Skip to content
7 changes: 4 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
module go.temporal.io/sdk

go 1.23.0

toolchain go1.23.6
go 1.25.4

require (
github.com/facebookgo/clock v0.0.0-20150410010913-600d898af40a
Expand All @@ -25,6 +23,7 @@ require (
require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0 // indirect
github.com/nexus-rpc/nexus-proto-annotations v0.0.0-20260330194009-e558d6edaf84 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/stretchr/objx v0.5.2 // indirect
golang.org/x/net v0.39.0 // indirect
Expand All @@ -33,3 +32,5 @@ require (
google.golang.org/genproto/googleapis/rpc v0.0.0-20240827150818-7e3bb234dfed // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

replace go.temporal.io/api => ../temporal-api-go
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/nexus-rpc/nexus-proto-annotations v0.0.0-20260330194009-e558d6edaf84 h1:SWHt3Coj0VvF0Km1A0wlY+IjnHKsjQLgO29io84r3wY=
github.com/nexus-rpc/nexus-proto-annotations v0.0.0-20260330194009-e558d6edaf84/go.mod h1:n3UjF1bPCW8llR8tHvbxJ+27yPWrhpo8w/Yg1IOuY0Y=
github.com/nexus-rpc/sdk-go v0.6.0 h1:QRgnP2zTbxEbiyWG/aXH8uSC5LV/Mg1fqb19jb4DBlo=
github.com/nexus-rpc/sdk-go v0.6.0/go.mod h1:FHdPfVQwRuJFZFTF0Y2GOAxCrbIBNrcPna9slkGKPYk=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
Expand All @@ -35,8 +37,6 @@ github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
go.temporal.io/api v1.62.5 h1:9R/9CeyM7xqHSlsNt+QIvapQLcRxCZ38bnXQx4mCN6I=
go.temporal.io/api v1.62.5/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
Expand Down
12 changes: 12 additions & 0 deletions internal/internal_task_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ type (
workflowTaskCompletion struct {
rawRequest proto.Message
applyCompletionMetrics func()
outboundDataConverter converter.DataConverter
}
)

Expand Down Expand Up @@ -2010,9 +2011,20 @@ func (wth *workflowTaskHandlerImpl) completeWorkflow(
metricsHandler.Timer(metrics.WorkflowEndToEndLatency).Record(elapsed)
}
},
outboundDataConverter: workflowTaskCompletionDataConverter(eventHandler, wth.dataConverter),
}
}

func workflowTaskCompletionDataConverter(
eventHandler *workflowExecutionEventHandlerImpl,
defaultConverter converter.DataConverter,
) converter.DataConverter {
if syncDefinition, ok := eventHandler.workflowDefinition.(*syncWorkflowDefinition); ok && syncDefinition.rootCtx != nil {
return getDataConverterFromWorkflowContext(syncDefinition.rootCtx)
}
return defaultConverter
}

