Feedback

C# - AsParallel for Observables

Veröffentlicht von am 05.04.2013
(2 Bewertungen)
AsParallel Extensions für das parallele Verarbeiten eines Observable Datenstroms.

Die AsParallel Funktion dient zur Aufteilung eine Observable Datenstroms auf unterschiedliche Threads.
Sobald ein Element berechnet wird, fordert der Thread ein neues Element zur Bearbeitung an.
Bespiel:


var o=Observable.Range(1,20)
.AsParallel(4)
.Select(RunJob);
var sub=o.Subscribe(x=> Console.WriteLine("{0}-{1}",x,Thread.CurrentThread.ManagedThreadId));

GFU-Schulungen  [Anzeige]

Visual Studio Team Foundation Server 2017/2015 (TFS) für Administratoren - Kompakt

Nach dieser Schulung beherrschen Sie die Grundlagen des TFS. Sie erledigen administrative Aufgaben schnell und sicher.

Angular mit ASP.NET Core für .NET-Entwickler

.NET ist Ihnen vertraut, als Entwickler verfügen Sie über einschlägige Kenntnisse. In diesem Kurs lernen Sie nun, Angular in .NET-Umgebungen einzusetzen. Sie verstehen das Konzept von Angular und integrieren das clientseitige JS-Framework sicher in.NET-Anwendungen.

public  static  class ObservableExtensions
{     
    /// <summary>
    ///  Observable AsParallel 
    /// </summary>
    /// <typeparam name="T">The type of the elements in the source sequence</typeparam>
    /// <param name="source">Source sequence</param>
    /// <returns>The source sequence with on different threads</returns>
    public static IObservable<T> AsParallel<T>(this IObservable<T> source)
    {
        return source.AsParallel(2);
    }


    /// <summary>
    /// Observable AsParallel 
    /// </summary>
    /// <typeparam name="T">The type of the elements in the source sequence</typeparam>
    /// <param name="source">Source sequence</param>
    /// <param name="count">Number of Threads</param>
    /// <returns>The source sequence with on different threads</returns>
public static IObservable<T> AsParallel<T>(this IObservable<T> source, int count)
        {
            BlockingCollection<Action> task = new BlockingCollection<Action>();
			Task[] tasks=new Task[count];

            for (int i = 0; i < count; i++)
                tasks[i]= System.Threading.Tasks.Task.Factory.StartNew(() =>
                {
                    foreach (Action action in task.GetConsumingEnumerable())
                        action();
                });

            var observer = Observable.Create<T>(o =>
            {
                source.Subscribe(
				   x =>  task.Add(() => o.OnNext(x)),
				   ex=>o.OnError(ex),
				   ()=>{ 
				      task.CompleteAdding();
					  Task.WaitAll(tasks);
				      o.OnCompleted();
					  
				});

                return () => {  task.Dispose();};
            });
			

            return observer;

        }
}
Abgelegt unter rx, ReactiveExtensions, Parallel.

1 Kommentare zum Snippet

kkleeberger schrieb am 26.08.2013:
Habe am Snippet noch eine Korrektur vorgenommen.
Die OnCompleted Funktion wurde zu früh aufgerufen, deshalb wird jetzt
mit Task.WaitAll überprüft ob alle Task die Berechnung beendet haben und erst danach die OnComplete() Funktion aufgerufen.
 

Logge dich ein, um hier zu kommentieren!