diff --git a/packages/rxjs/spec/operators/every-spec.ts b/packages/rxjs/spec/operators/every-spec.ts index f747d80184..a758f53e46 100644 --- a/packages/rxjs/spec/operators/every-spec.ts +++ b/packages/rxjs/spec/operators/every-spec.ts @@ -2,7 +2,7 @@ import { expect } from 'chai'; import { every, mergeMap } from 'rxjs/operators'; import { TestScheduler } from 'rxjs/testing'; import type { Observer } from 'rxjs'; -import { of, Observable } from 'rxjs'; +import { of, Observable, Subject } from 'rxjs'; import { observableMatcher } from '../helpers/observableMatcher'; /** @test {every} */ @@ -33,19 +33,6 @@ describe('every', () => { }); }); - it('should accept thisArg with scalar observables', () => { - const thisArg = {}; - - of(1) - .pipe( - every(function (this: any, value: number, index: number) { - expect(this).to.deep.equal(thisArg); - return true; - }, thisArg) - ) - .subscribe(); - }); - it('should increment index on each call to the predicate', () => { const indices: number[] = []; of(1, 2, 3, 4) @@ -60,36 +47,6 @@ describe('every', () => { expect(indices).to.deep.equal([0, 1, 2, 3]); }); - it('should accept thisArg with array observable', () => { - const thisArg = {}; - - of(1, 2, 3, 4) - .pipe( - every(function (this: any, value: number, index: number) { - expect(this).to.deep.equal(thisArg); - return true; - }, thisArg) - ) - .subscribe(); - }); - - it('should accept thisArg with ordinary observable', () => { - const thisArg = {}; - - const source = new Observable((observer: Observer) => { - observer.next(1); - observer.complete(); - }); - source - .pipe( - every(function (this: any, value: number, index: number) { - expect(this).to.deep.equal(thisArg); - return true; - }, thisArg) - ) - .subscribe(); - }); - it('should emit true if source is empty', () => { testScheduler.run(({ hot, expectObservable, expectSubscriptions }) => { const e1 = hot(' -----| '); @@ -344,4 +301,24 @@ describe('every', () => { expectSubscriptions(e1.subscriptions).toBe(e1subs); }); }); + + it('should handle reentrancy properly', () => { + const subject = new Subject(); + const results: any[] = []; + let n = 0; + + subject.pipe(every(() => false)).subscribe({ + next: (result) => { + results.push(result); + if (n < 3) { + subject.next(n++); + } + }, + complete: () => results.push('done'), + }); + + subject.next(n); + + expect(results).to.deep.equal([false, 'done']); + }); }); diff --git a/packages/rxjs/spec/operators/expand-spec.ts b/packages/rxjs/spec/operators/expand-spec.ts index ccc5d3beaa..f98b44f30f 100644 --- a/packages/rxjs/spec/operators/expand-spec.ts +++ b/packages/rxjs/spec/operators/expand-spec.ts @@ -30,23 +30,6 @@ describe('expand', () => { }); }); - it('should work with scheduler', () => { - testScheduler.run(({ cold, hot, expectObservable, expectSubscriptions }) => { - const e1 = hot(' --x----| ', { x: 1 }); - const e1subs = ' ^------! '; - const e2 = cold(' --c| ', { c: 2 }); - // --c| - // --c| - const expected = '--a-b-c-d|'; - const values = { a: 1, b: 2, c: 4, d: 8 }; - - const result = e1.pipe(expand((x) => (x === 8 ? EMPTY : e2.pipe(map((c) => c * x))), Infinity, testScheduler)); - - expectObservable(result).toBe(expected, values); - expectSubscriptions(e1.subscriptions).toBe(e1subs); - }); - }); - it('should map and recursively flatten', () => { testScheduler.run(({ cold, hot, expectObservable, expectSubscriptions }) => { const values = { @@ -470,35 +453,13 @@ describe('expand', () => { return cold(e2shape, { z: x + x }); }; - const result = e1.pipe(expand(project, undefined, undefined)); + const result = e1.pipe(expand(project, undefined)); expectObservable(result).toBe(expected, values); expectSubscriptions(e1.subscriptions).toBe(e1subs); }); }); - it('should work with the AsapScheduler', (done) => { - const expected = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]; - of(0) - .pipe( - expand((x) => of(x + 1), Infinity, asapScheduler), - take(10), - toArray() - ) - .subscribe({ next: (actual) => expect(actual).to.deep.equal(expected), error: done, complete: done }); - }); - - it('should work with the AsyncScheduler', (done) => { - const expected = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]; - of(0) - .pipe( - expand((x) => of(x + 1), Infinity, asyncScheduler), - take(10), - toArray() - ) - .subscribe({ next: (actual) => expect(actual).to.deep.equal(expected), error: done, complete: done }); - }); - it('should stop listening to a synchronous observable when unsubscribed', () => { const sideEffects: number[] = []; const synchronousObservable = new Observable((subscriber) => { diff --git a/packages/rxjs/src/internal/operators/every.ts b/packages/rxjs/src/internal/operators/every.ts index 05d002efb5..bc7c1d87b2 100644 --- a/packages/rxjs/src/internal/operators/every.ts +++ b/packages/rxjs/src/internal/operators/every.ts @@ -2,17 +2,7 @@ import { Observable, operate } from '@rxjs/observable'; import type { Falsy, OperatorFunction } from '../types.js'; export function every(predicate: BooleanConstructor): OperatorFunction extends never ? false : boolean>; -/** @deprecated Use a closure instead of a `thisArg`. Signatures accepting a `thisArg` will be removed in v8. */ -export function every( - predicate: BooleanConstructor, - thisArg: any -): OperatorFunction extends never ? false : boolean>; -/** @deprecated Use a closure instead of a `thisArg`. Signatures accepting a `thisArg` will be removed in v8. */ -export function every( - predicate: (this: A, value: T, index: number, source: Observable) => boolean, - thisArg: A -): OperatorFunction; -export function every(predicate: (value: T, index: number, source: Observable) => boolean): OperatorFunction; +export function every(predicate: (value: T, index: number) => boolean): OperatorFunction; /** * Returns an Observable that emits whether or not every item of the source satisfies the condition specified. @@ -39,27 +29,29 @@ export function every(predicate: (value: T, index: number, source: Observable * @return A function that returns an Observable of booleans that determines if * all items of the source Observable meet the condition specified. */ -export function every( - predicate: (value: T, index: number, source: Observable) => boolean, - thisArg?: any -): OperatorFunction { +export function every(predicate: (value: T, index: number) => boolean): OperatorFunction { return (source) => new Observable((destination) => { let index = 0; - source.subscribe( - operate({ - destination, - next: (value) => { - if (!predicate.call(thisArg, value, index++, source)) { - destination.next(false); - destination.complete(); - } - }, - complete: () => { - destination.next(true); + + const subscriber = operate({ + destination, + next: (value: T) => { + if (!predicate(value, index++)) { + // To prevent re-entrancy issues, we unsubscribe from the + // source as soon as possible. Because the `next` right below it + // could cause us to re-enter before we get to `complete()`. + subscriber.unsubscribe(); + destination.next(false); destination.complete(); - }, - }) - ); + } + }, + complete: () => { + destination.next(true); + destination.complete(); + }, + }); + + source.subscribe(subscriber); }); } diff --git a/packages/rxjs/src/internal/operators/expand.ts b/packages/rxjs/src/internal/operators/expand.ts index 74d1d9a1b7..bee1e7321a 100644 --- a/packages/rxjs/src/internal/operators/expand.ts +++ b/packages/rxjs/src/internal/operators/expand.ts @@ -1,21 +1,10 @@ -import type { OperatorFunction, ObservableInput, ObservedValueOf, SchedulerLike } from '../types.js'; import { Observable } from '@rxjs/observable'; +import type { ObservableInput, ObservedValueOf, OperatorFunction } from '../types.js'; import { mergeInternals } from './mergeInternals.js'; export function expand>( project: (value: T, index: number) => O, - concurrent?: number, - scheduler?: SchedulerLike -): OperatorFunction>; -/** - * @deprecated The `scheduler` parameter will be removed in v8. If you need to schedule the inner subscription, - * use `subscribeOn` within the projection function: `expand((value) => fn(value).pipe(subscribeOn(scheduler)))`. - * Details: Details: https://rxjs.dev/deprecations/scheduler-argument - */ -export function expand>( - project: (value: T, index: number) => O, - concurrent: number | undefined, - scheduler: SchedulerLike + concurrent?: number ): OperatorFunction>; /** @@ -61,8 +50,6 @@ export function expand>( * or the output Observable, returns an Observable. * @param concurrent Maximum number of input Observables being subscribed to * concurrently. - * @param scheduler The {@link SchedulerLike} to use for subscribing to - * each projected inner Observable. * @return A function that returns an Observable that emits the source values * and also result of applying the projection function to each value emitted on * the output Observable and merging the results of the Observables obtained @@ -70,8 +57,7 @@ export function expand>( */ export function expand>( project: (value: T, index: number) => O, - concurrent = Infinity, - scheduler?: SchedulerLike + concurrent = Infinity ): OperatorFunction> { concurrent = (concurrent || 0) < 1 ? Infinity : concurrent; return (source) => @@ -89,8 +75,7 @@ export function expand>( undefined, // Expand-specific - true, // Use expand path - scheduler // Inner subscription scheduler + true // Use expand path ) ); } diff --git a/packages/rxjs/src/internal/operators/mergeInternals.ts b/packages/rxjs/src/internal/operators/mergeInternals.ts index 913fc4600b..31e64e242a 100644 --- a/packages/rxjs/src/internal/operators/mergeInternals.ts +++ b/packages/rxjs/src/internal/operators/mergeInternals.ts @@ -1,7 +1,6 @@ -import type { Observable, Subscriber} from '@rxjs/observable'; +import type { Observable, Subscriber } from '@rxjs/observable'; import { from, operate } from '@rxjs/observable'; -import type { ObservableInput, SchedulerLike } from '../types.js'; -import { executeSchedule } from '../util/executeSchedule.js'; +import type { ObservableInput } from '../types.js'; /** * A process embodying the general "merge" strategy. This is used in @@ -13,8 +12,6 @@ import { executeSchedule } from '../util/executeSchedule.js'; * @param onBeforeNext Additional logic to apply before nexting to our consumer * @param expand If `true` this will perform an "expand" strategy, which differs only * in that it recurses, and the inner subscription must be schedule-able. - * @param innerSubScheduler A scheduler to use to schedule inner subscriptions, - * this is to support the expand strategy, mostly, and should be deprecated */ export function mergeInternals( source: Observable, @@ -22,9 +19,7 @@ export function mergeInternals( project: (value: T, index: number) => ObservableInput, concurrent: number, onBeforeNext?: (innerValue: R) => void, - expand?: boolean, - innerSubScheduler?: SchedulerLike, - additionalFinalizer?: () => void + expand?: boolean ) { // Buffered values, in the event of going over our concurrency limit const buffer: T[] = []; @@ -107,15 +102,7 @@ export function mergeInternals( // next conditional, if there were any more inner subscriptions // to start. while (buffer.length && active < concurrent) { - const bufferedValue = buffer.shift()!; - // Particularly for `expand`, we need to check to see if a scheduler was provided - // for when we want to start our inner subscription. Otherwise, we just start - // are next inner subscription. - if (innerSubScheduler) { - executeSchedule(destination, innerSubScheduler, () => doInnerSub(bufferedValue)); - } else { - doInnerSub(bufferedValue); - } + doInnerSub(buffer.shift()!); } // Check to see if we can complete, and complete if so. checkComplete(); @@ -140,10 +127,4 @@ export function mergeInternals( }, }) ); - - // Additional finalization (for when the destination is torn down). - // Other finalization is added implicitly via subscription above. - return () => { - additionalFinalizer?.(); - }; } diff --git a/packages/rxjs/src/internal/operators/mergeScan.ts b/packages/rxjs/src/internal/operators/mergeScan.ts index 38cd4d9400..c4ef34494d 100644 --- a/packages/rxjs/src/internal/operators/mergeScan.ts +++ b/packages/rxjs/src/internal/operators/mergeScan.ts @@ -77,7 +77,7 @@ export function mergeScan( // The accumulated state. let state = seed; - return mergeInternals( + mergeInternals( source, subscriber, (value, index) => accumulator(state, value, index), @@ -85,9 +85,11 @@ export function mergeScan( (value) => { state = value; }, - false, - undefined, - () => (state = null!) + false ); + + return () => { + state = null!; + }; }); }