diff --git a/modules/component-store/spec/component-store.spec.ts b/modules/component-store/spec/component-store.spec.ts index 4c62531275..ff32d25ea9 100644 --- a/modules/component-store/spec/component-store.spec.ts +++ b/modules/component-store/spec/component-store.spec.ts @@ -25,7 +25,9 @@ import { of, queueScheduler, scheduled, + Subject, Subscription, + switchMap, throwError, timer, } from 'rxjs'; @@ -1776,6 +1778,65 @@ describe('Component Store', () => { jest.advanceTimersByTime(20); }); }); + + describe('Resubscribes on errors', () => { + let eff: ReturnType>; + let lastResult: string | undefined; + let lastError: string | undefined; + + beforeEach(() => { + lastResult = undefined; + + eff = componentStore.effect((_) => + _.pipe( + tap((r) => (lastResult = r)), + switchMap((v) => { + if (v === 'error') { + lastError = v; + return throwError(() => 'err'); + } + lastError = undefined; + return of(v); + }) + ) + ); + }); + + it('resubscribes on error in generator', () => { + expect(lastError).toEqual(undefined); + eff('error'); + expect(lastResult).toEqual('error'); + expect(lastError).toEqual('error'); + + eff('next'); + expect(lastResult).toEqual('next'); + expect(lastError).toEqual(undefined); + }); + + it('resubscribes when value$ throws an error', () => { + expect(lastError).toEqual(undefined); + const s = new Subject(); + const m = s.pipe( + switchMap((v) => { + if (v === 'error') { + return throwError(() => 'err'); + } + return of(v); + }) + ); + eff(m); + s.next('a'); + expect(lastResult).toEqual('a'); + s.next('error'); + expect(lastResult).toEqual('a'); + s.next('b'); + expect(lastResult).toEqual('b'); + + eff('next'); + expect(lastResult).toEqual('next'); + expect(lastError).toEqual(undefined); + }); + }); }); describe('get', () => { diff --git a/modules/component-store/src/component-store.ts b/modules/component-store/src/component-store.ts index bad73c4797..265f248b60 100644 --- a/modules/component-store/src/component-store.ts +++ b/modules/component-store/src/component-store.ts @@ -12,6 +12,7 @@ import { asapScheduler, EMPTY, ObservedValueOf, + retry, } from 'rxjs'; import { takeUntil, @@ -403,14 +404,14 @@ export class ComponentStore implements OnDestroy { const origin$ = new Subject(); generator(origin$ as OriginType) // tied to the lifecycle 👇 of ComponentStore - .pipe(takeUntil(this.destroy$)) + .pipe(retry(), takeUntil(this.destroy$)) .subscribe(); return (( observableOrValue?: ObservableType | Observable ): Subscription => { const observable$ = isObservable(observableOrValue) - ? observableOrValue + ? observableOrValue.pipe(retry()) : of(observableOrValue); return observable$.pipe(takeUntil(this.destroy$)).subscribe((value) => { // any new 👇 value is pushed into a stream