Skip to content

Commit fdad6ff

Browse files
committed
feat: combine operator
1 parent c6547f7 commit fdad6ff

File tree

4 files changed

+367
-4
lines changed

4 files changed

+367
-4
lines changed

package.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "rxjs",
3-
"version": "7.5.2",
3+
"version": "7.6.0",
44
"description": "Reactive Extensions for modern JavaScript",
55
"main": "./dist/cjs/index.js",
66
"module": "./dist/esm5/index.js",

src/internal/observable/combine.ts

+267
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,267 @@
1+
import { Observable } from '../Observable';
2+
import { ObservableInput, SchedulerLike, ObservedValueOf, ObservableInputTuple } from '../types';
3+
import { argsArgArrayOrObject } from '../util/argsArgArrayOrObject';
4+
import { Subscriber } from '../Subscriber';
5+
import { from } from './from';
6+
import { identity } from '../util/identity';
7+
import { Tetris } from '../util/tetris';
8+
import { Subscription } from '../Subscription';
9+
import { mapOneOrManyArgs } from '../util/mapOneOrManyArgs';
10+
import { popResultSelector, popScheduler } from '../util/args';
11+
import { createObject } from '../util/createObject';
12+
import { OperatorSubscriber } from '../operators/OperatorSubscriber';
13+
import { AnyCatcher } from '../AnyCatcher';
14+
import { executeSchedule } from '../util/executeSchedule';
15+
16+
// combine(any)
17+
// We put this first because we need to catch cases where the user has supplied
18+
// _exactly `any`_ as the argument. Since `any` literally matches _anything_,
19+
// we don't want it to randomly hit one of the other type signatures below,
20+
// as we have no idea at build-time what type we should be returning when given an any.
21+
22+
/**
23+
* You have passed `any` here, we can't figure out if it is
24+
* an array or an object, so you're getting `unknown`. Use better types.
25+
* @param arg Something typed as `any`
26+
*/
27+
export function combine<T extends AnyCatcher>(arg: T): Observable<unknown>;
28+
29+
// combine([a, b, c])
30+
export function combine(sources: []): Observable<never>;
31+
export function combine<A extends readonly unknown[]>(sources: readonly [...ObservableInputTuple<A>]): Observable<A>;
32+
33+
export function combine<A extends readonly unknown[], R>(
34+
sources: readonly [...ObservableInputTuple<A>],
35+
resultSelector: (...values: A) => R
36+
): Observable<R>;
37+
38+
// combine({a, b, c})
39+
export function combine(sourcesObject: { [K in any]: never }): Observable<never>;
40+
export function combine<T extends Record<string, ObservableInput<any>>>(
41+
sourcesObject: T
42+
): Observable<{ [K in keyof T]: ObservedValueOf<T[K]> }>;
43+
44+
/**
45+
* Combines multiple Observables to create an Observable whose values are
46+
* calculated from the time of subscription to its input Observables.
47+
*
48+
* <span class="informal">Whenever any input Observable emits a value, it
49+
* computes a formula using the values from all the inputs, then emits
50+
* the output of that formula.</span>
51+
*
52+
* ![](combine.png)
53+
*
54+
* `combine` combines the values from all the Observables passed in the
55+
* observables array. This is done by subscribing to each Observable in order and,
56+
* whenever any Observable emits, collecting an array of values from each Observable. So if you pass `n` Observables
57+
* to this operator, the returned Observable will always emit an array of `n` values, in an order
58+
* corresponding to the order of the passed Observables (the value from the first Observable
59+
* will be at index 0 of the array and so on).
60+
*
61+
* Static version of `combine` accepts an array of Observables. Note that an array of
62+
* Observables is a good choice, if you don't know beforehand how many Observables
63+
* you will combine. Passing an empty array will result in an Observable that
64+
* completes immediately.
65+
*
66+
* To ensure the output array always has the same length, `combine` will
67+
* actually wait for all input Observables to emit at least once,
68+
* before it starts emitting results. If some Observable does not emit a value but
69+
* completes, resulting Observable will complete at the same moment without
70+
* emitting anything, since it will now be impossible to include a value from the
71+
* completed Observable in the resulting array. Also, if some input Observable does
72+
* not emit any value and never completes, `combine` will also never emit
73+
* and never complete, since, again, it will wait for all streams to emit some
74+
* value.
75+
*
76+
* If at least one Observable was passed to `combine` and all passed Observables
77+
* emitted something, the resulting Observable will complete when all combined
78+
* streams complete. So even if some Observable completes, the result of
79+
* `combine` will still emit values when other Observables do. In case
80+
* of a completed Observable, its value from now on will always be the last
81+
* emitted value. On the other hand, if any Observable errors, `combine`
82+
* will error immediately as well, and all other Observables will be unsubscribed.
83+
*
84+
* ## Examples
85+
*
86+
* Combine two timer Observables
87+
*
88+
* ```ts
89+
* import { timer, combine } from 'rxjs';
90+
*
91+
* const firstTimer = timer(0, 1000); // emit 0, 1, 2... after every second, starting from now
92+
* const secondTimer = timer(500, 1000); // emit 0, 1, 2... after every second, starting 0,5s from now
93+
* const combinedTimers = combine([firstTimer, secondTimer]);
94+
* combinedTimers.subscribe(value => console.log(value));
95+
* // Logs
96+
* // [0, 0] after 0.5s
97+
* // [1, 0] after 1s
98+
* // [1, 1] after 1.5s
99+
* // [2, 1] after 2s
100+
* ```
101+
*
102+
* Combine a dictionary of Observables
103+
*
104+
* ```ts
105+
* import { of, delay, startWith, combine } from 'rxjs';
106+
*
107+
* const observables = {
108+
* a: of(1).pipe(delay(1000), startWith(0)),
109+
* b: of(5).pipe(delay(5000), startWith(0)),
110+
* c: of(10).pipe(delay(10000), startWith(0))
111+
* };
112+
* const combined = combine(observables);
113+
* combined.subscribe(value => console.log(value));
114+
* // Logs
115+
* // { a: 0, b: 0, c: 0 } immediately
116+
* // { a: 1, b: 0, c: 0 } after 1s
117+
* // { a: 1, b: 5, c: 0 } after 5s
118+
* // { a: 1, b: 5, c: 10 } after 10s
119+
* ```
120+
*
121+
* Combine an array of Observables
122+
*
123+
* ```ts
124+
* import { of, delay, startWith, combine } from 'rxjs';
125+
*
126+
* const observables = [1, 5, 10].map(
127+
* n => of(n).pipe(
128+
* delay(n * 1000), // emit 0 and then emit n after n seconds
129+
* startWith(0)
130+
* )
131+
* );
132+
* const combined = combine(observables);
133+
* combined.subscribe(value => console.log(value));
134+
* // Logs
135+
* // [0, 0, 0] immediately
136+
* // [1, 0, 0] after 1s
137+
* // [1, 5, 0] after 5s
138+
* // [1, 5, 10] after 10s
139+
* ```
140+
*
141+
* Use map operator to dynamically calculate the Body-Mass Index
142+
*
143+
* ```ts
144+
* import { of, combine, map } from 'rxjs';
145+
*
146+
* const weight = of(70, 72, 76, 79, 75);
147+
* const height = of(1.76, 1.77, 1.78);
148+
* const bmi = combine([weight, height]).pipe(
149+
* map(([w, h]) => w / (h * h)),
150+
* );
151+
* bmi.subscribe(x => console.log('BMI is ' + x));
152+
*
153+
* // With output to console:
154+
* // BMI is 24.212293388429753
155+
* // BMI is 23.93948099205209
156+
* // BMI is 23.671253629592222
157+
* ```
158+
*
159+
* @see {@link combineAll}
160+
* @see {@link merge}
161+
* @see {@link withLatestFrom}
162+
*
163+
* @param {ObservableInput} [observables] An array of input Observables to combine with each other.
164+
* An array of Observables must be given as the first argument.
165+
* @param {function} [project] An optional function to project the values from
166+
* the combined values into a new value on the output Observable.
167+
* @param {SchedulerLike} [scheduler=null] The {@link SchedulerLike} to use for subscribing to
168+
* each input Observable.
169+
* @return {Observable} An Observable of projected values from the most recent
170+
* values from each input Observable, or an array of the most recent values from
171+
* each input Observable.
172+
*/
173+
export function combine<O extends ObservableInput<any>, R>(...args: any[]): Observable<R> | Observable<ObservedValueOf<O>[]> {
174+
const scheduler = popScheduler(args);
175+
const resultSelector = popResultSelector(args);
176+
177+
const { args: observables, keys } = argsArgArrayOrObject(args);
178+
179+
if (observables.length === 0) {
180+
// If no observables are passed, or someone has passed an ampty array
181+
// of observables, or even an empty object POJO, we need to just
182+
// complete (EMPTY), but we have to honor the scheduler provided if any.
183+
return from([], scheduler as any);
184+
}
185+
186+
const result = new Observable<ObservedValueOf<O>[]>(
187+
combineInit(
188+
observables as ObservableInput<ObservedValueOf<O>>[],
189+
scheduler,
190+
keys
191+
? // A handler for scrubbing the array of args into a dictionary.
192+
(values) => createObject(keys, values)
193+
: // A passthrough to just return the array
194+
identity
195+
)
196+
);
197+
198+
return resultSelector ? (result.pipe(mapOneOrManyArgs(resultSelector)) as Observable<R>) : result;
199+
}
200+
201+
export function combineInit(
202+
observables: ObservableInput<any>[],
203+
scheduler?: SchedulerLike,
204+
valueTransform: (values: any[]) => any = identity
205+
) {
206+
return (subscriber: Subscriber<any>) => {
207+
// The outer subscription. We're capturing this in a function
208+
// because we may have to schedule it.
209+
maybeSchedule(
210+
scheduler,
211+
() => {
212+
const { length } = observables;
213+
// A store for the values each observable has emitted so far. We match observable to value on index.
214+
const tetris = new Tetris(length);
215+
// The number of currently active subscriptions, as they complete, we decrement this number to see if
216+
// we are all done combining values, so we can complete the result.
217+
let active = length;
218+
// The loop to kick off subscription. We're keying everything on index `i` to relate the observables passed
219+
// in to the slot in the output array or the key in the array of keys in the output dictionary.
220+
for (let i = 0; i < length; i++) {
221+
maybeSchedule(
222+
scheduler,
223+
() => {
224+
const source = from(observables[i], scheduler as any);
225+
source.subscribe(
226+
new OperatorSubscriber(
227+
subscriber,
228+
(value) => {
229+
// When we get a value, record it in our set of values.
230+
tetris.pushToColumn(i, value);
231+
while (tetris.hasNext()) {
232+
const next = tetris.getNext();
233+
// We're not waiting for any more
234+
// first values, so we can emit!
235+
subscriber.next(valueTransform(next));
236+
}
237+
},
238+
() => {
239+
if (!--active) {
240+
// We only complete the result if we have no more active
241+
// inner observables.
242+
subscriber.complete();
243+
}
244+
}
245+
)
246+
);
247+
},
248+
subscriber
249+
);
250+
}
251+
},
252+
subscriber
253+
);
254+
};
255+
}
256+
257+
/**
258+
* A small utility to handle the couple of locations where we want to schedule if a scheduler was provided,
259+
* but we don't if there was no scheduler.
260+
*/
261+
function maybeSchedule(scheduler: SchedulerLike | undefined, execute: () => void, subscription: Subscription) {
262+
if (scheduler) {
263+
executeSchedule(subscription, scheduler, execute);
264+
} else {
265+
execute();
266+
}
267+
}

src/internal/operators/combineAll.ts

+49-3
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,52 @@
1-
import { combineLatestAll } from './combineLatestAll';
1+
import { OperatorFunction, ObservableInput } from '../types';
2+
import { joinAllInternals } from './joinAllInternals';
3+
import { combine } from '../observable/combine';
4+
5+
export function combineAll<T>(): OperatorFunction<ObservableInput<T>, T[]>;
6+
export function combineAll<T>(): OperatorFunction<any, T[]>;
7+
export function combineAll<T, R>(project: (...values: T[]) => R): OperatorFunction<ObservableInput<T>, R>;
8+
export function combineAll<R>(project: (...values: Array<any>) => R): OperatorFunction<any, R>;
29

310
/**
4-
* @deprecated Renamed to {@link combineLatestAll}. Will be removed in v8.
11+
* Flattens an Observable-of-Observables by applying {@link combine} when the Observable-of-Observables completes.
12+
*
13+
* ![](combineAll.png)
14+
*
15+
* `combineAll` takes an Observable of Observables, and collects all Observables from it. Once the outer Observable completes,
16+
* it subscribes to all collected Observables and combines their values using the {@link combineLatest} strategy, such that:
17+
*
18+
* * Every time an inner Observable emits, the output Observable emits
19+
* * When the returned observable emits, it emits all of the latest values by:
20+
* * If a `project` function is provided, it is called with each recent value from each inner Observable in whatever order they
21+
* arrived, and the result of the `project` function is what is emitted by the output Observable.
22+
* * If there is no `project` function, an array of all the most recent values is emitted by the output Observable.
23+
*
24+
* ## Example
25+
*
26+
* Map two click events to a finite interval Observable, then apply `combineAll`
27+
*
28+
* ```ts
29+
* import { fromEvent, map, interval, take, combineAll } from 'rxjs';
30+
*
31+
* const clicks = fromEvent(document, 'click');
32+
* const higherOrder = clicks.pipe(
33+
* map(() => interval(Math.random() * 2000).pipe(take(3))),
34+
* take(2)
35+
* );
36+
* const result = higherOrder.pipe(combineAll());
37+
*
38+
* result.subscribe(x => console.log(x));
39+
* ```
40+
*
41+
* @see {@link combineLatest}
42+
* @see {@link combineLatestWith}
43+
* @see {@link mergeAll}
44+
*
45+
* @param project optional function to map the most recent values from each inner Observable into a new result.
46+
* Takes each of the most recent values from each collected inner Observable as arguments, in order.
47+
* @return A function that returns an Observable that flattens Observables
48+
* emitted by the source Observable.
549
*/
6-
export const combineAll = combineLatestAll;
50+
export function combineAll<R>(project?: (...values: Array<any>) => R) {
51+
return joinAllInternals(combine, project);
52+
}

