AsParallel Extensions für das parallele Verarbeiten eines Observable Datenstroms.
Die
AsParallel Funktion dient zur Aufteilung eine Observable Datenstroms auf unterschiedliche Threads.
Sobald ein Element berechnet wird, fordert der Thread ein neues Element zur Bearbeitung an.
Bespiel:
var o=Observable.Range(1,20)
.AsParallel(4)
.Select(RunJob);
var sub=o.Subscribe(x=> Console.WriteLine("{0}-{1}",x,Thread.CurrentThread.ManagedThreadId));
public static class ObservableExtensions
{
/// <summary>
/// Observable AsParallel
/// </summary>
/// <typeparam name="T">The type of the elements in the source sequence</typeparam>
/// <param name="source">Source sequence</param>
/// <returns>The source sequence with on different threads</returns>
public static IObservable<T> AsParallel<T>(this IObservable<T> source)
{
return source.AsParallel(2);
}
/// <summary>
/// Observable AsParallel
/// </summary>
/// <typeparam name="T">The type of the elements in the source sequence</typeparam>
/// <param name="source">Source sequence</param>
/// <param name="count">Number of Threads</param>
/// <returns>The source sequence with on different threads</returns>
public static IObservable<T> AsParallel<T>(this IObservable<T> source, int count)
{
BlockingCollection<Action> task = new BlockingCollection<Action>();
Task[] tasks=new Task[count];
for (int i = 0; i < count; i++)
tasks[i]= System.Threading.Tasks.Task.Factory.StartNew(() =>
{
foreach (Action action in task.GetConsumingEnumerable())
action();
});
var observer = Observable.Create<T>(o =>
{
source.Subscribe(
x => task.Add(() => o.OnNext(x)),
ex=>o.OnError(ex),
()=>{
task.CompleteAdding();
Task.WaitAll(tasks);
o.OnCompleted();
});
return () => { task.Dispose();};
});
return observer;
}
}
1 Kommentare zum Snippet