Skip to content
110 changes: 110 additions & 0 deletions converter/system_nexus_data_converter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package converter

import (
"errors"
"fmt"

commonpb "go.temporal.io/api/common/v1"
systemnexus "go.temporal.io/api/workflowservice/v1/workflowservicenexus/json"
)

// SystemNexusDataConverter converts annotated generated system Nexus envelopes
// through their backing proto messages. It is intentionally narrow and only
// supports annotated generated system Nexus request/response types.
type SystemNexusDataConverter struct {
protoJSON *ProtoJSONPayloadConverter
}

// NewSystemNexusDataConverter returns a data converter for annotated generated
// system Nexus outer envelopes.
func NewSystemNexusDataConverter() DataConverter {
return &SystemNexusDataConverter{
protoJSON: NewProtoJSONPayloadConverter(),
}
}

func (c *SystemNexusDataConverter) ToPayload(value interface{}) (*commonpb.Payload, error) {
if value == nil || systemnexus.GetTemporalNexusProtoMessage(value) == nil {
return nil, errors.New("system nexus data converter only supports annotated generated types")
}
message, err := systemnexus.ToTemporalNexusProto(value)
if err != nil {
return nil, err
}
return c.protoJSON.ToPayload(message)
}

func (c *SystemNexusDataConverter) FromPayload(payload *commonpb.Payload, valuePtr interface{}) error {
if payload == nil {
return nil
}
if valuePtr == nil {
return errors.New("system nexus data converter requires a destination value")
}
message := systemnexus.GetTemporalNexusProtoMessage(valuePtr)
if message == nil {
return errors.New("system nexus data converter only supports annotated generated types")
}
if err := c.protoJSON.FromPayload(payload, message); err != nil {
return err
}
return systemnexus.FromTemporalNexusProto(message, valuePtr)
}

func (c *SystemNexusDataConverter) ToPayloads(values ...interface{}) (*commonpb.Payloads, error) {
if len(values) == 0 {
return nil, nil
}

result := &commonpb.Payloads{}
for i, value := range values {
rawValue, ok := value.(RawValue)
if ok {
result.Payloads = append(result.Payloads, rawValue.Payload())
continue
}
payload, err := c.ToPayload(value)
if err != nil {
return nil, fmt.Errorf("values[%d]: %w", i, err)
}
result.Payloads = append(result.Payloads, payload)
}
return result, nil
}

func (c *SystemNexusDataConverter) FromPayloads(payloads *commonpb.Payloads, valuePtrs ...interface{}) error {
if payloads == nil {
return nil
}
for i, payload := range payloads.GetPayloads() {
if i >= len(valuePtrs) {
break
}
rawValue, ok := valuePtrs[i].(*RawValue)
if ok {
*rawValue = NewRawValue(payload)
continue
}
if err := c.FromPayload(payload, valuePtrs[i]); err != nil {
return fmt.Errorf("payload item %d: %w", i, err)
}
}
return nil
}

func (c *SystemNexusDataConverter) ToString(input *commonpb.Payload) string {
return c.protoJSON.ToString(input)
}

func (c *SystemNexusDataConverter) ToStrings(input *commonpb.Payloads) []string {
if input == nil {
return nil
}
result := make([]string, len(input.Payloads))
for i, payload := range input.Payloads {
result[i] = c.ToString(payload)
}
return result
}

var _ DataConverter = (*SystemNexusDataConverter)(nil)
83 changes: 83 additions & 0 deletions converter/system_nexus_data_converter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package converter

import (
"testing"

"github.com/stretchr/testify/require"
commonpb "go.temporal.io/api/common/v1"
workflowservicev1 "go.temporal.io/api/workflowservice/v1"
systemnexus "go.temporal.io/api/workflowservice/v1/workflowservicenexus/json"
"google.golang.org/protobuf/proto"
)

func TestSystemNexusDataConverter(t *testing.T) {
t.Parallel()

dc := NewSystemNexusDataConverter()
value := systemnexus.SignalWithStartWorkflowExecutionRequest{
Namespace: "default",
WorkflowID: "workflow-id",
SignalName: "signal-name",
Input: &systemnexus.Payloads{
Payloads: []systemnexus.Payload{{
Data: "d29ya2Zsb3ctaW5wdXQ=",
Metadata: map[string]string{
"encoding": "anNvbi9wbGFpbg==",
},
}},
},
}

payload, err := dc.ToPayload(value)
require.NoError(t, err)
require.Equal(t, []byte(MetadataEncodingProtoJSON), payload.Metadata[MetadataEncoding])
require.Equal(
t,
[]byte((&workflowservicev1.SignalWithStartWorkflowExecutionRequest{}).ProtoReflect().Descriptor().FullName()),
payload.Metadata[MetadataMessageType],
)

var decoded systemnexus.SignalWithStartWorkflowExecutionRequest
require.NoError(t, dc.FromPayload(payload, &decoded))
require.Equal(t, value, decoded)

protoMessage, err := systemnexus.ToTemporalNexusProto(value)
require.NoError(t, err)
decodedProto := &workflowservicev1.SignalWithStartWorkflowExecutionRequest{}
require.NoError(t, GetDefaultDataConverter().FromPayload(payload, decodedProto))
require.True(t, proto.Equal(protoMessage, decodedProto))
}