src/internal/util/tetris.ts

+50
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
export class Tetris<T extends any> {
2+
private readonly arrays: Array<Array<T>>;
3+
private readonly lastEmissionIndices: Array<number>;
4+
5+
constructor(readonly columns: number = 2) {
6+
this.arrays = new Array<Array<T>>();
7+
this.lastEmissionIndices = new Array<number>(columns);
8+
for (let i = 0; i < columns; i++) {
9+
this.arrays[i] = new Array<T>();
10+
this.lastEmissionIndices[i] = -1;
11+
}
12+
}
13+
14+
getNext(): Array<T> {
15+
const next = new Array<T>();
16+
if (!this.hasNext()) {
17+
return [];
18+
}
19+
for (let column = 0; column < this.columns; column++) {
20+
if (this._hasNewIn(column)) {
21+
this.lastEmissionIndices[column] += 1;
22+
}
23+
next.push(this.arrays[column][this.lastEmissionIndices[column]]);
24+
}
25+
return next;
26+
}
27+
28+
private _hasNewIn(column: number): boolean {
29+
if (this.lastEmissionIndices[column] < this.arrays[column].length - 1) {
30+
return true;
31+
}
32+
return false;
33+
}
34+
35+
hasNext(): boolean {
36+
let hasNext: boolean = false;
37+
for (let column = 0; column < this.columns; column++) {
38+
if (this.arrays[column].length === 0) {
39+
return false;
40+
}
41+
hasNext = hasNext || this._hasNewIn(column);
42+
}
43+
return hasNext;
44+
}
45+
46+
pushToColumn(column: number, value: T): Tetris<T> {
47+
this.arrays[column].push(value);
48+
return this;
49+
}
50+
}

0 commit comments

Comments
 (0)