Description
gRPC is message based; it is often the case that consumers want to effectively mirror Stream
-like scenarios - client-to-server, server-to-client, or possibly even duplex. Individual consumers have typically hacked this in at the message layer using things like
[ProtoContract]
public class Fragment {
[ProtoMember(1)]
public byte[] Chunk {get;set;}
}
(or similar) - and manually transforming IAsyncEnumerable<Fragment>
into a suitable Stream
. That's a lot of machinery, that has a lot of complex timings, especially re memory lifetime, threading, etc.
Suggestion: allow Stream
and pipelines as library-level primitives, and let the library deal with all the implementation details. Advantages:
- shared supportable implementation available to all consumers
- can work at a lower level, with more direct access to the underlying gRPC primitives
- more appropriate timing details, in particular re the first chunk (avoiding
yield return
gymnastics)
We should ideally support both Stream
and "pipelines" usage, for example:
(these are candidate end-user service signatures that the library would detect; there is no API change on the library itself)
interface IByteStreamAPI
{
// note all methods support optional cancellation, headers, etc; ValueTask and Task to be supported interchangeably
Task ClientToServer_NoResponse(Stream clientToServer);
Task<SomeDto> ClientToServer_Response(Stream clientToServer);
Task<Stream> ServerToClient_NoArg();
Task<Stream> ServerToClient_Arg(SomeDto value);
Task ServerToClient_NoArg([Out] Stream serverToClient);
Task ServerToClient_Arg([Out] Stream serverToClient, SomeDto value);
Task<Stream> Duplex(Stream clientToServer);
Task Duplex(Stream clientToServer, [Out] Stream serverToClient);
Task Duplex_Single([In, Out] Stream duplex);
}
interface IBytePipeAPI
{
// note all methods support optional cancellation etc; ValueTask and Task to be supported interchangeably
Task ClientToServer_NoResponse(PipeReader clientToServer);
Task<SomeDto> ClientToServer_Response(PipeReader clientToServer);
Task<PipeReader> ServerToClient_NoArg();
Task<PipeReader> ServerToClient_Arg(SomeDto value);
Task ServerToClient_NoArg(PipeWriter serverToClient);
Task ServerToClient_Arg(PipeWriter serverToClient, SomeDto value);
Task<PipeReader> Duplex(PipeWriter clientToServer);
Task Duplex(PipeWriter clientToServer, PipeReader serverToClient);
Task Duplex(IDuplexPipe duplex);
}
I would suggest focusing on these two:
public Task<Stream> ServerToClient_NoArg();
public Task<Stream> ServerToClient_Arg(SomeDto value);
as an initial MVP/PoC to prove the idea. Example client usage:
IYourAPI yourProxy = GetProxy();
var arg = new YourRequestType { /* your request */ };
await using Stream response = await yourProxy.ServerToClient_Arg(arg);
From the client's perspective, the "await" is only the initial handshake and header metadata; the stream EOF would signal the full completion. From the server's perspective, it could simply return a simple Stream
i.e. File.OpenRead
etc, which would be processed in chunks, or it could create something more exotic like a Pipe
, creating a sub-task to write to the
.Writer
, and return .Reader.AsStream()
while the sub-task pushes data in the background. In reality, I expect mostly the first kind; if people want the Pipe
-style, they should probably use the Task<PipeReader>
approach!
Additional considerations might include annotation for things like maximum chunk-size and back-pressure, for example via a new attribute, something like
[return: ByteStream(MaxMessageBytes=2048, BufferBytes = 10240)]
public Task<Stream> ServerToClient_Arg(SomeDto value);
which would send at most 2048 bytes per chunk, and use 10k of local buffer before applying back-pressure at the sender.