0
\$\begingroup\$

I have created a helper service to send files from the client application to the API over HTTP using the POST method. Source project on Blazor Server Side, .NET 6, C # 10.

Service features:

  1. Sending files to the API.
  2. A limited number of parallel sending processes.
  3. Ability to freely add files to the queue without waiting for the completion of previous tasks.
  4. Receive the send status of each file subscribed to the event.

I'm wondering how can I improve parallel file sending? Optimize multithreading?

FileUploadManager.cs

using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Net.Http.Handlers;
using System.Threading;
using System.Threading.Tasks;

namespace BlazorApp.Services
{
    public class FileUploadManager : IDisposable
    {
        private readonly ILogger<FileUploadManager> logger;
        private readonly IOptions<UploadOptions> options;

        // HttpClient
        private readonly IFileUploadApiService fileUploadService;

        private readonly TaskFactory factory;
        private readonly List<Task> tasks;

        private readonly ConcurrentQueue<FileUploadDto> fileUploadQueue;

        private readonly object fileUploadQueueLock = new object();
        private readonly object actionLock = new object();

        private readonly long maxFileSize;
         
        private readonly Semaphore semaphore;

        public FileUploadManager(
            ILogger<FileUploadManager> logger,
            IOptions<UploadOptions> uploadOptions,
            IFileUploadApiService fileUploadService)
        {
            this.logger = logger;
            this.options = uploadOptions;
            this.fileUploadService = fileUploadService;

            fileUploadQueue = new ConcurrentQueue<FileUploadDto>();
            factory = new TaskFactory();
            tasks = new List<Task>();

            maxFileSize = options.Value.MaxFileSize;

            var maxParallelUplaods = this.options.Value.MaxParallelUploads > 0 ? this.options.Value.MaxParallelUploads : 1;
            semaphore = new Semaphore(maxParallelUplaods, maxParallelUplaods);
        }

// ------------ Main Points ---------------------------------

        // OUTPUT
        public Action<FileUploadEventArgs> OnStateChange { get; set; }

        // INPUT
        public void AddFilesToUploadQueue(List<FileUploadDto> files)
        {
            lock (fileUploadQueueLock)
            {
                foreach (var item in files)
                {
                    fileUploadQueue.Enqueue(item);
                }
            }

            for (int i = 0; i < files.Count; i++)
            {
                Task t = factory.StartNew(() => UploadOneAsync());
                tasks.Add(t); 
            }
        }
         
          private async Task UploadOneAsync()
          {
            semaphore.WaitOne();

            if (fileUploadQueue.TryDequeue(out FileUploadDto item))
            {
                await UploadFileAsync(item);
            }

            semaphore.Release();
          }

// -------------- Helpers -------------------------

        private async Task UploadFileAsync(FileUploadDto item) 
        {
            var token = item.CancellationTokenSource.Token;

            using (var readStream = item.File.OpenReadStream(maxFileSize, token))
            { 
                RaiseStateChange(item.Guid, FileUploadState.Starting);

                if (readStream.Length <= 0)
                { 
                    RaiseStateChange(item.Guid, FileUploadState.Failed);

                    return;
                }

                MessageDto result = null;

                using (var streamContent = fileUploadService.CreateStreamContent(readStream, item.File.Name, item.File.ContentType))
                { 
                      // HttpClient
                      result = await fileUploadService.UploadFileAsync(
                            streamContent, item.ChatId, item.Guid, item.CreatedUtc, UploadProgressEventHandler, token); 
                }

                if (result != null && result.Id > 0)
                { 
                    RaiseStateChange(item.Guid, FileUploadState.Completed);
                }
                else
                { 
                    RaiseStateChange(item.Guid, FileUploadState.Failed);
                }
            }
        } 

        private void RaiseStateChange(string guid, FileUploadState state, long? bytesTransferred = null)
        {
            lock (actionLock) 
            {
                OnStateChange?.Invoke(new FileUploadEventArgs(guid, state, bytesTransferred));
            }
        }
           
        private void UploadProgressEventHandler(object sender, HttpProgressEventArgs e)
        {
            if (sender == null)
            {
                return;
            }

            var request = (HttpRequestMessage)sender;
            if (!request.Content.Headers.Contains("guid"))
            {
                return;
            }

            var guid = request.Content.Headers.GetValues("guid").First();

            RaiseStateChange( 
                guid: guid, 

                // It checks how many bytes have been sent before setting
                // the status to InProgress. The default value is 512.
                // 512 is an estimated HTTP header size. This is done
                // in order to understand whether the sending of the
                // body has begun or not.
                state: e.BytesTransferred > 512 ? FileUploadState.InProgress : FileUploadState.Starting,

                bytesTransferred: e.BytesTransferred
            );
        } 
    }
}

Other related files:

FileUploadEventArgs.cs

using BlazorApp.Enums;

namespace BlazorApp.Services
{
    public class FileUploadEventArgs
    {
        public FileUploadEventArgs()
        {

        }

        public FileUploadEventArgs(string guid, FileUploadState state, long? bytesTransferred)
        {
            Guid = guid;
            State = state;
            BytesTransferred = bytesTransferred;
        }

        public string Guid { get; set; }  

        public FileUploadState State { get; set; } = FileUploadState.NotStarted;
        
        public long? BytesTransferred { get; set; } 
    }
}

FileUploadApiService.cs HttpClient helper service

using Microsoft.Extensions.Configuration;
using System;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using System.Net.Http;
using System.Net.Http.Handlers;
using System.Net.Http.Headers;
using Microsoft.AspNetCore.Http;
using System.Text.Json;
using Microsoft.Extensions.Logging; 

