Conversation
Add a `SharedStdinSubscription` which stores it's callbacks and has a `divert` method to create a new stream receiving events from stdin in place of the original listener.
Package publishing
Documentation at https://github.com/dart-lang/ecosystem/wiki/Publishing-automation. |
PR HealthLicense Headers ✔️
All source files should start with a license header. Unrelated files missing license headers
This check can be disabled by tagging the PR with Unused Dependencies ✔️
For details on how to fix these, see dependency_validator. This check can be disabled by tagging the PR with Changelog Entry ✔️
Changes to files need to be accounted for in their respective changelogs. This check can be disabled by tagging the PR with Coverage ✔️
This check for test coverage is informational (issues shown here will not fail the PR). This check can be disabled by tagging the PR with Breaking changes ✔️
This check can be disabled by tagging the PR with API leaks ✔️The following packages contain symbols visible in the public API, but not exported by the library. Export these symbols or remove them from your publicly visible API.
This check can be disabled by tagging the PR with |
|
cc @jakemac53 here is an idea of what I had in mind. If you have a particular UX in mind that's not covered here we can anchor on a different API to start. I suspect anything we do is going to be easier to fit in here than a different library. |
There was a problem hiding this comment.
Code Review
This pull request adds a SharedStdinSubscription class and a divert method to SharedStdIn to enable temporary redirection of input events. The review identifies a compilation error in the constructor, logic flaws in event handler management during diversion, and missing documentation for public members. It also recommends expanding test coverage to include edge cases like handler updates during active diversions and multiple diversion attempts.
| class SharedStdinSubscription extends DelegatingStreamSubscription<List<int>> { | ||
| final void Function(List<int>)? _onData; | ||
| final void Function()? _onDone; | ||
| final Function? _onError; | ||
| SharedStdinSubscription._( | ||
| super.sourceSubscription, this._onData, this._onDone, this._onError); | ||
|
|
||
| /// Temporarily diverts events from this stream into a new stream. | ||
| /// | ||
| /// Buffers events until the returned stream has a listener. After a listener | ||
| /// on the returned stream cancels, subsequent events will be delievered to | ||
| /// the original [onData] callback of this subscription. | ||
| /// | ||
| /// While the returned stream has a listener all events an errors are passed | ||
| /// only to the substream listener's callbacks. If this stream ends while the | ||
| /// returned stream has a listener both the substream and this stream's | ||
| /// [onDone] callback is invoked. | ||
| Stream<List<int>> divert() { | ||
| final controller = StreamController<List<int>>( | ||
| onCancel: () { | ||
| super.onData(_onData); | ||
| super.onError(_onError); | ||
| super.onDone(_onDone); | ||
| }, | ||
| sync: true, | ||
| ); | ||
|
|
||
| super.onData(controller.add); | ||
| super.onError(controller.addError); | ||
| super.onDone(() { | ||
| controller.close(); | ||
| _onDone?.call(); | ||
| }); | ||
|
|
||
| return controller.stream; | ||
| } | ||
| } |
There was a problem hiding this comment.
The SharedStdinSubscription class has several issues related to state management and correctness:
- Documentation: As per the repository style guide, public members should have documentation. The class itself is missing a doc comment.
- Constructor Error: The use of
super.sourceSubscriptionis likely a compilation error. Inpackage:async, theDelegatingStreamSubscriptionconstructor parameter is namedsource. - State Management: The handlers
_onData,_onDone, and_onErrorarefinaland never updated. If a user callsonData(...)on the subscription, the new handler is applied to the underlying subscription but not stored here. When a diversion ends, the original (initial) handler will be restored, discarding the user's update. - Diversion Integrity: Calling
onData,onError, oronDonewhile a diversion is active will immediately overwrite the diversion's internal handlers on the source subscription, breaking the diversion logic. - Typos: There are typos in the
divertdocumentation ("delievered", "an errors").
I suggest refactoring the class to track the current handlers and whether a diversion is active.
/// A subscription to [sharedStdIn] that can be temporarily diverted.
class SharedStdinSubscription extends DelegatingStreamSubscription<List<int>> {
void Function(List<int>)? _onData;
void Function()? _onDone;
Function? _onError;
bool _isDiverted = false;
SharedStdinSubscription._(
StreamSubscription<List<int>> source, this._onData, this._onDone, this._onError)
: super(source);
@override
void onData(void Function(List<int>)? handleData) {
_onData = handleData;
if (!_isDiverted) super.onData(handleData);
}
@override
void onError(Function? handleError) {
_onError = handleError;
if (!_isDiverted) super.onError(handleError);
}
@override
void onDone(void Function()? handleDone) {
_onDone = handleDone;
if (!_isDiverted) super.onDone(handleDone);
}
/// Temporarily diverts events from this stream into a new stream.
///
/// Buffers events until the returned stream has a listener. After a listener
/// on the returned stream cancels, subsequent events will be delivered to
/// the original [onData] callback of this subscription.
///
/// While the returned stream has a listener all events and errors are passed
/// only to the substream listener's callbacks. If this stream ends while the
/// returned stream has a listener both the substream and this stream's
/// [onDone] callback is invoked.
Stream<List<int>> divert() {
if (_isDiverted) {
throw StateError('Subscription is already diverted');
}
_isDiverted = true;
final controller = StreamController<List<int>>(
onCancel: () {
_isDiverted = false;
super.onData(_onData);
super.onError(_onError);
super.onDone(_onDone);
},
sync: true,
);
super.onData(controller.add);
super.onError(controller.addError);
super.onDone(() {
_isDiverted = false;
controller.close();
_onDone?.call();
});
return controller.stream;
}
}References
- At least all public members should have documentation, answering the why. (link)
- DartDoc comments should not start with 'Returns' if it's not the first line of the DartDoc. The first line should describe what the function does, and subsequent lines can describe the return value.
| test('should temporarily divert events', () async { | ||
| final logs = <List<int>>[]; | ||
| final sub = sharedStdIn.listen(logs.add); | ||
| fakeStdIn.add('a'); | ||
| await pumpEventQueue(times: 0); | ||
| expect(logs, ['a'.codeUnits]); | ||
|
|
||
| final diverted = sub.divert(); | ||
| fakeStdIn.add('b'); | ||
| await pumpEventQueue(times: 0); | ||
| expect(logs, ['a'.codeUnits]); | ||
|
|
||
| final divertedLogs = <List<int>>[]; | ||
| final divertedSub = diverted.listen(divertedLogs.add); | ||
| await pumpEventQueue(times: 0); | ||
| expect(divertedLogs, ['b'.codeUnits]); | ||
|
|
||
| fakeStdIn.add('c'); | ||
| await pumpEventQueue(times: 0); | ||
| expect(divertedLogs, ['b'.codeUnits, 'c'.codeUnits]); | ||
| expect(logs, ['a'.codeUnits]); | ||
|
|
||
| final cancelDone = divertedSub.cancel(); | ||
| fakeStdIn.add('d'); | ||
| await cancelDone; | ||
| fakeStdIn.add('e'); | ||
| await pumpEventQueue(times: 0); | ||
| expect(logs, ['a'.codeUnits, 'd'.codeUnits, 'e'.codeUnits]); | ||
|
|
||
| await sub.cancel(); | ||
| }); |
There was a problem hiding this comment.
The test coverage for divert should be expanded to include edge cases such as:
- Changing the
onDatahandler before callingdivertand ensuring it is restored correctly. - Changing the
onDatahandler during an active diversion. - Verifying behavior when multiple
divert()calls are attempted (which should ideally be prevented or handled explicitly).
When writing these tests, prefer using raw strings (e.g., r'$foo') for expectations involving string literals with special characters like '$' to improve readability and avoid confusion with escape sequences.
References
- For test expectations involving string literals with special characters (like '$'), prefer using raw strings (e.g., r'$foo') to improve readability and avoid confusion with escape sequences.
|
I think this would work, should it live in package:async maybe or do you think it's too specific to the stdin use case? |
IMO it's a better fit next to |
| /// only to the substream listener's callbacks. If this stream ends while the | ||
| /// returned stream has a listener both the substream and this stream's | ||
| /// [onDone] callback is invoked. | ||
| Stream<List<int>> divert() { |
There was a problem hiding this comment.
Possibly this doesn't matter, but there is a bit of an edge case here where the diverted stream never ends up being listened to. In that case, it will end up actually stealing all future stdin events and this subscription will never see any.
Resolving that might be complicated, unless you actually left the diversion up to the consumer - so they only divert the stream if they will for sure be listening to it. Technically this API is compatible with that but it doesn't enforce it - if this acted more like listen instead it would enforce that type of behavior.
That approach would also open up the ability to return a nested SharedStdInDescription to allow for nested diversion of events.
There was a problem hiding this comment.
there is a bit of an edge case here where the diverted stream never ends up being listened to
I'd be happy with either:
- Document that the returned stream must have a listener before the original subscription will get events again.
- Add a method like
closeDivertedStreamonSharedStdInSubscriptionwhich can handle this case.
There is also edge cases around the diverted stream not consuming all the events that happened while it was open which has me a little more worried.
|
Why? What is the use-case? Also: Is this a good idea? If it is, why is it only StdIn that can do it? How about making Give
The current behavior is Having one subscription with one possible divertion feels very special-casey. |
The use case is to be able to create a fully encapsulated terminal UI component that any existing terminal UI can easily delegate to (and temporarily hand off stdin to). Specifically we are creating a single/multi select dialog component for use in our existing CLIs. This should own stdin while it is active but return control of it when it's done. It doesn't require buying into a whole terminal UI framework etc, just plug and play anywhere. |
To allow allow code that implements rich user interactions on the CLI, and allow them to look more like utilities than frameworks which enforce particular interaction patterns like stopping a listener on After seeing the assumptions we need to make about handling
I'm very open to alternatives. This feels better to me than
It's the first use case where it came up, and it fits very naturally into this implementation. extension DivertableListen<T> on Stream<T> {
DivertableSubscription<T> divertableListen(...);
}
That is the behavior of
The single level divert is indeed a new special case, it's simpler and all we happen to need now. I could look at having the
The original listener in this case doesn't want to listen to these events, so we'd need extra complexity in the callbacks to ignore events and coordinate when to resume responding. This API uses standard stream interactions to coordinate that part. |
| onError: onError, | ||
| cancelOnError: cancelOnError, | ||
| onDone, | ||
| onError, |
There was a problem hiding this comment.
These should be bound in the current zone.
The arguments to listen or subscription.onXX should be called in the zone where that call happened.
If you store the callbacks now, and then pass them to another subscription at a later point,
they would get bound in the latter zone, which is not correct.
(I'm not sure this entire idea is zone-safe.)
There was a problem hiding this comment.
Isn't this handled by the inner _subscription?
This class does not invoke any of the callbacks, they are only forwarded to another StreamSubscription which would invoke them and I assumed handle any necessary zone considerations.
There was a problem hiding this comment.
A subscription binds callbacks in the current zone, where the, fx, onData is called.
When your subscription wrapper stores the pre-binding callback, and then later sets that on the subscription again, that later may be in a differnt current zone. That means the callback may end up being called in a different zone than where the client created the callback.
That could cause errors.
The perfect solution would be to store both the callback the current zone where it was last set, and then when you later set the callback on the subscription again, you do that as _onDoneZone.run(() { subscription.onDone(_onDone); }).
A probably sufficent solution would be to bind the function when you first receive it.
Then it will be double-bound by the subscription, but the innermost binding wins. It's just more unnecessary overhead on each call, instead of more storage and work when reinstalling the listener.
(But then I wouldn't do this shuffling at the subscription level. I'd want an abstraction level that is higher up, without adding anything to the Stream or StreamSubscription interfaces.)
| /// only to the substream listener's callbacks. If this stream ends while the | ||
| /// returned stream has a listener both the substream and this stream's | ||
| /// [onDone] callback is invoked. | ||
| Stream<List<int>> divert() { |
There was a problem hiding this comment.
Could it return a SharedStdinSubscription? So you could divert that again?
| } | ||
|
|
||
| /// A subscription to [sharedStdIn] that can be temporarily diverted. | ||
| class SharedStdInSubscription implements StreamSubscription<List<int>> { |
There was a problem hiding this comment.
I would not decorate the StreamSubscription. Don't be a subscription, have one.
There was a problem hiding this comment.
That requires a different method than listen as the API. What would you call it? What would you call the thing that has a subscription?
There was a problem hiding this comment.
As I suggested elsewhere, if you want to share a stream, you should decide before you start listening to it. Put that stream into a "Stream-sharer" object, get multiple output streams from that, and use it to switch and multiplex between the receivers.
Most of the problems here come from wanting to redirect the output of a stream subscription.
The only way to do that is to change the callbacks. You can't read the callbacks of a stream subscription, so if you overwrite them, the original is lost.
The solution here is to wrap everything in a layer that remembers those callbacks, so they can be put back later.
That's fragile, because it still uses the original subscription object.
If this class still stored the first listener's callbacks, but instead of putting them into the source subscription, it used its own functions, which then mutliplexed the events depending on whether the stream was diverted or not, either calling the stored callbacks (in their correct zone) or forwarding the events to the diversion controller, then that would reduce the need to overwrite the callbacks.
It's still the same API, and not the API I'd prefer.
You'd essentially have the implementation of something that can forward events to different targets, a multiplexing stream. Instead of always having this overhead, we could create a helper that wrapped any stream in that way, and do it at the stream level. Then you need to know if you want to divert before you start listening, but you can then also use it for other things than just this class.
Something like this: https://dartpad.dev/?id=f5dcdd50b1173632b85dd367b128be8b
There was a problem hiding this comment.
Alternative, since this is a SharedStdio which is already all about sharing:
Allow other listeners while the current one is paused.
That requires cooperation, because the current listener may be paused for any number of reasons. But if it deliberately pauses, then it's OK for someone else to listen on the shared stdio in the meantime and get any events coming in. The previous listener cannot resume while another listener is active.
The shared stdio should manage event buffering and never put an event into a paused subscription, where it will just be buffered and not available to another listener who should get events before.
The handover involves pausing, but that should also ensure that no events get lost or mis-delivered, they'll be buffered in the shared stream.
.... and I wrote that too: https://dartpad.dev/?id=cc2c2997eca0859a3bfefb39883325e7
|
There is actually another non-stdin use case in #2383 - the component also currently listens for sigint signals and will abort the dialog (and reset terminal state). But the main terminal application may also be listening for those signals to do its own handling, and currently we don't solve for that. This could take an abort stream instead of directly listening to |
|
I just realized I thought this was the Stdin stream in I'm still not convinced that the API is what I'd prefer, I think I'd prefer a separate object to handle the sharing. /// Cooperative shared access to the events of a stream.
///
/// Receives events from a source stream and sends them to a single subscriber.
///
/// The source stream is listened to when the first active stream listens.
/// The source stream subscription is paused while there is no active stream.
/// All streams close when the source stream closes, even those that are not
/// currently active.
class StreamShare<T> {
factory StreamShare(Stream<T> source) = ...;
/// Creates a new single-subscription stream which can receive events from the source stream.
///
/// If provided, the [onListen] callback is called when the returned stream is listened to,
/// The [onCancel], [onPause] and [onResume] are called when the listener
/// cancels, pauses or resumes listening.
/// The stream listener can do any of these while the stream is not active.
///
/// The returned stream will only receive events when it is active.
Stream<T> fork({void Function()? onListen, void Function()? onCancel, void Function()? onPause, void Function()? onResume});
/// Whether any stream is currently active.
bool get hasActive;
/// Whether [stream] is currently the active stream.
bool isActive(Stream<T> stream);
/// Makes [streamToActivate] the active stream.
/// Only allowed when there are no active streams.
///
/// If [from] is provided, it must be the current active stream.
/// It is then deactivated before [streamToActivate] is activated.
///
/// Returns whether the [streamToActivate] was made active.
/// That can fail to happen if the stream has already been listened
/// to and cancelled, or if it has not been listened to yet.
/// If so, there are no active stream. _So the return value is the
/// same value as [hasActive] after the call._
///
/// If [allowUnlistened] is set to `true`, the [streamToActivate] can be
/// made active before it has been listened to. In that case, the source stream will
/// be paused until that stream is listened to, or another stream is made active.
/// if `allowUnlistened` is `false`, activating an stream before it has been listened
/// to fails.
bool makeActive(Stream<T> streamToActivate, {Stream<T>? from, bool allowUnlistened = false});
/// Makes [streamToDeactivate] no longer be active.
///
/// The [streamToDeactivate] must be the currently active
/// stream, or there must be no active stream.
/// The latter allows a stream to cancel and stop being
/// active without
void makeInactive(Stream<T> streamToDeactivate);
/// Closes all stream and cancels the source stream subscription.
void closeAll();
/// Sets a callback which will be called when there is no active stream.
///
/// This callback is not called after calling [makeInactive],
/// only when the currently active stream cancels.
void onInactive(void Function() onInactive);
}That's ... probably overengineered. Very speculative. |
|
|
||
| @override | ||
| StreamSubscription<List<int>> listen( | ||
| SharedStdInSubscription listen( |
There was a problem hiding this comment.
It shouldn't be the subscription which creates a new stream. That feels like the abstraction layers are backwards.
(Which may be a symptom of SharedStdIn being a stream instead of having one. Bad design, copied from dart:io which at least had the excuse that it was written before we know, well, anything about good async design.)
We could keep a layer of indirection between the SharedStdIn and the subscriptions at all times, so you don't need the subscription to remember its own listeners, the underlying code remembers all the listeners and switches beween who gets the events.
Then you can, maybe, tell the SharedStdin itself that "this StreamSubscription" (which is currently the active one) should be paused, and a new Stream can be made (or listen of SharedStdIn can be called again, but that doesn't let you control who does it), and when that stream completes, the former subscription resumes handling events. (If it hasn't cancelled in the meantime.)
So SharedStdIn has the stack of active subscriptions (controllers internally) and it forwards to the topmost, and when that cancels, it continues to the next one. But it's in control, not the subscriptions.
Add a
SharedStdinSubscriptionwhich stores it's callbacks and has adivertmethod to create a new stream receiving events from stdin inplace of the original listener.