Awaiting stream.first hangs for a stream built from combineLatest and
switchMap over open-ended async* streams.
This looks very similar to the RxDart issue below, but for stream_transform:
Minimal reproduction
import 'dart:async';
import 'dart:io';
import 'package:stream_transform/stream_transform.dart';
Future<void> main() async {
final stream = _emitOnceAndNeverClose('left').combineLatest(
Stream.value('right').switchMap(_emitOnceAndNeverClose),
(left, right) => '$left|$right',
);
final value = await stream.first.timeout(const Duration(milliseconds: 200));
stdout.writeln(value);
}
Stream<T> _emitOnceAndNeverClose<T>(T value) async* {
yield value;
await Completer<void>().future;
}
How to run
Expected behavior
The program should complete and print:
Actual behavior
The program times out:
Unhandled exception:
TimeoutException after 0:00:00.200000: Future not completed
Without the timeout, it hangs indefinitely.
What I verified
switchMap is required. Replacing the switchMap(...) branch with a direct
_emitOnceAndNeverClose('right') stream makes the problem disappear.
- The non-
switchMap branch also matters. Replacing
_emitOnceAndNeverClose('left') with Stream.value('left') makes the
problem disappear.
Stream.value('right').switchMap(_emitOnceAndNeverClose) alone works.
combineLatest without switchMap works.
Additional context
We ran into this in a Flutter app where the source streams were long-lived
database watchers from Drift (watchSingleOrNull()), wrapped in async*
syncers. Those streams are intentionally open-ended until canceled.
The failure happened when we composed those watcher streams with
combineLatest and switchMap, then awaited .first to get one initial
bootstrap value. The minimal repro above removes Drift entirely and still
reproduces, so this does not appear to be Drift-specific.
Environment
- stream_transform:
2.1.1
- Dart SDK:
3.12.0-113.1.beta
- Flutter:
3.42.0-0.0.pre
- Platform:
macos_arm64
Related context
This seems especially relevant because stream_transform's changelog already
mentions cancellation behavior in switchLatest / switchMap:
And the current implementation of both switchMap and combineLatest
explicitly waits for cancel() futures during teardown, which seems related to
this hang.
Awaiting
stream.firsthangs for a stream built fromcombineLatestandswitchMapover open-endedasync*streams.This looks very similar to the RxDart issue below, but for
stream_transform:Stream.firsthangs forcombineLatest + switchMapwith open-endedasync*streams ReactiveX/rxdart#804Minimal reproduction
How to run
Expected behavior
The program should complete and print:
Actual behavior
The program times out:
Without the timeout, it hangs indefinitely.
What I verified
switchMapis required. Replacing theswitchMap(...)branch with a direct_emitOnceAndNeverClose('right')stream makes the problem disappear.switchMapbranch also matters. Replacing_emitOnceAndNeverClose('left')withStream.value('left')makes theproblem disappear.
Stream.value('right').switchMap(_emitOnceAndNeverClose)alone works.combineLatestwithoutswitchMapworks.Additional context
We ran into this in a Flutter app where the source streams were long-lived
database watchers from Drift (
watchSingleOrNull()), wrapped inasync*syncers. Those streams are intentionally open-ended until canceled.
The failure happened when we composed those watcher streams with
combineLatestandswitchMap, then awaited.firstto get one initialbootstrap value. The minimal repro above removes Drift entirely and still
reproduces, so this does not appear to be Drift-specific.
Environment
2.1.13.12.0-113.1.beta3.42.0-0.0.premacos_arm64Related context
This seems especially relevant because
stream_transform's changelog alreadymentions cancellation behavior in
switchLatest/switchMap:And the current implementation of both
switchMapandcombineLatestexplicitly waits for
cancel()futures during teardown, which seems related tothis hang.