Fix broken streaming responses (deadlocks and cast errors)#111
Fix broken streaming responses (deadlocks and cast errors)#111
Conversation
The previous implementation was broken in two ways: 1. CallableResponse used StreamController<String>, but Shelf Response expects bytes (List<int>), causing runtime type cast errors. 2. HttpsNamespace was awaiting closeStream() before returning the response, causing deadlocks. Changes: - Refactored CallableResponse to StreamController<List<int>>. - Used json.fuse(utf8).encode for direct byte encoding. - Switched to unawaited(closeStream()) in HttpsNamespace. - Ported and adapted unit tests for HttpsError compatibility.
There was a problem hiding this comment.
Code Review
This pull request updates the SSE streaming implementation to use byte-based data transmission instead of strings, improves efficiency in data encoding, and ensures stream closure is handled asynchronously. Feedback was provided regarding the memory inefficiency of using list literals for byte concatenation, suggesting the use of BytesBuilder instead, and a concern was raised about the potential premature clearing of the heartbeat timer due to the use of unawaited stream closure.
I am having trouble creating individual review comments. Click here to see my feedback.
lib/src/https/callable.dart (348-352)
Using a list literal with spread operators ([...]) on byte arrays creates a standard List<int>. In the Dart VM, this is an array of pointers/objects (8 bytes per element on 64-bit) rather than a packed byte array (Uint8List). This results in significant memory bloat and extra processing overhead when the stream is consumed. Using BytesBuilder is much more efficient for concatenating bytes.
return (BytesBuilder(copy: false)
..add(_dataPrefix)
..add(jsonBytes)
..add(_newlineSuffix))
.takeBytes();
lib/src/https/https_namespace.dart (281)
By making closeStream() unawaited, the finally block at line 308 will now execute immediately after the handler returns. This clears the heartbeat timer prematurely, which may cause long-running streams (e.g., those started via response.stream()) to time out if no other data is sent for a while.
To fix this, consider moving the clearHeartbeat() call from the finally block (line 308) into the closeStream() method in callable.dart. This ensures heartbeats continue as long as the stream is active, even after the handler has returned.
| try { | ||
| final formattedData = _encodeSSE({'message': chunk}); | ||
| _streamController!.add(formattedData); | ||
| _streamController!.add(_encodeSSE({'result': chunk})); |
There was a problem hiding this comment.
@gemini-code-assist - is this the right event format?
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request improves the Server-Sent Events (SSE) streaming implementation in the callable functions module. Key changes include updating the stream controller to handle byte lists, adding robust error handling for data streams to ensure errors are properly reported as SSE responses, implementing a heartbeat mechanism to keep connections alive, and refining stream lifecycle management. Additionally, comprehensive unit tests have been added to verify streaming behavior, error handling, and client disconnection scenarios. I have no feedback to provide as the changes appear well-implemented and thoroughly tested.
The previous implementation was broken in two ways:
Changes: