I want to create a production-ready producer/consumer that could help me avoid thread synchronization hell. Is this thread-safe? The main issue is to be safe with exceptions that can arrive.
public class AsynchSimpleProducer<T> : IDisposable
{
private readonly Action<Exception> error;
private readonly BlockingCollection<T> blockingCollection = new BlockingCollection<T>(50);
public event Func<T, Task> NewItem;
public AsynchSimpleProducer(Action<Exception> error)
{
this.error = error;
Task.Factory.StartNew(() =>
{
//this loop is ends only when blockingCollection.CompleteAdding called
Parallel.ForEach(blockingCollection.GetConsumingPartitioner(), SendItem);
});
}
private void SendItem(T item)
{
try
{
if (NewItem !=null)
NewItem(item).Wait();
}
catch (Exception ex)
{
error(ex);
}
}
public void Send(T newValue)
{
blockingCollection.Add(newValue);
}
public void Dispose()
{
blockingCollection.CompleteAdding();
}
}
Example of usage:
simpleProducer = new AsynchSimpleProducer<int>(Error);
simpleProducer.NewItem += simpleProducer_NewItem;
//this part can be in task
var next = random.Next(0, 1000);
simpleProducer.Send(next);