4

I need to read from a REDIS stream while the server will still be writing to the STream. Now there are 2 ways. 1 way is to get a list of messages already there. I am doing it as indicated below. However i want to do a blocking read so that my client console app will wait for the messages in the stream. I know the same can be done using following command in REDIS-CLI. I want to achieve the same result in C# XREAD BLOCK 1000 STREAMS #channel1 1526999626221-0


IDatabase db = redis.GetDatabase();                
while (true)
{
    iteration++;
    //var messages = db.StreamRead("#channel1", "0-0");
    var messages = db.StreamRead("#channel1", lastMessageInAsequence);

    IEnumerator MessageEnumerator = messages.GetEnumerator();

    while (MessageEnumerator.MoveNext())
    {
        StackExchange.Redis.NameValueEntry[] entry1 = ((StackExchange.Redis.StreamEntry)MessageEnumerator.Current).Values;

        lineCount++;
        Console.WriteLine(entry1[0].Name + ":" + entry1[0].Value);
        lastMessageInAsequence = entry1[0].Name;
    }

    Console.WriteLine($"{lineCount} lines read from stream in 1 go in iteration {iteration}");
}
1

3 Answers 3

2

This is not possible currently. see https://stackexchange.github.io/StackExchange.Redis/PipelinesMultiplexers.html#multiplexing for the reasons; we have discussed some options to enable this scenario, but it needs quite a bit of work to enable it. I have no ETA on those ideas. I understand what you're trying to do, it just isn't compatible wih the library right now.

1

The package CSRedisCore can be used to complete a blocking read. See https://developer.redis.com/develop/dotnet/streams/blocking-reads/cs-redis/

We look forward to when StackExchange.Redis can do the same.

1
  • has anyone used this successfully in a production env?
    – MX313
    Commented Nov 23, 2022 at 16:38
0

I've done it with below code:

WARNING: For reasons mentioned by Marc Gravell, make sure you use a ConnectionMultiplexer that isn't used by other processes. (I make use of a ConcurrentBag<>)

var result = await redisConnection .GetDatabase() .ExecuteAsync( "XREADGROUP", "GROUP", stream.ConsumerGroupName, stream.ConsumerName, "BLOCK", "10000", "COUNT", messagesToBuffer.ToString(CultureInfo.InvariantCulture), "STREAMS", stream.StreamKey.ToString(), ">") .ConfigureAwait(false);

I consumed the RedisResult in a similar way the RawResult is parsed to a StreamEntry

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.