Share infinite stream produced by 'expand' #6780
-
I have a paginated third-party resource living in a web service. What I want to do is turn that paginated resource into a stream of values, and let the client decide how many elements to use. That is, the client should not know that the original resource is paginated. So far I got the following code: import { from, Observable, of } from 'rxjs';
import { expand, mergeMap } from 'rxjs/operators';
interface Result {
page: number;
items: number[];
}
// Assume that this does a real HTTP request
function fetchPage(page: number = 0): Observable<Result> {
let items = [...Array(3).keys()].map((e) => e + 3 * page);
console.log('Fetching page ' + page);
return of({ page, items });
}
// Turn a paginated request into an inifnite stream of 'number'
function mkInifiniteStream(): Observable<number> {
return fetchPage().pipe(
expand((res) => fetchPage(res.page + 1)),
mergeMap((res) => from(res.items))
);
}
const infinite$ = mkInfiniteStream(); This works really well: I get a lazy infinite stream of numbers, and the client can just do Now, what I want to do is to share those values when dealing with multiple subscribers, that is: infinite$.pipe(take(10)).subscribe((v) => console.log('[1] got ', v));
// Assume that later we have new subscribers
setTimeout(() => {
infinte$.pipe(take(5)).subscribe((v) => console.log('[2] got ', v));
}, 1000);
setTimeout(() => {
infinte$.pipe(take(4)).subscribe((v) => console.log('[3] got ', v));
}, 1500); As you can see, we'll have multiple subscribers to the infinite stream, and I want to reuse the values already emitted in order to reduce the number of I tried the following, but I could not get the desired behavior:
Does not work since late subscribers result in multiple calls to 'fetchPage'.
Forces all values in the stream even though they are not needed (no client asked for all items yet) Any hint would be appreciated. In case anyone wants to try the code: https://stackblitz.com/edit/n4ywfw |
Beta Was this translation helpful? Give feedback.
Replies: 2 comments
-
I think it's not possible to do it fully reactive... All an observable knows as a producer is that someone is subscribed. Adding in a My solution keeps all the values received on an observable, and uses a custom observable to request more pages. It's essentially abstracting away the pagination from the consumer: const fetchNext$ = new Subject<void>();
const fetchedValues$ = fetchNext$.pipe(
observeOn(asapScheduler),
exhaustMap((_, i) => fetchPage(i).pipe(delay(1000))),
scan((arr, v) => {
arr.push(...v.items);
return arr;
}, []),
startWith([]),
shareReplay(1)
);
const infinite$ = new Observable<number>((obs) => {
let emitted = 0;
let requestDone = false;
const sub = fetchedValues$.subscribe({
next: (arr) => {
if (requestDone && emitted === arr.length) {
// Last request returned no new elements - We are finished.
obs.complete();
return;
}
// Emit the new values
while (emitted < arr.length && !obs.closed) {
obs.next(arr[emitted]);
emitted++;
}
// If the subscriber hasn't closed, request more values
if (!obs.closed) {
fetchNext$.next();
}
},
error: (e) => obs.error(e),
});
if (!obs.closed) {
requestDone = true;
fetchNext$.next();
}
return sub;
}); https://stackblitz.com/edit/n4ywfw-pnb3hp When building this, I've found out that |
Beta Was this translation helpful? Give feedback.
-
The solution I ended up with involved creating an stateful Observable using closures. We kind of make our own version of The only change is to function mkInfiniteStream() {
let lastPage = 0;
let itemsRead = [];
return defer(() =>
from(itemsRead).pipe(
concatWith(
fetchPage(lastPage).pipe(
expand((res) => fetchPage(res.page + 1)),
tap((res) => {
lastPage++;
itemsRead = [...itemsRead, ...res.items];
}),
mergeMap((res) => from(res.items))
)
)
)
);
}
// Usage is still the same:
const infinite$ = mkInfiniteStream(); The idea is to keep track internally of all items and the last page fetched. When requesting an item that was not fetched yet, the Observable is extended with items from the next page. This in turn updates all the items available ( Using I wish there was a more "pure" implementation, so if you come up with one please share it! This question was also posted on Stack Overflow |
Beta Was this translation helpful? Give feedback.
The solution I ended up with involved creating an stateful Observable using closures. We kind of make our own version of
share
which it's bolted in to the Observable.The only change is to
mkInfiniteStream
from the original code: