stream: cache minimum cursor count in share#63262
Open
trivikr wants to merge 2 commits into
Open
Conversation
Collaborator
|
Review requested:
|
9d05a7a to
a0ce67e
Compare
Track the number of consumers at the cached minimum cursor in share() so the minimum is only recomputed when the last consumer at that cursor advances or detaches. This avoids scanning every consumer on each trim attempt when multiple consumers advance through a shared buffer. Signed-off-by: Kamat, Trivikram <16024985+trivikr@users.noreply.github.com> Assisted-by: openai:gpt-5.5
a0ce67e to
8a32f1c
Compare
Member
Author
|
I'd attempted making the same changes in broadcast. They were reverted since benchmarks showed regression confidence improvement accuracy (*) (**) (***)
streams/iter-throughput-broadcast.js n=5 datasize=1048576 consumers=1 api='classic' -4.72 % ±8.35% ±11.66% ±16.39%
streams/iter-throughput-broadcast.js n=5 datasize=1048576 consumers=1 api='iter' *** -17.21 % ±8.78% ±12.06% ±16.50%
streams/iter-throughput-broadcast.js n=5 datasize=1048576 consumers=1 api='webstream' *** -31.77 % ±12.75% ±17.99% ±25.68%
streams/iter-throughput-broadcast.js n=5 datasize=1048576 consumers=2 api='classic' -2.32 % ±5.56% ±7.76% ±10.90%
streams/iter-throughput-broadcast.js n=5 datasize=1048576 consumers=2 api='iter' ** -11.18 % ±6.34% ±8.72% ±11.95%
streams/iter-throughput-broadcast.js n=5 datasize=1048576 consumers=2 api='webstream' *** -22.39 % ±6.13% ±8.68% ±12.44%
streams/iter-throughput-broadcast.js n=5 datasize=1048576 consumers=4 api='classic' -5.51 % ±7.07% ±9.78% ±13.55%
streams/iter-throughput-broadcast.js n=5 datasize=1048576 consumers=4 api='iter' -1.68 % ±6.46% ±8.92% ±12.30%
streams/iter-throughput-broadcast.js n=5 datasize=1048576 consumers=4 api='webstream' *** -26.03 % ±4.97% ±6.88% ±9.54%
streams/iter-throughput-broadcast.js n=5 datasize=16777216 consumers=1 api='classic' * -4.21 % ±3.90% ±5.36% ±7.33%
streams/iter-throughput-broadcast.js n=5 datasize=16777216 consumers=1 api='iter' ** -10.27 % ±7.18% ±10.03% ±14.07%
streams/iter-throughput-broadcast.js n=5 datasize=16777216 consumers=1 api='webstream' ** -20.80 % ±11.70% ±16.63% ±24.05%
streams/iter-throughput-broadcast.js n=5 datasize=16777216 consumers=2 api='classic' -4.53 % ±5.56% ±7.70% ±10.66%
streams/iter-throughput-broadcast.js n=5 datasize=16777216 consumers=2 api='iter' *** -11.46 % ±5.39% ±7.40% ±10.13%
streams/iter-throughput-broadcast.js n=5 datasize=16777216 consumers=2 api='webstream' ** -12.59 % ±9.01% ±12.36% ±16.87%
streams/iter-throughput-broadcast.js n=5 datasize=16777216 consumers=4 api='classic' -4.09 % ±6.01% ±8.35% ±11.61%
streams/iter-throughput-broadcast.js n=5 datasize=16777216 consumers=4 api='iter' * -8.16 % ±6.54% ±8.98% ±12.27%
streams/iter-throughput-broadcast.js n=5 datasize=16777216 consumers=4 api='webstream' *** -8.95 % ±4.03% ±5.55% ±7.60% |
Renegade334
reviewed
May 12, 2026
|
|
||
| #recomputeMinCursor() { | ||
| this.#cachedMinCursor = getMinCursor( | ||
| const [minCursor] = getMinCursor( |
Member
There was a problem hiding this comment.
Suggested change
| const [minCursor] = getMinCursor( | |
| const { 0: minCursor } = getMinCursor( |
| } | ||
|
|
||
| #recomputeMinCursor() { | ||
| const [minCursor, minCursorConsumers] = getMinCursor( |
Member
There was a problem hiding this comment.
Suggested change
| const [minCursor, minCursorConsumers] = getMinCursor( | |
| const { 0: minCursor, 1: minCursorConsumers } = getMinCursor( |
| } | ||
|
|
||
| #recomputeMinCursor() { | ||
| const [minCursor, minCursorConsumers] = getMinCursor( |
Member
There was a problem hiding this comment.
Suggested change
| const [minCursor, minCursorConsumers] = getMinCursor( | |
| const { 0: minCursor, 1: minCursorConsumers } = getMinCursor( |
Renegade334
reviewed
May 12, 2026
Comment on lines
+366
to
+379
| #deleteConsumerFromMin(consumer) { | ||
| if (consumer.cursor === this.#cachedMinCursor) { | ||
| this.#cachedMinCursorConsumers--; | ||
| } | ||
| } | ||
|
|
||
| #deleteConsumer(consumer) { | ||
| if (this.#consumers.delete(consumer)) { | ||
| const wasAtMin = consumer.cursor === this.#cachedMinCursor; | ||
| this.#deleteConsumerFromMin(consumer); | ||
| return wasAtMin && this.#cachedMinCursorConsumers === 0; | ||
| } | ||
| return false; | ||
| } |
Member
There was a problem hiding this comment.
Deduplication:
Suggested change
| #deleteConsumerFromMin(consumer) { | |
| if (consumer.cursor === this.#cachedMinCursor) { | |
| this.#cachedMinCursorConsumers--; | |
| } | |
| } | |
| #deleteConsumer(consumer) { | |
| if (this.#consumers.delete(consumer)) { | |
| const wasAtMin = consumer.cursor === this.#cachedMinCursor; | |
| this.#deleteConsumerFromMin(consumer); | |
| return wasAtMin && this.#cachedMinCursorConsumers === 0; | |
| } | |
| return false; | |
| } | |
| #deleteConsumerFromMin(consumer) { | |
| if (consumer.cursor === this.#cachedMinCursor) { | |
| this.#cachedMinCursorConsumers--; | |
| return true; | |
| } | |
| return false; | |
| } | |
| #deleteConsumer(consumer) { | |
| if (this.#consumers.delete(consumer) && | |
| this.#deleteConsumerFromMin(consumer) && | |
| this.#cachedMinCursorConsumers === 0) { | |
| return true; | |
| } | |
| return false; | |
| } |
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #63262 +/- ##
========================================
Coverage 90.04% 90.04%
========================================
Files 713 714 +1
Lines 225003 225336 +333
Branches 42536 42597 +61
========================================
+ Hits 202606 202910 +304
- Misses 14177 14199 +22
- Partials 8220 8227 +7
🚀 New features to boost your workflow:
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Description
This updates
share()andshareSync()to avoid recomputing the minimumconsumer cursor on every buffer trim attempt.
Instead of scanning all consumers each time, share now caches the current
minimum cursor and tracks how many consumers are positioned at that
cursor. The minimum is recomputed only when the last consumer at the
cached minimum advances or detaches.
The shared
getMinCursor()utility now returns both the minimum cursorand the number of consumers at that cursor.
Benchmark
Assisted-by: openai:gpt-5.5