Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into observable
Browse files Browse the repository at this point in the history
  • Loading branch information
JamesHenry committed Jan 30, 2024
2 parents 0c79578 + c666574 commit b0bbce2
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 159 deletions.
65 changes: 21 additions & 44 deletions packages/rxjs/spec/operators/every-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { expect } from 'chai';
import { every, mergeMap } from 'rxjs/operators';
import { TestScheduler } from 'rxjs/testing';
import type { Observer } from 'rxjs';
import { of, Observable } from 'rxjs';
import { of, Observable, Subject } from 'rxjs';
import { observableMatcher } from '../helpers/observableMatcher';

/** @test {every} */
Expand Down Expand Up @@ -33,19 +33,6 @@ describe('every', () => {
});
});

it('should accept thisArg with scalar observables', () => {
const thisArg = {};

of(1)
.pipe(
every(function (this: any, value: number, index: number) {
expect(this).to.deep.equal(thisArg);
return true;
}, thisArg)
)
.subscribe();
});

it('should increment index on each call to the predicate', () => {
const indices: number[] = [];
of(1, 2, 3, 4)
Expand All @@ -60,36 +47,6 @@ describe('every', () => {
expect(indices).to.deep.equal([0, 1, 2, 3]);
});

it('should accept thisArg with array observable', () => {
const thisArg = {};

of(1, 2, 3, 4)
.pipe(
every(function (this: any, value: number, index: number) {
expect(this).to.deep.equal(thisArg);
return true;
}, thisArg)
)
.subscribe();
});

it('should accept thisArg with ordinary observable', () => {
const thisArg = {};

const source = new Observable((observer: Observer<number>) => {
observer.next(1);
observer.complete();
});
source
.pipe(
every(function (this: any, value: number, index: number) {
expect(this).to.deep.equal(thisArg);
return true;
}, thisArg)
)
.subscribe();
});

it('should emit true if source is empty', () => {
testScheduler.run(({ hot, expectObservable, expectSubscriptions }) => {
const e1 = hot(' -----| ');
Expand Down Expand Up @@ -344,4 +301,24 @@ describe('every', () => {
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});
});

it('should handle reentrancy properly', () => {
const subject = new Subject<number>();
const results: any[] = [];
let n = 0;

subject.pipe(every(() => false)).subscribe({
next: (result) => {
results.push(result);
if (n < 3) {
subject.next(n++);
}
},
complete: () => results.push('done'),
});

subject.next(n);

expect(results).to.deep.equal([false, 'done']);
});
});
41 changes: 1 addition & 40 deletions packages/rxjs/spec/operators/expand-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,23 +30,6 @@ describe('expand', () => {
});
});

it('should work with scheduler', () => {
testScheduler.run(({ cold, hot, expectObservable, expectSubscriptions }) => {
const e1 = hot(' --x----| ', { x: 1 });
const e1subs = ' ^------! ';
const e2 = cold(' --c| ', { c: 2 });
// --c|
// --c|
const expected = '--a-b-c-d|';
const values = { a: 1, b: 2, c: 4, d: 8 };

const result = e1.pipe(expand((x) => (x === 8 ? EMPTY : e2.pipe(map((c) => c * x))), Infinity, testScheduler));

expectObservable(result).toBe(expected, values);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});
});

