4,015 questions
Best practices
0
votes
0
replies
30
views
Throw message on DLT/DLQ from GlobalKTable using Kafka Streams
I am reading data from a Kafka topic using Kafka Streams and more specifically GlobalKTable to populate my store. In the case of corrupt data that can not successfully be parsed I wish to throw a ...
Best practices
0
votes
7
replies
57
views
How to populate two stores from one topic using Kafka Streams GlobalKTable
I am using Kafka Streams, and trying to figure out the best practice for handling multiple stores that are populated using the same data without having to read it twice.
My Kafka Streams service is ...
0
votes
0
answers
76
views
Kafka Streams main consumer fetch rate stays low after GlobalKTable (RocksDB) restore
I’m running a Kafka Streams application (tested with versions 3.9.1 and 4.1.0) that uses a GlobalKTable backed by RocksDB. There are multiple instances of the application, each with 4 stream threads.
...
2
votes
1
answer
70
views
Unknown magic byte with spring cloud stream aggregation
I'm developing a Spring Cloud Stream app that accepts messages in Avro format. After setting up everything, I start my app, then get an Unknown magic byte error. I'm thinking this is because I'm using ...
0
votes
1
answer
39
views
Can the statestore be shared across two processors?
Here is a simple topology:
stream1
.merge(stream2)
.merge(stream3)
.to("topic1)
stream2 = builder.stream("topic1")
stream2.
process(() -> new Processor<>() { process() { Read ...
2
votes
1
answer
63
views
Kafka Streams application does not purge repartition topic records when manual commit
I'm currently developing a Kafka Streams application. Due to internal requirements, I've disabled auto-commits by setting commit.interval.ms to a very long duration (e.g. Long.MAX). Instead, I'm using ...
0
votes
1
answer
48
views
Wrong partitions are assigned to KStream threads after topic selectKey / repartition
I need to consume two topics A with 40 partitions and B with 10 partitions and keep some info in a shared persistent state store with social security number SSN of type string as its key and a custom ...
1
vote
0
answers
28
views
When migrating Kafka Streams to Spring Cloud Stream how to resolve InconsistentGroupProtocolException
The prior application is on kafka-streams (KStream) and has been rewritten to use Spring Cloud Stream. Both apps are on kafka-clients-3.1.2.
When starting the upgraded application it gives:
java.lang....
-2
votes
1
answer
39
views
Linear processing a kafka topic based on a constraint
I have a kafka topic producing product for multiple companies
{"company": "c1", "productIds": ["p1", "p2", "p3"]}
{"company": &...
0
votes
1
answer
38
views
Kafka Streams KTable Race Condition: Multiple Concurrent Updates See Same Stale State
I'm building a conference system using Kafka Streams where users can join/leave rooms.
I'm experiencing a race condition where multiple concurrent leave requests see the same stale room state, causing ...
0
votes
1
answer
56
views
Does Kafka Streams StreamTask process records from multiple co-partitioned topics sequentially or in parallel?
I have read the explanation written here that one StreamTask is handeling all messages from co-partitioned topics: How do co-partitioning ensure that partition from 2 different topics end up assigned ...
3
votes
0
answers
80
views
Kafka Stream .checkpoint not created
I’m comparing how GlobalKTable Kafka Streams handles checkpointing in version 2.5.0 versus 3.7.0, and I’ve identified a regression: in 3.7.0 the .checkpoint file isn’t created the first time you drain ...
0
votes
1
answer
53
views
How can I emit the delete records to the output topic, and make that key tombstoned and removed from KTable internal state?
I'm working on a Change Data Capture (CDC) application using Kafka Streams, and I have the following setup:
A KStream is converted to a KTable (let's call it kt1).
I perform a left join between kt1 ...
0
votes
0
answers
112
views
Kafka Streams State store behaving differently if stream is converted beforehand
I am expiriencing a very weird behaviour in my kafka streams application.
Setup is the following:
I create the state store "user-store" manually and connect it to a processor "filter-...
0
votes
0
answers
27
views
Kafka Consumer unable to read messages because of partitions
I have written sample Kafka Producer application without mentioning partition which sends message to Consumer application.
My Consumer application is written using Kafka Streams as it is using state ...