namespace BlazorApp.Services
{
    public class FileUploadApiService : IFileUploadApiService
    {
        private readonly ILogger<FileUploadApiService> logger;
        private readonly HttpClient httpClient;
        private readonly ProgressMessageHandler httpProgressHandler;

        private readonly object httpProgressHandlerLock = new object();

        public FileUploadApiService(
            ILogger<FileUploadApiService> logger,
            IConfiguration configuration, 
            IHttpContextAccessor httpContextAccessor)
        {
            this.logger = logger;

            var authHttpClientHandler = new AuthHttpClientHandler(httpContextAccessor);
            httpProgressHandler = new ProgressMessageHandler(authHttpClientHandler);

            httpClient = new HttpClient(httpProgressHandler);
            httpClient.BaseAddress = new Uri(configuration["ApiUrl"]);
             
            httpClient.Timeout = TimeSpan.FromMinutes(32);
        } 

        public async Task<MessageDto> UploadFileAsync(
            StreamContent file,  
            string guid,
            DateTime messageCreatedUtc,
            EventHandler<HttpProgressEventArgs> progressHandler, 
            CancellationToken cancellationToken)
        {
            MessageDto response = null;

            var url = "/api/upload/file";
            var formData = new MultipartFormDataContent();

            image.Headers.ContentDisposition.Name = "file";
            formData.Add(file, "file"); 
            formData.Add(new StringContent(guid), "guid");

            response = await PostAndReportProgressAsync(url, formData, guid, progressHandler, cancellationToken);

            return response;
        }
         
        private async Task<MessageDto> PostAndReportProgressAsync(
           string url,
           MultipartFormDataContent formData,
           string guid,
           EventHandler<HttpProgressEventArgs> progressHandler,
           CancellationToken cancellationToken)
        {
            MessageDto returnValue = null;

            lock (httpProgressHandlerLock)
            {
                httpProgressHandler.HttpSendProgress += progressHandler;
            }

            formData.Headers.ContentType.MediaType = "multipart/form-data";
            formData.Headers.Add("guid", guid);

            try
            {
                var result = await httpClient.PostAsync(url, formData, cancellationToken);

                if (result.IsSuccessStatusCode)
                {
                    var jsonString = await result.Content.ReadAsStringAsync();
                    var options = new JsonSerializerOptions
                    {
                        PropertyNameCaseInsensitive = true
                    };
                    returnValue = JsonSerializer.Deserialize<MessageDto>(jsonString, options);
                }
            }
            catch (Exception ex)
            {
                logger.LogWarning(ex, "[FileUploadApiService][PostAndReportProgress]: Exception");

                throw new FileUploadApiException("Unhandled exception while post file", ex);
            }
            finally
            {
                lock (httpProgressHandlerLock)
                {
                    httpProgressHandler.HttpSendProgress -= progressHandler;
                }
            } 

            return returnValue;
        }
         
        public StreamContent CreateStreamContent(Stream stream, string fileName, string contentType)
        {
            var fileContent = new StreamContent(stream);
            fileContent.Headers.ContentDisposition = new ContentDispositionHeaderValue("form-data")
            {
                Name = "\"file\"",
                FileName = "\"" + fileName + "\""
            };
            fileContent.Headers.TryAddWithoutValidation("Content-Type", contentType);
            return fileContent;
        } 
    }
}

FileUploadDto.cs

using BlazorApp.Enums;
using Microsoft.AspNetCore.Components.Forms;
using System;
using System.Threading;

namespace BlazorApp.Data.Dto
{
    public class FileUploadDto
    {
        public string Guid { get; set; }
   
        public IBrowserFile File { get; set; } 

        public DateTime CreatedUtc{ get; set; } 

        public CancellationTokenSource CancellationTokenSource {get;set;}
    } 
}

Startup.cs

        public void ConfigureServices(IServiceCollection services)
        {
            //.................
            services.AddSingleton<IFileUploadApiService, FileUploadApiService>();
            services.AddSingleton<FileUploadManager>();
            //.................
        }
\$\endgroup\$
10
  • 1
    \$\begingroup\$ Welcome to CodeReview. Why did you use TaskFactory.StartNew? UploadOne is already an async I/O task. You don't need to pass that to a CPU thread. (By the way you should prefer Task.Run over StartNew.) \$\endgroup\$ Commented Dec 10, 2021 at 8:20
  • 2
    \$\begingroup\$ Using a "new" CPU thread for an I/O bound operation does not make too much sense. Network drives can handle way more parallelism than the processors. You don't need to explicitly "offload" the I/O bound operation to a new thread \$\endgroup\$ Commented Dec 10, 2021 at 11:25
  • 2
    \$\begingroup\$ Let me try to put it into another way. Task.Run should be called for CPU-bound operations. File upload is not a processor intensive operation. If you would calculate for instance the MD5 hash / add a watermark onto images / etc before you upload them then you a mixture of CPU and I/O bound operations. Does it give you clarity? \$\endgroup\$ Commented Dec 10, 2021 at 13:01
  • 2
    \$\begingroup\$ You should simply call the method like this UploadOneAsync(), so without await, then you are firing it off without blocking. You can save the returned Task into a task collection, like: List<Task> uploads = new List<Task>(); uploads.Add(UploadOneAsync()). And after you have iterated through the to be uploaded files then you can call await Task.WhenAll(uploads); \$\endgroup\$ Commented Dec 10, 2021 at 13:46
  • 1
    \$\begingroup\$ @PeterCsala Ok, thanks for helping me with the explanation. \$\endgroup\$ Commented Dec 10, 2021 at 14:46

0

You must log in to answer this question.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.