Skip to main content
removed langauge tag from the title + edited title
Link
t3chb0t
  • 44.7k
  • 9
  • 85
  • 191

C# Async Queue Implementation Thread Safetyimplementation without locking

Source Link
9ee1
  • 163
  • 1
  • 4

C# Async Queue Implementation Thread Safety

I wrote a quick implementation of an async queue that utilizes a backing ConcurrentQueue. It was pretty much based on an implementation given in this Stack Overflow answer. The only difference is that I got rid of the internal locks that implementation uses.

Since a ConcurrentQueue is already thread safe, I can't figure out why they chose to use a lock in their implementation. Normally I would ask them on Stack Overflow but the question is 4 years old and I would rather post my implementation here and have someone do a peer review to see if I have a potential threading issue.

/// <summary>
///     Asynchronous Queue.
/// </summary>
/// <typeparam name="T">
///     The queue's elements' type.
/// </typeparam>
public sealed class AsyncQueue<T> {
    /// <summary>
    ///     Items.
    /// </summary>
    private readonly ConcurrentQueue<T> _items;

    /// <summary>
    ///     Promises.
    /// </summary>
    private readonly ConcurrentQueue<TaskCompletionSource<T>> _promises;

    /// <summary>
    ///     Create an Asynchronous Queue.
    /// </summary>
    public AsyncQueue() {
        this._items = new ConcurrentQueue<T>();
        this._promises = new ConcurrentQueue<TaskCompletionSource<T>>();
    }

    /// <summary>
    ///     Dequeue an Item Asynchronously.
    /// </summary>
    /// <returns>
    ///     A task representing the asynchronous operation.
    /// </returns>
    public Task<T> DequeueAsync() {
        var dequeueTask = this.DequeueAsync(CancellationToken.None);
        return dequeueTask;
    }

    /// <summary>
    ///     Dequeue an Item Asynchronously.
    /// </summary>
    /// <param name="cancellationToken">
    ///     A cancellation token to cancel the asynchronous operation with.
    /// </param>
    /// <returns>
    ///     A task representing the asynchronous operation.
    /// </returns>
    public async Task<T> DequeueAsync(CancellationToken cancellationToken) {
        CancellationTokenRegistration? cancellationTokenRegistration = null;
        var promise = new TaskCompletionSource<T>();
        var itemFound = this._items.TryDequeue(out var item);
        if (!itemFound) {
            cancellationTokenRegistration = cancellationToken.Register(OnCancellationTokenCanceled, promise);
            this._promises.Enqueue(promise);
        }
        else {
            promise.TrySetResult(item);
        }

        try {
            item = await promise.Task;
            return item;
        }
        finally {
            cancellationTokenRegistration?.Dispose();
        }

        // <summary>
        //      On Cancellation Token Canceled.
        // </summary>
        void OnCancellationTokenCanceled(object cState) {
            var cPromise = (TaskCompletionSource<T>) cState;
            cPromise.TrySetCanceled();
        }
    }

    /// <summary>
    ///     Enqueue an Item.
    /// </summary>
    /// <param name="item">
    ///     An item to enqueue.
    /// </param>
    public void Enqueue(T item) {
        while (true) {
            var promiseFound = this._promises.TryDequeue(out var promise);
            if (!promiseFound) {
                this._items.Enqueue(item);
                break;
            }

            var promiseSet = promise.TrySetResult(item);
            if (promiseSet) {
                break;
            }
        }
    }
}

I wrote unit tests and it seems to work fine. I think there might be a rare race condition that happens in the following scenario, but I have not been able to trigger it and I want a second opinion:

  1. Thread 1 enqueues an item
  2. Thread 1 does not find an existing promise
  3. Thread 1 is preempted
  4. Thread 2 attempts to dequeue an item
  5. Thread 2 does not find an existing item
  6. Thread 2 is preempted
  7. Thread 1 enqueues item
  8. Thread 2 creates and enqueues a promise
  9. Idle time until another enqueue or dequeue operation occurs