func (wth *workflowTaskHandlerImpl) executeAnyPressurePoints(event *historypb.HistoryEvent, isInReplay bool) error {
if wth.ppMgr != nil && !reflect.ValueOf(wth.ppMgr).IsNil() && !isInReplay {
switch event.GetEventType() {
Expand Down
8 changes: 7 additions & 1 deletion internal/internal_task_pollers.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,10 @@ func newWorkflowTaskProcessor(
numNormalPollerMetric: newNumPollerMetric(params.MetricsHandler, metrics.PollerTypeWorkflowTask),
numStickyPollerMetric: newNumPollerMetric(params.MetricsHandler, metrics.PollerTypeWorkflowStickyTask),
inboundPayloadVisitor: params.inboundPayloadVisitor,
outboundPayloadVisitor: params.outboundPayloadVisitor,
outboundPayloadVisitor: newSystemNexusOutboundPayloadVisitor(
params.DataConverter,
params.outboundPayloadVisitor,
),
}
}

Expand Down Expand Up @@ -548,6 +551,9 @@ func (wtp *workflowTaskProcessor) RespondTaskCompletedWithMetrics(

uploadPayloadMetrics := &workflowTaskStorageMetrics{}
ctx := context.WithValue(context.Background(), storageOperationCallbackContextKey, uploadPayloadMetrics)
if taskCompletion.outboundDataConverter != nil {
ctx = context.WithValue(ctx, systemNexusPayloadConverterContextKey, taskCompletion.outboundDataConverter)
}
if err = visitProtoPayloads(ctx, wtp.outboundPayloadVisitor, taskCompletion.rawRequest); err != nil {
// The outbound visitor failed (e.g. storage driver error or panic). We
// cannot send the original response, so fall back to an explicit WFT
Expand Down
111 changes: 111 additions & 0 deletions internal/system_nexus.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package internal

import (
"context"

commandpb "go.temporal.io/api/command/v1"
commonpb "go.temporal.io/api/common/v1"
"go.temporal.io/api/proxy"
systemnexus "go.temporal.io/api/workflowservice/v1/workflowservicenexus/json"

"go.temporal.io/sdk/converter"
)

const systemNexusPayloadConverterContextKey contextKey = "systemNexusPayloadConverter"

func getSystemNexusPayloadConverter() converter.DataConverter {
return converter.GetDefaultDataConverter()
}

type systemNexusOutboundPayloadVisitor struct {
dataConverter converter.DataConverter
next PayloadVisitor
}

func newSystemNexusOutboundPayloadVisitor(
dataConverter converter.DataConverter,
next PayloadVisitor,
) PayloadVisitor {
return &systemNexusOutboundPayloadVisitor{
dataConverter: dataConverter,
next: next,
}
}

func (v *systemNexusOutboundPayloadVisitor) Visit(
ctx *proxy.VisitPayloadsContext,
payloads []*commonpb.Payload,
) ([]*commonpb.Payload, error) {
attrs, ok := ctx.Parent.(*commandpb.ScheduleNexusOperationCommandAttributes)
if ok &&
ctx.SinglePayloadRequired &&
len(payloads) == 1 &&
systemnexus.IsTemporalNexusOperation(attrs.GetService(), attrs.GetOperation()) {
return v.rewriteSystemNexusPayload(ctx, attrs.GetService(), attrs.GetOperation(), payloads[0])
}
if v.next == nil {
return payloads, nil
}
return v.next.Visit(ctx, payloads)
}

func (v *systemNexusOutboundPayloadVisitor) rewriteSystemNexusPayload(
visitCtx *proxy.VisitPayloadsContext,
service string,
operation string,
payload *commonpb.Payload,
) ([]*commonpb.Payload, error) {
visitor := systemnexus.GetTemporalNexusPayloadVisitor(service, operation)
if visitor == nil {
return []*commonpb.Payload{payload}, nil
}

rewrittenPayload, err := visitor(payload, func(
nestedPayloads []*commonpb.Payload,
) ([]*commonpb.Payload, error) {
encodedPayloads, err := encodeSystemNexusNestedPayloads(
v.dataConverterForContext(visitCtx.Context),
nestedPayloads,
)
if err != nil {
return nil, err
}
if v.next == nil {
return encodedPayloads, nil
}
return v.next.Visit(&proxy.VisitPayloadsContext{Context: visitCtx.Context}, encodedPayloads)
}, false)
if err != nil {
return nil, err
}
return []*commonpb.Payload{rewrittenPayload}, nil
}

func (v *systemNexusOutboundPayloadVisitor) dataConverterForContext(ctx context.Context) converter.DataConverter {
if ctx != nil {
if dc, ok := ctx.Value(systemNexusPayloadConverterContextKey).(converter.DataConverter); ok && dc != nil {
return dc
}
}
return v.dataConverter
}

func encodeSystemNexusNestedPayloads(
dataConverter converter.DataConverter,
payloads []*commonpb.Payload,
) ([]*commonpb.Payload, error) {
if len(payloads) == 0 {
return nil, nil
}

rawPayloads := make([]interface{}, len(payloads))
for i, payload := range payloads {
rawPayloads[i] = converter.NewRawValue(payload)
}

encoded, err := dataConverter.ToPayloads(rawPayloads...)
if err != nil || encoded == nil {
return nil, err
}
return encoded.Payloads, nil
}
Loading