-
Notifications
You must be signed in to change notification settings - Fork 16
QueueAsyncProcessor
ikopylov edited this page Mar 21, 2015
·
7 revisions
One of the most common scenario in parallel data processing is to put the data to the thread-safe blocking queue and later take and process them in multiple threads. If you want to use this pattern then QueueAsyncProcessor is right what you need.
Sample source code:
// Define subclass of QueueAsyncProcessor parametrized with type of elements
public class DataProcessor: QueueAsyncProcessor<int>
{
public DataProcessor(int threadCount, int maxQueueSize)
: base(threadCount: threadCount, maxQueueSize: maxQueueSize, name: "name")
{
}
// Implement main process method
protected override void Process(int element, object state, CancellationToken token)
{
Console.WriteLine(element);
}
}
// usage
static void Main()
{
// Create instance of QueueAsyncProcessor
DataProcessor processor = new DataProcessor(Environment.ProcessorCount, 64 * Environment.ProcessorCount);
// Start our processor
processor.Start();
// Add elements for processing
for (int i = 0; i < 100; i++)
processor.Add(i);
// Stop our processor
processor.Stop(waitForStop: true, letFinishProcess: true, completeAdding: true);
}
If you need, there's also exist a version of processor which received a processing delegate as constructor parameter (DelegateQueueAsyncProcessor).