Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 108 additions & 0 deletions dom/observable/tentative/observable-constructor.any.js
Original file line number Diff line number Diff line change
Expand Up @@ -1091,3 +1091,111 @@ test(() => {
assert_array_equals(addTeardownsCalled, ["teardown 1", "teardown 2"],
"Teardowns called synchronously upon addition end up in FIFO order");
}, "Teardowns should be called synchronously during addTeardown() if the subscription is inactive");

test(() => {
const results = [];
let producerInvocations = 0;
let teardownInvocations = 0;

const source = new Observable((subscriber) => {
producerInvocations++;
results.push('producer invoked');
subscriber.addTeardown(() => {
teardownInvocations++;
results.push('teardown invoked');
});
});

const ac1 = new AbortController();
const ac2 = new AbortController();

// First subscription.
source.subscribe({}, {signal: ac1.signal});
assert_equals(producerInvocations, 1,
"Producer is invoked once for first subscription");

// Second subscription should reuse the same producer.
source.subscribe({}, {signal: ac2.signal});
assert_equals(producerInvocations, 1,
"Producer should not be invoked again for second subscription");

// First unsubscribe.
ac1.abort();
assert_equals(teardownInvocations, 0,
"Teardown not run when first subscriber unsubscribes");

// Second unsubscribe.
ac2.abort();
assert_equals(teardownInvocations, 1,
"Teardown should run after last subscriber unsubscribes");

assert_array_equals(results, ['producer invoked', 'teardown invoked']);
}, "Multiple subscriptions share the same producer and teardown runs only " +
"after last subscription abort");

test(() => {
const results = [];
let activeSubscriber = null;

const source = new Observable(subscriber => {
activeSubscriber = subscriber;
results.push('producer start');
subscriber.addTeardown(() => results.push('teardown'));
});

// First subscription.
const ac1 = new AbortController();
source.subscribe({}, {signal: ac1.signal});
assert_array_equals(results, ['producer start']);

// Second subscription.
const ac2 = new AbortController();
source.subscribe({}, {signal: ac2.signal});

// Complete the subscription.
activeSubscriber.complete();
assert_array_equals(results, ['producer start', 'teardown']);

// Additional subscription after complete.
const ac3 = new AbortController();
source.subscribe({}, {signal: ac3.signal});

assert_array_equals(results, ['producer start', 'teardown', 'producer start']);
}, "New subscription after complete creates new producer");

