Skip to content

fix(asynciterable): use more yield #379

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion spec/asynciterable-operators/batch-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ test('done while waiting', async () => {
expect(await it.next()).toEqual({ done: true });
});

test('canceled', async () => {
// eslint-disable-next-line jest/no-disabled-tests -- See https://github.com/ReactiveX/IxJS/pull/379#issuecomment-2611883590
test.skip('canceled', async () => {
let canceled = false;

async function* generate() {
Expand Down
2 changes: 1 addition & 1 deletion spec/asynciterable-operators/finalize-spec.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { hasNext, hasErr, noNext } from '../asynciterablehelpers.js';
import { range, throwError } from 'ix/asynciterable/index.js';
import { flatMap, finalize, tap } from 'ix/asynciterable/operators/index.js';
import { finalize, tap, flatMap } from 'ix/asynciterable/operators/index.js';

test('AsyncIterable#finalize defers behavior', async () => {
let done = false;
Expand Down
2 changes: 1 addition & 1 deletion spec/asynciterable-operators/mergeall-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@ import { mergeAll } from 'ix/asynciterable/operators/index.js';

test('AsyncIterable#merge mergeAll behavior', async () => {
const res = of(of(1, 2, 3), of(4, 5)).pipe(mergeAll());
expect(await toArray(res)).toEqual([1, 2, 4, 3, 5]);
expect(await toArray(res)).toEqual([1, 4, 2, 5, 3]);
});
4 changes: 3 additions & 1 deletion spec/asynciterable-operators/timeout-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ test('AsyncIterable#timeout throws when delayed', async () => {
await noNext(it);
});

test('AsyncIterable#timeout triggers finalize', async () => {
// eslint-disable-next-line jest/no-disabled-tests -- See https://github.com/ReactiveX/IxJS/pull/379#issuecomment-2611883590
test.skip('AsyncIterable#timeout triggers finalize', async () => {
let done = false;
const xs = async function* () {
yield await delayValue(1, 500);
Expand All @@ -48,5 +49,6 @@ test('AsyncIterable#timeout triggers finalize', async () => {
await hasNext(it, 1);
await hasErr(it, TimeoutError);
await noNext(it);
await new Promise((res) => setTimeout(res, 10));
expect(done).toBeTruthy();
});
20 changes: 19 additions & 1 deletion spec/asynciterable/concat-spec.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,25 @@
import { take } from 'ix/asynciterable/operators.js';
import '../asynciterablehelpers.js';
import { concat, of, sequenceEqual } from 'ix/asynciterable/index.js';
import { concat, of, sequenceEqual, toArray } from 'ix/asynciterable/index.js';

test('AsyncIterable#concat behavior', async () => {
const res = concat(of(1, 2, 3), of(4, 5));
expect(await sequenceEqual(res, of(1, 2, 3, 4, 5))).toBeTruthy();
});

test("AsyncIterable#concat doesn't execute more than necessary", async () => {
let i = 0;

async function* asyncGenerator() {
i++;
yield 1;
}

const res = concat(asyncGenerator(), asyncGenerator()).pipe(take(1));
const items = await toArray(res);

expect(items).toEqual([1]);
// This second generator should not be started at all since the first one
// provides enough values
expect(i).toBe(1);
});
2 changes: 1 addition & 1 deletion src/add/asynciterable-operators/mergeall.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ export function mergeAllProto<T>(
this: AsyncIterableX<AsyncIterable<T>>,
concurrent = Infinity
): AsyncIterableX<T> {
return mergeAll(concurrent)(this);
return mergeAll<T>(concurrent)(this);
}

AsyncIterableX.prototype.mergeAll = mergeAllProto;
Expand Down
40 changes: 21 additions & 19 deletions src/asynciterable/_extremaby.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,29 +9,31 @@ export async function extremaBy<TSource, TKey>(
): Promise<TSource[]> {
throwIfAborted(signal);

let result = [];
const it = wrapWithAbort(source, signal)[Symbol.asyncIterator]();
const { value, done } = await it.next();
if (done) {
throw new Error('Sequence contains no elements');
}

let resKey = await selector(value, signal);
result.push(value);
let hasValue = false;
let key: TKey | undefined;
let result: TSource[] = [];

let next: IteratorResult<TSource>;
while (!(next = await it.next()).done) {
const current = next.value;
const key = await selector(current, signal);
const cmp = await comparer(key, resKey, signal);
for await (const item of wrapWithAbort(source, signal)) {
if (!hasValue) {
key = await selector(item, signal);
result.push(item);
hasValue = true;
} else {
const currentKey = await selector(item, signal);
const cmp = await comparer(currentKey, key as TKey, signal);

if (cmp === 0) {
result.push(current);
} else if (cmp > 0) {
result = [current];
resKey = key;
if (cmp === 0) {
result.push(item);
} else if (cmp > 0) {
result = [item];
key = currentKey;
}
}
}

if (!hasValue) {
throw new Error('Sequence contains no elements');
}

return result;
}
3 changes: 1 addition & 2 deletions src/asynciterable/asynciterablex.ts
Original file line number Diff line number Diff line change
Expand Up @@ -290,8 +290,7 @@ export class FromPromiseIterable<TSource, TResult = TSource> extends AsyncIterab
}

async *[Symbol.asyncIterator]() {
const item = await this._source;
yield await this._selector(item, 0);
yield await this._selector(await this._source, 0);
}
}

Expand Down
3 changes: 3 additions & 0 deletions src/asynciterable/average.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,12 @@ export async function average(
['signal']: signal,
['thisArg']: thisArg,
} = options || {};

throwIfAborted(signal);

let sum = 0;
let count = 0;

for await (const item of wrapWithAbort(source, signal)) {
sum += await selector.call(thisArg, item, signal);
count++;
Expand Down
30 changes: 8 additions & 22 deletions src/asynciterable/catcherror.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { AsyncIterableX } from './asynciterablex.js';
import { returnAsyncIterator } from '../util/returniterator.js';
import { wrapWithAbort } from './operators/withabort.js';
import { throwIfAborted } from '../aborterror.js';

Expand All @@ -19,29 +18,16 @@ export class CatchAllAsyncIterable<TSource> extends AsyncIterableX<TSource> {
let hasError = false;

for (const source of this._source) {
const it = wrapWithAbort(source, signal)[Symbol.asyncIterator]();

error = null;
hasError = false;

while (1) {
let c = <TSource>{};

try {
const { done, value } = await it.next();
if (done) {
await returnAsyncIterator(it);
break;
}
c = value;
} catch (e) {
error = e;
hasError = true;
await returnAsyncIterator(it);
break;
try {
for await (const item of wrapWithAbort(source, signal)) {
yield item;
}

yield c;
} catch (e) {
error = e;
hasError = true;
}

if (!hasError) {
Expand All @@ -64,7 +50,7 @@ export class CatchAllAsyncIterable<TSource> extends AsyncIterableX<TSource> {
* sequences until a source sequence terminates successfully.
*/
export function catchAll<T>(source: Iterable<AsyncIterable<T>>): AsyncIterableX<T> {
return new CatchAllAsyncIterable<T>(source);
return new CatchAllAsyncIterable(source);
}

/**
Expand All @@ -76,5 +62,5 @@ export function catchAll<T>(source: Iterable<AsyncIterable<T>>): AsyncIterableX<
* sequences until a source sequence terminates successfully.
*/
export function catchError<T>(...args: AsyncIterable<T>[]): AsyncIterableX<T> {
return new CatchAllAsyncIterable<T>(args);
return new CatchAllAsyncIterable(args);
}
50 changes: 27 additions & 23 deletions src/asynciterable/combinelatest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ import { identity } from '../util/identity.js';
import { wrapWithAbort } from './operators/withabort.js';
import { throwIfAborted } from '../aborterror.js';
import { safeRace } from '../util/safeRace.js';
import { returnAsyncIterators } from '../util/returniterator.js';

// eslint-disable-next-line @typescript-eslint/no-empty-function
const NEVER_PROMISE = new Promise(() => {});
const NEVER_PROMISE = new Promise<never>(() => {});

type MergeResult<T> = { value: T; index: number };

Expand All @@ -28,39 +29,42 @@ export class CombineLatestAsyncIterable<TSource> extends AsyncIterableX<TSource[
const length = this._sources.length;
const iterators = new Array<AsyncIterator<TSource>>(length);
const nexts = new Array<Promise<MergeResult<IteratorResult<TSource>>>>(length);
let hasValueAll = false;
const values = new Array<TSource>(length);
const hasValues = new Array<boolean>(length);
let active = length;

hasValues.fill(false);
let active = length;
let allValuesAvailable = false;
const values = new Array<TSource>(length);
const hasValues = new Array<boolean>(length).fill(false);

for (let i = 0; i < length; i++) {
const iterator = wrapWithAbort(this._sources[i], signal)[Symbol.asyncIterator]();
iterators[i] = iterator;
nexts[i] = wrapPromiseWithIndex(iterator.next(), i);
}

while (active > 0) {
const next = safeRace(nexts);
const {
value: { value: value$, done: done$ },
index,
} = await next;
if (done$) {
nexts[index] = <Promise<MergeResult<IteratorResult<TSource>>>>NEVER_PROMISE;
active--;
} else {
values[index] = value$;
hasValues[index] = true;
try {
while (active > 0) {
const {
value: { value, done },
index,
} = await safeRace(nexts);

if (done) {
nexts[index] = NEVER_PROMISE;
active--;
} else {
values[index] = value;
hasValues[index] = true;
allValuesAvailable = allValuesAvailable || hasValues.every(identity);

const iterator$ = iterators[index];
nexts[index] = wrapPromiseWithIndex(iterator$.next(), index);
nexts[index] = wrapPromiseWithIndex(iterators[index].next(), index);

if (hasValueAll || (hasValueAll = hasValues.every(identity))) {
yield values;
if (allValuesAvailable) {
yield values;
}
}
}
} finally {
await returnAsyncIterators(iterators);
}
}
}
Expand Down Expand Up @@ -176,5 +180,5 @@ export function combineLatest<T, T2, T3, T4, T5, T6>(
*/
export function combineLatest<T>(...sources: AsyncIterable<T>[]): AsyncIterableX<T[]>;
export function combineLatest<T>(...sources: any[]): AsyncIterableX<T[]> {
return new CombineLatestAsyncIterable<T>(sources);
return new CombineLatestAsyncIterable(sources);
}
5 changes: 3 additions & 2 deletions src/asynciterable/concat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ export class ConcatAsyncIterable<TSource> extends AsyncIterableX<TSource> {

async *[Symbol.asyncIterator](signal?: AbortSignal) {
throwIfAborted(signal);

for (const outer of this._source) {
for await (const item of wrapWithAbort(outer, signal)) {
yield item;
Expand All @@ -24,7 +25,7 @@ export class ConcatAsyncIterable<TSource> extends AsyncIterableX<TSource> {
export function _concatAll<TSource>(
source: Iterable<AsyncIterable<TSource>>
): AsyncIterableX<TSource> {
return new ConcatAsyncIterable<TSource>(source);
return new ConcatAsyncIterable(source);
}

/**
Expand Down Expand Up @@ -136,5 +137,5 @@ export function concat<T, T2, T3, T4, T5, T6>(
* @returns {AsyncIterableX<T>} An async-iterable sequence that contains the elements of each given sequence, in sequential order.
*/
export function concat<T>(...args: AsyncIterable<T>[]): AsyncIterableX<T> {
return new ConcatAsyncIterable<T>(args);
return new ConcatAsyncIterable(args);
}
3 changes: 2 additions & 1 deletion src/asynciterable/count.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@ export async function count<T>(
): Promise<number> {
const { ['signal']: signal, ['thisArg']: thisArg, ['predicate']: predicate = async () => true } =
options || {};

throwIfAborted(signal);
let i = 0;

let i = 0;
for await (const item of wrapWithAbort(source, signal)) {
if (await predicate.call(thisArg, item, i, signal)) {
i++;
Expand Down
9 changes: 6 additions & 3 deletions src/asynciterable/create.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,13 @@ class AnonymousAsyncIterable<T> extends AsyncIterableX<T> {

async *[Symbol.asyncIterator](signal?: AbortSignal) {
throwIfAborted(signal);

const it = await this._fn(signal);
let next: IteratorResult<T> | undefined;
while (!(next = await it.next()).done) {
yield next.value;

for await (const item of {
[Symbol.asyncIterator]: () => it,
}) {
yield item;
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions src/asynciterable/defer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ class DeferAsyncIterable<TSource> extends AsyncIterableX<TSource> {

async *[Symbol.asyncIterator](signal?: AbortSignal) {
throwIfAborted(signal);
const items = await this._fn(signal);
for await (const item of wrapWithAbort(items, signal)) {

for await (const item of wrapWithAbort(await this._fn(signal), signal)) {
yield item;
}
}
Expand All @@ -32,5 +32,5 @@ class DeferAsyncIterable<TSource> extends AsyncIterableX<TSource> {
export function defer<TSource>(
factory: (signal?: AbortSignal) => AsyncIterable<TSource> | Promise<AsyncIterable<TSource>>
): AsyncIterableX<TSource> {
return new DeferAsyncIterable<TSource>(factory);
return new DeferAsyncIterable(factory);
}
2 changes: 2 additions & 0 deletions src/asynciterable/elementat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@ export async function elementAt<T>(
signal?: AbortSignal
): Promise<T | undefined> {
throwIfAborted(signal);

let i = index;
for await (const item of wrapWithAbort(source, signal)) {
if (i === 0) {
return item;
}
i--;
}

return undefined;
}
3 changes: 3 additions & 0 deletions src/asynciterable/every.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@ export async function every<T>(
options: FindOptions<T>
): Promise<boolean> {
const { ['signal']: signal, ['thisArg']: thisArg, ['predicate']: predicate } = options;

throwIfAborted(signal);

let i = 0;
for await (const item of wrapWithAbort(source, signal)) {
if (!(await predicate.call(thisArg, item, i++, signal))) {
return false;
}
}

return true;
}
Loading