func TestSystemNexusDataConverterToPayloads(t *testing.T) {
t.Parallel()

dc := NewSystemNexusDataConverter()
payloads, err := dc.ToPayloads(
systemnexus.SignalWithStartWorkflowExecutionResponse{RunID: "run-id"},
)
require.NoError(t, err)
require.Len(t, payloads.Payloads, 1)
require.Equal(t, []byte(MetadataEncodingProtoJSON), payloads.Payloads[0].Metadata[MetadataEncoding])

var response systemnexus.SignalWithStartWorkflowExecutionResponse
require.NoError(t, dc.FromPayloads(&commonpb.Payloads{Payloads: payloads.Payloads}, &response))
require.Equal(t, "run-id", response.RunID)
}

func TestSystemNexusDataConverterRejectsUnannotatedTypes(t *testing.T) {
t.Parallel()

dc := NewSystemNexusDataConverter()

_, err := dc.ToPayload("plain-value")
require.Error(t, err)
require.ErrorContains(t, err, "annotated generated types")

payload, err := GetDefaultDataConverter().ToPayload("plain-value")
require.NoError(t, err)

var plain string
err = dc.FromPayload(payload, &plain)
require.Error(t, err)
require.ErrorContains(t, err, "annotated generated types")
}
7 changes: 4 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
module go.temporal.io/sdk

go 1.23.0

toolchain go1.23.6
go 1.25.4