test(() => {
const results = [];
let producerInvocations = 0;

const source = new Observable(subscriber => {
producerInvocations++;
results.push('producer start');
subscriber.addTeardown(() => results.push('teardown'));
});

// Create 3 subscriptions.
const ac1 = new AbortController();
const ac2 = new AbortController();
const ac3 = new AbortController();
source.subscribe({}, {signal: ac1.signal});
source.subscribe({}, {signal: ac2.signal});
source.subscribe({}, {signal: ac3.signal});

assert_equals(producerInvocations, 1, "Producer should be invoked once");

// Unsubscribe in a different order.
ac2.abort();
results.push('after first abort');
ac1.abort();
results.push('after second abort');
ac3.abort();
results.push('after final abort');

assert_array_equals(results, [
'producer start',
'after first abort',
'after second abort',
'teardown',
'after final abort'
]);
}, "Teardown runs after last unsubscribe regardless of unsubscription order");
98 changes: 50 additions & 48 deletions dom/observable/tentative/observable-from.any.js
Original file line number Diff line number Diff line change
Expand Up @@ -674,38 +674,21 @@ promise_test(async t => {

// This test is a more chaotic version of the above. It ensures that a single
// Observable can handle multiple in-flight subscriptions to the same underlying
// async iterable without the two subscriptions competing.
//
// This test is added because it is easy to imagine an implementation whereby
// upon subscription, the Observable's internal subscribe callback takes the
// underlying async iterable object, and simply pulls the async iterator off of
// it (by invoking `@@asyncIterator`), and saves it alongside the underlying
// async iterable. This async iterator would be used to manage values as they
// are asynchronously emitted from the underlying object, but this value can get
// OVERWRITTEN by a brand new subscription that comes in before the first
// subscription has completed. In a broken implementation, this overwriting
// would prevent the first subscription from ever completing.
// async iterable without the two subscriptions competing. It asserts that the
// asynchronous values are pushed to the observers in the correct order.
promise_test(async t => {
const async_iterable = {
slow: true,
[Symbol.asyncIterator]() {
// The first time @@asyncIterator is called, `shouldBeSlow` is true, and
// when the return object takes closure of it, all values are emitted
// SLOWLY asynchronously. The second time, `shouldBeSlow` is false, and
// all values are emitted FAST but still asynchronous.
const shouldBeSlow = this.slow;
this.slow = false;

return {
val: 0,
next() {
// Returns a Promise that resolves in a random amount of time less
// than a second.
return new Promise(resolve => {
t.step_timeout(() => resolve({
value: `${this.val}-${shouldBeSlow ? 'slow' : 'fast'}`,
value: this.val,
done: this.val++ === 4 ? true : false,
}), shouldBeSlow ? 200 : 0);
}), 200);
});
},
};
Expand All @@ -715,30 +698,46 @@ promise_test(async t => {
const results = [];
const source = Observable.from(async_iterable);

const subscribeFunction = function(resolve, reject) {
const promise = new Promise(resolve => {
source.subscribe({
next: v => results.push(v),
complete: () => resolve(),
next: v => {
results.push(`${v}-first-sub`);

// Half-way through the first subscription, start another subscription.
if (v === 0) {
source.subscribe({
next: v => results.push(`${v}-second-sub`),
complete: () => {
results.push('complete-second-sub');
resolve();
}
});
}
},
complete: () => {
results.push('complete-first-sub');
resolve();
}
});
});

// A broken implementation will rely on this timeout.
t.step_timeout(() => reject('TIMEOUT'), 3000);
}

const slow_promise = new Promise(subscribeFunction);
const fast_promise = new Promise(subscribeFunction);
await Promise.all([slow_promise, fast_promise]);
await promise;
assert_array_equals(results, [
'0-fast',
'1-fast',
'2-fast',
'3-fast',
'0-slow',
'1-slow',
'2-slow',
'3-slow',
'0-first-sub',

'1-first-sub',
'1-second-sub',

'2-first-sub',
'2-second-sub',

'3-first-sub',
'3-second-sub',

'complete-first-sub',
'complete-second-sub',
]);
}, "from(): Asynchronous iterable multiple in-flight subscriptions competing");
}, "from(): Asynchronous iterable multiple in-flight subscriptions");
// This test is like the above, ensuring that multiple subscriptions to the same
// sync-iterable-converted-Observable can exist at a time. Since sync iterables
// push all of their values to the Observable synchronously, the way to do this
Expand All @@ -751,24 +750,27 @@ test(() => {
const source = Observable.from(array);
source.subscribe({
next: v => {
results.push(v);
results.push(`${v}-first-sub`);
if (v === 3) {
// Pushes all 5 values to `results` right after the first instance of `3`.
source.subscribe({
next: v => results.push(v),
complete: () => results.push('inner complete'),
next: v => results.push(`${v}-second-sub`),
complete: () => results.push('complete-second-sub'),
});
}
},
complete: () => results.push('outer complete'),
complete: () => results.push('complete-first-sub'),
});

assert_array_equals(results, [
1, 2, 3,
1, 2, 3, 4, 5, 'inner complete',
4, 5, 'outer complete'
// These values are pushed when there is only a single subscription.
'1-first-sub', '2-first-sub', '3-first-sub',
// These values are pushed in the correct order, for two subscriptions.
'4-first-sub', '4-second-sub',
'5-first-sub', '5-second-sub',
'complete-first-sub', 'complete-second-sub',
]);
}, "from(): Sync iterable multiple in-flight subscriptions competing");
}, "from(): Sync iterable multiple in-flight subscriptions");

promise_test(async () => {
const async_generator = async function*() {
Expand Down