-
Notifications
You must be signed in to change notification settings - Fork 74
feat(bufferCountWithDebounce): add new operator #380
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
feat(bufferCountWithDebounce): add new operator #380
Conversation
Rather than redefining |
I think this is how The following test should pass, but currently doesn't. it('should fill buffers', async () => {
const sourceDelay = 900;
const bufferSize = 3;
const maxWaitTime = 4000;
// maxWaitTime > sourceDelay * (bufferSize + 1)
//
// Setting 'maxWaitTime > sourceDelay * bufferSize' causes the timeout to finish when the 'bufferCountOrTime' generator is suspended,
// so the next time a new value is polled, it will be the timerEvent. However, because
// we check 'buffer.length > 0' in the case that we receive a timer event, it is necessary
// that we wait 'bufferSize + 1' times so that the buffer fills with one more element, and it
// is yielded.
// Essentially, because maxWaitTime > sourceDelay * bufferSize, it causes the timeout to finish
// right in the middle of the second "filling" of the buffer, so it's yielded half-empty.
const source = interval(sourceDelay);
const res = source.pipe(bufferCountOrTime(bufferSize, maxWaitTime));
await expect(toArray(res.pipe(take(2)))).resolves.toEqual([
[0, 1, 2],
[3, 4, 5],
]); // Actually gives [[0, 1, 2], [3]]
}); This is my opinion on how the semantics of |
Ah, I see the confusion. The |
ceff6a7
to
ecf009b
Compare
Fixed! |
This implementation makes a lot more sense to me. Instead of a fixed interval that runs in the background completely unrelated to the source, whenever an item is received and no timer is running, a new timeout is started for the specified time. Either the buffer runs full, in which case the timeout is cleared, and the buffer is yielded, or the timeout fires, and the partially filled buffer is yielded.
With the current implementations I get a lot of partially filled buffers because e.g:
Buffer count: 50, buffer window: 5s, source: 10 msg/s, processor: 12.5 msg/sWith a pipeline likeThe consumer has actually nothing to do with this, see #380 (comment)source -> buffer (5s) -> processor (4s)
, on the first run, you get a full buffer, but because the interval keeps running, by the time another batch of items is requested (after 4 seconds), the timeout has almost expired and only collects another 10 items (in 1 second).It makes more sense to me to start the timer once an item arrives and essentially race a full buffer versus the timer firing.
Lastly, I think it would make most sense for this operator to keep batching items in the background so that once the consumer requests the next batch, it is already present. But that's a bigger change than this one, so I thought I'd start here.