Skip to content

feat: Add allValuesFrom function #6774

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions api_guard/dist/types/index.d.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
export declare function allValuesFrom<T>(source: Observable<T>): Promise<T[]>;

export declare const animationFrame: AnimationFrameScheduler;

export declare function animationFrames(timestampProvider?: TimestampProvider): Observable<{
Expand Down
39 changes: 39 additions & 0 deletions spec/allValuesFrom-spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import { interval, allValuesFrom, EMPTY, throwError, of } from 'rxjs';
import { expect } from 'chai';
import { finalize, take } from 'rxjs/operators';

describe('allValuesFrom', () => {
it('should emit all the values as a promise', async () => {
let finalized = false;
const source = interval(2).pipe(take(4), finalize(() => (finalized = true)));
const result = await allValuesFrom(source);
expect(result).to.deep.equal([0, 1, 2, 3]);
expect(finalized).to.be.true;
});

it('should produce empty arrays for empty observables', async () => {
const source = EMPTY;
const result = await allValuesFrom(source);
expect(result).to.be.empty;
});

it('should error for errored observables', async () => {
const source = throwError(() => new Error('blorp!'));
let error: any = null;
try {
await allValuesFrom(source);
} catch (err) {
error = err;
}
expect(error).to.be.an.instanceOf(Error);
expect(error.message).to.equal('blorp!');
});

it('should work with a synchronous observable', async () => {
let finalized = false;
const source = of('apples', 'bananas').pipe(finalize(() => (finalized = true)));
const result = await allValuesFrom(source);
expect(result).to.deep.equal(['apples', 'bananas']);
expect(finalized).to.be.true;
});
});
1 change: 1 addition & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ export { isObservable } from './internal/util/isObservable';
/* Promise Conversion */
export { lastValueFrom } from './internal/lastValueFrom';
export { firstValueFrom } from './internal/firstValueFrom';
export { allValuesFrom } from './internal/allValuesFrom';

/* Error types */
export { ArgumentOutOfRangeError } from './internal/util/ArgumentOutOfRangeError';
Expand Down
54 changes: 54 additions & 0 deletions src/internal/allValuesFrom.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import { Observable } from './Observable';
import { firstValueFrom } from './firstValueFrom';
import { toArray } from './operators/toArray';

/**
* Converts an observable to a promise by subscribing to the observable,
* and returning a promise that will resolve as soon as it completes,
* producing an array of all the values that the observable emitted. (If the
* observable completes without emitting any values, the promise will resolve
* with an empty array.)
*
* If the observable stream emits an error, the returned promise will reject
* with that error.
*
* **WARNING**: Only use this with observables you *know* will complete. If
* the source observable does not complete, you will end up with a promise
* that is hung up, and potentially all the state of an async function hanging
* out in memory. To avoid this situation, look into adding something like
* {@link timeout}, {@link take}, {@link takeWhile}, or {@link takeUntil}
* amongst others.
*
* ## Example
*
* Wait for the first three values from a stream and emit them from a promise
* in an async function
*
* ```ts
* import { interval, take, firstValueFrom } from 'rxjs';
*
* async function execute() {
* const source$ = interval(2000).pipe(take(3));
* const numbers = await allValuesFrom(source$);
* console.log(`The numbers are: ${ numbers.join(', ') }`);
* }
*
* execute();
*
* // Expected output:
* // 'The numbers are: 0, 1, 2'
* ```
*
* Note that this function is equivalent to piping the observable through
* {@link toArray} and then calling either {@link firstValueFrom} or
* {@link lastValueFrom} (which are equivalent in this case).
*
* @see {@link firstValueFrom}
* @see {@link lastValueFrom}
* @see {@link toArray}
*
* @param source the observable to convert to a promise
*/
export function allValuesFrom<T>(source: Observable<T>): Promise<T[]> {
return firstValueFrom(source.pipe(toArray()));
}
1 change: 1 addition & 0 deletions src/internal/firstValueFrom.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ export function firstValueFrom<T>(source: Observable<T>): Promise<T>;
* ```
*
* @see {@link lastValueFrom}
* @see {@link allValuesFrom}
*
* @param source the observable to convert to a promise
* @param config a configuration object to define the `defaultValue` to use if the source completes without emitting a value
Expand Down
1 change: 1 addition & 0 deletions src/internal/lastValueFrom.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ export function lastValueFrom<T>(source: Observable<T>): Promise<T>;
* ```
*
* @see {@link firstValueFrom}
* @see {@link allValuesFrom}
*
* @param source the observable to convert to a promise
* @param config a configuration object to define the `defaultValue` to use if the source completes without emitting a value
Expand Down
2 changes: 2 additions & 0 deletions src/internal/operators/toArray.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ const arrReducer = (arr: any[], value: any) => (arr.push(value), arr);
* // output: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
* ```
*
* @see {@link allValuesFrom}
*
* @return A function that returns an Observable that emits an array of items
* emitted by the source Observable when source completes.
*/
Expand Down