Feedback

C# - Klasse zur parallelen Verarbeitung von Tasks

Veröffentlicht von am 16.11.2015
(0 Bewertungen)
Die Klasse ermöglicht das parallele Verarbeiten von Tasks. Hierzu wird eine beliebige Menge an Worker-Threads gestartet, die anschließend auf Tasks warten.

Verwendung findet die Klasse bei mir häufig im IO-Bereich, zum Beispiel beim Crawlen von Webseiten um Bilder zu rippen.

//Achtung: Minimales Beispiel ohne Exceptionhandling und mit "Anfängerregex"
var DownloadDirectory = "C:/downloads"; //Pfad, an dem die Bilder gespeichert werden sollen

//using-Block blockiert am Ende, bis alle Tasks beendet wurden
using (var queue = new TaskQueue<Uri>(
10, //10 Worker-Threads
delegate (Uri url) //Callback für jeden Task
{
new WebClient().DownloadFile(url, Path.Combine(DownloadDirectory, url.Segments[url.Segments.Length - 1]));
}
))
{
//Quellcode der Zielseite herunterladen
var src = new WebClient().DownloadString("https://9gag.com/");
//Urls zu Bildern suchen
var urls = new Regex("src=\"(http.+?)\"", RegexOptions.Compiled).Matches(src).Cast<Match>().Select(m => m.Groups[1].Value).Where(s => !string.IsNullOrEmpty(s)).Distinct();

foreach (var url in urls)
{
//Alle gefundenen Bilder einreihen
queue.Enqueue(new Uri(url));
}
}


Die Abarbeitungsreihenfolge der Tasks ist nicht definiert!

using (var queue = new TaskQueue<int>(Console.WriteLine))
{
for (int i = 0; i < 10; ++i)
{
queue.Enqueue(i);
}
}


Mögliche Ausgabe:
0
3
2
5
6
7
8
9
4
1


Die Queue kann ohne "generische Generics" nur einen Parameter für den Callback entgegen nehmen. Das Problem kann man (ohne Codeänderung) umgehen, indem entweder als Typ ein Tuple oder dynamic verwendet wird.

using (var queue = new TaskQueue<Tuple<int, int>>(t => Console.WriteLine(t.Item1 + t.Item2)))
{
for (int i = 0; i < 10; ++i)
{
queue.Enqueue(Tuple.Create(i, i));
}
}
using (var queue = new TaskQueue<dynamic>(d => Console.WriteLine(d.Val)))
{
for (int i = 0; i < 10; ++i)
{
queue.Enqueue(new { Val = i });
}
}
using System;
using System.Collections.Generic;
using System.Threading;

/// <summary>
/// Eine Klasse, die parallel Aufgaben abarbeiten kann.
/// </summary>
/// <typeparam name="T">Typ des Parameters des Callbacks</typeparam>
class TaskQueue<T> : IDisposable
{
	/// <summary>
	/// Typ des Callback zum Abarbeiten von Tasks
	/// </summary>
	/// <param name="item"></param>
	public delegate void ConsumeAction(T item);

	/// <summary>
	/// Wrapper für einen Task.
	/// Wird intern zum Beenden der TaskQueue verwendet.
	/// </summary>
	private class Task
	{
		/// <summary>
		/// Das gewrappte Item
		/// </summary>
		public T Param { get; private set; }
		/// <summary>
		/// Gibt an, ob die TaskQueue beendet werden soll
		/// </summary>
		public bool Abort { get; private set; }

		public Task(T param, bool abort)
		{
			Param = param;
			Abort = abort;
		}
	}

	/// <summary>
	/// Callback zum Abarbeiten von Tasks
	/// </summary>
	private ConsumeAction consume;
	/// <summary>
	/// Queue der abzuarbeitenden Tasks
	/// </summary>
	private Queue<Task> tasks;
	/// <summary>
	/// Synchronisierungsobjekt der Threads
	/// </summary>
	private object barrier;
	/// <summary>
	/// Liste der verwendeten Threads
	/// </summary>
	private Thread[] workers;

	/// <summary>
	/// Erstellt eine TaskQueue mit je einem Thread pro CPU (Kern).
	/// </summary>
	/// <param name="consumeAction">Callback zum Bearbeiten eines Tasks</param>
	public TaskQueue(ConsumeAction consumeAction)
		: this(Environment.ProcessorCount, consumeAction)
	{

	}

	/// <summary>
	/// Erstellt eine TaskQueue mit der angegebenen Anzahl an Threads.
	/// </summary>
	/// <param name="numWorkers">Anzahl an Threads</param>
	/// <param name="consumeAction">Callback zum Bearbeiten eines Tasks</param>
	public TaskQueue(int numWorkers, ConsumeAction consumeAction)
	{
		if (numWorkers < 0)
		{
			throw new ArgumentException(nameof(numWorkers));
		}
		if (consumeAction == null)
		{
			throw new ArgumentNullException(nameof(consumeAction));
		}

		barrier = new object();
		tasks = new Queue<Task>();

		consume = consumeAction;

		workers = new Thread[numWorkers];

		for (int i = 0; i < numWorkers; i++)
		{
			workers[i] = new Thread(Consume);
			workers[i].Start();
		}
	}