it('should map and recursively flatten', () => {
testScheduler.run(({ cold, hot, expectObservable, expectSubscriptions }) => {
const values = {
Expand Down Expand Up @@ -470,35 +453,13 @@ describe('expand', () => {
return cold(e2shape, { z: x + x });
};

const result = e1.pipe(expand(project, undefined, undefined));
const result = e1.pipe(expand(project, undefined));

expectObservable(result).toBe(expected, values);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});
});

it('should work with the AsapScheduler', (done) => {
const expected = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
of(0)
.pipe(
expand((x) => of(x + 1), Infinity, asapScheduler),
take(10),
toArray()
)
.subscribe({ next: (actual) => expect(actual).to.deep.equal(expected), error: done, complete: done });
});

it('should work with the AsyncScheduler', (done) => {
const expected = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
of(0)
.pipe(
expand((x) => of(x + 1), Infinity, asyncScheduler),
take(10),
toArray()
)
.subscribe({ next: (actual) => expect(actual).to.deep.equal(expected), error: done, complete: done });
});

it('should stop listening to a synchronous observable when unsubscribed', () => {
const sideEffects: number[] = [];
const synchronousObservable = new Observable<number>((subscriber) => {
Expand Down
50 changes: 21 additions & 29 deletions packages/rxjs/src/internal/operators/every.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,7 @@ import { Observable, operate } from '@rxjs/observable';
import type { Falsy, OperatorFunction } from '../types.js';

export function every<T>(predicate: BooleanConstructor): OperatorFunction<T, Exclude<T, Falsy> extends never ? false : boolean>;
/** @deprecated Use a closure instead of a `thisArg`. Signatures accepting a `thisArg` will be removed in v8. */
export function every<T>(
predicate: BooleanConstructor,
thisArg: any
): OperatorFunction<T, Exclude<T, Falsy> extends never ? false : boolean>;
/** @deprecated Use a closure instead of a `thisArg`. Signatures accepting a `thisArg` will be removed in v8. */
export function every<T, A>(
predicate: (this: A, value: T, index: number, source: Observable<T>) => boolean,
thisArg: A
): OperatorFunction<T, boolean>;
export function every<T>(predicate: (value: T, index: number, source: Observable<T>) => boolean): OperatorFunction<T, boolean>;
export function every<T>(predicate: (value: T, index: number) => boolean): OperatorFunction<T, boolean>;

/**
* Returns an Observable that emits whether or not every item of the source satisfies the condition specified.
Expand All @@ -39,27 +29,29 @@ export function every<T>(predicate: (value: T, index: number, source: Observable
* @return A function that returns an Observable of booleans that determines if
* all items of the source Observable meet the condition specified.
*/
export function every<T>(
predicate: (value: T, index: number, source: Observable<T>) => boolean,
thisArg?: any
): OperatorFunction<T, boolean> {
export function every<T>(predicate: (value: T, index: number) => boolean): OperatorFunction<T, boolean> {
return (source) =>
new Observable((destination) => {
let index = 0;
source.subscribe(
operate({
destination,
next: (value) => {
if (!predicate.call(thisArg, value, index++, source)) {
destination.next(false);
destination.complete();
}
},
complete: () => {
destination.next(true);

const subscriber = operate({
destination,
next: (value: T) => {
if (!predicate(value, index++)) {
// To prevent re-entrancy issues, we unsubscribe from the
// source as soon as possible. Because the `next` right below it
// could cause us to re-enter before we get to `complete()`.
subscriber.unsubscribe();
destination.next(false);
destination.complete();
},
})
);
}
},
complete: () => {
destination.next(true);
destination.complete();
},
});

source.subscribe(subscriber);
});
}
23 changes: 4 additions & 19 deletions packages/rxjs/src/internal/operators/expand.ts
Original file line number Diff line number Diff line change
@@ -1,21 +1,10 @@
import type { OperatorFunction, ObservableInput, ObservedValueOf, SchedulerLike } from '../types.js';
import { Observable } from '@rxjs/observable';
import type { ObservableInput, ObservedValueOf, OperatorFunction } from '../types.js';
import { mergeInternals } from './mergeInternals.js';

export function expand<T, O extends ObservableInput<unknown>>(
project: (value: T, index: number) => O,
concurrent?: number,
scheduler?: SchedulerLike
): OperatorFunction<T, ObservedValueOf<O>>;
/**
* @deprecated The `scheduler` parameter will be removed in v8. If you need to schedule the inner subscription,
* use `subscribeOn` within the projection function: `expand((value) => fn(value).pipe(subscribeOn(scheduler)))`.
* Details: Details: https://rxjs.dev/deprecations/scheduler-argument
*/
export function expand<T, O extends ObservableInput<unknown>>(
project: (value: T, index: number) => O,
concurrent: number | undefined,
scheduler: SchedulerLike
concurrent?: number
): OperatorFunction<T, ObservedValueOf<O>>;

/**
Expand Down Expand Up @@ -61,17 +50,14 @@ export function expand<T, O extends ObservableInput<unknown>>(
* or the output Observable, returns an Observable.
* @param concurrent Maximum number of input Observables being subscribed to
* concurrently.
* @param scheduler The {@link SchedulerLike} to use for subscribing to
* each projected inner Observable.
* @return A function that returns an Observable that emits the source values
* and also result of applying the projection function to each value emitted on
* the output Observable and merging the results of the Observables obtained
* from this transformation.
*/
export function expand<T, O extends ObservableInput<unknown>>(
project: (value: T, index: number) => O,
concurrent = Infinity,
scheduler?: SchedulerLike
concurrent = Infinity
): OperatorFunction<T, ObservedValueOf<O>> {
concurrent = (concurrent || 0) < 1 ? Infinity : concurrent;
return (source) =>
Expand All @@ -89,8 +75,7 @@ export function expand<T, O extends ObservableInput<unknown>>(
undefined,

// Expand-specific
true, // Use expand path
scheduler // Inner subscription scheduler
true // Use expand path
)
);
}
27 changes: 4 additions & 23 deletions packages/rxjs/src/internal/operators/mergeInternals.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import type { Observable, Subscriber} from '@rxjs/observable';
import type { Observable, Subscriber } from '@rxjs/observable';
import { from, operate } from '@rxjs/observable';
import type { ObservableInput, SchedulerLike } from '../types.js';
import { executeSchedule } from '../util/executeSchedule.js';
import type { ObservableInput } from '../types.js';

/**
* A process embodying the general "merge" strategy. This is used in
Expand All @@ -13,18 +12,14 @@ import { executeSchedule } from '../util/executeSchedule.js';
* @param onBeforeNext Additional logic to apply before nexting to our consumer
* @param expand If `true` this will perform an "expand" strategy, which differs only
* in that it recurses, and the inner subscription must be schedule-able.
* @param innerSubScheduler A scheduler to use to schedule inner subscriptions,
* this is to support the expand strategy, mostly, and should be deprecated
*/
export function mergeInternals<T, R>(
source: Observable<T>,
destination: Subscriber<R>,
project: (value: T, index: number) => ObservableInput<R>,
concurrent: number,
onBeforeNext?: (innerValue: R) => void,
expand?: boolean,
innerSubScheduler?: SchedulerLike,
additionalFinalizer?: () => void
expand?: boolean
) {
// Buffered values, in the event of going over our concurrency limit
const buffer: T[] = [];
Expand Down Expand Up @@ -107,15 +102,7 @@ export function mergeInternals<T, R>(
// next conditional, if there were any more inner subscriptions
// to start.
while (buffer.length && active < concurrent) {
const bufferedValue = buffer.shift()!;
// Particularly for `expand`, we need to check to see if a scheduler was provided
// for when we want to start our inner subscription. Otherwise, we just start
// are next inner subscription.
if (innerSubScheduler) {
executeSchedule(destination, innerSubScheduler, () => doInnerSub(bufferedValue));
} else {
doInnerSub(bufferedValue);
}
doInnerSub(buffer.shift()!);
}
// Check to see if we can complete, and complete if so.
checkComplete();
Expand All @@ -140,10 +127,4 @@ export function mergeInternals<T, R>(
},
})
);

// Additional finalization (for when the destination is torn down).
// Other finalization is added implicitly via subscription above.
return () => {
additionalFinalizer?.();
};
}
10 changes: 6 additions & 4 deletions packages/rxjs/src/internal/operators/mergeScan.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,17 +77,19 @@ export function mergeScan<T, R>(
// The accumulated state.
let state = seed;

return mergeInternals(
mergeInternals(
source,
subscriber,
(value, index) => accumulator(state, value, index),
concurrent,
(value) => {
state = value;
},
false,
undefined,
() => (state = null!)
false
);

return () => {
state = null!;
};
});
}

0 comments on commit b0bbce2

Please sign in to comment.