Feedback

C# - AsParallel for Observables

Veröffentlicht von am 05.04.2013
(2 Bewertungen)
AsParallel Extensions für das parallele Verarbeiten eines Observable Datenstroms.

Die AsParallel Funktion dient zur Aufteilung eine Observable Datenstroms auf unterschiedliche Threads.
Sobald ein Element berechnet wird, fordert der Thread ein neues Element zur Bearbeitung an.
Bespiel:


var o=Observable.Range(1,20)
.AsParallel(4)
.Select(RunJob);
var sub=o.Subscribe(x=> Console.WriteLine("{0}-{1}",x,Thread.CurrentThread.ManagedThreadId));

public  static  class ObservableExtensions
{     
    /// <summary>
    ///  Observable AsParallel 
    /// </summary>
    /// <typeparam name="T">The type of the elements in the source sequence</typeparam>
    /// <param name="source">Source sequence</param>
    /// <returns>The source sequence with on different threads</returns>
    public static IObservable<T> AsParallel<T>(this IObservable<T> source)
    {
        return source.AsParallel(2);
    }


    /// <summary>
    /// Observable AsParallel 
    /// </summary>
    /// <typeparam name="T">The type of the elements in the source sequence</typeparam>
    /// <param name="source">Source sequence</param>
    /// <param name="count">Number of Threads</param>
    /// <returns>The source sequence with on different threads</returns>
public static IObservable<T> AsParallel<T>(this IObservable<T> source, int count)
        {
            BlockingCollection<Action> task = new BlockingCollection<Action>();
			Task[] tasks=new Task[count];

            for (int i = 0; i < count; i++)
                tasks[i]= System.Threading.Tasks.Task.Factory.StartNew(() =>
                {
                    foreach (Action action in task.GetConsumingEnumerable())
                        action();
                });

            var observer = Observable.Create<T>(o =>
            {
                source.Subscribe(
				   x =>  task.Add(() => o.OnNext(x)),
				   ex=>o.OnError(ex),
				   ()=>{ 
				      task.CompleteAdding();
					  Task.WaitAll(tasks);
				      o.OnCompleted();
					  
				});

                return () => {  task.Dispose();};
            });
			

            return observer;

        }
}
Abgelegt unter rx, ReactiveExtensions, Parallel.

1 Kommentare zum Snippet

kkleeberger schrieb am 26.08.2013:
Habe am Snippet noch eine Korrektur vorgenommen.
Die OnCompleted Funktion wurde zu früh aufgerufen, deshalb wird jetzt
mit Task.WaitAll überprüft ob alle Task die Berechnung beendet haben und erst danach die OnComplete() Funktion aufgerufen.
 

Logge dich ein, um hier zu kommentieren!