	/// <summary>
	/// Beendet die TaskQueue und blockiert bis alle Tasks abgearbeitet wurden.
	/// </summary>
	public void Dispose()
	{
		foreach (var worker in workers)
		{
			EnqueueInternal(new Task(default(T), true));
		}
		foreach (var worker in workers)
		{
			worker.Join();
		}
	}

	/// <summary>
	/// Reiht einen neuen Task in die Queue ein.
	/// </summary>
	/// <param name="item">Das Element, das an den Callback übergeben werden soll</param>
	public void Enqueue(T item)
	{
		EnqueueInternal(new Task(item, false));
	}

	/// <summary>
	/// Reiht einen neuen Task ein und weckt einen Thread zum Bearbeiten.
	/// </summary>
	/// <param name="task"></param>
	private void EnqueueInternal(Task task)
	{
		lock (barrier)
		{
			tasks.Enqueue(task);

			Monitor.Pulse(barrier);
		}
	}

	/// <summary>
	/// Methode der Threads, die auf neue Tasks wartet und diese abarbeitet.
	/// </summary>
	private void Consume()
	{
		while (true)
		{
			Task task;
			lock (barrier)
			{
				while (tasks.Count == 0)
				{
					Monitor.Wait(barrier);
				}
				task = tasks.Dequeue();
			}

			if (task.Abort)
			{
				return;
			}

			consume(task.Param);
		}
	}
}
Abgelegt unter task, queue, thread, parallel.

1 Kommentare zum Snippet

AI schrieb am 22.01.2026:
Das Muster, mehrere Tasks parallel auszuführen und auf alle zu warten, ist grundsätzlich weiterhin relevant. Der gezeigte Code spiegelt aber einen älteren Stil ohne moderne Asynchronitäts- und Parallelisierungs-Primitiven wider. Insbesondere wird eine eigene Warteschleife über Task.WhenAny geschrieben, statt vorhandene Bibliotheken und Sprachfeatures zu nutzen. Außerdem fehlen Cancellation-Unterstützung, saubere Fehlerbehandlung und Kontrolle über den Grad der Parallelität.

Analyse nach heutigen Kriterien:
- Performance: Die manuelle Schleife über Task.WhenAny mit Entfernen aus einer Liste funktioniert, ist aber unnötig kompliziert und erzeugt zusätzliche List-Allokationen. Task.WhenAll ist für „alle Tasks beenden“ effizienter und semantisch klarer.
- Memory-Allokationen: Die mutable Task-Liste und wiederholte Remove-Operationen erzeugen unnötige Allokationen und Reallokationen. Moderne APIs kommen ohne diese Struktur aus.
- Korrektheit: Exceptions einzelner Tasks werden nicht zentral behandelt. Fehler können unbemerkt bleiben oder erst sehr spät sichtbar werden.
- Robustheit: Es fehlt jegliche Cancellation-Unterstützung. In zeitgemäßen APIs gehört ein CancellationToken zwingend dazu.
- Thread-Safety: Der Ansatz ist grundsätzlich thread-safe, aber die manuelle Verwaltung der Task-Liste macht den Code fehleranfällig bei Erweiterungen.
- Cloud/Container: Blockierende Wait- oder Result-Zugriffe sind in Cloud-Workloads problematisch. Async/await ist der erwartete Standard.
- Standard-.NET-Ansatz: Task.WhenAll, Parallel.ForEachAsync, SemaphoreSlim oder Channels sind heute die bevorzugten Werkzeuge.

Modernisierte Variante (async/await, WhenAll, Cancellation):

using System;
using System.Threading;
using System.Threading.Tasks;

public static class ParallelTasks
{
public static Task WhenAll(Func<CancellationToken, Task>[] taskFactories, CancellationToken cancellationToken = default)
{
if (taskFactories is null) throw new ArgumentNullException(nameof(taskFactories));

var tasks = new Task[taskFactories.Length];
for (int i = 0; i < taskFactories.Length; i++)
tasks[i] = taskFactories[i](cancellationToken);

return Task.WhenAll(tasks);
}
}


Warum das heute objektiv besser ist:
- Performance: Task.WhenAll nutzt die Laufzeit effizienter als manuelle Schleifen mit WhenAny.
- Robustheit: Fehler werden gesammelt propagiert, Cancellation ist sauber integrierbar.
- Memory-Allokationen: Keine dynamischen Listenmanipulationen.
- Cloud-Tauglichkeit: Async/await ohne Blockieren verhindert Threadpool-Starvation.

Security-Realitätscheck:
Fehlende Cancellation oder Timeouts können in Server-Szenarien zu Ressourcenerschöpfung führen. Parallele Verarbeitung sollte immer begrenzt und kontrollierbar sein.
 

Logge dich ein, um hier zu kommentieren!