Skip to content

Commit 7ef6a12

Browse files
echistyakovmeta-codesync[bot]
authored andcommitted
Server-side streaming
Summary: Server-side streaming. ✅ Conformance tests. ✅ Unittests (with full coverage) ✅ SR tests downstream. #build_rule_type[go_library,go_binary,go_test] Reviewed By: podtserkovskiy Differential Revision: D83200596 fbshipit-source-id: 6128eb443d93a100448dffe52e26413634201f01
1 parent aca8de1 commit 7ef6a12

File tree

6 files changed

+852
-10
lines changed

6 files changed

+852
-10
lines changed

third-party/thrift/src/thrift/compiler/generate/templates/go/svc/processor_function.mustache

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,98 @@ func (p *{{> svc/proc_func_name}}) NewReqArgs() thrift.ReadableStruct {
3333
func (p *{{> svc/proc_func_name}}) RunContext(ctx context.Context, reqStruct thrift.ReadableStruct) (thrift.WritableStruct, error) {
3434
return nil, thrift.NewApplicationException(thrift.INTERNAL_ERROR, "not supported")
3535
}
36+
37+
func (p *{{> svc/proc_func_name}}) RunStreamContext(
38+
ctx context.Context,
39+
reqStruct thrift.ReadableStruct,
40+
onFirstResponse func(thrift.WritableStruct),
41+
onStreamNext func(thrift.WritableStruct),
42+
onStreamComplete func(),
43+
) {
44+
{{#function:params.fields?}}
45+
args := reqStruct.(*req{{service:go_name}}{{function:go_name}})
46+
{{/function:params.fields?}}
47+
firstResponse := newResp{{service:go_name}}{{function:go_name}}()
48+
{{#function:stream_has_first_response?}}retval, {{/function:stream_has_first_response?}}elemProducerFunc, initialErr := p.handler.{{function:go_name}}({{!
49+
}}ctx{{#if function:params.fields?}}, {{/if}}{{!
50+
}}{{#function:args}}{{!
51+
}}args.{{field:go_name}}{{!
52+
}}{{^last?}}, {{/last?}}{{!
53+
}}{{/function:args}})
54+
if initialErr != nil {
55+
{{#function:exceptions?}}
56+
switch v := initialErr.(type) {
57+
{{#function:exceptions}}
58+
case *{{#field:type}}{{> common/type}}{{/field:type}}:
59+
firstResponse.{{field:go_name}} = v
60+
onFirstResponse(firstResponse)
61+
{{/function:exceptions}}
62+
default:
63+
internalErr := fmt.Errorf("Internal error processing {{function:go_name}}: %w", initialErr)
64+
x := thrift.NewApplicationException(thrift.INTERNAL_ERROR, internalErr.Error())
65+
onFirstResponse(x)
66+
}
67+
{{/function:exceptions?}}
68+
{{^function:exceptions?}}
69+
internalErr := fmt.Errorf("Internal error processing {{function:go_name}}: %w", initialErr)
70+
x := thrift.NewApplicationException(thrift.INTERNAL_ERROR, internalErr.Error())
71+
onFirstResponse(x)
72+
{{/function:exceptions?}}
73+
onStreamComplete()
74+
return
75+
}
76+
77+
{{#function:stream_has_first_response?}}
78+
{{#function:stream_first_response_type}}
79+
firstResponse.{{function:retval_field_name}} = {{^type:nilable?}}&{{/type:nilable?}}retval
80+
{{/function:stream_first_response_type}}
81+
{{/function:stream_has_first_response?}}
82+
onFirstResponse(firstResponse)
83+
84+
{{#function:stream_elem_type}}
85+
fbthriftElemChan := make(chan {{#type:structured?}}*{{/type:structured?}}{{> common/type}}, thrift.DefaultStreamBufferSize)
86+
{{/function:stream_elem_type}}
87+
var senderWg sync.WaitGroup
88+
senderWg.Add(1)
89+
// Sender goroutine (receives elements on the channel and sends them out via onStreamNext)
90+
go func() {
91+
defer senderWg.Done()
92+
for elem := range fbthriftElemChan {
93+
streamWrapStruct := newStream{{service:go_name}}{{function:go_name}}()
94+
{{#function:stream_elem_type}}
95+
streamWrapStruct.{{function:retval_field_name}} = {{^type:nilable?}}&{{/type:nilable?}}elem
96+
{{/function:stream_elem_type}}
97+
onStreamNext(streamWrapStruct)
98+
}
99+
}()
100+
101+
streamErr := elemProducerFunc(ctx, fbthriftElemChan)
102+
// Stream is complete. Close the channel and wait for the sender goroutine to finish.
103+
close(fbthriftElemChan)
104+
senderWg.Wait()
105+
if streamErr != nil {
106+
{{#function:stream_exceptions?}}
107+
streamWrapStruct := newStream{{service:go_name}}{{function:go_name}}()
108+
switch v := streamErr.(type) {
109+
{{#function:stream_exceptions}}
110+
case *{{#field:type}}{{> common/type}}{{/field:type}}:
111+
streamWrapStruct.{{field:go_name}} = v
112+
onStreamNext(streamWrapStruct)
113+
{{/function:stream_exceptions}}
114+
default:
115+
internalErr := fmt.Errorf("Internal stream handler error {{function:go_name}}: %w", streamErr)
116+
x := thrift.NewApplicationException(thrift.INTERNAL_ERROR, internalErr.Error())
117+
onStreamNext(x)
118+
}
119+
{{/function:stream_exceptions?}}
120+
{{^function:stream_exceptions?}}
121+
internalErr := fmt.Errorf("Internal stream handler error {{function:go_name}}: %w", streamErr)
122+
x := thrift.NewApplicationException(thrift.INTERNAL_ERROR, internalErr.Error())
123+
onStreamNext(x)
124+
{{/function:stream_exceptions?}}
125+
}
126+
onStreamComplete()
127+
}
36128
{{#else}}
37129
func (p *{{> svc/proc_func_name}}) RunContext(ctx context.Context, reqStruct thrift.ReadableStruct) (thrift.WritableStruct, error) {
38130
{{#function:params.fields?}}

third-party/thrift/src/thrift/compiler/test/fixtures/doctext/out/go/gen-go/svcs.go

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,50 @@ func (p *procFuncCNumbers) RunContext(ctx context.Context, reqStruct thrift.Read
231231
return nil, thrift.NewApplicationException(thrift.INTERNAL_ERROR, "not supported")
232232
}
233233

234+
func (p *procFuncCNumbers) RunStreamContext(
235+
ctx context.Context,
236+
reqStruct thrift.ReadableStruct,
237+
onFirstResponse func(thrift.WritableStruct),
238+
onStreamNext func(thrift.WritableStruct),
239+
onStreamComplete func(),
240+
) {
241+
firstResponse := newRespCNumbers()
242+
elemProducerFunc, initialErr := p.handler.Numbers(ctx)
243+
if initialErr != nil {
244+
internalErr := fmt.Errorf("Internal error processing Numbers: %w", initialErr)
245+
x := thrift.NewApplicationException(thrift.INTERNAL_ERROR, internalErr.Error())
246+
onFirstResponse(x)
247+
onStreamComplete()
248+
return
249+
}
250+
251+
onFirstResponse(firstResponse)
252+
253+
fbthriftElemChan := make(chan Number, thrift.DefaultStreamBufferSize)
254+
var senderWg sync.WaitGroup
255+
senderWg.Add(1)
256+
// Sender goroutine (receives elements on the channel and sends them out via onStreamNext)
257+
go func() {
258+
defer senderWg.Done()
259+
for elem := range fbthriftElemChan {
260+
streamWrapStruct := newStreamCNumbers()
261+
streamWrapStruct.Success = &elem
262+
onStreamNext(streamWrapStruct)
263+
}
264+
}()
265+
266+
streamErr := elemProducerFunc(ctx, fbthriftElemChan)
267+
// Stream is complete. Close the channel and wait for the sender goroutine to finish.
268+
close(fbthriftElemChan)
269+
senderWg.Wait()
270+
if streamErr != nil {
271+
internalErr := fmt.Errorf("Internal stream handler error Numbers: %w", streamErr)
272+
x := thrift.NewApplicationException(thrift.INTERNAL_ERROR, internalErr.Error())
273+
onStreamNext(x)
274+
}
275+
onStreamComplete()
276+
}
277+
234278

235279
type procFuncCThing struct {
236280
handler C

0 commit comments

Comments
 (0)