-3

When any new message received in the stream, on that time OnMessageReceived event should trigger.

public event EventHandler<MqMessageReceivedEventArgs> OnMessageReceived;


var result = await redisDatabase.StreamRangeAsync(StreamName, "-", "+", 1, Order.Ascending);

  protected virtual void OnMessageReceivedEvent(MqMessageReceivedEventArgs e)
  {
      OnMessageReceived?.Invoke(this, e);
  }


2 Answers 2

0

According to official documentation, you have to do the following:

Define pasing function:

public static class RedisHelper
{
    Dictionary<string, string> ParseResult(StreamEntry entry) => 
        entry.Values.ToDictionary(x => x.Name.ToString(), x => x.Value.ToString());
}

Start consumer task:


var tokenSource = new CancellationTokenSource();
var token = tokenSource.Token;

var readTask = Task.Run(async () =>
{
    while (!token.IsCancellationRequested)
    {
        var result = await db.StreamRangeAsync(streamName, "-", "+", 1, Order.Descending);
        if (result.Any())
        {
            var dictionaries = result.Select(r => ParseResult(r)).ToList();
            // or invoke event in loop
            OnMessageReceivedEvent(new MqMessageReceivedEventArgs(dictionaries))
        }

        await Task.Delay(1000);
    }
});
1
  • When a new message is received, then the event should trigger, But here I am using the class library, through the test case I am calling the method. Is it possible?
    – Mahesh
    Commented Mar 14, 2024 at 7:00
0

looks like you are trying to call/trigger OnMessageReceivedEvent, when this called a event with new msg will receive in stream. asuming logic already implemented and it listen for msg in stream.

public async Task ListenForMessages()
{
    while (true)
    {
        var result = await redisDatabase.StreamRangeAsync(StreamName, "-", "+", 1, Order.Ascending);
        if (result.Length > 0)
        {
            foreach (var message in result)
            {
                OnMessageReceived?.Invoke(this, new MqMessageReceivedEventArgs(message));
            }
        }
        await Task.Delay(1000); // Delay before checking for new messages again
    }
}
public class MqMessageReceivedEventArgs : EventArgs
{
public MqMessageReceivedEventArgs(StreamEntry message)
{
    // Initialize any properties you want to pass along with the event
}
}
2
  • As it’s currently written, your answer is unclear. Please edit to add additional details that will help others understand how this addresses the question asked. You can find more information on how to write good answers in the help center.
    – Community Bot
    Commented Mar 14, 2024 at 8:48
  • @JabTak is it the right thing to define while in the program, because I am doing TDD(Test Driven Development)
    – Mahesh
    Commented Mar 15, 2024 at 7:23

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.