Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions client_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ var (
// exported constructor function.
//
// When using this stream, request headers should be set via the [ClientStreamForClient.RequestHeader] method.
//
// Send is not safe to call concurrently.
type ClientStreamForClient[Req, Res any] struct {
conn StreamingClientConn
initializer maybeInitializer
Expand Down Expand Up @@ -110,6 +112,8 @@ func (c *ClientStreamForClient[Req, Res]) Conn() (StreamingClientConn, error) {
// In addition, the response returned by [ClientStreamForClientSimple.CloseAndReceive] is the response type defined for
// the stream and _not_ a Connect [Response] wrapper type. As a result, response headers/trailers should be read from
// the [CallInfo] object in context.
//
// Send is not safe to call concurrently.
type ClientStreamForClientSimple[Req, Res any] struct {
stream *ClientStreamForClient[Req, Res]
}
Expand Down Expand Up @@ -160,6 +164,8 @@ func (c *ClientStreamForClientSimple[Req, Res]) CloseAndReceive() (*Res, error)
//
// It's returned from [Client].CallServerStream, but doesn't currently have an
// exported constructor function.
//
// Receive is not safe to call concurrently.
type ServerStreamForClient[Res any] struct {
conn StreamingClientConn
initializer maybeInitializer
Expand Down Expand Up @@ -247,6 +253,9 @@ func (s *ServerStreamForClient[Res]) Conn() (StreamingClientConn, error) {
//
// It's returned from [Client].CallBidiStream, but doesn't currently have an
// exported constructor function.
//
// Send and Receive may be called from separate goroutines concurrently, but
// neither may be called concurrently with itself.
type BidiStreamForClient[Req, Res any] struct {
conn StreamingClientConn
initializer maybeInitializer
Expand Down Expand Up @@ -358,6 +367,9 @@ func (b *BidiStreamForClient[Req, Res]) Conn() (StreamingClientConn, error) {
//
// It's returned from [Client].CallBidiStream, but doesn't currently have an
// exported constructor function.
//
// Send and Receive may be called from separate goroutines concurrently, but
// neither may be called concurrently with itself.
type BidiStreamForClientSimple[Req, Res any] struct {
stream *BidiStreamForClient[Req, Res]
}
Expand Down
35 changes: 26 additions & 9 deletions connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,14 +87,24 @@ func (s StreamType) String() string {
// all returned errors can be cast to [*Error] using the standard library's
// [errors.As].
//
// StreamingHandlerConn implementations do not need to be safe for concurrent use.
// StreamingHandlerConn implementations provided by this module support limited
// concurrent use: the read side (Receive, RequestHeader) may be called
// concurrently with the write side (Send, ResponseHeader, ResponseTrailer), but
// the read side must not be called concurrently with itself, and the write side
// must not be called concurrently with itself.
type StreamingHandlerConn interface {
Spec() Spec
Peer() Peer

// Receive and RequestHeader form the read side of the stream. They are not
// safe to call concurrently with each other, but may be called concurrently
// with Send, ResponseHeader, and ResponseTrailer.
Receive(any) error
RequestHeader() http.Header

// Send, ResponseHeader, and ResponseTrailer form the write side of the
// stream. They are not safe to call concurrently with each other, but may
// be called concurrently with Receive and RequestHeader.
Send(any) error
ResponseHeader() http.Header
ResponseTrailer() http.Header
Expand All @@ -120,22 +130,29 @@ type StreamingHandlerConn interface {
// all returned errors can be cast to [*Error] using the standard library's
// [errors.As].
//
// In order to support bidirectional streaming RPCs, all StreamingClientConn
// implementations must support limited concurrent use. See the comments on
// each group of methods for details.
// StreamingClientConn implementations provided by this module support limited
// concurrent use: the read side (Receive, ResponseHeader, ResponseTrailer,
// CloseResponse) may be called concurrently with the write side (Send,
// RequestHeader, CloseRequest), but the read side must not be called
// concurrently with itself, and the write side must not be called concurrently
// with itself.
type StreamingClientConn interface {
// Spec and Peer must be safe to call concurrently with all other methods.
// Spec and Peer are safe to call concurrently with all other methods.
Spec() Spec
Peer() Peer

// Send, RequestHeader, and CloseRequest may race with each other, but must
// be safe to call concurrently with all other methods.
// Send, RequestHeader, and CloseRequest form the write side of the stream.
// They are not safe to call concurrently with each other, but may be called
// concurrently with Receive, ResponseHeader, ResponseTrailer, and
// CloseResponse.
Send(any) error
RequestHeader() http.Header
CloseRequest() error

// Receive, ResponseHeader, ResponseTrailer, and CloseResponse may race with
// each other, but must be safe to call concurrently with all other methods.
// Receive, ResponseHeader, ResponseTrailer, and CloseResponse form the read
// side of the stream. They are not safe to call concurrently with each
// other, but may be called concurrently with Send, RequestHeader, and
// CloseRequest.
Receive(any) error
ResponseHeader() http.Header
ResponseTrailer() http.Header
Expand Down
7 changes: 7 additions & 0 deletions handler_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
//
// It's constructed as part of [Handler] invocation, but doesn't currently have
// an exported constructor.
//
// Receive is not safe to call concurrently.
type ClientStream[Req any] struct {
conn StreamingHandlerConn
initializer maybeInitializer
Expand Down Expand Up @@ -90,6 +92,8 @@ func (c *ClientStream[Req]) Conn() StreamingHandlerConn {
//
// It's constructed as part of [Handler] invocation, but doesn't currently have
// an exported constructor.
//
// Send is not safe to call concurrently.
type ServerStream[Res any] struct {
conn StreamingHandlerConn
}
Expand Down Expand Up @@ -131,6 +135,9 @@ func (s *ServerStream[Res]) Conn() StreamingHandlerConn {
//
// It's constructed as part of [Handler] invocation, but doesn't currently have
// an exported constructor.
//
// Send and Receive may be called from separate goroutines concurrently, but
// neither may be called concurrently with itself.
type BidiStream[Req, Res any] struct {
conn StreamingHandlerConn
initializer maybeInitializer
Expand Down