Description
Thank you for writing xstream
, we use it heavily in a sizable project and have enjoyed using it.
There appears to be a subtle timing bug that seems to be caused by the way streams are asynchronously disconnected. The following unit test shows the bug. In the test, I have a producer (FiveProducer
). that emits 5
but never completes, this is simply meant to mimic a stream that emits a value and may possibly emit more in the future. This producer has .compose(dropRepeats()).remember()
added.
If I subscribe to this stream once (sub1
/ spy1
), and then unsubscribe from it, wait zero milliseconds, then subscribe a second time (spy2
), I would expect both spy1
and spy2
to receive the value 5
. However, spy2
does not.
class FiveProducer implements InternalProducer<number> {
public type = 'FiveProducer';
_start(out: InternalListener<number>): void {
out._n(5);
}
_stop(): void {}
}
test('Verify dropRepeats behaviour', async () => {
const stream = new Stream(new FiveProducer()).compose(dropRepeats()).remember();
const spy1 = jest.fn();
const sub1 = stream.subscribe({ next: spy1 });
sub1.unsubscribe();
// This is important -- must wait exactly zero.
await jest.advanceTimersByTimeAsync(0);
const spy2 = jest.fn();
stream.subscribe({ next: spy2 });
await jest.advanceTimersByTimeAsync(10);
// This succeeds -- spy1 is called once with the right value
expect(spy1).toHaveBeenCalledTimes(1);
expect(spy1).toHaveBeenNthCalledWith(1, 5);
// This fails -- spy2 is not called
expect(spy2).toHaveBeenCalledTimes(1);
expect(spy2).toHaveBeenNthCalledWith(1, 5);
});
What seems to be happening is:
sub1
is unsubscribed, which schedules an asynchronous stop of the.remember()
stream.- Asynchronously, the
.remember
stream is stopped, which causes an asynchronous operation to be scheduled, to stop the.compose
stream - However, before this runs, my second spy attempts to subscribe to the stream. This starts the
.remember()
stream, which doesn't emit a remembered value immediately (it has been stopped, and.has==false
. Instead it starts the.compose
stream. - The
.compose
stream runs this code, which cancels the stop timer, but does not cause any other action to happen:
if (this._stopID !== NO) {
clearTimeout(this._stopID);
this._stopID = NO;
}
The end effect is that the stream never receives any data at all. This feels like a race condition bug to me, but I do not know exactly where/how to fix this. I'd appreciate any advice, as we run into this bug somewhat often, and unpredictably, since our app heavily uses many streams asynchronously.
Thank you!