-
Notifications
You must be signed in to change notification settings - Fork 298
Expand file tree
/
Copy pathsystem_nexus.go
More file actions
111 lines (97 loc) · 2.96 KB
/
system_nexus.go
File metadata and controls
111 lines (97 loc) · 2.96 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
package internal
import (
"context"
commandpb "go.temporal.io/api/command/v1"
commonpb "go.temporal.io/api/common/v1"
"go.temporal.io/api/proxy"
systemnexus "go.temporal.io/api/workflowservice/v1/workflowservicenexus/json"
"go.temporal.io/sdk/converter"
)
const systemNexusPayloadConverterContextKey contextKey = "systemNexusPayloadConverter"
func getSystemNexusPayloadConverter() converter.DataConverter {
return converter.GetDefaultDataConverter()
}
type systemNexusOutboundPayloadVisitor struct {
dataConverter converter.DataConverter
next PayloadVisitor
}
func newSystemNexusOutboundPayloadVisitor(
dataConverter converter.DataConverter,
next PayloadVisitor,
) PayloadVisitor {
return &systemNexusOutboundPayloadVisitor{
dataConverter: dataConverter,
next: next,
}
}
func (v *systemNexusOutboundPayloadVisitor) Visit(
ctx *proxy.VisitPayloadsContext,
payloads []*commonpb.Payload,
) ([]*commonpb.Payload, error) {
attrs, ok := ctx.Parent.(*commandpb.ScheduleNexusOperationCommandAttributes)
if ok &&
ctx.SinglePayloadRequired &&
len(payloads) == 1 &&
systemnexus.IsTemporalNexusOperation(attrs.GetService(), attrs.GetOperation()) {
return v.rewriteSystemNexusPayload(ctx, attrs.GetService(), attrs.GetOperation(), payloads[0])
}
if v.next == nil {
return payloads, nil
}
return v.next.Visit(ctx, payloads)
}
func (v *systemNexusOutboundPayloadVisitor) rewriteSystemNexusPayload(
visitCtx *proxy.VisitPayloadsContext,
service string,
operation string,
payload *commonpb.Payload,
) ([]*commonpb.Payload, error) {
visitor := systemnexus.GetTemporalNexusPayloadVisitor(service, operation)
if visitor == nil {
return []*commonpb.Payload{payload}, nil
}
rewrittenPayload, err := visitor(payload, func(
nestedPayloads []*commonpb.Payload,
) ([]*commonpb.Payload, error) {
encodedPayloads, err := encodeSystemNexusNestedPayloads(
v.dataConverterForContext(visitCtx.Context),
nestedPayloads,
)
if err != nil {
return nil, err
}
if v.next == nil {
return encodedPayloads, nil
}
return v.next.Visit(&proxy.VisitPayloadsContext{Context: visitCtx.Context}, encodedPayloads)
}, false)
if err != nil {
return nil, err
}
return []*commonpb.Payload{rewrittenPayload}, nil
}
func (v *systemNexusOutboundPayloadVisitor) dataConverterForContext(ctx context.Context) converter.DataConverter {
if ctx != nil {
if dc, ok := ctx.Value(systemNexusPayloadConverterContextKey).(converter.DataConverter); ok && dc != nil {
return dc
}
}
return v.dataConverter
}
func encodeSystemNexusNestedPayloads(
dataConverter converter.DataConverter,
payloads []*commonpb.Payload,
) ([]*commonpb.Payload, error) {
if len(payloads) == 0 {
return nil, nil
}
rawPayloads := make([]interface{}, len(payloads))
for i, payload := range payloads {
rawPayloads[i] = converter.NewRawValue(payload)
}
encoded, err := dataConverter.ToPayloads(rawPayloads...)
if err != nil || encoded == nil {
return nil, err
}
return encoded.Payloads, nil
}