require (
github.com/facebookgo/clock v0.0.0-20150410010913-600d898af40a
Expand All @@ -25,6 +23,7 @@ require (
require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0 // indirect
github.com/nexus-rpc/nexus-proto-annotations v0.0.0-20260330194009-e558d6edaf84 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/stretchr/objx v0.5.2 // indirect
golang.org/x/net v0.39.0 // indirect
Expand All @@ -33,3 +32,5 @@ require (
google.golang.org/genproto/googleapis/rpc v0.0.0-20240827150818-7e3bb234dfed // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

replace go.temporal.io/api => ../temporal-api-go
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/nexus-rpc/nexus-proto-annotations v0.0.0-20260330194009-e558d6edaf84 h1:SWHt3Coj0VvF0Km1A0wlY+IjnHKsjQLgO29io84r3wY=
github.com/nexus-rpc/nexus-proto-annotations v0.0.0-20260330194009-e558d6edaf84/go.mod h1:n3UjF1bPCW8llR8tHvbxJ+27yPWrhpo8w/Yg1IOuY0Y=
github.com/nexus-rpc/sdk-go v0.6.0 h1:QRgnP2zTbxEbiyWG/aXH8uSC5LV/Mg1fqb19jb4DBlo=
github.com/nexus-rpc/sdk-go v0.6.0/go.mod h1:FHdPfVQwRuJFZFTF0Y2GOAxCrbIBNrcPna9slkGKPYk=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
Expand All @@ -35,8 +37,6 @@ github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
go.temporal.io/api v1.62.5 h1:9R/9CeyM7xqHSlsNt+QIvapQLcRxCZ38bnXQx4mCN6I=
go.temporal.io/api v1.62.5/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
Expand Down
22 changes: 22 additions & 0 deletions interceptor/tracing_interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -711,6 +711,28 @@ func (t *tracingWorkflowOutboundInterceptor) SignalExternalWorkflow(
return t.Next.SignalExternalWorkflow(ctx, workflowID, runID, signalName, arg)
}

func (t *tracingWorkflowOutboundInterceptor) SignalWithStartWorkflow(
ctx workflow.Context,
workflowID string,
signalName string,
signalArg interface{},
options workflow.StartWorkflowOptions,
workflowType string,
workflowArgs ...interface{},
) workflow.Future {
if !t.root.options.DisableSignalTracing {
var span TracerSpan
var futErr workflow.Future
span, ctx, futErr = t.startNonReplaySpan(ctx, "SignalWithStartWorkflow", signalName, false, t.root.workflowHeaderWriter(ctx))
if futErr != nil {
return futErr
}
defer span.Finish(&TracerFinishSpanOptions{})
}

return t.Next.SignalWithStartWorkflow(ctx, workflowID, signalName, signalArg, options, workflowType, workflowArgs...)
}

func (t *tracingWorkflowOutboundInterceptor) SignalChildWorkflow(
ctx workflow.Context,
workflowID string,
Expand Down
12 changes: 12 additions & 0 deletions internal/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,18 @@ type WorkflowOutboundInterceptor interface {
// interceptor.WorkflowHeader will return a non-nil map for this context.
SignalExternalWorkflow(ctx Context, workflowID, runID, signalName string, arg interface{}) Future

// SignalWithStartWorkflow intercepts workflow.SignalWithStartWorkflow.
// interceptor.WorkflowHeader will return a non-nil map for this context.
SignalWithStartWorkflow(
ctx Context,
workflowID string,
signalName string,
signalArg interface{},
options StartWorkflowOptions,
workflowType string,
workflowArgs ...interface{},
) Future

// SignalChildWorkflow intercepts
// workflow.ChildWorkflowFuture.SignalChildWorkflow.
// interceptor.WorkflowHeader will return a non-nil map for this context.
Expand Down
14 changes: 14 additions & 0 deletions internal/interceptor_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,20 @@ func (w *WorkflowOutboundInterceptorBase) SignalExternalWorkflow(
return w.Next.SignalExternalWorkflow(ctx, workflowID, runID, signalName, arg)
}

// SignalWithStartWorkflow implements
// WorkflowOutboundInterceptor.SignalWithStartWorkflow.
func (w *WorkflowOutboundInterceptorBase) SignalWithStartWorkflow(
ctx Context,
workflowID string,
signalName string,
signalArg interface{},
options StartWorkflowOptions,
workflowType string,
workflowArgs ...interface{},
) Future {
return w.Next.SignalWithStartWorkflow(ctx, workflowID, signalName, signalArg, options, workflowType, workflowArgs...)
}

// SignalChildWorkflow implements
// WorkflowOutboundInterceptor.SignalChildWorkflow.
func (w *WorkflowOutboundInterceptorBase) SignalChildWorkflow(
Expand Down
15 changes: 15 additions & 0 deletions internal/interceptortest/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,21 @@ func (p *proxyWorkflowOutbound) SignalExternalWorkflow(
return
}

func (p *proxyWorkflowOutbound) SignalWithStartWorkflow(
ctx workflow.Context,
workflowID string,
signalName string,
signalArg interface{},
options workflow.StartWorkflowOptions,
workflowType string,
workflowArgs ...interface{},
) (ret workflow.Future) {
args := []interface{}{ctx, workflowID, signalName, signalArg, options, workflowType}
args = append(args, workflowArgs...)
ret, _ = p.invoke(args...)[0].Interface().(workflow.Future)
return
}

func (p *proxyWorkflowOutbound) UpsertSearchAttributes(
ctx workflow.Context,
attributes map[string]interface{},
Expand Down
14 changes: 13 additions & 1 deletion internal/internal_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
commonpb "go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"
"go.temporal.io/api/sdk/v1"
systemnexus "go.temporal.io/api/workflowservice/v1/workflowservicenexus/json"

"go.temporal.io/sdk/converter"
"go.temporal.io/sdk/internal/common/metrics"
Expand Down Expand Up @@ -1727,6 +1728,10 @@ func (w *WorkflowOptions) getRunningUpdateHandles() map[string]UpdateInfo {
}

func (d *decodeFutureImpl) Get(ctx Context, valuePtr interface{}) error {
return d.getWithDataConverter(ctx, valuePtr, getDataConverterFromWorkflowContext(ctx))
}

func (d *decodeFutureImpl) getWithDataConverter(ctx Context, valuePtr interface{}, dataConverter converter.DataConverter) error {
more := d.futureImpl.channel.Receive(ctx, nil)
if more {
panic("not closed")
Expand All @@ -1741,14 +1746,21 @@ func (d *decodeFutureImpl) Get(ctx Context, valuePtr interface{}) error {
if rf.Type().Kind() != reflect.Ptr {
return errors.New("valuePtr parameter is not a pointer")
}
dataConverter := getDataConverterFromWorkflowContext(ctx)
err := dataConverter.FromPayloads(d.futureImpl.value.(*commonpb.Payloads), valuePtr)
if err != nil {
return err
}
return d.futureImpl.err
}

func (n *nexusOperationFutureImpl) Get(ctx Context, valuePtr interface{}) error {
dataConverter := getDataConverterFromWorkflowContext(ctx)
if systemnexus.GetTemporalNexusProtoMessage(valuePtr) != nil {
dataConverter = converter.NewSystemNexusDataConverter()
}
return n.decodeFutureImpl.getWithDataConverter(ctx, valuePtr, dataConverter)
}

// newDecodeFuture creates a new future as well as associated Settable that is used to set its value.
// fn - the decoded value needs to be validated against a function.
func newDecodeFuture(ctx Context, fn interface{}) (Future, Settable) {
Expand Down
Loading