I have created a helper service to send files from the client application to the API over HTTP using the POST method. Source project on Blazor Server Side, .NET 6, C # 10.
Service features:
- Sending files to the API.
- A limited number of parallel sending processes.
- Ability to freely add files to the queue without waiting for the completion of previous tasks.
- Receive the send status of each file subscribed to the event.
I'm wondering how can I improve parallel file sending? Optimize multithreading?
FileUploadManager.cs
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Net.Http.Handlers;
using System.Threading;
using System.Threading.Tasks;
namespace BlazorApp.Services
{
public class FileUploadManager : IDisposable
{
private readonly ILogger<FileUploadManager> logger;
private readonly IOptions<UploadOptions> options;
// HttpClient
private readonly IFileUploadApiService fileUploadService;
private readonly TaskFactory factory;
private readonly List<Task> tasks;
private readonly ConcurrentQueue<FileUploadDto> fileUploadQueue;
private readonly object fileUploadQueueLock = new object();
private readonly object actionLock = new object();
private readonly long maxFileSize;
private readonly Semaphore semaphore;
public FileUploadManager(
ILogger<FileUploadManager> logger,
IOptions<UploadOptions> uploadOptions,
IFileUploadApiService fileUploadService)
{
this.logger = logger;
this.options = uploadOptions;
this.fileUploadService = fileUploadService;
fileUploadQueue = new ConcurrentQueue<FileUploadDto>();
factory = new TaskFactory();
tasks = new List<Task>();
maxFileSize = options.Value.MaxFileSize;
var maxParallelUplaods = this.options.Value.MaxParallelUploads > 0 ? this.options.Value.MaxParallelUploads : 1;
semaphore = new Semaphore(maxParallelUplaods, maxParallelUplaods);
}
// ------------ Main Points ---------------------------------
// OUTPUT
public Action<FileUploadEventArgs> OnStateChange { get; set; }
// INPUT
public void AddFilesToUploadQueue(List<FileUploadDto> files)
{
lock (fileUploadQueueLock)
{
foreach (var item in files)
{
fileUploadQueue.Enqueue(item);
}
}
for (int i = 0; i < files.Count; i++)
{
Task t = factory.StartNew(() => UploadOneAsync());
tasks.Add(t);
}
}
private async Task UploadOneAsync()
{
semaphore.WaitOne();
if (fileUploadQueue.TryDequeue(out FileUploadDto item))
{
await UploadFileAsync(item);
}
semaphore.Release();
}
// -------------- Helpers -------------------------
private async Task UploadFileAsync(FileUploadDto item)
{
var token = item.CancellationTokenSource.Token;
using (var readStream = item.File.OpenReadStream(maxFileSize, token))
{
RaiseStateChange(item.Guid, FileUploadState.Starting);
if (readStream.Length <= 0)
{
RaiseStateChange(item.Guid, FileUploadState.Failed);
return;
}
MessageDto result = null;
using (var streamContent = fileUploadService.CreateStreamContent(readStream, item.File.Name, item.File.ContentType))
{
// HttpClient
result = await fileUploadService.UploadFileAsync(
streamContent, item.ChatId, item.Guid, item.CreatedUtc, UploadProgressEventHandler, token);
}
if (result != null && result.Id > 0)
{
RaiseStateChange(item.Guid, FileUploadState.Completed);
}
else
{
RaiseStateChange(item.Guid, FileUploadState.Failed);
}
}
}
private void RaiseStateChange(string guid, FileUploadState state, long? bytesTransferred = null)
{
lock (actionLock)
{
OnStateChange?.Invoke(new FileUploadEventArgs(guid, state, bytesTransferred));
}
}
private void UploadProgressEventHandler(object sender, HttpProgressEventArgs e)
{
if (sender == null)
{
return;
}
var request = (HttpRequestMessage)sender;
if (!request.Content.Headers.Contains("guid"))
{
return;
}
var guid = request.Content.Headers.GetValues("guid").First();
RaiseStateChange(
guid: guid,
// It checks how many bytes have been sent before setting
// the status to InProgress. The default value is 512.
// 512 is an estimated HTTP header size. This is done
// in order to understand whether the sending of the
// body has begun or not.
state: e.BytesTransferred > 512 ? FileUploadState.InProgress : FileUploadState.Starting,
bytesTransferred: e.BytesTransferred
);
}
}
}
Other related files:
FileUploadEventArgs.cs
using BlazorApp.Enums;
namespace BlazorApp.Services
{
public class FileUploadEventArgs
{
public FileUploadEventArgs()
{
}
public FileUploadEventArgs(string guid, FileUploadState state, long? bytesTransferred)
{
Guid = guid;
State = state;
BytesTransferred = bytesTransferred;
}
public string Guid { get; set; }
public FileUploadState State { get; set; } = FileUploadState.NotStarted;
public long? BytesTransferred { get; set; }
}
}
FileUploadApiService.cs HttpClient helper service
using Microsoft.Extensions.Configuration;
using System;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using System.Net.Http;
using System.Net.Http.Handlers;
using System.Net.Http.Headers;
using Microsoft.AspNetCore.Http;
using System.Text.Json;
using Microsoft.Extensions.Logging;
namespace BlazorApp.Services
{
public class FileUploadApiService : IFileUploadApiService
{
private readonly ILogger<FileUploadApiService> logger;
private readonly HttpClient httpClient;
private readonly ProgressMessageHandler httpProgressHandler;
private readonly object httpProgressHandlerLock = new object();
public FileUploadApiService(
ILogger<FileUploadApiService> logger,
IConfiguration configuration,
IHttpContextAccessor httpContextAccessor)
{
this.logger = logger;
var authHttpClientHandler = new AuthHttpClientHandler(httpContextAccessor);
httpProgressHandler = new ProgressMessageHandler(authHttpClientHandler);
httpClient = new HttpClient(httpProgressHandler);
httpClient.BaseAddress = new Uri(configuration["ApiUrl"]);
httpClient.Timeout = TimeSpan.FromMinutes(32);
}
public async Task<MessageDto> UploadFileAsync(
StreamContent file,
string guid,
DateTime messageCreatedUtc,
EventHandler<HttpProgressEventArgs> progressHandler,
CancellationToken cancellationToken)
{
MessageDto response = null;
var url = "/api/upload/file";
var formData = new MultipartFormDataContent();
image.Headers.ContentDisposition.Name = "file";
formData.Add(file, "file");
formData.Add(new StringContent(guid), "guid");
response = await PostAndReportProgressAsync(url, formData, guid, progressHandler, cancellationToken);
return response;
}
private async Task<MessageDto> PostAndReportProgressAsync(
string url,
MultipartFormDataContent formData,
string guid,
EventHandler<HttpProgressEventArgs> progressHandler,
CancellationToken cancellationToken)
{
MessageDto returnValue = null;
lock (httpProgressHandlerLock)
{
httpProgressHandler.HttpSendProgress += progressHandler;
}
formData.Headers.ContentType.MediaType = "multipart/form-data";
formData.Headers.Add("guid", guid);
try
{
var result = await httpClient.PostAsync(url, formData, cancellationToken);
if (result.IsSuccessStatusCode)
{
var jsonString = await result.Content.ReadAsStringAsync();
var options = new JsonSerializerOptions
{
PropertyNameCaseInsensitive = true
};
returnValue = JsonSerializer.Deserialize<MessageDto>(jsonString, options);
}
}
catch (Exception ex)
{
logger.LogWarning(ex, "[FileUploadApiService][PostAndReportProgress]: Exception");
throw new FileUploadApiException("Unhandled exception while post file", ex);
}
finally
{
lock (httpProgressHandlerLock)
{
httpProgressHandler.HttpSendProgress -= progressHandler;
}
}
return returnValue;
}
public StreamContent CreateStreamContent(Stream stream, string fileName, string contentType)
{
var fileContent = new StreamContent(stream);
fileContent.Headers.ContentDisposition = new ContentDispositionHeaderValue("form-data")
{
Name = "\"file\"",
FileName = "\"" + fileName + "\""
};
fileContent.Headers.TryAddWithoutValidation("Content-Type", contentType);
return fileContent;
}
}
}
FileUploadDto.cs
using BlazorApp.Enums;
using Microsoft.AspNetCore.Components.Forms;
using System;
using System.Threading;
namespace BlazorApp.Data.Dto
{
public class FileUploadDto
{
public string Guid { get; set; }
public IBrowserFile File { get; set; }
public DateTime CreatedUtc{ get; set; }
public CancellationTokenSource CancellationTokenSource {get;set;}
}
}
Startup.cs
public void ConfigureServices(IServiceCollection services)
{
//.................
services.AddSingleton<IFileUploadApiService, FileUploadApiService>();
services.AddSingleton<FileUploadManager>();
//.................
}
TaskFactory.StartNew?UploadOneis already an async I/O task. You don't need to pass that to a CPU thread. (By the way you should preferTask.RunoverStartNew.) \$\endgroup\$Task.Runshould be called for CPU-bound operations. File upload is not a processor intensive operation. If you would calculate for instance the MD5 hash / add a watermark onto images / etc before you upload them then you a mixture of CPU and I/O bound operations. Does it give you clarity? \$\endgroup\$UploadOneAsync(), so without await, then you are firing it off without blocking. You can save the returnedTaskinto a task collection, like:List<Task> uploads = new List<Task>(); uploads.Add(UploadOneAsync()). And after you have iterated through the to be uploaded files then you can callawait Task.WhenAll(uploads);\$\endgroup\$