I'm building a producer consummer queue in C# and I was reading for searching the best method in terms of robust and performance.
For years I was using always BlockingCollection but I have discovered TPLDataflow and Channels.
I'have been doing some benchmarking and I have seen that both TPL and Channels are much more faster in dequeing elements.
My requirements are
- Queue behaviour (maintain item ordering)
- Multiple threads can Enqueue elements
- One thread reading elements (to maintain order)
IProducerConsumer interface
public interface IProducerConsumer
{
void Enqueue(Action item);
void Stop();
void StartDequeing();
}
Blocking Collection Implementation
public class BlockingCollectionQueue : IProducerConsumer
{
private readonly BlockingCollection<Action> _defaultQueue;
private Task _dequeTask;
public BlockingCollectionQueue()
{
_defaultQueue = new BlockingCollection<Action>(new ConcurrentQueue<Action>());
}
public void Enqueue(Action item)
{
if (!_defaultQueue.IsAddingCompleted)
{
_defaultQueue.Add(item);
}
}
public void Stop()
{
_defaultQueue.CompleteAdding();
}
public void StartDequeing()
{
Task.Run(DequeueTask);
}
private void DequeueTask()
{
foreach (var item in _defaultQueue.GetConsumingEnumerable())
{
item?.Invoke();
}
}
}
Channel Implementation
public class ChannelQueue : IProducerConsumer
{
private readonly Channel<Action> _channel;
private readonly ChannelWriter<Action> _channelWriter;
private readonly ChannelReader<Action> _channelReader;
public ChannelQueue()
{
_channel = Channel.CreateUnbounded<Action>(new UnboundedChannelOptions() { SingleReader = true });
_channelWriter = _channel.Writer;
_channelReader = _channel.Reader;
}
public void Enqueue(Action item)
{
_channelWriter.TryWrite(item);
}
public void StartDequeing()
{
Task.Run(DequeueTask);
}
private async Task DequeueTask()
{
while (await _channelReader.WaitToReadAsync())
{
while (_channelReader.TryRead(out var job))
{
job?.Invoke();
}
}
}
public void Stop()
{
_channelWriter.Complete();
}
}
TPLDataFlow using BufferBlock implementation
public class DataFlowQueue : IProducerConsumer
{
private readonly BufferBlock<Action> _bufferBlock;
private Task _dequeTask;
public DataFlowQueue()
{
var dataflowOptions = new DataflowBlockOptions() { EnsureOrdered = true };
_bufferBlock = new BufferBlock<Action>(dataflowOptions);
}
public void Enqueue(Action item)
{
_bufferBlock.Post(item);
}
public void StartDequeing()
{
_dequeTask = Task.Run(DequeueTask);
}
private async Task DequeueTask()
{
while (await _bufferBlock.OutputAvailableAsync())
{
while(_bufferBlock.TryReceive(out var item))
{
item?.Invoke();
}
}
}
public void Stop()
{
_bufferBlock.Complete();
}
}
TPLDataFlow using ActionBlock
public class ActionBlockQueue : IProducerConsumer
{
private readonly ActionBlock<Action> _actionBlock;
private Task _dequeTask;
public ActionBlockQueue()
{
var dataflowOptions = new ExecutionDataflowBlockOptions() { EnsureOrdered = true, MaxDegreeOfParallelism = 1 };
_actionBlock = new ActionBlock<Action>(item=> item?.Invoke(), dataflowOptions);
}
public void Enqueue(Action item, QueuePriority priority = QueuePriority.Default)
{
_actionBlock.Post(item);
}
public void StartDequeing()
{
}
public void Stop()
{
_actionBlock.Complete();
}
}
Benchmark using BenchmarDotNet
As you can see all of the implementations are storing Action in the queues and I'm using an AutoResetEvent to signal when the last element is dequeued.
public class MultipleJobBenchMark
{
private AutoResetEvent _autoResetEvent;
public MultipleJobBenchMark()
{
_autoResetEvent = new AutoResetEvent(false);
}
[Benchmark]
public void BlockingCollectionQueue()
{
DoMultipleJobs(new BlockingCollectionQueue());
}
[Benchmark]
public void DataFlowQueue()
{
DoMultipleJobs(new DataFlowQueue());
}
[Benchmark]
public void ActionBlockQueue()
{
DoMultipleJobs(new ActionBlockQueue());
}
[Benchmark]
public void ChannelQueue()
{
DoMultipleJobs(new ChannelQueue());
}
private void DoMultipleJobs(IProducerConsumer producerConsumerQueue)
{
producerConsumerQueue.StartDequeing();
int jobs = 100000;
for (int i = 0; i < jobs - 1; i++)
{
producerConsumerQueue.Enqueue(() => { });
}
producerConsumerQueue.Enqueue(() => _autoResetEvent.Set());
_autoResetEvent.WaitOne();
producerConsumerQueue.Stop();
}
}
Results
- BlockingCollection: Mean 21.5ms
- BufferBlock Queue: Mean 14.937ms
- ActionBlock Queue: 6.007ms
- Channel: 4.781ms
Questions and Conclussions
By doing this exercise I have seen that at this time the use of BlockingCollection maybe is not the best option.
I don't understand why there is such a big difference between BufferBlock and ActionBlock. I have done both implementations becase in my interface I was defined StartDequeue() method and with ActionBlock it is not possible because dequeuing is done at ActionBlock construct.
Does my implementation using BufferBlock the best?
I wanted to post here my results to see which is the most accepted of Producer Consummer Queue at this momment and why I have seen such a big difference Between ActionBlock and BufferBlock
As your benchmarks reveal, the
Channel<T>is a relatively more performant producer consumer queue than theBlockingCollection<T>. Which is reasonable since theChannel<T>is a newer component (2019), and takes advantage of theValueTask<T>technology that was non-existent when theBlockingCollection<T>was introduced (2010). For this to have any measurable effect, you must be passing crazy many items per second through the queue. In which case it might be a good idea to consider processing the items in batches/chunks, instead of passing each item individually through the queue.In general I think that the
BlockingCollection<T>is still a good option when your producer consumer system is synchronous, i.e. when the producer and the consumer are running on dedicated threads. TheChannel<T>is a natural choice when you want to build an asynchronous system, i.e. you are calling asynchronous APIs and you want to make efficient use of threads. As for the components found in the TPL Dataflow library, they are a valid option when you want to build an asynchronous system that can run on older versions of .NET. There are very few reasons to prefer the olderBufferBlock<T>over the newerChannel<T>when both are available. TheChannel<T>has a cleaner and more expressive API, and offers more options. Like the ability to drop old items, when new items are added and the maximum capacity has been reached.A rare scenario where you may want to avoid the
Channel<T>is in case your producer, or the consumer, or both, is using cancellation tokens in each and every asynchronous write/read operation, that are routinely canceled. This usage can trigger a memory leak in theChannel<T>, but not in theBufferBlock<T>. See this question for details.