Die Klasse ermöglicht das parallele Verarbeiten von Tasks. Hierzu wird eine beliebige Menge an Worker-Threads gestartet, die anschließend auf Tasks warten.
Verwendung findet die Klasse bei mir häufig im IO-Bereich, zum Beispiel beim Crawlen von Webseiten um Bilder zu rippen.
//Achtung: Minimales Beispiel ohne Exceptionhandling und mit "Anfängerregex"
var DownloadDirectory = "C:/downloads"; //Pfad, an dem die Bilder gespeichert werden sollen
//using-Block blockiert am Ende, bis alle Tasks beendet wurden
using (var queue = new TaskQueue<Uri>(
10, //10 Worker-Threads
delegate (Uri url) //Callback für jeden Task
{
new WebClient().DownloadFile(url, Path.Combine(DownloadDirectory, url.Segments[url.Segments.Length - 1]));
}
))
{
//Quellcode der Zielseite herunterladen
var src = new WebClient().DownloadString("https://9gag.com/");
//Urls zu Bildern suchen
var urls = new Regex("src=\"(http.+?)\"", RegexOptions.Compiled).Matches(src).Cast<Match>().Select(m => m.Groups[1].Value).Where(s => !string.IsNullOrEmpty(s)).Distinct();
foreach (var url in urls)
{
//Alle gefundenen Bilder einreihen
queue.Enqueue(new Uri(url));
}
}
Die Abarbeitungsreihenfolge der Tasks ist
nicht definiert!
using (var queue = new TaskQueue<int>(Console.WriteLine))
{
for (int i = 0; i < 10; ++i)
{
queue.Enqueue(i);
}
}
Mögliche Ausgabe:
Die Queue kann ohne "generische Generics" nur einen Parameter für den Callback entgegen nehmen. Das Problem kann man (ohne Codeänderung) umgehen, indem entweder als Typ ein Tuple oder dynamic verwendet wird.
using (var queue = new TaskQueue<Tuple<int, int>>(t => Console.WriteLine(t.Item1 + t.Item2)))
{
for (int i = 0; i < 10; ++i)
{
queue.Enqueue(Tuple.Create(i, i));
}
}
using (var queue = new TaskQueue<dynamic>(d => Console.WriteLine(d.Val)))
{
for (int i = 0; i < 10; ++i)
{
queue.Enqueue(new { Val = i });
}
}
using System;
using System.Collections.Generic;
using System.Threading;
/// <summary>
/// Eine Klasse, die parallel Aufgaben abarbeiten kann.
/// </summary>
/// <typeparam name="T">Typ des Parameters des Callbacks</typeparam>
class TaskQueue<T> : IDisposable
{
/// <summary>
/// Typ des Callback zum Abarbeiten von Tasks
/// </summary>
/// <param name="item"></param>
public delegate void ConsumeAction(T item);
/// <summary>
/// Wrapper für einen Task.
/// Wird intern zum Beenden der TaskQueue verwendet.
/// </summary>
private class Task
{
/// <summary>
/// Das gewrappte Item
/// </summary>
public T Param { get; private set; }
/// <summary>
/// Gibt an, ob die TaskQueue beendet werden soll
/// </summary>
public bool Abort { get; private set; }
public Task(T param, bool abort)
{
Param = param;
Abort = abort;
}
}
/// <summary>
/// Callback zum Abarbeiten von Tasks
/// </summary>
private ConsumeAction consume;
/// <summary>
/// Queue der abzuarbeitenden Tasks
/// </summary>
private Queue<Task> tasks;
/// <summary>
/// Synchronisierungsobjekt der Threads
/// </summary>
private object barrier;
/// <summary>
/// Liste der verwendeten Threads
/// </summary>
private Thread[] workers;
/// <summary>
/// Erstellt eine TaskQueue mit je einem Thread pro CPU (Kern).
/// </summary>
/// <param name="consumeAction">Callback zum Bearbeiten eines Tasks</param>
public TaskQueue(ConsumeAction consumeAction)
: this(Environment.ProcessorCount, consumeAction)
{
}
/// <summary>
/// Erstellt eine TaskQueue mit der angegebenen Anzahl an Threads.
/// </summary>
/// <param name="numWorkers">Anzahl an Threads</param>
/// <param name="consumeAction">Callback zum Bearbeiten eines Tasks</param>
public TaskQueue(int numWorkers, ConsumeAction consumeAction)
{
if (numWorkers < 0)
{
throw new ArgumentException(nameof(numWorkers));
}
if (consumeAction == null)
{
throw new ArgumentNullException(nameof(consumeAction));
}
barrier = new object();
tasks = new Queue<Task>();
consume = consumeAction;
workers = new Thread[numWorkers];
for (int i = 0; i < numWorkers; i++)
{
workers[i] = new Thread(Consume);
workers[i].Start();
}
}
/// <summary>
/// Beendet die TaskQueue und blockiert bis alle Tasks abgearbeitet wurden.
/// </summary>
public void Dispose()
{
foreach (var worker in workers)
{
EnqueueInternal(new Task(default(T), true));
}
foreach (var worker in workers)
{
worker.Join();
}
}
/// <summary>
/// Reiht einen neuen Task in die Queue ein.
/// </summary>
/// <param name="item">Das Element, das an den Callback übergeben werden soll</param>
public void Enqueue(T item)
{
EnqueueInternal(new Task(item, false));
}
/// <summary>
/// Reiht einen neuen Task ein und weckt einen Thread zum Bearbeiten.
/// </summary>
/// <param name="task"></param>
private void EnqueueInternal(Task task)
{
lock (barrier)
{
tasks.Enqueue(task);
Monitor.Pulse(barrier);
}
}
/// <summary>
/// Methode der Threads, die auf neue Tasks wartet und diese abarbeitet.
/// </summary>
private void Consume()
{
while (true)
{
Task task;
lock (barrier)
{
while (tasks.Count == 0)
{
Monitor.Wait(barrier);
}
task = tasks.Dequeue();
}
if (task.Abort)
{
return;
}
consume(task.Param);
}
}
}
Kommentare zum Snippet