Skip to content

Limit subscription in-flight event accumulation#18055

Open
Caideyipi wants to merge 2 commits into
apache:masterfrom
Caideyipi:fix/subscription-inflight-backpressure
Open

Limit subscription in-flight event accumulation#18055
Caideyipi wants to merge 2 commits into
apache:masterfrom
Caideyipi:fix/subscription-inflight-backpressure

Conversation

@Caideyipi

@Caideyipi Caideyipi commented Jun 29, 2026

Copy link
Copy Markdown
Collaborator

Summary

  • Add poll-side backpressure for pipe-based subscription queues when too many events are already in flight. Before rejecting a poll, the queue first remaps in-flight events so committed or pollable entries can be cleaned/recycled and immediately free capacity.
  • Enforce subscription prefetch limits against retained events instead of only prefetched events. Retained events include both queued and in-flight events, so slow or stalled consumers cannot bypass the local/global event caps by continuously moving events out of the prefetch queue.
  • Apply the same retained-event capacity accounting to consensus subscription queues across polling, WAL/realtime prefetch, lingered batch emission, watermark emission, and idle rescheduling.
  • Wake consensus prefetch after ack/ackSilent frees a retained-event slot, so queues resume promptly after backpressure clears.
  • Inject the prefetching queue count into SubscriptionPrefetchingQueueStates for focused unit coverage of global threshold calculations without bootstrapping the global subscription broker.

Motivation

Previously, the main limits primarily looked at prefetched queue size. If a consumer kept polling but did not commit, events moved into the in-flight map and no longer counted against those prefetch limits. That could allow retained subscription events to accumulate and delay backpressure until memory pressure or other secondary limits were hit. This PR makes in-flight events part of the capacity model and throttles further polling when the in-flight side itself reaches the configured limits.

Testing

  • mvn spotless:apply -pl iotdb-core/datanode
  • git diff --check origin/master...HEAD
  • mvn -Ddevelocity.off=true -pl iotdb-core/datanode -Dtest=SubscriptionPrefetchingQueueStatesTest -DforkCount=0 test
    • Ran 4 tests in SubscriptionPrefetchingQueueStatesTest.
    • forkCount=0 was used only for this local Windows environment because the default Surefire fork JVM failed to start due to insufficient page file / native memory before any tests ran.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

1 participant