Skip to content

Commit 13a7e9c

Browse files
echistyakovmeta-codesync[bot]
authored andcommitted
Update rsocket-go
Summary: rsocket/rsocket-go#146 Reviewed By: podtserkovskiy Differential Revision: D93109728 fbshipit-source-id: 29de69ecfead9341ff39291fa8a3156f9768f17f
1 parent 0dba81f commit 13a7e9c

File tree

1 file changed

+2
-5
lines changed

1 file changed

+2
-5
lines changed

thrift/lib/go/thrift/rocket_rsocket_client.go

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -348,10 +348,7 @@ func (r *rsocketClient) RequestChannel(
348348
sendingFlux := flux.Create(func(fluxCtx context.Context, sink flux.Sink) {
349349
defer close(sendingDoneChan)
350350

351-
// First, send the initial request payload
352-
sink.Next(request)
353-
354-
// Then, send payloads from the sink channel until it's closed or context is done
351+
// Send payloads from the sink channel until it's closed or context is done
355352
for {
356353
select {
357354
case p, ok := <-sinkPayloadChan:
@@ -368,7 +365,7 @@ func (r *rsocketClient) RequestChannel(
368365
}
369366
})
370367

371-
receivingFlux := r.client.RequestChannel(sendingFlux)
368+
receivingFlux := r.client.RequestChannel(request, sendingFlux)
372369

373370
channelCtx, channelCancel := context.WithCancel(ctx)
374371
receivingPayloadChan, receivingErrChan := receivingFlux.ToChan(channelCtx, types.DefaultStreamBufferSize)

0 commit comments

Comments
 (0)