-
Notifications
You must be signed in to change notification settings - Fork 231
Expand file tree
/
Copy pathinterceptor.go
More file actions
115 lines (99 loc) · 3.19 KB
/
interceptor.go
File metadata and controls
115 lines (99 loc) · 3.19 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
package workflow_security_interceptor
import (
"go.temporal.io/sdk/interceptor"
"go.temporal.io/sdk/temporal"
"go.temporal.io/sdk/workflow"
"time"
)
type workerInterceptor struct {
interceptor.WorkerInterceptorBase
}
func NewWorkerInterceptor() interceptor.WorkerInterceptor {
return &workerInterceptor{}
}
func (w *workerInterceptor) InterceptWorkflow(
ctx workflow.Context,
next interceptor.WorkflowInboundInterceptor,
) interceptor.WorkflowInboundInterceptor {
i := &workflowInboundInterceptor{}
i.Next = next
return i
}
type workflowInboundInterceptor struct {
interceptor.WorkflowInboundInterceptorBase
}
func (w *workflowInboundInterceptor) Init(next interceptor.WorkflowOutboundInterceptor) error {
i := &workflowOutboundInterceptor{}
i.Next = next
return w.Next.Init(i)
}
type workflowOutboundInterceptor struct {
interceptor.WorkflowOutboundInterceptorBase
}
func ValidateChildWorkflowTypeActivity(childWorkflowType string) (bool, error) {
return childWorkflowType == "ChildWorkflow", nil
}
type validatedChildWorkflowFuture struct {
fAllowed workflow.Future
sExecution workflow.Settable
fExecution workflow.Future
sResult workflow.Settable
fResult workflow.Future
child workflow.ChildWorkflowFuture
}
func NewValidatedChildWorkflowFuture(ctx workflow.Context, allowed workflow.Future) *validatedChildWorkflowFuture {
r := &validatedChildWorkflowFuture{}
r.fAllowed = allowed
r.fExecution, r.sExecution = workflow.NewFuture(ctx)
r.fResult, r.sResult = workflow.NewFuture(ctx)
return r
}
func (v validatedChildWorkflowFuture) Get(ctx workflow.Context, valuePtr interface{}) error {
return v.fResult.Get(ctx, valuePtr)
}
func (v validatedChildWorkflowFuture) IsReady() bool {
return v.fResult.IsReady()
}
func (v validatedChildWorkflowFuture) GetChildWorkflowExecution() workflow.Future {
return v.fExecution
}
func (v validatedChildWorkflowFuture) SignalChildWorkflow(ctx workflow.Context, signalName string, data interface{}) workflow.Future {
f, s := workflow.NewFuture(ctx)
workflow.Go(ctx, func(ctx workflow.Context) {
// wait for validation
err := v.fAllowed.Get(ctx, nil)
if err != nil {
s.SetError(err)
}
s.Chain(v.child.SignalChildWorkflow(ctx, signalName, data))
})
return f
}
func (w *workflowOutboundInterceptor) ExecuteChildWorkflow(
ctx workflow.Context,
childWorkflowType string,
args ...interface{},
) workflow.ChildWorkflowFuture {
aCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: time.Second * 10,
})
fAllowed := workflow.ExecuteActivity(aCtx, ValidateChildWorkflowTypeActivity, childWorkflowType)
result := NewValidatedChildWorkflowFuture(ctx, fAllowed)
workflow.Go(ctx, func(ctx workflow.Context) {
var allowed bool
err := fAllowed.Get(ctx, &allowed)
if err != nil {
result.sResult.SetError(err)
return
}
if !allowed {
result.sResult.SetError(temporal.NewApplicationError("Child workflow type \""+childWorkflowType+"\" not allowed", "not-allowed"))
return
}
childFuture := w.Next.ExecuteChildWorkflow(ctx, childWorkflowType, args...)
result.child = childFuture
result.sExecution.Chain(childFuture.GetChildWorkflowExecution())
result.sResult.Chain(childFuture)
})
return result
}