diff --git a/converter/system_nexus_data_converter.go b/converter/system_nexus_data_converter.go new file mode 100644 index 000000000..d776f5cfb --- /dev/null +++ b/converter/system_nexus_data_converter.go @@ -0,0 +1,110 @@ +package converter + +import ( + "errors" + "fmt" + + commonpb "go.temporal.io/api/common/v1" + systemnexus "go.temporal.io/api/workflowservice/v1/workflowservicenexus/json" +) + +// SystemNexusDataConverter converts annotated generated system Nexus envelopes +// through their backing proto messages. It is intentionally narrow and only +// supports annotated generated system Nexus request/response types. +type SystemNexusDataConverter struct { + protoJSON *ProtoJSONPayloadConverter +} + +// NewSystemNexusDataConverter returns a data converter for annotated generated +// system Nexus outer envelopes. +func NewSystemNexusDataConverter() DataConverter { + return &SystemNexusDataConverter{ + protoJSON: NewProtoJSONPayloadConverter(), + } +} + +func (c *SystemNexusDataConverter) ToPayload(value interface{}) (*commonpb.Payload, error) { + if value == nil || systemnexus.GetTemporalNexusProtoMessage(value) == nil { + return nil, errors.New("system nexus data converter only supports annotated generated types") + } + message, err := systemnexus.ToTemporalNexusProto(value) + if err != nil { + return nil, err + } + return c.protoJSON.ToPayload(message) +} + +func (c *SystemNexusDataConverter) FromPayload(payload *commonpb.Payload, valuePtr interface{}) error { + if payload == nil { + return nil + } + if valuePtr == nil { + return errors.New("system nexus data converter requires a destination value") + } + message := systemnexus.GetTemporalNexusProtoMessage(valuePtr) + if message == nil { + return errors.New("system nexus data converter only supports annotated generated types") + } + if err := c.protoJSON.FromPayload(payload, message); err != nil { + return err + } + return systemnexus.FromTemporalNexusProto(message, valuePtr) +} + +func (c *SystemNexusDataConverter) ToPayloads(values ...interface{}) (*commonpb.Payloads, error) { + if len(values) == 0 { + return nil, nil + } + + result := &commonpb.Payloads{} + for i, value := range values { + rawValue, ok := value.(RawValue) + if ok { + result.Payloads = append(result.Payloads, rawValue.Payload()) + continue + } + payload, err := c.ToPayload(value) + if err != nil { + return nil, fmt.Errorf("values[%d]: %w", i, err) + } + result.Payloads = append(result.Payloads, payload) + } + return result, nil +} + +func (c *SystemNexusDataConverter) FromPayloads(payloads *commonpb.Payloads, valuePtrs ...interface{}) error { + if payloads == nil { + return nil + } + for i, payload := range payloads.GetPayloads() { + if i >= len(valuePtrs) { + break + } + rawValue, ok := valuePtrs[i].(*RawValue) + if ok { + *rawValue = NewRawValue(payload) + continue + } + if err := c.FromPayload(payload, valuePtrs[i]); err != nil { + return fmt.Errorf("payload item %d: %w", i, err) + } + } + return nil +} + +func (c *SystemNexusDataConverter) ToString(input *commonpb.Payload) string { + return c.protoJSON.ToString(input) +} + +func (c *SystemNexusDataConverter) ToStrings(input *commonpb.Payloads) []string { + if input == nil { + return nil + } + result := make([]string, len(input.Payloads)) + for i, payload := range input.Payloads { + result[i] = c.ToString(payload) + } + return result +} + +var _ DataConverter = (*SystemNexusDataConverter)(nil) diff --git a/converter/system_nexus_data_converter_test.go b/converter/system_nexus_data_converter_test.go new file mode 100644 index 000000000..1d1f34cec --- /dev/null +++ b/converter/system_nexus_data_converter_test.go @@ -0,0 +1,83 @@ +package converter + +import ( + "testing" + + "github.com/stretchr/testify/require" + commonpb "go.temporal.io/api/common/v1" + workflowservicev1 "go.temporal.io/api/workflowservice/v1" + systemnexus "go.temporal.io/api/workflowservice/v1/workflowservicenexus/json" + "google.golang.org/protobuf/proto" +) + +func TestSystemNexusDataConverter(t *testing.T) { + t.Parallel() + + dc := NewSystemNexusDataConverter() + value := systemnexus.SignalWithStartWorkflowExecutionRequest{ + Namespace: "default", + WorkflowID: "workflow-id", + SignalName: "signal-name", + Input: &systemnexus.Payloads{ + Payloads: []systemnexus.Payload{{ + Data: "d29ya2Zsb3ctaW5wdXQ=", + Metadata: map[string]string{ + "encoding": "anNvbi9wbGFpbg==", + }, + }}, + }, + } + + payload, err := dc.ToPayload(value) + require.NoError(t, err) + require.Equal(t, []byte(MetadataEncodingProtoJSON), payload.Metadata[MetadataEncoding]) + require.Equal( + t, + []byte((&workflowservicev1.SignalWithStartWorkflowExecutionRequest{}).ProtoReflect().Descriptor().FullName()), + payload.Metadata[MetadataMessageType], + ) + + var decoded systemnexus.SignalWithStartWorkflowExecutionRequest + require.NoError(t, dc.FromPayload(payload, &decoded)) + require.Equal(t, value, decoded) + + protoMessage, err := systemnexus.ToTemporalNexusProto(value) + require.NoError(t, err) + decodedProto := &workflowservicev1.SignalWithStartWorkflowExecutionRequest{} + require.NoError(t, GetDefaultDataConverter().FromPayload(payload, decodedProto)) + require.True(t, proto.Equal(protoMessage, decodedProto)) +} + +func TestSystemNexusDataConverterToPayloads(t *testing.T) { + t.Parallel() + + dc := NewSystemNexusDataConverter() + payloads, err := dc.ToPayloads( + systemnexus.SignalWithStartWorkflowExecutionResponse{RunID: "run-id"}, + ) + require.NoError(t, err) + require.Len(t, payloads.Payloads, 1) + require.Equal(t, []byte(MetadataEncodingProtoJSON), payloads.Payloads[0].Metadata[MetadataEncoding]) + + var response systemnexus.SignalWithStartWorkflowExecutionResponse + require.NoError(t, dc.FromPayloads(&commonpb.Payloads{Payloads: payloads.Payloads}, &response)) + require.Equal(t, "run-id", response.RunID) +} + +func TestSystemNexusDataConverterRejectsUnannotatedTypes(t *testing.T) { + t.Parallel() + + dc := NewSystemNexusDataConverter() + + _, err := dc.ToPayload("plain-value") + require.Error(t, err) + require.ErrorContains(t, err, "annotated generated types") + + payload, err := GetDefaultDataConverter().ToPayload("plain-value") + require.NoError(t, err) + + var plain string + err = dc.FromPayload(payload, &plain) + require.Error(t, err) + require.ErrorContains(t, err, "annotated generated types") +} diff --git a/go.mod b/go.mod index 2b47cf567..3feaee5cd 100644 --- a/go.mod +++ b/go.mod @@ -1,8 +1,6 @@ module go.temporal.io/sdk -go 1.23.0 - -toolchain go1.23.6 +go 1.25.4 require ( github.com/facebookgo/clock v0.0.0-20150410010913-600d898af40a @@ -25,6 +23,7 @@ require ( require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0 // indirect + github.com/nexus-rpc/nexus-proto-annotations v0.0.0-20260330194009-e558d6edaf84 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/stretchr/objx v0.5.2 // indirect golang.org/x/net v0.39.0 // indirect @@ -33,3 +32,5 @@ require ( google.golang.org/genproto/googleapis/rpc v0.0.0-20240827150818-7e3bb234dfed // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) + +replace go.temporal.io/api => ../temporal-api-go diff --git a/go.sum b/go.sum index fd9193a8e..0cf545b9a 100644 --- a/go.sum +++ b/go.sum @@ -20,6 +20,8 @@ github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/nexus-rpc/nexus-proto-annotations v0.0.0-20260330194009-e558d6edaf84 h1:SWHt3Coj0VvF0Km1A0wlY+IjnHKsjQLgO29io84r3wY= +github.com/nexus-rpc/nexus-proto-annotations v0.0.0-20260330194009-e558d6edaf84/go.mod h1:n3UjF1bPCW8llR8tHvbxJ+27yPWrhpo8w/Yg1IOuY0Y= github.com/nexus-rpc/sdk-go v0.6.0 h1:QRgnP2zTbxEbiyWG/aXH8uSC5LV/Mg1fqb19jb4DBlo= github.com/nexus-rpc/sdk-go v0.6.0/go.mod h1:FHdPfVQwRuJFZFTF0Y2GOAxCrbIBNrcPna9slkGKPYk= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -35,8 +37,6 @@ github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= -go.temporal.io/api v1.62.5 h1:9R/9CeyM7xqHSlsNt+QIvapQLcRxCZ38bnXQx4mCN6I= -go.temporal.io/api v1.62.5/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= diff --git a/interceptor/tracing_interceptor.go b/interceptor/tracing_interceptor.go index 91fa489c0..e43833b1b 100644 --- a/interceptor/tracing_interceptor.go +++ b/interceptor/tracing_interceptor.go @@ -711,6 +711,28 @@ func (t *tracingWorkflowOutboundInterceptor) SignalExternalWorkflow( return t.Next.SignalExternalWorkflow(ctx, workflowID, runID, signalName, arg) } +func (t *tracingWorkflowOutboundInterceptor) SignalWithStartWorkflow( + ctx workflow.Context, + workflowID string, + signalName string, + signalArg interface{}, + options workflow.StartWorkflowOptions, + workflowType string, + workflowArgs ...interface{}, +) workflow.Future { + if !t.root.options.DisableSignalTracing { + var span TracerSpan + var futErr workflow.Future + span, ctx, futErr = t.startNonReplaySpan(ctx, "SignalWithStartWorkflow", signalName, false, t.root.workflowHeaderWriter(ctx)) + if futErr != nil { + return futErr + } + defer span.Finish(&TracerFinishSpanOptions{}) + } + + return t.Next.SignalWithStartWorkflow(ctx, workflowID, signalName, signalArg, options, workflowType, workflowArgs...) +} + func (t *tracingWorkflowOutboundInterceptor) SignalChildWorkflow( ctx workflow.Context, workflowID string, diff --git a/internal/interceptor.go b/internal/interceptor.go index dbccfb945..a06e5edf5 100644 --- a/internal/interceptor.go +++ b/internal/interceptor.go @@ -272,6 +272,18 @@ type WorkflowOutboundInterceptor interface { // interceptor.WorkflowHeader will return a non-nil map for this context. SignalExternalWorkflow(ctx Context, workflowID, runID, signalName string, arg interface{}) Future + // SignalWithStartWorkflow intercepts workflow.SignalWithStartWorkflow. + // interceptor.WorkflowHeader will return a non-nil map for this context. + SignalWithStartWorkflow( + ctx Context, + workflowID string, + signalName string, + signalArg interface{}, + options StartWorkflowOptions, + workflowType string, + workflowArgs ...interface{}, + ) Future + // SignalChildWorkflow intercepts // workflow.ChildWorkflowFuture.SignalChildWorkflow. // interceptor.WorkflowHeader will return a non-nil map for this context. diff --git a/internal/interceptor_base.go b/internal/interceptor_base.go index c1ae31044..426729e59 100644 --- a/internal/interceptor_base.go +++ b/internal/interceptor_base.go @@ -310,6 +310,20 @@ func (w *WorkflowOutboundInterceptorBase) SignalExternalWorkflow( return w.Next.SignalExternalWorkflow(ctx, workflowID, runID, signalName, arg) } +// SignalWithStartWorkflow implements +// WorkflowOutboundInterceptor.SignalWithStartWorkflow. +func (w *WorkflowOutboundInterceptorBase) SignalWithStartWorkflow( + ctx Context, + workflowID string, + signalName string, + signalArg interface{}, + options StartWorkflowOptions, + workflowType string, + workflowArgs ...interface{}, +) Future { + return w.Next.SignalWithStartWorkflow(ctx, workflowID, signalName, signalArg, options, workflowType, workflowArgs...) +} + // SignalChildWorkflow implements // WorkflowOutboundInterceptor.SignalChildWorkflow. func (w *WorkflowOutboundInterceptorBase) SignalChildWorkflow( diff --git a/internal/interceptortest/proxy.go b/internal/interceptortest/proxy.go index 125c86fcb..7a2073967 100644 --- a/internal/interceptortest/proxy.go +++ b/internal/interceptortest/proxy.go @@ -368,6 +368,21 @@ func (p *proxyWorkflowOutbound) SignalExternalWorkflow( return } +func (p *proxyWorkflowOutbound) SignalWithStartWorkflow( + ctx workflow.Context, + workflowID string, + signalName string, + signalArg interface{}, + options workflow.StartWorkflowOptions, + workflowType string, + workflowArgs ...interface{}, +) (ret workflow.Future) { + args := []interface{}{ctx, workflowID, signalName, signalArg, options, workflowType} + args = append(args, workflowArgs...) + ret, _ = p.invoke(args...)[0].Interface().(workflow.Future) + return +} + func (p *proxyWorkflowOutbound) UpsertSearchAttributes( ctx workflow.Context, attributes map[string]interface{}, diff --git a/internal/internal_workflow.go b/internal/internal_workflow.go index c2c0338b0..702c97ccf 100644 --- a/internal/internal_workflow.go +++ b/internal/internal_workflow.go @@ -19,6 +19,7 @@ import ( commonpb "go.temporal.io/api/common/v1" enumspb "go.temporal.io/api/enums/v1" "go.temporal.io/api/sdk/v1" + systemnexus "go.temporal.io/api/workflowservice/v1/workflowservicenexus/json" "go.temporal.io/sdk/converter" "go.temporal.io/sdk/internal/common/metrics" @@ -1727,6 +1728,10 @@ func (w *WorkflowOptions) getRunningUpdateHandles() map[string]UpdateInfo { } func (d *decodeFutureImpl) Get(ctx Context, valuePtr interface{}) error { + return d.getWithDataConverter(ctx, valuePtr, getDataConverterFromWorkflowContext(ctx)) +} + +func (d *decodeFutureImpl) getWithDataConverter(ctx Context, valuePtr interface{}, dataConverter converter.DataConverter) error { more := d.futureImpl.channel.Receive(ctx, nil) if more { panic("not closed") @@ -1741,7 +1746,6 @@ func (d *decodeFutureImpl) Get(ctx Context, valuePtr interface{}) error { if rf.Type().Kind() != reflect.Ptr { return errors.New("valuePtr parameter is not a pointer") } - dataConverter := getDataConverterFromWorkflowContext(ctx) err := dataConverter.FromPayloads(d.futureImpl.value.(*commonpb.Payloads), valuePtr) if err != nil { return err @@ -1749,6 +1753,14 @@ func (d *decodeFutureImpl) Get(ctx Context, valuePtr interface{}) error { return d.futureImpl.err } +func (n *nexusOperationFutureImpl) Get(ctx Context, valuePtr interface{}) error { + dataConverter := getDataConverterFromWorkflowContext(ctx) + if systemnexus.GetTemporalNexusProtoMessage(valuePtr) != nil { + dataConverter = converter.NewSystemNexusDataConverter() + } + return n.decodeFutureImpl.getWithDataConverter(ctx, valuePtr, dataConverter) +} + // newDecodeFuture creates a new future as well as associated Settable that is used to set its value. // fn - the decoded value needs to be validated against a function. func newDecodeFuture(ctx Context, fn interface{}) (Future, Settable) { diff --git a/internal/system_nexus.go b/internal/system_nexus.go new file mode 100644 index 000000000..056570d88 --- /dev/null +++ b/internal/system_nexus.go @@ -0,0 +1,501 @@ +package internal + +import ( + "encoding/base64" + "encoding/json" + "errors" + "strconv" + "time" + + "go.temporal.io/api/temporalproto" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/durationpb" + + commonpb "go.temporal.io/api/common/v1" + enumspb "go.temporal.io/api/enums/v1" + sdkpb "go.temporal.io/api/sdk/v1" + systemnexus "go.temporal.io/api/workflowservice/v1/workflowservicenexus/json" + + "go.temporal.io/sdk/converter" +) + +func newSystemNexusSignalWithStartInputFromWorkflow( + ctx Context, + env WorkflowEnvironment, + workflowID string, + signalName string, + signalArg interface{}, + options StartWorkflowOptions, + workflowType string, + workflowArgs []interface{}, +) (systemnexus.SignalWithStartWorkflowExecutionRequest, error) { + if workflowID == "" { + return systemnexus.SignalWithStartWorkflowExecutionRequest{}, errWorkflowIDNotSet + } + + workflowOptionsFromCtx := getWorkflowEnvOptions(ctx) + dc := WithWorkflowContext(ctx, workflowOptionsFromCtx.DataConverter) + wfType, input, err := getValidatedWorkflowFunction(workflowType, workflowArgs, dc, env.GetRegistry()) + if err != nil { + return systemnexus.SignalWithStartWorkflowExecutionRequest{}, err + } + + signalInput, err := encodeArg(dc, signalArg) + if err != nil { + return systemnexus.SignalWithStartWorkflowExecutionRequest{}, err + } + + header, err := workflowHeaderPropagated(ctx, workflowOptionsFromCtx.ContextPropagators) + if err != nil { + return systemnexus.SignalWithStartWorkflowExecutionRequest{}, err + } + + memo, err := getWorkflowMemo(options.Memo, dc, env.TryUse(SDKFlagMemoUserDCEncode)) + if err != nil { + return systemnexus.SignalWithStartWorkflowExecutionRequest{}, err + } + + searchAttr, err := serializeSearchAttributes(options.SearchAttributes, options.TypedSearchAttributes) + if err != nil { + return systemnexus.SignalWithStartWorkflowExecutionRequest{}, err + } + + userMetadata, err := buildUserMetadata(options.StaticSummary, options.StaticDetails, dc) + if err != nil { + return systemnexus.SignalWithStartWorkflowExecutionRequest{}, err + } + var userMetadataMessage proto.Message + if userMetadata != nil { + userMetadataMessage = userMetadata + } + + return newSystemNexusSignalWithStartInput( + workflowOptionsFromCtx.Namespace, + options.requestID, + workflowID, + signalName, + wfType, + input, + signalInput, + header, + memo, + searchAttr, + userMetadataMessage, + options, + ) +} + +func newSystemNexusSignalWithStartInput( + namespace string, + requestID string, + workflowID string, + signalName string, + workflowType *WorkflowType, + input *commonpb.Payloads, + signalInput *commonpb.Payloads, + header *commonpb.Header, + memo *commonpb.Memo, + searchAttr *commonpb.SearchAttributes, + userMetadata proto.Message, + options StartWorkflowOptions, +) (systemnexus.SignalWithStartWorkflowExecutionRequest, error) { + req := systemnexus.SignalWithStartWorkflowExecutionRequest{ + Namespace: namespace, + RequestID: requestID, + WorkflowID: workflowID, + SignalName: signalName, + TaskQueue: toSystemNexusTaskQueue(options.TaskQueue), + WorkflowType: &systemnexus.WorkflowType{ + Name: workflowType.Name, + }, + WorkflowIDConflictPolicy: toSystemNexusWorkflowIDConflictPolicy(options.WorkflowIDConflictPolicy), + WorkflowIDReusePolicy: toSystemNexusWorkflowIDReusePolicy(options.WorkflowIDReusePolicy), + VersioningOverride: toSystemNexusVersioningOverride(options.VersioningOverride), + Priority: toSystemNexusPriority(options.Priority), + } + var err error + if req.RetryPolicy, err = toSystemNexusRetryPolicy(options.RetryPolicy); err != nil { + return systemnexus.SignalWithStartWorkflowExecutionRequest{}, err + } + + if req.Input, err = toSystemNexusInput(input); err != nil { + return systemnexus.SignalWithStartWorkflowExecutionRequest{}, err + } + if req.SignalInput, err = toSystemNexusInput(signalInput); err != nil { + return systemnexus.SignalWithStartWorkflowExecutionRequest{}, err + } + if req.Header, err = toSystemNexusHeader(header); err != nil { + return systemnexus.SignalWithStartWorkflowExecutionRequest{}, err + } + if req.Memo, err = toSystemNexusMemo(memo); err != nil { + return systemnexus.SignalWithStartWorkflowExecutionRequest{}, err + } + if req.SearchAttributes, err = toSystemNexusSearchAttributes(searchAttr); err != nil { + return systemnexus.SignalWithStartWorkflowExecutionRequest{}, err + } + if req.UserMetadata, err = toSystemNexusUserMetadata(userMetadata); err != nil { + return systemnexus.SignalWithStartWorkflowExecutionRequest{}, err + } + + if executionTimeout, err := toSystemNexusDurationString(options.WorkflowExecutionTimeout); err != nil { + return systemnexus.SignalWithStartWorkflowExecutionRequest{}, err + } else { + req.WorkflowExecutionTimeout = executionTimeout + } + if runTimeout, err := toSystemNexusDurationString(options.WorkflowRunTimeout); err != nil { + return systemnexus.SignalWithStartWorkflowExecutionRequest{}, err + } else { + req.WorkflowRunTimeout = runTimeout + } + if taskTimeout, err := toSystemNexusDurationString(options.WorkflowTaskTimeout); err != nil { + return systemnexus.SignalWithStartWorkflowExecutionRequest{}, err + } else { + req.WorkflowTaskTimeout = taskTimeout + } + if startDelay, err := toSystemNexusDurationString(options.StartDelay); err != nil { + return systemnexus.SignalWithStartWorkflowExecutionRequest{}, err + } else { + req.WorkflowStartDelay = startDelay + } + + req.CronSchedule = options.CronSchedule + return req, nil +} + +func newSystemNexusSignalWithStartPayload( + namespace string, + requestID string, + workflowID string, + signalName string, + workflowType *WorkflowType, + input *commonpb.Payloads, + signalInput *commonpb.Payloads, + header *commonpb.Header, + memo *commonpb.Memo, + searchAttr *commonpb.SearchAttributes, + userMetadata proto.Message, + options StartWorkflowOptions, +) (*commonpb.Payload, error) { + req, err := newSystemNexusSignalWithStartInput( + namespace, + requestID, + workflowID, + signalName, + workflowType, + input, + signalInput, + header, + memo, + searchAttr, + userMetadata, + options, + ) + if err != nil { + return nil, err + } + return converter.NewSystemNexusDataConverter().ToPayload(req) +} + +func toSystemNexusInput(payloads *commonpb.Payloads) (*systemnexus.Payloads, error) { + if payloads == nil || len(payloads.Payloads) == 0 { + return nil, nil + } + items, err := toSystemNexusPayloads(payloads.Payloads) + if err != nil { + return nil, err + } + return &systemnexus.Payloads{Payloads: items}, nil +} + +func toSystemNexusPayloads(payloads []*commonpb.Payload) ([]systemnexus.Payload, error) { + items := make([]systemnexus.Payload, len(payloads)) + for i, payload := range payloads { + value, err := toSystemNexusPayload(payload) + if err != nil { + return nil, err + } + items[i] = value + } + return items, nil +} + +func toSystemNexusHeader(header *commonpb.Header) (*systemnexus.Header, error) { + if header == nil { + return nil, nil + } + fields := make(map[string]systemnexus.Payload, len(header.Fields)) + for key, payload := range header.Fields { + value, err := toSystemNexusPayload(payload) + if err != nil { + return nil, err + } + fields[key] = value + } + return &systemnexus.Header{Fields: fields}, nil +} + +func toSystemNexusPayload(payload *commonpb.Payload) (systemnexus.Payload, error) { + if payload == nil { + return systemnexus.Payload{}, nil + } + result := systemnexus.Payload{ + Data: base64.StdEncoding.EncodeToString(payload.GetData()), + } + if len(payload.GetMetadata()) > 0 { + result.Metadata = make(map[string]string, len(payload.GetMetadata())) + for key, value := range payload.GetMetadata() { + result.Metadata[key] = base64.StdEncoding.EncodeToString(value) + } + } + if externalPayloads := payload.GetExternalPayloads(); len(externalPayloads) > 0 { + result.ExternalPayloads = make([]systemnexus.PayloadExternalPayloadDetails, len(externalPayloads)) + for i, details := range externalPayloads { + result.ExternalPayloads[i] = systemnexus.PayloadExternalPayloadDetails{ + SizeBytes: strconv.FormatInt(details.GetSizeBytes(), 10), + } + } + } + return result, nil +} + +func toSystemNexusMemo(memo *commonpb.Memo) (*systemnexus.Memo, error) { + if memo == nil { + return nil, nil + } + fields := make(map[string]systemnexus.Payload, len(memo.Fields)) + for key, payload := range memo.Fields { + value, err := toSystemNexusPayload(payload) + if err != nil { + return nil, err + } + fields[key] = value + } + return &systemnexus.Memo{Fields: fields}, nil +} + +func toSystemNexusSearchAttributes(searchAttr *commonpb.SearchAttributes) (*systemnexus.SearchAttributes, error) { + if searchAttr == nil { + return nil, nil + } + fields := make(map[string]systemnexus.Payload, len(searchAttr.IndexedFields)) + for key, payload := range searchAttr.IndexedFields { + value, err := toSystemNexusPayload(payload) + if err != nil { + return nil, err + } + fields[key] = value + } + return &systemnexus.SearchAttributes{IndexedFields: fields}, nil +} + +func toSystemNexusUserMetadata(userMetadata proto.Message) (*systemnexus.UserMetadata, error) { + if userMetadata == nil { + return nil, nil + } + result := &systemnexus.UserMetadata{} + switch metadata := userMetadata.(type) { + case *sdkpb.UserMetadata: + if metadata.Summary != nil { + summary, err := toSystemNexusPayload(metadata.Summary) + if err != nil { + return nil, err + } + result.Summary = &summary + } + if metadata.Details != nil { + details, err := toSystemNexusPayload(metadata.Details) + if err != nil { + return nil, err + } + result.Details = &details + } + return result, nil + default: + value, err := systemNexusProtoToJSONValue(userMetadata) + if err != nil { + return nil, err + } + valueMap, ok := value.(map[string]any) + if !ok { + return nil, errors.New("system nexus user metadata JSON must be an object") + } + if rawSummary, ok := valueMap["summary"].(map[string]any); ok { + summary, err := toSystemNexusPayloadFromJSONMap(rawSummary) + if err != nil { + return nil, err + } + result.Summary = &summary + } + if rawDetails, ok := valueMap["details"].(map[string]any); ok { + details, err := toSystemNexusPayloadFromJSONMap(rawDetails) + if err != nil { + return nil, err + } + result.Details = &details + } + return result, nil + } +} + +func toSystemNexusPayloadFromJSONMap(valueMap map[string]any) (systemnexus.Payload, error) { + value, err := json.Marshal(valueMap) + if err != nil { + return systemnexus.Payload{}, err + } + var protoPayload commonpb.Payload + if err := (temporalproto.CustomJSONUnmarshalOptions{ + Metadata: map[string]interface{}{ + commonpb.EnablePayloadShorthandMetadataKey: true, + }, + }).Unmarshal(value, &protoPayload); err != nil { + return systemnexus.Payload{}, err + } + return toSystemNexusPayload(&protoPayload) +} + +func systemNexusProtoToJSONValue(message proto.Message) (any, error) { + data, err := temporalproto.CustomJSONMarshalOptions{ + Metadata: map[string]interface{}{ + commonpb.EnablePayloadShorthandMetadataKey: true, + }, + }.Marshal(message) + if err != nil { + return nil, err + } + + var value any + if err := json.Unmarshal(data, &value); err != nil { + return nil, err + } + return value, nil +} + +func toSystemNexusDurationString(d time.Duration) (string, error) { + if d == 0 { + return "", nil + } + value, err := systemNexusProtoToJSONValue(durationpb.New(d)) + if err != nil { + return "", err + } + durationValue, ok := value.(string) + if !ok { + return "", errors.New("system nexus duration JSON must be a string") + } + return durationValue, nil +} + +func toSystemNexusTaskQueue(name string) *systemnexus.TaskQueue { + if name == "" { + return nil + } + kind := systemnexus.TaskQueueKindNormal + return &systemnexus.TaskQueue{Name: name, Kind: &kind} +} + +func toSystemNexusRetryPolicy(retryPolicy *RetryPolicy) (*systemnexus.RetryPolicy, error) { + if retryPolicy == nil { + return nil, nil + } + policy := &systemnexus.RetryPolicy{ + BackoffCoefficient: retryPolicy.BackoffCoefficient, + MaximumAttempts: int64(retryPolicy.MaximumAttempts), + NonRetryableErrorTypes: retryPolicy.NonRetryableErrorTypes, + } + if retryPolicy.InitialInterval != 0 { + initialInterval, err := toSystemNexusDurationString(retryPolicy.InitialInterval) + if err != nil { + return nil, err + } + policy.InitialInterval = initialInterval + } + if retryPolicy.MaximumInterval != 0 { + maximumInterval, err := toSystemNexusDurationString(retryPolicy.MaximumInterval) + if err != nil { + return nil, err + } + policy.MaximumInterval = maximumInterval + } + return policy, nil +} + +func toSystemNexusWorkflowIDConflictPolicy(policy enumspb.WorkflowIdConflictPolicy) *systemnexus.WorkflowIDConflictPolicy { + switch policy { + case enumspb.WORKFLOW_ID_CONFLICT_POLICY_FAIL: + value := systemnexus.WorkflowIDConflictPolicyFail + return &value + case enumspb.WORKFLOW_ID_CONFLICT_POLICY_USE_EXISTING: + value := systemnexus.WorkflowIDConflictPolicyUseExisting + return &value + case enumspb.WORKFLOW_ID_CONFLICT_POLICY_TERMINATE_EXISTING: + value := systemnexus.WorkflowIDConflictPolicyTerminateExisting + return &value + default: + return nil + } +} + +func toSystemNexusWorkflowIDReusePolicy(policy enumspb.WorkflowIdReusePolicy) *systemnexus.WorkflowIDReusePolicy { + switch policy { + case enumspb.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE: + value := systemnexus.WorkflowIDReusePolicyAllowDuplicate + return &value + case enumspb.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE_FAILED_ONLY: + value := systemnexus.WorkflowIDReusePolicyAllowDuplicateFailedOnly + return &value + case enumspb.WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE: + value := systemnexus.WorkflowIDReusePolicyRejectDuplicate + return &value + case enumspb.WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING: + value := systemnexus.WorkflowIDReusePolicyTerminateIfRunning + return &value + default: + return nil + } +} + +func toSystemNexusVersioningOverride(versioningOverride VersioningOverride) *systemnexus.VersioningOverride { + if versioningOverride == nil { + return nil + } + switch v := versioningOverride.(type) { + case *PinnedVersioningOverride: + behavior := systemnexus.VersioningBehaviorPinned + pinnedBehavior := systemnexus.PinnedOverrideBehaviorPinned + return &systemnexus.VersioningOverride{ + Behavior: &behavior, + PinnedVersion: v.Version.toCanonicalString(), + Deployment: &systemnexus.Deployment{ + SeriesName: v.Version.DeploymentName, + BuildID: v.Version.BuildID, + }, + Pinned: &systemnexus.VersioningOverridePinnedOverride{ + Behavior: &pinnedBehavior, + Version: &systemnexus.WorkerDeploymentVersion{ + DeploymentName: v.Version.DeploymentName, + BuildID: v.Version.BuildID, + }, + }, + } + case *AutoUpgradeVersioningOverride: + behavior := systemnexus.VersioningBehaviorAutoUpgrade + return &systemnexus.VersioningOverride{ + Behavior: &behavior, + AutoUpgrade: true, + } + default: + return nil + } +} + +func toSystemNexusPriority(priority Priority) *systemnexus.Priority { + var defaultPriority Priority + if priority == defaultPriority { + return nil + } + return &systemnexus.Priority{ + PriorityKey: int64(priority.PriorityKey), + FairnessKey: priority.FairnessKey, + FairnessWeight: float64(priority.FairnessWeight), + } +} diff --git a/internal/system_nexus_test.go b/internal/system_nexus_test.go new file mode 100644 index 000000000..2b9634fbf --- /dev/null +++ b/internal/system_nexus_test.go @@ -0,0 +1,199 @@ +package internal + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + commandpb "go.temporal.io/api/command/v1" + commonpb "go.temporal.io/api/common/v1" + systemnexus "go.temporal.io/api/workflowservice/v1/workflowservicenexus/json" + + "go.temporal.io/sdk/converter" + "google.golang.org/protobuf/proto" +) + +func TestSystemNexusPayloadVisitor_VisitsNestedPayloadsOnly(t *testing.T) { + storageParams, err := ExternalStorageToParams(converter.ExternalStorage{ + Drivers: []converter.StorageDriver{newTestDriver("system-nexus")}, + PayloadSizeThreshold: 1, + }) + require.NoError(t, err) + + req := systemnexus.SignalWithStartWorkflowExecutionRequest{ + Namespace: "default", + WorkflowID: "system-nexus-workflow-id", + SignalName: "test-signal", + Input: &systemnexus.Payloads{Payloads: []systemnexus.Payload{ + mustSystemNexusPayloadJSON(t, "workflow-input"), + }}, + SignalInput: &systemnexus.Payloads{Payloads: []systemnexus.Payload{ + mustSystemNexusPayloadJSON(t, "signal-input"), + }}, + Memo: &systemnexus.Memo{ + Fields: map[string]systemnexus.Payload{ + "memo-key": mustSystemNexusPayloadJSON(t, "memo-value"), + }, + }, + Header: &systemnexus.Header{ + Fields: map[string]systemnexus.Payload{ + "header-key": mustSystemNexusPayloadJSON(t, "header-value"), + }, + }, + UserMetadata: &systemnexus.UserMetadata{ + Summary: payloadPtr(mustSystemNexusPayloadJSON(t, "summary-value")), + Details: payloadPtr(mustSystemNexusPayloadJSON(t, "details-value")), + }, + SearchAttributes: &systemnexus.SearchAttributes{ + IndexedFields: map[string]systemnexus.Payload{ + "custom-key": mustSystemNexusPayloadJSON(t, "search-attribute-value"), + }, + }, + } + + systemDC := converter.NewSystemNexusDataConverter() + outerPayload, err := systemDC.ToPayload(req) + require.NoError(t, err) + + attrs := &commandpb.ScheduleNexusOperationCommandAttributes{ + Service: systemnexus.WorkflowService.ServiceName, + Operation: systemnexus.WorkflowService.SignalWithStartWorkflowExecution.Name(), + Input: outerPayload, + } + err = visitProtoPayloads(context.Background(), NewExternalStorageVisitor(storageParams), attrs) + require.NoError(t, err) + + var decoded systemnexus.SignalWithStartWorkflowExecutionRequest + require.NoError(t, systemDC.FromPayload(attrs.Input, &decoded)) + requirePayloadJSONReference(t, decoded.Input.Payloads[0]) + requirePayloadJSONReference(t, decoded.SignalInput.Payloads[0]) + requirePayloadJSONReference(t, decoded.Memo.Fields["memo-key"]) + requirePayloadJSONReference(t, decoded.Header.Fields["header-key"]) + requirePayloadJSONReference(t, decoded.UserMetadata.Summary) + requirePayloadJSONReference(t, decoded.UserMetadata.Details) + requirePayloadJSONReference(t, decoded.SearchAttributes.IndexedFields["custom-key"]) + + driver := storageParams.driverMap["system-nexus"].(*testStorageDriver) + driver.mu.Lock() + defer driver.mu.Unlock() + require.Len(t, driver.data, 6) +} + +func TestNewSystemNexusSignalWithStartInput_PreservesPreencodedPayloads(t *testing.T) { + codec := &testSignalWithStartCodec{} + dc := converter.NewCodecDataConverter(converter.GetDefaultDataConverter(), codec) + + input, err := encodeArgs(dc, []interface{}{"workflow-input"}) + require.NoError(t, err) + signalInput, err := encodeArg(dc, "signal-input") + require.NoError(t, err) + memo, err := getWorkflowMemo(map[string]interface{}{"memo-key": "memo-value"}, dc, true) + require.NoError(t, err) + userMetadata, err := buildUserMetadata("summary-value", "details-value", dc) + require.NoError(t, err) + + outerPayload, err := newSystemNexusSignalWithStartPayload( + "default", + "test-request-id", + "system-nexus-workflow-id", + "test-signal", + &WorkflowType{Name: "test-workflow"}, + input, + signalInput, + nil, + memo, + nil, + userMetadata, + StartWorkflowOptions{TaskQueue: "task-queue"}, + ) + require.NoError(t, err) + + systemDC := converter.NewSystemNexusDataConverter() + var decodedReq systemnexus.SignalWithStartWorkflowExecutionRequest + require.NoError(t, systemDC.FromPayload(outerPayload, &decodedReq)) + require.Equal(t, "test-request-id", decodedReq.RequestID) + + handler := systemnexus.GetTemporalNexusPayloadVisitor( + systemnexus.WorkflowService.ServiceName, + systemnexus.WorkflowService.SignalWithStartWorkflowExecution.Name(), + ) + require.NotNil(t, handler) + + value := handler.InputType() + require.IsType(t, &systemnexus.SignalWithStartWorkflowExecutionRequest{}, value) + require.NoError(t, systemDC.FromPayload(outerPayload, value)) + + var visitedPayloads []*commonpb.Payload + visitedValue, err := handler.Visit( + value, + func(payloads []*commonpb.Payload) ([]*commonpb.Payload, error) { + visitedPayloads = append(visitedPayloads, payloads...) + return payloads, nil + }, + false, + ) + require.NoError(t, err) + require.IsType(t, &systemnexus.SignalWithStartWorkflowExecutionRequest{}, visitedValue) + require.Len(t, visitedPayloads, 4) + for _, payload := range visitedPayloads { + require.Equal(t, []byte("true"), payload.GetMetadata()["test-codec"]) + } +} + +func requirePayloadJSONReference(t *testing.T, value any, path ...string) { + t.Helper() + current := value + for _, segment := range path { + next, ok := current.(map[string]any) + require.True(t, ok) + current = next[segment] + } + switch typed := current.(type) { + case []any: + require.NotEmpty(t, typed) + requirePayloadJSONReference(t, typed[0]) + case systemnexus.Payload: + require.True(t, len(typed.ExternalPayloads) > 0 || typed.Data != "") + case *systemnexus.Payload: + require.NotNil(t, typed) + require.True(t, len(typed.ExternalPayloads) > 0 || typed.Data != "") + case map[string]any: + _, hasExternalPayloads := typed["externalPayloads"] + _, hasData := typed["data"] + require.True(t, hasExternalPayloads || hasData) + default: + require.Failf(t, "expected rewritten payload JSON", "got %T", current) + } +} + +type testSignalWithStartCodec struct{} + +func payloadPtr(payload systemnexus.Payload) *systemnexus.Payload { + return &payload +} + +func mustSystemNexusPayloadJSON(t *testing.T, value interface{}) systemnexus.Payload { + t.Helper() + payload, err := converter.GetDefaultDataConverter().ToPayload(value) + require.NoError(t, err) + result, err := toSystemNexusPayload(payload) + require.NoError(t, err) + return result +} + +func (c *testSignalWithStartCodec) Encode(payloads []*commonpb.Payload) ([]*commonpb.Payload, error) { + encoded := make([]*commonpb.Payload, len(payloads)) + for i, payload := range payloads { + cloned := proto.Clone(payload).(*commonpb.Payload) + if cloned.Metadata == nil { + cloned.Metadata = make(map[string][]byte, 1) + } + cloned.Metadata["test-codec"] = []byte("true") + encoded[i] = cloned + } + return encoded, nil +} + +func (c *testSignalWithStartCodec) Decode(payloads []*commonpb.Payload) ([]*commonpb.Payload, error) { + return payloads, nil +} diff --git a/internal/workflow.go b/internal/workflow.go index e29ece31a..bc9403a9a 100644 --- a/internal/workflow.go +++ b/internal/workflow.go @@ -10,6 +10,7 @@ import ( "strings" "time" + "github.com/google/uuid" "github.com/nexus-rpc/sdk-go/nexus" "google.golang.org/protobuf/types/known/durationpb" @@ -17,6 +18,7 @@ import ( commonpb "go.temporal.io/api/common/v1" enumspb "go.temporal.io/api/enums/v1" failurepb "go.temporal.io/api/failure/v1" + systemnexus "go.temporal.io/api/workflowservice/v1/workflowservicenexus/json" "go.temporal.io/sdk/converter" "go.temporal.io/sdk/internal/common/metrics" @@ -42,6 +44,10 @@ const ( HandlerUnfinishedPolicyAbandon ) +// TODO: Switch this back to "__temporal_system" once the server supports reserved +// system endpoint names for Nexus endpoint registration/routing. +const systemNexusEndpoint = "temporal-system" + // VersioningBehavior specifies when existing workflows could change their Build ID. // // Exposed as: [go.temporal.io/sdk/workflow.VersioningBehavior] @@ -1757,6 +1763,95 @@ func (wc *workflowEnvironmentInterceptor) SignalExternalWorkflow(ctx Context, wo return signalExternalWorkflow(ctx, workflowID, runID, signalName, arg, childWorkflowOnly) } +// SignalWithStartWorkflow sends a signal to a running workflow. If the workflow is not +// running or not found, it starts the workflow and then sends the signal in a single +// system Nexus operation. +// +// Exposed as: [go.temporal.io/sdk/workflow.SignalWithStartWorkflow] +func SignalWithStartWorkflow( + ctx Context, + workflowID string, + signalName string, + signalArg interface{}, + options StartWorkflowOptions, + workflow interface{}, + workflowArgs ...interface{}, +) Future { + assertNotInReadOnlyState(ctx) + i := getWorkflowOutboundInterceptor(ctx) + env := getWorkflowEnvironment(ctx) + + if options.ID != "" && options.ID != workflowID { + future, settable := NewFuture(ctx) + settable.Set(nil, fmt.Errorf("workflow ID from options not used, must be unset or match workflow ID parameter")) + return future + } + + options.ID = workflowID + if options.ID == "" { + options.ID = uuid.NewString() + } + + workflowType, err := getWorkflowFunctionName(env.GetRegistry(), workflow) + if err != nil { + future, settable := NewFuture(ctx) + settable.Set(nil, err) + return future + } + + ctx = workflowContextWithNewHeader(ctx) + return i.SignalWithStartWorkflow(ctx, options.ID, signalName, signalArg, options, workflowType, workflowArgs...) +} + +func (wc *workflowEnvironmentInterceptor) SignalWithStartWorkflow( + ctx Context, + workflowID string, + signalName string, + signalArg interface{}, + options StartWorkflowOptions, + workflowType string, + workflowArgs ...interface{}, +) Future { + ctx1 := setWorkflowEnvOptionsIfNotExist(ctx) + future, settable := NewFuture(ctx1) + + req, err := newSystemNexusSignalWithStartInputFromWorkflow( + ctx1, + wc.env, + workflowID, + signalName, + signalArg, + options, + workflowType, + workflowArgs, + ) + if err != nil { + settable.Set(nil, err) + return future + } + + opFuture := wc.ExecuteNexusOperation(ctx1, ExecuteNexusOperationInput{ + Client: nexusClient{ + endpoint: systemNexusEndpoint, + service: systemnexus.WorkflowService.ServiceName, + }, + Operation: systemnexus.WorkflowService.SignalWithStartWorkflowExecution.Name(), + Input: req, + Options: NexusOperationOptions{CancellationType: NexusOperationCancellationTypeWaitCompleted}, + NexusHeader: nexus.Header{}, + }) + + wc.dispatcher.NewCoroutine(ctx1, "signal-with-start-workflow", false, func(ctx Context) { + var result systemnexus.SignalWithStartWorkflowExecutionResponse + if err := opFuture.Get(ctx, &result); err != nil { + settable.Set(nil, err) + return + } + settable.Set(WorkflowExecution{ID: workflowID, RunID: result.RunID}, nil) + }) + return future +} + func (wc *workflowEnvironmentInterceptor) SignalChildWorkflow(ctx Context, workflowID, signalName string, arg interface{}) Future { const childWorkflowOnly = true // this means we are limited to child workflow // Empty run ID to indicate current one @@ -2981,7 +3076,12 @@ func (wc *workflowEnvironmentInterceptor) prepareNexusOperationParams(ctx Contex return executeNexusOperationParams{}, fmt.Errorf("invalid 'operation' parameter, must be an OperationReference or a string") } - payload, err := dc.ToPayload(input.Input) + payloadConverter := dc + if systemnexus.IsTemporalNexusOperation(input.Client.Service(), operationName) { + payloadConverter = converter.NewSystemNexusDataConverter() + } + + payload, err := payloadConverter.ToPayload(input.Input) if err != nil { return executeNexusOperationParams{}, err } diff --git a/test/go.mod b/test/go.mod index c992986d8..742fada3f 100644 --- a/test/go.mod +++ b/test/go.mod @@ -1,6 +1,6 @@ module go.temporal.io/sdk/test -go 1.24.0 +go 1.25.4 require ( github.com/golang/mock v1.6.0 @@ -39,6 +39,7 @@ require ( github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.3.2 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0 // indirect github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect + github.com/nexus-rpc/nexus-proto-annotations v0.0.0-20260330194009-e558d6edaf84 // indirect github.com/opencontainers/runtime-spec v1.0.2 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect @@ -66,6 +67,7 @@ require ( ) replace ( + go.temporal.io/api => ../../temporal-api-go go.temporal.io/sdk => ../ go.temporal.io/sdk/contrib/opentelemetry => ../contrib/opentelemetry go.temporal.io/sdk/contrib/opentracing => ../contrib/opentracing diff --git a/test/go.sum b/test/go.sum index 4729e0af1..22c149afc 100644 --- a/test/go.sum +++ b/test/go.sum @@ -97,6 +97,8 @@ github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lN github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= +github.com/nexus-rpc/nexus-proto-annotations v0.0.0-20260330194009-e558d6edaf84 h1:SWHt3Coj0VvF0Km1A0wlY+IjnHKsjQLgO29io84r3wY= +github.com/nexus-rpc/nexus-proto-annotations v0.0.0-20260330194009-e558d6edaf84/go.mod h1:n3UjF1bPCW8llR8tHvbxJ+27yPWrhpo8w/Yg1IOuY0Y= github.com/nexus-rpc/sdk-go v0.6.0 h1:QRgnP2zTbxEbiyWG/aXH8uSC5LV/Mg1fqb19jb4DBlo= github.com/nexus-rpc/sdk-go v0.6.0/go.mod h1:FHdPfVQwRuJFZFTF0Y2GOAxCrbIBNrcPna9slkGKPYk= github.com/opencontainers/runtime-spec v1.0.2 h1:UfAcuLBJB9Coz72x1hgl8O5RVzTdNiaglX6v2DM6FI0= @@ -174,8 +176,6 @@ go.opentelemetry.io/otel/sdk/metric v1.40.0 h1:mtmdVqgQkeRxHgRv4qhyJduP3fYJRMX4A go.opentelemetry.io/otel/sdk/metric v1.40.0/go.mod h1:4Z2bGMf0KSK3uRjlczMOeMhKU2rhUqdWNoKcYrtcBPg= go.opentelemetry.io/otel/trace v1.40.0 h1:WA4etStDttCSYuhwvEa8OP8I5EWu24lkOzp+ZYblVjw= go.opentelemetry.io/otel/trace v1.40.0/go.mod h1:zeAhriXecNGP/s2SEG3+Y8X9ujcJOTqQ5RgdEJcawiA= -go.temporal.io/api v1.62.5 h1:9R/9CeyM7xqHSlsNt+QIvapQLcRxCZ38bnXQx4mCN6I= -go.temporal.io/api v1.62.5/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE= go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= diff --git a/test/nexus_system_test.go b/test/nexus_system_test.go new file mode 100644 index 000000000..3d4bcd332 --- /dev/null +++ b/test/nexus_system_test.go @@ -0,0 +1,265 @@ +package test_test + +import ( + "context" + "encoding/json" + "fmt" + "sync" + "testing" + + "github.com/google/uuid" + "github.com/nexus-rpc/sdk-go/nexus" + "github.com/stretchr/testify/require" + commonpb "go.temporal.io/api/common/v1" + nexuspb "go.temporal.io/api/nexus/v1" + "go.temporal.io/api/operatorservice/v1" + systemnexus "go.temporal.io/api/workflowservice/v1/workflowservicenexus/json" + + "go.temporal.io/sdk/client" + "go.temporal.io/sdk/converter" + ilog "go.temporal.io/sdk/internal/log" + "go.temporal.io/sdk/worker" + "go.temporal.io/sdk/workflow" + "google.golang.org/protobuf/proto" +) + +func TestSystemNexusDefersOuterEnvelopeEncoding(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), ctxTimeout) + defer cancel() + + var suiteBase ConfigAndClientSuiteBase + require.NoError(t, suiteBase.InitConfigAndNamespace()) + if suiteBase.client != nil { + defer suiteBase.client.Close() + } + + handlerTC := newTestContext(t, ctx) + driver := newMemDriver("system-nexus") + codec := &rejectOuterSystemNexusCodec{} + + callerClient, err := client.DialContext(ctx, client.Options{ + HostPort: handlerTC.testConfig.ServiceAddr, + Namespace: handlerTC.testConfig.Namespace, + Logger: ilog.NewDefaultLogger(), + DataConverter: converter.NewCodecDataConverter( + converter.GetDefaultDataConverter(), + codec, + ), + ExternalStorage: converter.ExternalStorage{ + Drivers: []converter.StorageDriver{driver}, + PayloadSizeThreshold: 1, + }, + ConnectionOptions: client.ConnectionOptions{TLS: handlerTC.testConfig.TLS}, + WorkerHeartbeatInterval: -1, + }) + require.NoError(t, err) + defer callerClient.Close() + + handlerClient, err := client.DialContext(ctx, client.Options{ + HostPort: handlerTC.testConfig.ServiceAddr, + Namespace: handlerTC.testConfig.Namespace, + Logger: ilog.NewDefaultLogger(), + DataConverter: converter.NewSystemNexusDataConverter(), + ConnectionOptions: client.ConnectionOptions{TLS: handlerTC.testConfig.TLS}, + WorkerHeartbeatInterval: -1, + }) + require.NoError(t, err) + defer handlerClient.Close() + + callerTaskQueue := "sdk-go-system-nexus-caller-" + uuid.NewString() + callerWorker := worker.New(callerClient, callerTaskQueue, worker.Options{}) + callerWorker.RegisterWorkflow(systemNexusSignalWithStartWorkflow) + + handlerWorker := worker.New(handlerClient, handlerTC.taskQueue, worker.Options{}) + service := nexus.NewService(systemnexus.WorkflowService.ServiceName) + var receivedRequestMu sync.Mutex + var receivedRequest *systemnexus.SignalWithStartWorkflowExecutionRequest + require.NoError(t, service.Register(nexus.NewSyncOperation( + systemnexus.WorkflowService.SignalWithStartWorkflowExecution.Name(), + func( + _ context.Context, + req systemnexus.SignalWithStartWorkflowExecutionRequest, + _ nexus.StartOperationOptions, + ) (systemnexus.SignalWithStartWorkflowExecutionResponse, error) { + receivedRequestMu.Lock() + reqCopy := req + receivedRequest = &reqCopy + receivedRequestMu.Unlock() + return systemnexus.SignalWithStartWorkflowExecutionResponse{ + RunID: "system-nexus-workflow-id-run", + }, nil + }, + ))) + handlerWorker.RegisterNexusService(service) + + systemEndpointSpec := &nexuspb.EndpointSpec{ + // TODO: Switch this back to "__temporal_system" once the server supports + // reserved system endpoint names for Nexus endpoint registration/routing. + Name: "temporal-system", + Target: &nexuspb.EndpointTarget{ + Variant: &nexuspb.EndpointTarget_Worker_{ + Worker: &nexuspb.EndpointTarget_Worker{ + Namespace: handlerTC.testConfig.Namespace, + TaskQueue: handlerTC.taskQueue, + }, + }, + }, + } + existingEndpoints, err := handlerTC.client.OperatorService().ListNexusEndpoints(ctx, &operatorservice.ListNexusEndpointsRequest{ + Name: systemEndpointSpec.Name, + }) + require.NoError(t, err) + if len(existingEndpoints.Endpoints) == 0 { + _, err = handlerTC.client.OperatorService().CreateNexusEndpoint(ctx, &operatorservice.CreateNexusEndpointRequest{ + Spec: systemEndpointSpec, + }) + require.NoError(t, err) + } else { + _, err = handlerTC.client.OperatorService().UpdateNexusEndpoint(ctx, &operatorservice.UpdateNexusEndpointRequest{ + Id: existingEndpoints.Endpoints[0].Id, + Version: existingEndpoints.Endpoints[0].Version, + Spec: systemEndpointSpec, + }) + require.NoError(t, err) + } + + require.NoError(t, callerWorker.Start()) + defer callerWorker.Stop() + require.NoError(t, handlerWorker.Start()) + defer handlerWorker.Stop() + + run, err := callerClient.ExecuteWorkflow(ctx, client.StartWorkflowOptions{ + ID: "system-nexus-" + uuid.NewString(), + TaskQueue: callerTaskQueue, + }, systemNexusSignalWithStartWorkflow) + require.NoError(t, err) + + var result string + require.NoError(t, run.Get(ctx, &result)) + require.Equal(t, "system-nexus-workflow-id-run", result) + require.GreaterOrEqual(t, codec.EncodeCount(), 5) + receivedRequestMu.Lock() + defer receivedRequestMu.Unlock() + require.NotNil(t, receivedRequest) + require.Equal(t, "system-nexus-workflow-id", receivedRequest.WorkflowID) + require.Equal(t, "test-signal", receivedRequest.SignalName) + requirePayloadJSONReference(t, receivedRequest.Input.Payloads[0]) + requirePayloadJSONReference(t, receivedRequest.SignalInput.Payloads[0]) + requirePayloadJSONReference(t, receivedRequest.Memo.Fields["memo-key"]) + requirePayloadJSONReference(t, receivedRequest.UserMetadata.Summary) + requirePayloadJSONReference(t, receivedRequest.UserMetadata.Details) + requirePayloadJSONReference(t, receivedRequest.SearchAttributes.IndexedFields["custom-key"]) + + driver.mu.Lock() + defer driver.mu.Unlock() + var storedPayloadData [][]byte + for _, payload := range driver.data { + storedPayloadData = append(storedPayloadData, payload.GetData()) + } + require.NotEmpty(t, storedPayloadData) + require.Contains(t, storedPayloadData, []byte(`"workflow-input"`)) + require.Contains(t, storedPayloadData, []byte(`"signal-input"`)) + require.Contains(t, storedPayloadData, []byte(`"memo-value"`)) + require.Contains(t, storedPayloadData, []byte(`"summary-value"`)) + require.Contains(t, storedPayloadData, []byte(`"details-value"`)) +} + +func systemNexusSignalWithStartWorkflow(ctx workflow.Context) (string, error) { + fut := workflow.SignalWithStartWorkflow( + ctx, + "system-nexus-workflow-id", + "test-signal", + "signal-input", + workflow.StartWorkflowOptions{ + TaskQueue: "test-task-queue", + Memo: map[string]interface{}{ + "memo-key": "memo-value", + }, + SearchAttributes: map[string]interface{}{ + "custom-key": "search-attribute-value", + }, + StaticSummary: "summary-value", + StaticDetails: "details-value", + }, + systemNexusTargetWorkflow, + "workflow-input", + ) + + var exec workflow.Execution + if err := fut.Get(ctx, &exec); err != nil { + return "", err + } + return exec.RunID, nil +} + +func systemNexusTargetWorkflow(ctx workflow.Context, input string) error { + return nil +} + +type rejectOuterSystemNexusCodec struct { + mu sync.Mutex + encodeCount int +} + +func (c *rejectOuterSystemNexusCodec) Encode(payloads []*commonpb.Payload) ([]*commonpb.Payload, error) { + encoded := make([]*commonpb.Payload, len(payloads)) + for i, payload := range payloads { + if looksLikeSystemNexusEnvelope(payload) { + return nil, fmt.Errorf("outer system nexus envelope should not be codec encoded") + } + cloned := proto.Clone(payload).(*commonpb.Payload) + if cloned.Metadata == nil { + cloned.Metadata = make(map[string][]byte, 1) + } + cloned.Metadata["test-codec"] = []byte("true") + encoded[i] = cloned + } + + c.mu.Lock() + c.encodeCount += len(payloads) + c.mu.Unlock() + return encoded, nil +} + +func (c *rejectOuterSystemNexusCodec) Decode(payloads []*commonpb.Payload) ([]*commonpb.Payload, error) { + for _, payload := range payloads { + if looksLikeSystemNexusEnvelope(payload) { + return nil, fmt.Errorf("outer system nexus envelope should not be codec decoded") + } + } + return payloads, nil +} + +func (c *rejectOuterSystemNexusCodec) EncodeCount() int { + c.mu.Lock() + defer c.mu.Unlock() + return c.encodeCount +} + +func looksLikeSystemNexusEnvelope(payload *commonpb.Payload) bool { + var value map[string]any + if err := json.Unmarshal(payload.GetData(), &value); err != nil { + return false + } + _, hasNamespace := value["namespace"] + _, hasWorkflowID := value["workflowId"] + _, hasSignalName := value["signalName"] + return hasNamespace && hasWorkflowID && hasSignalName +} + +func requirePayloadJSONReference(t *testing.T, value any) { + t.Helper() + switch payload := value.(type) { + case systemnexus.Payload: + require.True(t, len(payload.ExternalPayloads) > 0 || payload.Data != "") + case *systemnexus.Payload: + require.NotNil(t, payload) + require.True(t, len(payload.ExternalPayloads) > 0 || payload.Data != "") + case map[string]any: + _, hasExternalPayloads := payload["externalPayloads"] + _, hasData := payload["data"] + require.True(t, hasExternalPayloads || hasData) + default: + require.Failf(t, "expected payload object", "got %T", value) + } +} diff --git a/workflow/workflow.go b/workflow/workflow.go index 814924194..b7a48e32e 100644 --- a/workflow/workflow.go +++ b/workflow/workflow.go @@ -121,6 +121,9 @@ type ( // ChildWorkflowOptions stores all child workflow specific parameters that will be stored inside of a Context. ChildWorkflowOptions = internal.ChildWorkflowOptions + // StartWorkflowOptions configuration parameters for starting a workflow execution. + StartWorkflowOptions = internal.StartWorkflowOptions + // RegisterOptions consists of options for registering a workflow RegisterOptions = internal.RegisterWorkflowOptions @@ -383,6 +386,21 @@ func SignalExternalWorkflow(ctx Context, workflowID, runID, signalName string, a return internal.SignalExternalWorkflow(ctx, workflowID, runID, signalName, arg) } +// SignalWithStartWorkflow sends a signal to a running workflow. If the workflow is not +// running or not found, it starts the workflow and then sends the signal in a single +// system Nexus operation. The returned future resolves to a [workflow.Execution]. +func SignalWithStartWorkflow( + ctx Context, + workflowID string, + signalName string, + signalArg interface{}, + options StartWorkflowOptions, + workflow interface{}, + workflowArgs ...interface{}, +) Future { + return internal.SignalWithStartWorkflow(ctx, workflowID, signalName, signalArg, options, workflow, workflowArgs...) +} + // GetSignalChannel returns the channel corresponding to the signal name. func GetSignalChannel(ctx Context, signalName string) ReceiveChannel { return internal.GetSignalChannel(ctx, signalName)