Skip to content

Commit c36771c

Browse files
author
Chris Ludden
authored
Merge pull request #2 from cludden/cludden/rpc-as-workflow-and-activity
feat!: supports multiple method option types
2 parents 9f6b2bf + e93bfab commit c36771c

8 files changed

Lines changed: 350 additions & 193 deletions

File tree

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
# protoc-gen-go-temporal
22

3-
a protobuf generator for temporal clients and workers
3+
a protoc plugin for generating temporal clients and workers from protobuf schemas
44

5-
based a fork of [github.com/cretz/temporal-sdk-go-advanced](https://github.com/cretz/temporal-sdk-go-advanced)
5+
inspired by [github.com/cretz/temporal-sdk-go-advanced](https://github.com/cretz/temporal-sdk-go-advanced)
66

77
## Getting Started
88
1. Install [buf](https://docs.buf.build/installation)

example/example_temporal.pb.go

Lines changed: 92 additions & 66 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

gen/simple/simple_temporal.pb.go

Lines changed: 128 additions & 89 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

internal/plugin/activities.go

Lines changed: 35 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ func (svc *Service) genRegisterActivities(f *g.File) {
4444
).
4545
BlockFunc(func(fn *g.Group) {
4646
for _, activity := range svc.activitiesOrdered {
47-
fn.Id(fmt.Sprintf("Register%s", activity)).Call(
47+
fn.Id(fmt.Sprintf("Register%sActivity", activity)).Call(
4848
g.Id("r"), g.Id("activities").Dot(activity),
4949
)
5050
}
@@ -56,8 +56,8 @@ func (svc *Service) genRegisterActivity(f *g.File, activity string) {
5656
method := svc.methods[activity]
5757
hasInput := !isEmpty(method.Input)
5858
hasOutput := !isEmpty(method.Output)
59-
f.Commentf("Register%s registers a %s activity", activity, activity)
60-
f.Func().Id(fmt.Sprintf("Register%s", activity)).
59+
f.Commentf("Register%sActivity registers a %s activity", activity, activity)
60+
f.Func().Id(fmt.Sprintf("Register%sActivity", activity)).
6161
Params(
6262
g.Id("r").Qual(workerPkg, "Registry"),
6363
g.Id("fn").Func().
@@ -77,7 +77,7 @@ func (svc *Service) genRegisterActivity(f *g.File, activity string) {
7777
Block(
7878
g.Id("r").Dot("RegisterActivityWithOptions").Call(
7979
g.Id("fn"), g.Qual(activityPkg, "RegisterOptions").Block(
80-
g.Id("Name").Op(":").Id(fmt.Sprintf("%sName", activity)).Op(","),
80+
g.Id("Name").Op(":").Id(fmt.Sprintf("%sActivityName", activity)).Op(","),
8181
),
8282
),
8383
)
@@ -205,6 +205,7 @@ func (svc *Service) genActivityFunction(f *g.File, activity string, local bool)
205205
g.Op("*").Id(fmt.Sprintf("%sFuture", method.GoName)),
206206
).
207207
BlockFunc(func(fn *g.Group) {
208+
// initialize activity options if nil
208209
fn.If(g.Id("opts").Op("==").Nil()).BlockFunc(func(bl *g.Group) {
209210
optionsFn := "GetActivityOptions"
210211
if local {
@@ -215,6 +216,8 @@ func (svc *Service) genActivityFunction(f *g.File, activity string, local bool)
215216
)
216217
bl.Id("opts").Op("=").Op("&").Id("activityOpts")
217218
})
219+
220+
// set default retry policy
218221
if policy := opts.GetRetryPolicy(); policy != nil {
219222
fn.If(g.Id("opts").Dot("RetryPolicy").Op("==").Nil()).Block(
220223
g.Id("opts").Dot("RetryPolicy").Op("=").Op("&").Qual(temporalPkg, "RetryPolicy").BlockFunc(func(fields *g.Group) {
@@ -236,27 +239,36 @@ func (svc *Service) genActivityFunction(f *g.File, activity string, local bool)
236239
}),
237240
)
238241
}
239-
if timeout := opts.GetHeartbeatTimeout(); timeout.IsValid() {
242+
243+
// set default heartbeat timeout
244+
if timeout := opts.GetHeartbeatTimeout(); !local && timeout.IsValid() {
240245
fn.If(g.Id("opts").Dot("HeartbeatTimeout").Op("==").Lit(0)).Block(
241246
g.Id("opts").Dot("HeartbeatTimeout").Op("=").Id(strconv.FormatInt(timeout.AsDuration().Nanoseconds(), 10)).Comment(timeout.AsDuration().String()),
242247
)
243248
}
244249

250+
// set default schedule to close timeout
245251
if timeout := opts.GetScheduleToCloseTimeout(); timeout.IsValid() {
246252
fn.If(g.Id("opts").Dot("ScheduleToCloseTimeout").Op("==").Lit(0)).Block(
247253
g.Id("opts").Dot("ScheduleToCloseTimeout").Op("=").Id(strconv.FormatInt(timeout.AsDuration().Nanoseconds(), 10)).Comment(timeout.AsDuration().String()),
248254
)
249255
}
250-
if timeout := opts.GetScheduleToStartTimeout(); timeout.IsValid() {
256+
257+
// set default schedule to start timeout
258+
if timeout := opts.GetScheduleToStartTimeout(); !local && timeout.IsValid() {
251259
fn.If(g.Id("opts").Dot("ScheduleToStartTimeout").Op("==").Lit(0)).Block(
252260
g.Id("opts").Dot("ScheduleToStartTimeout").Op("=").Id(strconv.FormatInt(timeout.AsDuration().Nanoseconds(), 10)).Comment(timeout.AsDuration().String()),
253261
)
254262
}
263+
264+
// set default start to close timeout
255265
if timeout := opts.GetStartToCloseTimeout(); timeout.IsValid() {
256266
fn.If(g.Id("opts").Dot("StartToCloseTimeout").Op("==").Lit(0)).Block(
257267
g.Id("opts").Dot("StartToCloseTimeout").Op("=").Id(strconv.FormatInt(timeout.AsDuration().Nanoseconds(), 10)).Comment(timeout.AsDuration().String()),
258268
)
259269
}
270+
271+
// inject ctx with activity options
260272
if local {
261273
fn.Id("ctx").Op("=").Qual(workflowPkg, "WithLocalActivityOptions").Call(
262274
g.Id("ctx"), g.Op("*").Id("opts"),
@@ -267,21 +279,35 @@ func (svc *Service) genActivityFunction(f *g.File, activity string, local bool)
267279
g.Id("ctx"), g.Op("*").Id("opts"),
268280
)
269281
}
282+
283+
// if activity function nil for local activity, replace with activity name
284+
if local {
285+
fn.Var().Id("activity").Any()
286+
fn.If(g.Id("fn").Op("==").Nil()).
287+
Block(
288+
g.Id("activity").Op("=").Id(fmt.Sprintf("%sActivityName", activity)),
289+
).
290+
Else().
291+
Block(
292+
g.Id("activity").Op("=").Id("fn"),
293+
)
294+
}
295+
270296
fn.Return(
271-
g.Op("&").Id(fmt.Sprintf("%sFuture", method.GoName)).BlockFunc(func(bl *g.Group) {
297+
g.Op("&").Id(fmt.Sprintf("%sFuture", method.GoName)).ValuesFunc(func(bl *g.Group) {
272298
future := bl.Id("Future").Op(":")
273299
if local {
274300
future.Qual(workflowPkg, "ExecuteLocalActivity").CallFunc(func(returnVals *g.Group) {
275301
returnVals.Id("ctx")
276-
returnVals.Id("fn")
302+
returnVals.Id("activity")
277303
if hasInput {
278304
returnVals.Id("req")
279305
}
280306
}).Op(",")
281307
} else {
282308
future.Qual(workflowPkg, "ExecuteActivity").CallFunc(func(returnVals *g.Group) {
283309
returnVals.Id("ctx")
284-
returnVals.Id(fmt.Sprintf("%sName", method.GoName))
310+
returnVals.Id(fmt.Sprintf("%sActivityName", method.GoName))
285311
if hasInput {
286312
returnVals.Id("req")
287313
}

internal/plugin/client.go

Lines changed: 71 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,29 @@ func (svc *Service) genClientInterface(f *g.File) {
1717
f.Type().Id("Client").InterfaceFunc(func(methods *g.Group) {
1818
for _, workflow := range svc.workflowsOrdered {
1919
opts := svc.workflows[workflow]
20-
// generate Execute<Workflow> method
20+
2121
method := svc.methods[workflow]
2222
hasInput := !isEmpty(method.Input)
23+
hasOutput := !isEmpty(method.Output)
24+
25+
// generate <Workflow> method
26+
methods.Commentf("%s executes a %s workflow and blocks until error or response received", workflow, workflow)
27+
methods.Id(workflow).
28+
ParamsFunc(func(args *g.Group) {
29+
args.Id("ctx").Qual("context", "Context")
30+
args.Id("opts").Op("*").Qual(clientPkg, "StartWorkflowOptions")
31+
if hasInput {
32+
args.Id("req").Op("*").Id(method.Input.GoIdent.GoName)
33+
}
34+
}).
35+
ParamsFunc(func(returnVals *g.Group) {
36+
if hasOutput {
37+
returnVals.Op("*").Id(method.Output.GoIdent.GoName)
38+
}
39+
returnVals.Error()
40+
})
41+
42+
// generate Execute<Workflow> method
2343
methods.Commentf("Execute%s executes a %s workflow", workflow, workflow)
2444
methods.Id(fmt.Sprintf("Execute%s", workflow)).
2545
ParamsFunc(func(args *g.Group) {
@@ -363,6 +383,51 @@ func (svc *Service) genClientConstructor(f *g.File) {
363383
)
364384
}
365385

386+
// genClientWorkflow generates an <Workflow> client method
387+
func (svc *Service) genClientWorkflow(f *g.File, workflow string) {
388+
method := svc.methods[workflow]
389+
hasInput := !isEmpty(method.Input)
390+
hasOutput := !isEmpty(method.Output)
391+
f.Commentf("%s executes a %s workflow and blocks until error or response received", workflow, workflow)
392+
f.Func().
393+
Params(g.Id("c").Op("*").Id("workflowClient")).
394+
Id(workflow).
395+
ParamsFunc(func(args *g.Group) {
396+
args.Id("ctx").Qual("context", "Context")
397+
args.Id("opts").Op("*").Qual(clientPkg, "StartWorkflowOptions")
398+
if hasInput {
399+
args.Id("req").Op("*").Id(method.Input.GoIdent.GoName)
400+
}
401+
}).
402+
ParamsFunc(func(returnVals *g.Group) {
403+
if hasOutput {
404+
returnVals.Op("*").Id(method.Output.GoIdent.GoName)
405+
}
406+
returnVals.Error()
407+
}).
408+
BlockFunc(func(fn *g.Group) {
409+
// execute workflow
410+
fn.Id("run").Op(",").Err().Op(":=").Id("c").Dot(fmt.Sprintf("Execute%s", workflow)).CallFunc(func(args *g.Group) {
411+
args.Id("ctx")
412+
args.Id("opts")
413+
if hasInput {
414+
args.Id("req")
415+
}
416+
})
417+
fn.If(g.Err().Op("!=").Nil()).Block(
418+
g.ReturnFunc(func(returnVals *g.Group) {
419+
if hasOutput {
420+
returnVals.Nil()
421+
}
422+
returnVals.Err()
423+
}),
424+
)
425+
fn.Return(
426+
g.Id("run").Dot("Get").Call(g.Id("ctx")),
427+
)
428+
})
429+
}
430+
366431
// genClientWorkflowExecute generates an Execute<Workflow> client method
367432
func (svc *Service) genClientWorkflowExecute(f *g.File, workflow string) {
368433
method := svc.methods[workflow]
@@ -391,7 +456,7 @@ func (svc *Service) genClientWorkflowExecute(f *g.File, workflow string) {
391456
fn.Id("run").Op(",").Err().Op(":=").Id("c").Dot("client").Dot("ExecuteWorkflow").CallFunc(func(args *g.Group) {
392457
args.Id("ctx")
393458
args.Op("*").Id("opts")
394-
args.Id(fmt.Sprintf("%sName", workflow))
459+
args.Id(fmt.Sprintf("%sWorkflowName", workflow))
395460
if hasInput {
396461
args.Id("req")
397462
}
@@ -473,14 +538,14 @@ func (svc *Service) genClientSignalWithStart(f *g.File, workflow, signal string)
473538
fn.Id("run").Op(",").Err().Op(":=").Id("c").Dot("client").Dot("SignalWithStartWorkflow").CallFunc(func(args *g.Group) {
474539
args.Id("ctx")
475540
args.Id("opts").Dot("ID")
476-
args.Id(fmt.Sprintf("%sName", signal))
541+
args.Id(fmt.Sprintf("%sSignalName", signal))
477542
if hasSignalInput {
478543
args.Id("signal")
479544
} else {
480545
args.Nil()
481546
}
482547
args.Op("*").Id("opts")
483-
args.Id(fmt.Sprintf("%sName", workflow))
548+
args.Id(fmt.Sprintf("%sWorkflowName", workflow))
484549
if hasWorkflowInput {
485550
args.Id("req")
486551
}
@@ -525,7 +590,7 @@ func (svc *Service) genClientQueryMethod(f *g.File, query string) {
525590
args.Id("ctx")
526591
args.Id("workflowID")
527592
args.Id("runID")
528-
args.Id(fmt.Sprintf("%sName", query))
593+
args.Id(fmt.Sprintf("%sQueryName", query))
529594
if hasInput {
530595
args.Id("query")
531596
}
@@ -570,7 +635,7 @@ func (svc *Service) genClientSignalMethod(f *g.File, signal string) {
570635
args.Id("ctx")
571636
args.Id("workflowID")
572637
args.Id("runID")
573-
args.Id(fmt.Sprintf("%sName", signal))
638+
args.Id(fmt.Sprintf("%sSignalName", signal))
574639
if hasInput {
575640
args.Id("signal")
576641
} else {

internal/plugin/service.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ func (svc *Service) render(f *g.File) {
9696
// generate client workflow methods
9797
for _, workflow := range svc.workflowsOrdered {
9898
opts := svc.workflows[workflow]
99+
svc.genClientWorkflow(f, workflow)
99100
svc.genClientWorkflowExecute(f, workflow)
100101
svc.genClientWorkflowGet(f, workflow)
101102
for _, signal := range opts.GetSignal() {
@@ -186,7 +187,7 @@ func (svc *Service) genConstants(f *g.File) {
186187
f.Const().DefsFunc(func(defs *g.Group) {
187188
for _, workflow := range svc.workflowsOrdered {
188189
method := svc.methods[workflow]
189-
defs.Id(fmt.Sprintf("%sName", workflow)).Op("=").Lit(string(method.Desc.FullName()))
190+
defs.Id(fmt.Sprintf("%sWorkflowName", workflow)).Op("=").Lit(fmt.Sprintf("%sWorkflow", string(method.Desc.FullName())))
190191
}
191192
})
192193
}
@@ -214,7 +215,7 @@ func (svc *Service) genConstants(f *g.File) {
214215
f.Const().DefsFunc(func(defs *g.Group) {
215216
for _, query := range svc.queriesOrdered {
216217
method := svc.methods[query]
217-
defs.Id(fmt.Sprintf("%sName", query)).Op("=").Lit(string(method.Desc.FullName()))
218+
defs.Id(fmt.Sprintf("%sQueryName", query)).Op("=").Lit(fmt.Sprintf("%sQuery", string(method.Desc.FullName())))
218219
}
219220
})
220221
}
@@ -225,7 +226,7 @@ func (svc *Service) genConstants(f *g.File) {
225226
f.Const().DefsFunc(func(defs *g.Group) {
226227
for _, signal := range svc.signalsOrdered {
227228
method := svc.methods[signal]
228-
defs.Id(fmt.Sprintf("%sName", signal)).Op("=").Lit(string(method.Desc.FullName()))
229+
defs.Id(fmt.Sprintf("%sSignalName", signal)).Op("=").Lit(fmt.Sprintf("%sSignal", string(method.Desc.FullName())))
229230
}
230231
})
231232
}
@@ -236,7 +237,7 @@ func (svc *Service) genConstants(f *g.File) {
236237
f.Const().DefsFunc(func(defs *g.Group) {
237238
for _, activity := range svc.activitiesOrdered {
238239
method := svc.methods[activity]
239-
defs.Id(fmt.Sprintf("%sName", activity)).Op("=").Lit(string(method.Desc.FullName()))
240+
defs.Id(fmt.Sprintf("%sActivityName", activity)).Op("=").Lit(fmt.Sprintf("%sActivity", string(method.Desc.FullName())))
240241
}
241242
})
242243
}

0 commit comments

Comments
 (0)