diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 912fc80ee..94e6cc221 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -77,6 +77,7 @@ jobs: working-directory: ./internal/cmd/build env: WORKFLOW_CACHE_SIZE: "0" + DISABLE_STANDALONE_NEXUS_TESTS: "1" integration-test-with-cache: strategy: @@ -108,6 +109,8 @@ jobs: - name: Integration tests (with cache) run: go run . integration-test -dev-server working-directory: ./internal/cmd/build + env: + DISABLE_STANDALONE_NEXUS_TESTS: "1" docker-compose-test: runs-on: ubuntu-latest @@ -146,6 +149,7 @@ jobs: DISABLE_SERVER_1_27_TESTS: "1" DISABLE_PRIORITY_TESTS: "1" DISABLE_STANDALONE_ACTIVITY_TESTS: "1" + DISABLE_STANDALONE_NEXUS_TESTS: "1" DISABLE_NEXUS_CALLER_TIMEOUT_TESTS: "1" DISABLE_NEW_NEXUS_ERROR_FORMAT_TESTS: "1" working-directory: ./internal/cmd/build diff --git a/client/client.go b/client/client.go index a4a57ed74..06e1a0c66 100644 --- a/client/client.go +++ b/client/client.go @@ -985,6 +985,92 @@ type ( // NOTE: Experimental TerminateActivityOptions = internal.ClientTerminateActivityOptions + // StartNexusOperationOptions contains configuration parameters for starting a Nexus operation execution. + // + // NOTE: Experimental + StartNexusOperationOptions = internal.ClientStartNexusOperationOptions + + // NexusClientOptions contains options for creating a NexusClient. + // + // NOTE: Experimental + NexusClientOptions = internal.ClientNexusClientOptions + + // NexusClient is the client for starting Nexus operations bound to a specific endpoint and service. + // This is for standalone Nexus operations outside of workflow context. + // For Nexus operations within workflows, use workflow.NexusClient. + // + // NOTE: Experimental + NexusClient = internal.ClientNexusClient + + // NexusOperationHandle represents a running or completed standalone Nexus operation execution. + // It can be used to get the result, describe, cancel, or terminate the operation. + // + // NOTE: Experimental + NexusOperationHandle = internal.ClientNexusOperationHandle + + // NexusOperationMetadata contains information about a Nexus operation execution. + // This is returned by ListNexusOperations and embedded in NexusOperationExecutionDescription. + // + // NOTE: Experimental + NexusOperationMetadata = internal.ClientNexusOperationMetadata + + // NexusOperationExecutionDescription contains detailed information about a Nexus operation execution. + // This is returned by NexusOperationHandle.Describe. + // + // NOTE: Experimental + NexusOperationExecutionDescription = internal.ClientNexusOperationExecutionDescription + + // NexusOperationCancellationInfo contains cancellation information for a Nexus operation. + // + // NOTE: Experimental + NexusOperationCancellationInfo = internal.ClientNexusOperationCancellationInfo + + // DescribeNexusOperationOptions contains options for NexusOperationHandle.Describe call. + // + // NOTE: Experimental + DescribeNexusOperationOptions = internal.ClientDescribeNexusOperationOptions + + // CancelNexusOperationOptions contains options for NexusOperationHandle.Cancel call. + // + // NOTE: Experimental + CancelNexusOperationOptions = internal.ClientCancelNexusOperationOptions + + // TerminateNexusOperationOptions contains options for NexusOperationHandle.Terminate call. + // + // NOTE: Experimental + TerminateNexusOperationOptions = internal.ClientTerminateNexusOperationOptions + + // ListNexusOperationsOptions contains input for ListNexusOperations call. + // + // NOTE: Experimental + ListNexusOperationsOptions = internal.ClientListNexusOperationsOptions + + // CountNexusOperationsOptions contains input for CountNexusOperations call. + // + // NOTE: Experimental + CountNexusOperationsOptions = internal.ClientCountNexusOperationsOptions + + // CountNexusOperationsResult contains the result of the CountNexusOperations call. + // + // NOTE: Experimental + CountNexusOperationsResult = internal.ClientCountNexusOperationsResult + + // CountNexusOperationsAggregationGroup contains groups of Nexus operations if + // CountNexusOperationExecutions is grouped by a field. + // + // NOTE: Experimental + CountNexusOperationsAggregationGroup = internal.ClientCountNexusOperationsAggregationGroup + + // ListNexusOperationsResult contains the result of the ListNexusOperations call. + // + // NOTE: Experimental + ListNexusOperationsResult = internal.ClientListNexusOperationsResult + + // GetNexusOperationHandleOptions contains input for GetNexusOperationHandle call. + // + // NOTE: Experimental + GetNexusOperationHandleOptions = internal.ClientGetNexusOperationHandleOptions + // Client is the client for starting and getting information about a workflow executions as well as // completing activities asynchronously. Client interface { @@ -1499,6 +1585,30 @@ type ( // NOTE: Experimental CountActivities(ctx context.Context, options CountActivitiesOptions) (*CountActivitiesResult, error) + // NewNexusClient creates a new Nexus client bound to the given endpoint and service. + // This is for standalone Nexus operations outside of workflow context. + // For Nexus operations within workflows, use workflow.NexusClient instead. + // + // NOTE: Experimental + NewNexusClient(options NexusClientOptions) (NexusClient, error) + + // GetNexusOperationHandle creates a handle to the referenced Nexus operation. + // No network call is made. The handle can be used to poll, describe, cancel, or terminate. + // + // NOTE: Experimental + GetNexusOperationHandle(options GetNexusOperationHandleOptions) NexusOperationHandle + + // ListNexusOperations lists Nexus operation executions based on query. + // Currently, all errors are returned in the iterator and not the base level error. + // + // NOTE: Experimental + ListNexusOperations(ctx context.Context, options ListNexusOperationsOptions) (ListNexusOperationsResult, error) + + // CountNexusOperations counts Nexus operation executions based on query. + // + // NOTE: Experimental + CountNexusOperations(ctx context.Context, options CountNexusOperationsOptions) (*CountNexusOperationsResult, error) + // WorkflowService provides access to the underlying gRPC service. This should only be used for advanced use cases // that cannot be accomplished via other Client methods. Unlike calls to other Client methods, calls directly to the // service are not configured with internal semantics such as automatic retries. diff --git a/contrib/aws/lambdaworker/go.mod b/contrib/aws/lambdaworker/go.mod index 08309dc18..8ece78efc 100644 --- a/contrib/aws/lambdaworker/go.mod +++ b/contrib/aws/lambdaworker/go.mod @@ -22,7 +22,7 @@ require ( github.com/pmezard/go-difflib v1.0.0 // indirect github.com/robfig/cron v1.2.0 // indirect github.com/stretchr/objx v0.5.2 // indirect - go.temporal.io/api v1.62.7 // indirect + go.temporal.io/api v1.62.7-0.20260406221216-94affc798a96 // indirect golang.org/x/net v0.49.0 // indirect golang.org/x/sync v0.19.0 // indirect golang.org/x/sys v0.40.0 // indirect diff --git a/contrib/aws/lambdaworker/go.sum b/contrib/aws/lambdaworker/go.sum index f4f460614..6ccf7a526 100644 --- a/contrib/aws/lambdaworker/go.sum +++ b/contrib/aws/lambdaworker/go.sum @@ -59,8 +59,8 @@ go.opentelemetry.io/otel/sdk/metric v1.39.0 h1:cXMVVFVgsIf2YL6QkRF4Urbr/aMInf+2W go.opentelemetry.io/otel/sdk/metric v1.39.0/go.mod h1:xq9HEVH7qeX69/JnwEfp6fVq5wosJsY1mt4lLfYdVew= go.opentelemetry.io/otel/trace v1.39.0 h1:2d2vfpEDmCJ5zVYz7ijaJdOF59xLomrvj7bjt6/qCJI= go.opentelemetry.io/otel/trace v1.39.0/go.mod h1:88w4/PnZSazkGzz/w84VHpQafiU4EtqqlVdxWy+rNOA= -go.temporal.io/api v1.62.7 h1:joCtF30Dr+ynzrFJySewZsWbyf4AETZpuizHhFIyj/o= -go.temporal.io/api v1.62.7/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= +go.temporal.io/api v1.62.7-0.20260406221216-94affc798a96 h1:o1LvmIVZuroFdl2jNp9adY0cvbzItM3PxbZwrTAvvVc= +go.temporal.io/api v1.62.7-0.20260406221216-94affc798a96/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= diff --git a/contrib/aws/lambdaworker/otel/go.mod b/contrib/aws/lambdaworker/otel/go.mod index 4fc834740..2674e0df6 100644 --- a/contrib/aws/lambdaworker/otel/go.mod +++ b/contrib/aws/lambdaworker/otel/go.mod @@ -35,7 +35,7 @@ require ( go.opentelemetry.io/otel/metric v1.42.0 // indirect go.opentelemetry.io/otel/trace v1.42.0 // indirect go.opentelemetry.io/proto/otlp v1.9.0 // indirect - go.temporal.io/api v1.62.7 // indirect + go.temporal.io/api v1.62.7-0.20260406221216-94affc798a96 // indirect golang.org/x/net v0.51.0 // indirect golang.org/x/sync v0.19.0 // indirect golang.org/x/sys v0.41.0 // indirect diff --git a/contrib/aws/lambdaworker/otel/go.sum b/contrib/aws/lambdaworker/otel/go.sum index 1248cddf5..c00ba17c2 100644 --- a/contrib/aws/lambdaworker/otel/go.sum +++ b/contrib/aws/lambdaworker/otel/go.sum @@ -68,8 +68,8 @@ go.opentelemetry.io/otel/trace v1.42.0 h1:OUCgIPt+mzOnaUTpOQcBiM/PLQ/Op7oq6g4Len go.opentelemetry.io/otel/trace v1.42.0/go.mod h1:f3K9S+IFqnumBkKhRJMeaZeNk9epyhnCmQh/EysQCdc= go.opentelemetry.io/proto/otlp v1.9.0 h1:l706jCMITVouPOqEnii2fIAuO3IVGBRPV5ICjceRb/A= go.opentelemetry.io/proto/otlp v1.9.0/go.mod h1:xE+Cx5E/eEHw+ISFkwPLwCZefwVjY+pqKg1qcK03+/4= -go.temporal.io/api v1.62.7 h1:joCtF30Dr+ynzrFJySewZsWbyf4AETZpuizHhFIyj/o= -go.temporal.io/api v1.62.7/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= +go.temporal.io/api v1.62.7-0.20260406221216-94affc798a96 h1:o1LvmIVZuroFdl2jNp9adY0cvbzItM3PxbZwrTAvvVc= +go.temporal.io/api v1.62.7-0.20260406221216-94affc798a96/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= diff --git a/contrib/aws/s3driver/awssdkv2/go.mod b/contrib/aws/s3driver/awssdkv2/go.mod index 7c13915b6..9b3bebbab 100644 --- a/contrib/aws/s3driver/awssdkv2/go.mod +++ b/contrib/aws/s3driver/awssdkv2/go.mod @@ -8,7 +8,7 @@ require ( github.com/aws/smithy-go v1.22.4 github.com/johannesboyne/gofakes3 v0.0.0-20260208201424-4c385a1f6a73 github.com/stretchr/testify v1.10.0 - go.temporal.io/api v1.62.7 + go.temporal.io/api v1.62.7-0.20260406221216-94affc798a96 go.temporal.io/sdk v1.25.1 go.temporal.io/sdk/contrib/aws/s3driver v0.0.0 ) diff --git a/contrib/aws/s3driver/awssdkv2/go.sum b/contrib/aws/s3driver/awssdkv2/go.sum index 6b8d3f36d..694a4886b 100644 --- a/contrib/aws/s3driver/awssdkv2/go.sum +++ b/contrib/aws/s3driver/awssdkv2/go.sum @@ -80,8 +80,8 @@ go.opentelemetry.io/otel/trace v1.39.0 h1:2d2vfpEDmCJ5zVYz7ijaJdOF59xLomrvj7bjt6 go.opentelemetry.io/otel/trace v1.39.0/go.mod h1:88w4/PnZSazkGzz/w84VHpQafiU4EtqqlVdxWy+rNOA= go.shabbyrobe.org/gocovmerge v0.0.0-20230507111327-fa4f82cfbf4d h1:Ns9kd1Rwzw7t0BR8XMphenji4SmIoNZPn8zhYmaVKP8= go.shabbyrobe.org/gocovmerge v0.0.0-20230507111327-fa4f82cfbf4d/go.mod h1:92Uoe3l++MlthCm+koNi0tcUCX3anayogF0Pa/sp24k= -go.temporal.io/api v1.62.7 h1:joCtF30Dr+ynzrFJySewZsWbyf4AETZpuizHhFIyj/o= -go.temporal.io/api v1.62.7/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= +go.temporal.io/api v1.62.7-0.20260406221216-94affc798a96 h1:o1LvmIVZuroFdl2jNp9adY0cvbzItM3PxbZwrTAvvVc= +go.temporal.io/api v1.62.7-0.20260406221216-94affc798a96/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= diff --git a/contrib/aws/s3driver/go.mod b/contrib/aws/s3driver/go.mod index 50f80a7b8..fe082085a 100644 --- a/contrib/aws/s3driver/go.mod +++ b/contrib/aws/s3driver/go.mod @@ -4,7 +4,7 @@ go 1.24.0 require ( github.com/stretchr/testify v1.10.0 - go.temporal.io/api v1.62.7 + go.temporal.io/api v1.62.7-0.20260406221216-94affc798a96 go.temporal.io/sdk v1.25.1 golang.org/x/sync v0.19.0 google.golang.org/protobuf v1.36.11 diff --git a/contrib/aws/s3driver/go.sum b/contrib/aws/s3driver/go.sum index 5b22527db..54fc9095d 100644 --- a/contrib/aws/s3driver/go.sum +++ b/contrib/aws/s3driver/go.sum @@ -42,8 +42,8 @@ go.opentelemetry.io/otel/sdk/metric v1.39.0 h1:cXMVVFVgsIf2YL6QkRF4Urbr/aMInf+2W go.opentelemetry.io/otel/sdk/metric v1.39.0/go.mod h1:xq9HEVH7qeX69/JnwEfp6fVq5wosJsY1mt4lLfYdVew= go.opentelemetry.io/otel/trace v1.39.0 h1:2d2vfpEDmCJ5zVYz7ijaJdOF59xLomrvj7bjt6/qCJI= go.opentelemetry.io/otel/trace v1.39.0/go.mod h1:88w4/PnZSazkGzz/w84VHpQafiU4EtqqlVdxWy+rNOA= -go.temporal.io/api v1.62.7 h1:joCtF30Dr+ynzrFJySewZsWbyf4AETZpuizHhFIyj/o= -go.temporal.io/api v1.62.7/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= +go.temporal.io/api v1.62.7-0.20260406221216-94affc798a96 h1:o1LvmIVZuroFdl2jNp9adY0cvbzItM3PxbZwrTAvvVc= +go.temporal.io/api v1.62.7-0.20260406221216-94affc798a96/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= diff --git a/contrib/datadog/go.mod b/contrib/datadog/go.mod index 7aa6f69bf..574019dd9 100644 --- a/contrib/datadog/go.mod +++ b/contrib/datadog/go.mod @@ -81,7 +81,7 @@ require ( go.opentelemetry.io/otel/metric v1.40.0 // indirect go.opentelemetry.io/otel/sdk v1.40.0 // indirect go.opentelemetry.io/otel/trace v1.40.0 // indirect - go.temporal.io/api v1.62.7 // indirect + go.temporal.io/api v1.62.7-0.20260406221216-94affc798a96 // indirect go.uber.org/atomic v1.11.0 // indirect go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.27.0 // indirect diff --git a/contrib/datadog/go.sum b/contrib/datadog/go.sum index f947e6eec..ddc3c4684 100644 --- a/contrib/datadog/go.sum +++ b/contrib/datadog/go.sum @@ -233,8 +233,8 @@ go.opentelemetry.io/proto/slim/otlp/collector/profiles/v1development v0.0.1 h1:T go.opentelemetry.io/proto/slim/otlp/collector/profiles/v1development v0.0.1/go.mod h1:riqUmAOJFDFuIAzZu/3V6cOrTyfWzpgNJnG5UwrapCk= go.opentelemetry.io/proto/slim/otlp/profiles/v1development v0.0.1 h1:z/oMlrCv3Kopwh/dtdRagJy+qsRRPA86/Ux3g7+zFXM= go.opentelemetry.io/proto/slim/otlp/profiles/v1development v0.0.1/go.mod h1:C7EHYSIiaALi9RnNORCVaPCQDuJgJEn/XxkctaTez1E= -go.temporal.io/api v1.62.7 h1:joCtF30Dr+ynzrFJySewZsWbyf4AETZpuizHhFIyj/o= -go.temporal.io/api v1.62.7/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= +go.temporal.io/api v1.62.7-0.20260406221216-94affc798a96 h1:o1LvmIVZuroFdl2jNp9adY0cvbzItM3PxbZwrTAvvVc= +go.temporal.io/api v1.62.7-0.20260406221216-94affc798a96/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= diff --git a/contrib/envconfig/go.mod b/contrib/envconfig/go.mod index 8406a6a90..85d97ca85 100644 --- a/contrib/envconfig/go.mod +++ b/contrib/envconfig/go.mod @@ -20,7 +20,7 @@ require ( github.com/pmezard/go-difflib v1.0.0 // indirect github.com/robfig/cron v1.2.0 // indirect github.com/stretchr/objx v0.5.2 // indirect - go.temporal.io/api v1.62.7 // indirect + go.temporal.io/api v1.62.7-0.20260406221216-94affc798a96 // indirect golang.org/x/net v0.49.0 // indirect golang.org/x/sync v0.19.0 // indirect golang.org/x/sys v0.40.0 // indirect diff --git a/contrib/envconfig/go.sum b/contrib/envconfig/go.sum index 2449f41cc..635c49069 100644 --- a/contrib/envconfig/go.sum +++ b/contrib/envconfig/go.sum @@ -57,8 +57,8 @@ go.opentelemetry.io/otel/sdk/metric v1.39.0 h1:cXMVVFVgsIf2YL6QkRF4Urbr/aMInf+2W go.opentelemetry.io/otel/sdk/metric v1.39.0/go.mod h1:xq9HEVH7qeX69/JnwEfp6fVq5wosJsY1mt4lLfYdVew= go.opentelemetry.io/otel/trace v1.39.0 h1:2d2vfpEDmCJ5zVYz7ijaJdOF59xLomrvj7bjt6/qCJI= go.opentelemetry.io/otel/trace v1.39.0/go.mod h1:88w4/PnZSazkGzz/w84VHpQafiU4EtqqlVdxWy+rNOA= -go.temporal.io/api v1.62.7 h1:joCtF30Dr+ynzrFJySewZsWbyf4AETZpuizHhFIyj/o= -go.temporal.io/api v1.62.7/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= +go.temporal.io/api v1.62.7-0.20260406221216-94affc798a96 h1:o1LvmIVZuroFdl2jNp9adY0cvbzItM3PxbZwrTAvvVc= +go.temporal.io/api v1.62.7-0.20260406221216-94affc798a96/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= diff --git a/contrib/opentelemetry/go.mod b/contrib/opentelemetry/go.mod index d73004604..1889bbb43 100644 --- a/contrib/opentelemetry/go.mod +++ b/contrib/opentelemetry/go.mod @@ -32,7 +32,7 @@ require ( github.com/stretchr/objx v0.5.2 // indirect go.opentelemetry.io/otel/metric v1.40.0 go.opentelemetry.io/otel/sdk/metric v1.40.0 - go.temporal.io/api v1.62.7 // indirect + go.temporal.io/api v1.62.7-0.20260406221216-94affc798a96 // indirect golang.org/x/net v0.49.0 // indirect golang.org/x/sys v0.40.0 // indirect golang.org/x/text v0.33.0 // indirect diff --git a/contrib/opentelemetry/go.sum b/contrib/opentelemetry/go.sum index 1e5b392ed..e8522cd4a 100644 --- a/contrib/opentelemetry/go.sum +++ b/contrib/opentelemetry/go.sum @@ -56,8 +56,8 @@ go.opentelemetry.io/otel/sdk/metric v1.40.0 h1:mtmdVqgQkeRxHgRv4qhyJduP3fYJRMX4A go.opentelemetry.io/otel/sdk/metric v1.40.0/go.mod h1:4Z2bGMf0KSK3uRjlczMOeMhKU2rhUqdWNoKcYrtcBPg= go.opentelemetry.io/otel/trace v1.40.0 h1:WA4etStDttCSYuhwvEa8OP8I5EWu24lkOzp+ZYblVjw= go.opentelemetry.io/otel/trace v1.40.0/go.mod h1:zeAhriXecNGP/s2SEG3+Y8X9ujcJOTqQ5RgdEJcawiA= -go.temporal.io/api v1.62.7 h1:joCtF30Dr+ynzrFJySewZsWbyf4AETZpuizHhFIyj/o= -go.temporal.io/api v1.62.7/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= +go.temporal.io/api v1.62.7-0.20260406221216-94affc798a96 h1:o1LvmIVZuroFdl2jNp9adY0cvbzItM3PxbZwrTAvvVc= +go.temporal.io/api v1.62.7-0.20260406221216-94affc798a96/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= diff --git a/contrib/opentracing/go.mod b/contrib/opentracing/go.mod index 4dc61aa05..cd472e836 100644 --- a/contrib/opentracing/go.mod +++ b/contrib/opentracing/go.mod @@ -20,7 +20,7 @@ require ( github.com/pmezard/go-difflib v1.0.0 // indirect github.com/robfig/cron v1.2.0 // indirect github.com/stretchr/objx v0.5.2 // indirect - go.temporal.io/api v1.62.7 // indirect + go.temporal.io/api v1.62.7-0.20260406221216-94affc798a96 // indirect golang.org/x/net v0.49.0 // indirect golang.org/x/sync v0.19.0 // indirect golang.org/x/sys v0.40.0 // indirect diff --git a/contrib/opentracing/go.sum b/contrib/opentracing/go.sum index 9bf54514b..a28a2b4b1 100644 --- a/contrib/opentracing/go.sum +++ b/contrib/opentracing/go.sum @@ -60,8 +60,8 @@ go.opentelemetry.io/otel/sdk/metric v1.39.0 h1:cXMVVFVgsIf2YL6QkRF4Urbr/aMInf+2W go.opentelemetry.io/otel/sdk/metric v1.39.0/go.mod h1:xq9HEVH7qeX69/JnwEfp6fVq5wosJsY1mt4lLfYdVew= go.opentelemetry.io/otel/trace v1.39.0 h1:2d2vfpEDmCJ5zVYz7ijaJdOF59xLomrvj7bjt6/qCJI= go.opentelemetry.io/otel/trace v1.39.0/go.mod h1:88w4/PnZSazkGzz/w84VHpQafiU4EtqqlVdxWy+rNOA= -go.temporal.io/api v1.62.7 h1:joCtF30Dr+ynzrFJySewZsWbyf4AETZpuizHhFIyj/o= -go.temporal.io/api v1.62.7/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= +go.temporal.io/api v1.62.7-0.20260406221216-94affc798a96 h1:o1LvmIVZuroFdl2jNp9adY0cvbzItM3PxbZwrTAvvVc= +go.temporal.io/api v1.62.7-0.20260406221216-94affc798a96/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= diff --git a/contrib/sysinfo/go.mod b/contrib/sysinfo/go.mod index add397a54..9246c036a 100644 --- a/contrib/sysinfo/go.mod +++ b/contrib/sysinfo/go.mod @@ -33,7 +33,7 @@ require ( github.com/tklauser/go-sysconf v0.3.12 // indirect github.com/tklauser/numcpus v0.6.1 // indirect github.com/yusufpapurcu/wmi v1.2.4 // indirect - go.temporal.io/api v1.62.7 // indirect + go.temporal.io/api v1.62.7-0.20260406221216-94affc798a96 // indirect golang.org/x/exp v0.0.0-20240325151524-a685a6edb6d8 // indirect golang.org/x/net v0.49.0 // indirect golang.org/x/sync v0.19.0 // indirect diff --git a/contrib/sysinfo/go.sum b/contrib/sysinfo/go.sum index 395d61b7a..b2d87d6fe 100644 --- a/contrib/sysinfo/go.sum +++ b/contrib/sysinfo/go.sum @@ -91,8 +91,8 @@ go.opentelemetry.io/otel/sdk/metric v1.39.0 h1:cXMVVFVgsIf2YL6QkRF4Urbr/aMInf+2W go.opentelemetry.io/otel/sdk/metric v1.39.0/go.mod h1:xq9HEVH7qeX69/JnwEfp6fVq5wosJsY1mt4lLfYdVew= go.opentelemetry.io/otel/trace v1.39.0 h1:2d2vfpEDmCJ5zVYz7ijaJdOF59xLomrvj7bjt6/qCJI= go.opentelemetry.io/otel/trace v1.39.0/go.mod h1:88w4/PnZSazkGzz/w84VHpQafiU4EtqqlVdxWy+rNOA= -go.temporal.io/api v1.62.7 h1:joCtF30Dr+ynzrFJySewZsWbyf4AETZpuizHhFIyj/o= -go.temporal.io/api v1.62.7/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= +go.temporal.io/api v1.62.7-0.20260406221216-94affc798a96 h1:o1LvmIVZuroFdl2jNp9adY0cvbzItM3PxbZwrTAvvVc= +go.temporal.io/api v1.62.7-0.20260406221216-94affc798a96/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA= go.uber.org/goleak v1.1.12/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= diff --git a/contrib/tally/go.mod b/contrib/tally/go.mod index eeff2666d..bb44982d2 100644 --- a/contrib/tally/go.mod +++ b/contrib/tally/go.mod @@ -21,7 +21,7 @@ require ( github.com/robfig/cron v1.2.0 // indirect github.com/stretchr/objx v0.5.2 // indirect github.com/twmb/murmur3 v1.1.5 // indirect - go.temporal.io/api v1.62.7 // indirect + go.temporal.io/api v1.62.7-0.20260406221216-94affc798a96 // indirect go.uber.org/atomic v1.9.0 // indirect golang.org/x/net v0.49.0 // indirect golang.org/x/sync v0.19.0 // indirect diff --git a/contrib/tally/go.sum b/contrib/tally/go.sum index 2f9917acd..8281498e7 100644 --- a/contrib/tally/go.sum +++ b/contrib/tally/go.sum @@ -139,8 +139,8 @@ go.opentelemetry.io/otel/sdk/metric v1.39.0 h1:cXMVVFVgsIf2YL6QkRF4Urbr/aMInf+2W go.opentelemetry.io/otel/sdk/metric v1.39.0/go.mod h1:xq9HEVH7qeX69/JnwEfp6fVq5wosJsY1mt4lLfYdVew= go.opentelemetry.io/otel/trace v1.39.0 h1:2d2vfpEDmCJ5zVYz7ijaJdOF59xLomrvj7bjt6/qCJI= go.opentelemetry.io/otel/trace v1.39.0/go.mod h1:88w4/PnZSazkGzz/w84VHpQafiU4EtqqlVdxWy+rNOA= -go.temporal.io/api v1.62.7 h1:joCtF30Dr+ynzrFJySewZsWbyf4AETZpuizHhFIyj/o= -go.temporal.io/api v1.62.7/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= +go.temporal.io/api v1.62.7-0.20260406221216-94affc798a96 h1:o1LvmIVZuroFdl2jNp9adY0cvbzItM3PxbZwrTAvvVc= +go.temporal.io/api v1.62.7-0.20260406221216-94affc798a96/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE= go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= diff --git a/go.mod b/go.mod index 9f2602792..31735fd1c 100644 --- a/go.mod +++ b/go.mod @@ -12,7 +12,7 @@ require ( github.com/nexus-rpc/sdk-go v0.6.0 github.com/robfig/cron v1.2.0 github.com/stretchr/testify v1.10.0 - go.temporal.io/api v1.62.7 + go.temporal.io/api v1.62.7-0.20260406221216-94affc798a96 golang.org/x/sync v0.19.0 golang.org/x/sys v0.40.0 golang.org/x/time v0.3.0 diff --git a/go.sum b/go.sum index 88250bb40..a66aa91c0 100644 --- a/go.sum +++ b/go.sum @@ -55,8 +55,8 @@ go.opentelemetry.io/otel/sdk/metric v1.39.0 h1:cXMVVFVgsIf2YL6QkRF4Urbr/aMInf+2W go.opentelemetry.io/otel/sdk/metric v1.39.0/go.mod h1:xq9HEVH7qeX69/JnwEfp6fVq5wosJsY1mt4lLfYdVew= go.opentelemetry.io/otel/trace v1.39.0 h1:2d2vfpEDmCJ5zVYz7ijaJdOF59xLomrvj7bjt6/qCJI= go.opentelemetry.io/otel/trace v1.39.0/go.mod h1:88w4/PnZSazkGzz/w84VHpQafiU4EtqqlVdxWy+rNOA= -go.temporal.io/api v1.62.7 h1:joCtF30Dr+ynzrFJySewZsWbyf4AETZpuizHhFIyj/o= -go.temporal.io/api v1.62.7/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= +go.temporal.io/api v1.62.7-0.20260406221216-94affc798a96 h1:o1LvmIVZuroFdl2jNp9adY0cvbzItM3PxbZwrTAvvVc= +go.temporal.io/api v1.62.7-0.20260406221216-94affc798a96/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= diff --git a/interceptor/interceptor.go b/interceptor/interceptor.go index 55f497f67..318aceca7 100644 --- a/interceptor/interceptor.go +++ b/interceptor/interceptor.go @@ -244,6 +244,54 @@ type ClientPollActivityResultInput = internal.ClientPollActivityResultInput // NOTE: Experimental type ClientPollActivityResultOutput = internal.ClientPollActivityResultOutput +// ClientExecuteNexusOperationInput is the input to +// ClientOutboundInterceptor.ExecuteNexusOperation. +// +// NOTE: Experimental +type ClientExecuteNexusOperationInput = internal.ClientExecuteNexusOperationInput + +// ClientGetNexusOperationHandleInput is the input to +// ClientOutboundInterceptor.GetNexusOperationHandle. +// +// NOTE: Experimental +type ClientGetNexusOperationHandleInput = internal.ClientGetNexusOperationHandleInput + +// ClientCancelNexusOperationInput is the input to +// ClientOutboundInterceptor.CancelNexusOperation. +// +// NOTE: Experimental +type ClientCancelNexusOperationInput = internal.ClientCancelNexusOperationInput + +// ClientTerminateNexusOperationInput is the input to +// ClientOutboundInterceptor.TerminateNexusOperation. +// +// NOTE: Experimental +type ClientTerminateNexusOperationInput = internal.ClientTerminateNexusOperationInput + +// ClientDescribeNexusOperationInput is the input to +// ClientOutboundInterceptor.DescribeNexusOperation. +// +// NOTE: Experimental +type ClientDescribeNexusOperationInput = internal.ClientDescribeNexusOperationInput + +// ClientDescribeNexusOperationOutput is the output of +// ClientOutboundInterceptor.DescribeNexusOperation. +// +// NOTE: Experimental +type ClientDescribeNexusOperationOutput = internal.ClientDescribeNexusOperationOutput + +// ClientPollNexusOperationResultInput is the input to +// ClientOutboundInterceptor.PollNexusOperationResult. +// +// NOTE: Experimental +type ClientPollNexusOperationResultInput = internal.ClientPollNexusOperationResultInput + +// ClientPollNexusOperationResultOutput is the output of +// ClientOutboundInterceptor.PollNexusOperationResult. +// +// NOTE: Experimental +type ClientPollNexusOperationResultOutput = internal.ClientPollNexusOperationResultOutput + // ScheduleClientCreateInput is input for // ScheduleClientInterceptor.CreateSchedule. type ScheduleClientCreateInput = internal.ScheduleClientCreateInput diff --git a/internal/client.go b/internal/client.go index a97694f47..8331c140e 100644 --- a/internal/client.go +++ b/internal/client.go @@ -533,6 +533,30 @@ type ( // NOTE: Experimental CountActivities(ctx context.Context, options ClientCountActivitiesOptions) (*ClientCountActivitiesResult, error) + // NewNexusClient creates a new Nexus client bound to the given endpoint and service. + // This is for standalone Nexus operations outside of workflow context. + // For Nexus operations within workflows, use workflow.NewNexusClient instead. + // + // NOTE: Experimental + NewNexusClient(options ClientNexusClientOptions) (ClientNexusClient, error) + + // GetNexusOperationHandle creates a handle to the referenced Nexus operation. + // No network call is made. The handle can be used to poll, describe, cancel, or terminate. + // + // NOTE: Experimental + GetNexusOperationHandle(options ClientGetNexusOperationHandleOptions) ClientNexusOperationHandle + + // ListNexusOperations lists Nexus operation executions based on query. + // Currently, all errors are returned in the iterator and not the base level error. + // + // NOTE: Experimental + ListNexusOperations(ctx context.Context, options ClientListNexusOperationsOptions) (ClientListNexusOperationsResult, error) + + // CountNexusOperations counts Nexus operation executions based on query. + // + // NOTE: Experimental + CountNexusOperations(ctx context.Context, options ClientCountNexusOperationsOptions) (*ClientCountNexusOperationsResult, error) + // WorkflowService provides access to the underlying gRPC service. This should only be used for advanced use cases // that cannot be accomplished via other Client methods. Unlike calls to other Client methods, calls directly to the // service are not configured with internal semantics such as automatic retries. diff --git a/internal/cmd/build/go.mod b/internal/cmd/build/go.mod index d7d4205d5..a76aacce6 100644 --- a/internal/cmd/build/go.mod +++ b/internal/cmd/build/go.mod @@ -22,7 +22,7 @@ require ( github.com/robfig/cron v1.2.0 // indirect github.com/stretchr/objx v0.5.2 // indirect github.com/stretchr/testify v1.10.0 // indirect - go.temporal.io/api v1.62.7 // indirect + go.temporal.io/api v1.62.7-0.20260406221216-94affc798a96 // indirect golang.org/x/exp/typeparams v0.0.0-20250210185358-939b2ce775ac // indirect golang.org/x/mod v0.31.0 // indirect golang.org/x/net v0.49.0 // indirect diff --git a/internal/cmd/build/go.sum b/internal/cmd/build/go.sum index eb114eacb..893b8089a 100644 --- a/internal/cmd/build/go.sum +++ b/internal/cmd/build/go.sum @@ -59,8 +59,8 @@ go.opentelemetry.io/otel/sdk/metric v1.39.0 h1:cXMVVFVgsIf2YL6QkRF4Urbr/aMInf+2W go.opentelemetry.io/otel/sdk/metric v1.39.0/go.mod h1:xq9HEVH7qeX69/JnwEfp6fVq5wosJsY1mt4lLfYdVew= go.opentelemetry.io/otel/trace v1.39.0 h1:2d2vfpEDmCJ5zVYz7ijaJdOF59xLomrvj7bjt6/qCJI= go.opentelemetry.io/otel/trace v1.39.0/go.mod h1:88w4/PnZSazkGzz/w84VHpQafiU4EtqqlVdxWy+rNOA= -go.temporal.io/api v1.62.7 h1:joCtF30Dr+ynzrFJySewZsWbyf4AETZpuizHhFIyj/o= -go.temporal.io/api v1.62.7/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= +go.temporal.io/api v1.62.7-0.20260406221216-94affc798a96 h1:o1LvmIVZuroFdl2jNp9adY0cvbzItM3PxbZwrTAvvVc= +go.temporal.io/api v1.62.7-0.20260406221216-94affc798a96/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= diff --git a/internal/interceptor.go b/internal/interceptor.go index dbccfb945..2a05d117a 100644 --- a/internal/interceptor.go +++ b/internal/interceptor.go @@ -450,6 +450,36 @@ type ClientOutboundInterceptor interface { // NOTE: Experimental PollActivityResult(context.Context, *ClientPollActivityResultInput) (*ClientPollActivityResultOutput, error) + // ExecuteNexusOperation intercepts NexusClient.ExecuteOperation. + // + // NOTE: Experimental + ExecuteNexusOperation(context.Context, *ClientExecuteNexusOperationInput) (ClientNexusOperationHandle, error) + + // GetNexusOperationHandle intercepts client.Client.GetNexusOperationHandle. + // + // NOTE: Experimental + GetNexusOperationHandle(*ClientGetNexusOperationHandleInput) ClientNexusOperationHandle + + // CancelNexusOperation intercepts NexusOperationHandle.Cancel. + // + // NOTE: Experimental + CancelNexusOperation(context.Context, *ClientCancelNexusOperationInput) error + + // TerminateNexusOperation intercepts NexusOperationHandle.Terminate. + // + // NOTE: Experimental + TerminateNexusOperation(context.Context, *ClientTerminateNexusOperationInput) error + + // DescribeNexusOperation intercepts NexusOperationHandle.Describe. + // + // NOTE: Experimental + DescribeNexusOperation(context.Context, *ClientDescribeNexusOperationInput) (*ClientDescribeNexusOperationOutput, error) + + // PollNexusOperationResult intercepts NexusOperationHandle.Get. + // + // NOTE: Experimental + PollNexusOperationResult(context.Context, *ClientPollNexusOperationResultInput) (*ClientPollNexusOperationResultOutput, error) + mustEmbedClientOutboundInterceptorBase() } @@ -670,6 +700,100 @@ type ClientPollActivityResultOutput struct { Error error } +// ClientExecuteNexusOperationInput is the input to +// ClientOutboundInterceptor.ExecuteNexusOperation. +// +// NOTE: Experimental +// +// Exposed as: [go.temporal.io/sdk/interceptor.ClientExecuteNexusOperationInput] +type ClientExecuteNexusOperationInput struct { + Options *ClientStartNexusOperationOptions + Endpoint string + Service string + OperationType string + Input interface{} +} + +// ClientGetNexusOperationHandleInput is the input to +// ClientOutboundInterceptor.GetNexusOperationHandle. +// +// NOTE: Experimental +// +// Exposed as: [go.temporal.io/sdk/interceptor.ClientGetNexusOperationHandleInput] +type ClientGetNexusOperationHandleInput struct { + OperationID string + RunID string +} + +// ClientCancelNexusOperationInput is the input to +// ClientOutboundInterceptor.CancelNexusOperation. +// +// NOTE: Experimental +// +// Exposed as: [go.temporal.io/sdk/interceptor.ClientCancelNexusOperationInput] +type ClientCancelNexusOperationInput struct { + OperationID string + RunID string + Reason string +} + +// ClientTerminateNexusOperationInput is the input to +// ClientOutboundInterceptor.TerminateNexusOperation. +// +// NOTE: Experimental +// +// Exposed as: [go.temporal.io/sdk/interceptor.ClientTerminateNexusOperationInput] +type ClientTerminateNexusOperationInput struct { + OperationID string + RunID string + Reason string +} + +// ClientDescribeNexusOperationInput is the input to +// ClientOutboundInterceptor.DescribeNexusOperation. +// +// NOTE: Experimental +// +// Exposed as: [go.temporal.io/sdk/interceptor.ClientDescribeNexusOperationInput] +type ClientDescribeNexusOperationInput struct { + OperationID string + RunID string +} + +// ClientDescribeNexusOperationOutput is the output of +// ClientOutboundInterceptor.DescribeNexusOperation. +// +// NOTE: Experimental +// +// Exposed as: [go.temporal.io/sdk/interceptor.ClientDescribeNexusOperationOutput] +type ClientDescribeNexusOperationOutput struct { + Description *ClientNexusOperationExecutionDescription +} + +// ClientPollNexusOperationResultInput is the input to +// ClientOutboundInterceptor.PollNexusOperationResult. +// +// NOTE: Experimental +// +// Exposed as: [go.temporal.io/sdk/interceptor.ClientPollNexusOperationResultInput] +type ClientPollNexusOperationResultInput struct { + OperationID string + RunID string +} + +// ClientPollNexusOperationResultOutput is the output of +// ClientOutboundInterceptor.PollNexusOperationResult. +// +// NOTE: Experimental +// +// Exposed as: [go.temporal.io/sdk/interceptor.ClientPollNexusOperationResultOutput] +type ClientPollNexusOperationResultOutput struct { + // Result is the result of the operation, if it has completed successfully. + Result converter.EncodedValue + // Error is the result of a failed operation. + Error error +} + // NexusOutboundInterceptor intercepts Nexus operation method invocations. See documentation in the interceptor package // for more details. // diff --git a/internal/interceptor_base.go b/internal/interceptor_base.go index c1ae31044..487760093 100644 --- a/internal/interceptor_base.go +++ b/internal/interceptor_base.go @@ -635,6 +635,65 @@ func (c *ClientOutboundInterceptorBase) PollActivityResult( return c.Next.PollActivityResult(ctx, in) } +// ExecuteNexusOperation implements ClientOutboundInterceptor.ExecuteNexusOperation. +// +// NOTE: Experimental +func (c *ClientOutboundInterceptorBase) ExecuteNexusOperation( + ctx context.Context, + in *ClientExecuteNexusOperationInput, +) (ClientNexusOperationHandle, error) { + return c.Next.ExecuteNexusOperation(ctx, in) +} + +// GetNexusOperationHandle implements ClientOutboundInterceptor.GetNexusOperationHandle. +// +// NOTE: Experimental +func (c *ClientOutboundInterceptorBase) GetNexusOperationHandle( + in *ClientGetNexusOperationHandleInput, +) ClientNexusOperationHandle { + return c.Next.GetNexusOperationHandle(in) +} + +// CancelNexusOperation implements ClientOutboundInterceptor.CancelNexusOperation. +// +// NOTE: Experimental +func (c *ClientOutboundInterceptorBase) CancelNexusOperation( + ctx context.Context, + in *ClientCancelNexusOperationInput, +) error { + return c.Next.CancelNexusOperation(ctx, in) +} + +// TerminateNexusOperation implements ClientOutboundInterceptor.TerminateNexusOperation. +// +// NOTE: Experimental +func (c *ClientOutboundInterceptorBase) TerminateNexusOperation( + ctx context.Context, + in *ClientTerminateNexusOperationInput, +) error { + return c.Next.TerminateNexusOperation(ctx, in) +} + +// DescribeNexusOperation implements ClientOutboundInterceptor.DescribeNexusOperation. +// +// NOTE: Experimental +func (c *ClientOutboundInterceptorBase) DescribeNexusOperation( + ctx context.Context, + in *ClientDescribeNexusOperationInput, +) (*ClientDescribeNexusOperationOutput, error) { + return c.Next.DescribeNexusOperation(ctx, in) +} + +// PollNexusOperationResult implements ClientOutboundInterceptor.PollNexusOperationResult. +// +// NOTE: Experimental +func (c *ClientOutboundInterceptorBase) PollNexusOperationResult( + ctx context.Context, + in *ClientPollNexusOperationResultInput, +) (*ClientPollNexusOperationResultOutput, error) { + return c.Next.PollNexusOperationResult(ctx, in) +} + func (*ClientOutboundInterceptorBase) mustEmbedClientOutboundInterceptorBase() {} // NexusOperationInboundInterceptorBase is a default implementation of [NexusOperationInboundInterceptor] that diff --git a/internal/internal_nexus_client.go b/internal/internal_nexus_client.go new file mode 100644 index 000000000..5bc78d565 --- /dev/null +++ b/internal/internal_nexus_client.go @@ -0,0 +1,881 @@ +package internal + +import ( + "context" + "errors" + "fmt" + "iter" + "reflect" + "time" + + "github.com/google/uuid" + commonpb "go.temporal.io/api/common/v1" + enumspb "go.temporal.io/api/enums/v1" + failurepb "go.temporal.io/api/failure/v1" + nexuspb "go.temporal.io/api/nexus/v1" + "go.temporal.io/api/workflowservice/v1" + "go.temporal.io/sdk/converter" + "google.golang.org/protobuf/types/known/durationpb" +) + +const pollNexusOperationTimeout = 60 * time.Second + +type ( + // ClientStartNexusOperationOptions contains configuration parameters for starting a Nexus operation execution. + // + // NOTE: Experimental + // + // Exposed as: [go.temporal.io/sdk/client.StartNexusOperationOptions] + ClientStartNexusOperationOptions struct { + // ID - The business identifier of the operation. + // + // Mandatory: No default. + ID string + // ScheduleToCloseTimeout - Total time that the operation is allowed to run. + // + // Optional: Defaults to unlimited. + ScheduleToCloseTimeout time.Duration + // IDConflictPolicy - Defines how to resolve an operation id conflict with a running operation. + // + // Optional: Defaults to NEXUS_OPERATION_ID_CONFLICT_POLICY_FAIL. + IDConflictPolicy enumspb.NexusOperationIdConflictPolicy + // IDReusePolicy - Defines whether to allow re-using an operation ID from a previously closed operation. + // + // Optional: Defaults to NEXUS_OPERATION_ID_REUSE_POLICY_ALLOW_DUPLICATE. + IDReusePolicy enumspb.NexusOperationIdReusePolicy + // SearchAttributes - Specifies Search Attributes that will be attached to the operation. + // + // Optional: default to none. + SearchAttributes SearchAttributes + // Summary is a single-line summary for this operation that will appear in UI/CLI. + // + // Optional: defaults to none/empty. + Summary string + } + + // ClientNexusClientOptions contains options for creating a NexusClient. + // + // NOTE: Experimental + // + // Exposed as: [go.temporal.io/sdk/client.NexusClientOptions] + ClientNexusClientOptions struct { + // Endpoint - The Nexus endpoint name. + // + // Mandatory: No default. + Endpoint string + // Service - The Nexus service name. + // + // Mandatory: No default. + Service string + } + + // ClientGetNexusOperationHandleOptions contains input for GetNexusOperationHandle call. + // + // NOTE: Experimental + // + // Exposed as: [go.temporal.io/sdk/client.GetNexusOperationHandleOptions] + ClientGetNexusOperationHandleOptions struct { + // OperationID - The operation ID. + // + // Mandatory: No default. + OperationID string + // RunID - The run ID. Can be empty to target the latest run. + // + // Optional: defaults to empty. + RunID string + } + + // ClientDescribeNexusOperationOptions contains options for ClientNexusOperationHandle.Describe call. + // + // NOTE: Experimental + // + // Exposed as: [go.temporal.io/sdk/client.DescribeNexusOperationOptions] + ClientDescribeNexusOperationOptions struct { + } + + // ClientCancelNexusOperationOptions contains options for ClientNexusOperationHandle.Cancel call. + // + // NOTE: Experimental + // + // Exposed as: [go.temporal.io/sdk/client.CancelNexusOperationOptions] + ClientCancelNexusOperationOptions struct { + // Reason is optional description of the reason for cancellation. + Reason string + } + + // ClientTerminateNexusOperationOptions contains options for ClientNexusOperationHandle.Terminate call. + // + // NOTE: Experimental + // + // Exposed as: [go.temporal.io/sdk/client.TerminateNexusOperationOptions] + ClientTerminateNexusOperationOptions struct { + // Reason is optional description of the reason for termination. + Reason string + } + + // ClientListNexusOperationsOptions contains input for ListNexusOperations call. + // + // NOTE: Experimental + // + // Exposed as: [go.temporal.io/sdk/client.ListNexusOperationsOptions] + ClientListNexusOperationsOptions struct { + // Query is a visibility query for listing Nexus operations. + // See https://docs.temporal.io/list-filter for the syntax. + Query string + } + + // ClientCountNexusOperationsOptions contains input for CountNexusOperations call. + // + // NOTE: Experimental + // + // Exposed as: [go.temporal.io/sdk/client.CountNexusOperationsOptions] + ClientCountNexusOperationsOptions struct { + // Query is a visibility query for counting Nexus operations. + // See https://docs.temporal.io/list-filter for the syntax. + Query string + } + + // ClientNexusOperationMetadata contains information about a Nexus operation execution. + // This is returned by ListNexusOperations and embedded in ClientNexusOperationExecutionDescription. + // + // NOTE: Experimental + // + // Exposed as: [go.temporal.io/sdk/client.NexusOperationMetadata] + ClientNexusOperationMetadata struct { + // RawExecutionListInfo is the raw PB message this struct was built from. This field is nil + // in the result of ClientNexusOperationHandle.Describe call - use + // ClientNexusOperationExecutionDescription.RawInfo instead. + RawExecutionListInfo *nexuspb.NexusOperationExecutionListInfo + // OperationID is the unique identifier of this operation within its namespace. + OperationID string + // OperationRunID is the run ID of the operation. + OperationRunID string + // Endpoint is the Nexus endpoint name. + Endpoint string + // Service is the Nexus service name. + Service string + // Operation is the Nexus operation name. + Operation string + // ScheduledTime is the time when the operation was originally scheduled. + ScheduledTime time.Time + // CloseTime is the time when the operation transitioned to a terminal state. + CloseTime time.Time + // Status is the current execution status of the operation. + Status enumspb.NexusOperationExecutionStatus + // SearchAttributes are the search attributes attached to this operation. + SearchAttributes SearchAttributes + // StateTransitionCount is incremented each time the operation state is mutated. + StateTransitionCount int64 + // ExecutionDuration is the difference between close time and scheduled time. + // Only populated if the operation is closed. + ExecutionDuration time.Duration + } + + // ClientNexusOperationExecutionDescription contains detailed information about a Nexus operation execution. + // This is returned by ClientNexusOperationHandle.Describe. + // + // NOTE: Experimental + // + // Exposed as: [go.temporal.io/sdk/client.NexusOperationExecutionDescription] + ClientNexusOperationExecutionDescription struct { + ClientNexusOperationMetadata + // RawInfo is the raw PB message this struct was built from. + RawInfo *nexuspb.NexusOperationExecutionInfo + // State is a more detailed breakdown of the running status. + State enumspb.PendingNexusOperationState + // ScheduleToCloseTimeout is the schedule-to-close timeout for this operation. + ScheduleToCloseTimeout time.Duration + // ScheduleToStartTimeout is the schedule-to-start timeout for this operation. + // May not be populated by all server versions. + ScheduleToStartTimeout time.Duration + // StartToCloseTimeout is the start-to-close timeout for this operation. + // May not be populated by all server versions. + StartToCloseTimeout time.Duration + // Attempt is the number of attempts made to start/deliver the operation request. + Attempt int32 + // ExpirationTime is the scheduled time plus schedule-to-close timeout. + ExpirationTime time.Time + // LastAttemptCompleteTime is the time when the last attempt completed. + LastAttemptCompleteTime time.Time + // NextAttemptScheduleTime is the time when the next attempt is scheduled. + NextAttemptScheduleTime time.Time + // LastAttemptFailure is the last attempt's failure, if any. + LastAttemptFailure *failurepb.Failure + // BlockedReason provides additional information if the state is BLOCKED. + BlockedReason string + // OperationToken is only set for asynchronous operations after a successful StartOperation call. + OperationToken string + // Identity is the identity of the client who started this operation. + Identity string + // CancellationInfo contains cancellation information if cancellation has been requested. + CancellationInfo *ClientNexusOperationCancellationInfo + dc converter.DataConverter + failureConverter converter.FailureConverter + inboundPayloadVisitor PayloadVisitor + } + + // ClientNexusOperationCancellationInfo contains cancellation information for a Nexus operation. + // + // NOTE: Experimental + // + // Exposed as: [go.temporal.io/sdk/client.NexusOperationCancellationInfo] + ClientNexusOperationCancellationInfo struct { + // RawInfo is the raw PB message this struct was built from. + RawInfo *nexuspb.NexusOperationExecutionCancellationInfo + // RequestedTime is the time when cancellation was requested. + RequestedTime time.Time + // State is the current state of the cancellation request. + State enumspb.NexusOperationCancellationState + // Attempt is the number of attempts made to deliver the cancel operation request. + Attempt int32 + // LastAttemptCompleteTime is the time when the last cancellation attempt completed. + LastAttemptCompleteTime time.Time + // NextAttemptScheduleTime is the time when the next cancellation attempt is scheduled. + NextAttemptScheduleTime time.Time + // BlockedReason provides additional information if the cancellation state is BLOCKED. + BlockedReason string + // Reason is the reason specified in the cancellation request. + Reason string + lastAttemptFailure *failurepb.Failure + failureConverter converter.FailureConverter + inboundPayloadVisitor PayloadVisitor + } + + // ClientCountNexusOperationsResult contains the result of the CountNexusOperations call. + // + // NOTE: Experimental + // + // Exposed as: [go.temporal.io/sdk/client.CountNexusOperationsResult] + ClientCountNexusOperationsResult struct { + // Count is the approximate number of operations matching the query. + Count int64 + // Groups contains aggregation groups if the query includes a GROUP BY clause. + Groups []ClientCountNexusOperationsAggregationGroup + } + + // ClientCountNexusOperationsAggregationGroup contains groups of Nexus operations if + // CountNexusOperationExecutions is grouped by a field. + // The list might not be complete, and the counts of each group is approximate. + // + // NOTE: Experimental + // + // Exposed as: [go.temporal.io/sdk/client.CountNexusOperationsAggregationGroup] + ClientCountNexusOperationsAggregationGroup struct { + // GroupValues contains the group-by field values for this group. + GroupValues []any + // Count is the approximate number of operations in this group. + Count int64 + } + + // ClientListNexusOperationsResult contains the result of the ListNexusOperations call. + // + // NOTE: Experimental + // + // Exposed as: [go.temporal.io/sdk/client.ListNexusOperationsResult] + ClientListNexusOperationsResult struct { + // Results is an iterator over Nexus operation metadata entries. + Results iter.Seq2[*ClientNexusOperationMetadata, error] + } + + // ClientNexusClient is the client for starting Nexus operations bound to a specific endpoint and service. + // This is for standalone Nexus operations outside of workflow context. + // For Nexus operations within workflows, use workflow.NexusClient instead. + // + // Methods may be added to this interface; implementing it directly is discouraged. + // + // NOTE: Experimental + // + // Exposed as: [go.temporal.io/sdk/client.NexusClient] + ClientNexusClient interface { + // ExecuteOperation starts a Nexus operation and returns a handle to it. + // + // NOTE: Experimental + ExecuteOperation(ctx context.Context, operation any, input any, options ClientStartNexusOperationOptions) (ClientNexusOperationHandle, error) + } + + // ClientNexusOperationHandle represents a running or completed standalone Nexus operation execution. + // It can be used to get the result, describe, cancel, or terminate the operation. + // + // Methods may be added to this interface; implementing it directly is discouraged. + // + // NOTE: Experimental + // + // Exposed as: [go.temporal.io/sdk/client.NexusOperationHandle] + ClientNexusOperationHandle interface { + // GetID returns the ID of the operation this handle points to. + // + // NOTE: Experimental + GetID() string + // GetRunID returns the run ID that this handle was created with. + // + // NOTE: Experimental + GetRunID() string + // Get waits until the operation finishes and gets its result. If the operation completes + // successfully, the result is written to valuePtr and nil is returned. If the operation + // failed, the failure is returned as an error. + // + // NOTE: Experimental + Get(ctx context.Context, valuePtr any) error + // Describe returns detailed information about current state of the operation execution. + // + // NOTE: Experimental + Describe(ctx context.Context, options ClientDescribeNexusOperationOptions) (*ClientNexusOperationExecutionDescription, error) + // Cancel requests cancellation of the operation. + // + // NOTE: Experimental + Cancel(ctx context.Context, options ClientCancelNexusOperationOptions) error + // Terminate terminates the operation. + // + // NOTE: Experimental + Terminate(ctx context.Context, options ClientTerminateNexusOperationOptions) error + } + + // nexusClientImpl is the default implementation of ClientNexusClient. + nexusClientImpl struct { + client *WorkflowClient + endpoint string + service string + } + + // clientNexusOperationHandleImpl is the default implementation of ClientNexusOperationHandle. + clientNexusOperationHandleImpl struct { + client *WorkflowClient + id string + runID string + result *ClientPollNexusOperationResultOutput + } +) + +var ( + _ ClientNexusClient = &nexusClientImpl{} + _ ClientNexusOperationHandle = &clientNexusOperationHandleImpl{} +) + +// GetSummary returns summary of the operation. See ClientStartNexusOperationOptions.Summary. +// Returns empty string if there is no summary. +// Uses the data converter of the client used to make the Describe call. Returns error if data conversion fails. +// +// NOTE: Experimental +func (d *ClientNexusOperationExecutionDescription) GetSummary() (string, error) { + payload := d.RawInfo.GetUserMetadata().GetSummary() + if payload == nil { + return "", nil + } + var err error + if payload, err = visitPayload(context.Background(), d.inboundPayloadVisitor, payload); err != nil { + return "", err + } + var summary string + err = d.dc.FromPayload(payload, &summary) + if err != nil { + return "", err + } + return summary, nil +} + +// GetLastAttemptFailure returns the last attempt failure of the operation, using the failure +// converter of the client used to make the Describe call. Returns nil if there was no failure. +// +// NOTE: Experimental +func (d *ClientNexusOperationExecutionDescription) GetLastAttemptFailure() error { + failure := d.LastAttemptFailure + if failure == nil { + return nil + } + if err := visitProtoPayloads(context.Background(), d.inboundPayloadVisitor, failure); err != nil { + return err + } + return d.failureConverter.FailureToError(failure) +} + +// GetLastAttemptFailure returns the last attempt failure of the cancellation info. +// Returns nil if there was no failure. +// +// NOTE: Experimental +func (c *ClientNexusOperationCancellationInfo) GetLastAttemptFailure() error { + if c.lastAttemptFailure == nil { + return nil + } + if err := visitProtoPayloads(context.Background(), c.inboundPayloadVisitor, c.lastAttemptFailure); err != nil { + return err + } + return c.failureConverter.FailureToError(c.lastAttemptFailure) +} + +func (nc *nexusClientImpl) ExecuteOperation(ctx context.Context, operation any, input any, options ClientStartNexusOperationOptions) (ClientNexusOperationHandle, error) { + if err := nc.client.ensureInitialized(ctx); err != nil { + return nil, err + } + + // Resolve operation name from the operation parameter + operationName, err := resolveNexusOperationName(operation, input) + if err != nil { + return nil, err + } + + // Set header before interceptor run so interceptors can access it + ctx = contextWithNewHeader(ctx) + + return nc.client.interceptor.ExecuteNexusOperation(ctx, &ClientExecuteNexusOperationInput{ + Options: &options, + Endpoint: nc.endpoint, + Service: nc.service, + OperationType: operationName, + Input: input, + }) +} + +// resolveNexusOperationName resolves a Nexus operation name from the given value. +// It accepts a string name or a typed operation reference (with Name() and InputType() methods). +// This matches the resolution logic used in workflow context (see prepareNexusOperationParams). +func resolveNexusOperationName(operation any, input any) (string, error) { + if name, ok := operation.(string); ok { + if name == "" { + return "", fmt.Errorf("operation name must not be empty") + } + return name, nil + } + if regOp, ok := operation.(interface { + Name() string + InputType() reflect.Type + }); ok { + operationName := regOp.Name() + inputType := reflect.TypeOf(input) + if inputType != nil && !inputType.AssignableTo(regOp.InputType()) { + return "", fmt.Errorf("cannot assign argument of type %q to type %q for operation %q", inputType, regOp.InputType(), operationName) + } + return operationName, nil + } + return "", fmt.Errorf("invalid 'operation' parameter, must be an OperationReference or a string") +} + +func (h *clientNexusOperationHandleImpl) GetID() string { + return h.id +} + +func (h *clientNexusOperationHandleImpl) GetRunID() string { + return h.runID +} + +func (h *clientNexusOperationHandleImpl) Get(ctx context.Context, valuePtr any) error { + if h.result != nil { + if h.result.Error != nil { + return h.result.Error + } + if h.result.Result != nil { + if valuePtr == nil { + return nil + } + return h.result.Result.Get(valuePtr) + } + } + if err := h.client.ensureInitialized(ctx); err != nil { + return err + } + + // repeatedly poll, the loop repeats until there's an outcome + for { + resp, err := h.client.interceptor.PollNexusOperationResult(ctx, &ClientPollNexusOperationResultInput{ + OperationID: h.id, + RunID: h.runID, + }) + if err != nil { + return err + } + if resp.Error != nil { + h.result = &ClientPollNexusOperationResultOutput{Error: resp.Error} + return resp.Error + } + if resp.Result != nil { + if valuePtr == nil { + return nil + } + h.result = &ClientPollNexusOperationResultOutput{Result: resp.Result} + return resp.Result.Get(valuePtr) + } + } +} + +func (h *clientNexusOperationHandleImpl) Describe(ctx context.Context, options ClientDescribeNexusOperationOptions) (*ClientNexusOperationExecutionDescription, error) { + if err := h.client.ensureInitialized(ctx); err != nil { + return nil, err + } + out, err := h.client.interceptor.DescribeNexusOperation(ctx, &ClientDescribeNexusOperationInput{ + OperationID: h.id, + RunID: h.runID, + }) + if err != nil { + return nil, err + } + return out.Description, nil +} + +func (h *clientNexusOperationHandleImpl) Cancel(ctx context.Context, options ClientCancelNexusOperationOptions) error { + if err := h.client.ensureInitialized(ctx); err != nil { + return err + } + return h.client.interceptor.CancelNexusOperation(ctx, &ClientCancelNexusOperationInput{ + OperationID: h.id, + RunID: h.runID, + Reason: options.Reason, + }) +} + +func (h *clientNexusOperationHandleImpl) Terminate(ctx context.Context, options ClientTerminateNexusOperationOptions) error { + if err := h.client.ensureInitialized(ctx); err != nil { + return err + } + return h.client.interceptor.TerminateNexusOperation(ctx, &ClientTerminateNexusOperationInput{ + OperationID: h.id, + RunID: h.runID, + Reason: options.Reason, + }) +} + +// WorkflowClient methods for Nexus operations + +func (wc *WorkflowClient) NewNexusClient(options ClientNexusClientOptions) (ClientNexusClient, error) { + if options.Endpoint == "" { + return nil, errors.New("endpoint is required") + } + if options.Service == "" { + return nil, errors.New("service is required") + } + return &nexusClientImpl{client: wc, endpoint: options.Endpoint, service: options.Service}, nil +} + +func (wc *WorkflowClient) GetNexusOperationHandle(options ClientGetNexusOperationHandleOptions) ClientNexusOperationHandle { + return wc.interceptor.GetNexusOperationHandle(&ClientGetNexusOperationHandleInput{ + OperationID: options.OperationID, + RunID: options.RunID, + }) +} + +// ListNexusOperations does not go through the interceptor chain, consistent with ListActivities. +func (wc *WorkflowClient) ListNexusOperations(ctx context.Context, options ClientListNexusOperationsOptions) (ClientListNexusOperationsResult, error) { + return ClientListNexusOperationsResult{ + Results: func(yield func(*ClientNexusOperationMetadata, error) bool) { + if err := wc.ensureInitialized(ctx); err != nil { + yield(nil, err) + return + } + + request := &workflowservice.ListNexusOperationExecutionsRequest{ + Namespace: wc.namespace, + Query: options.Query, + } + + for { + resp, err := wc.getListNexusOperationsPage(ctx, request) + if err != nil { + yield(nil, err) + return + } + + for _, op := range resp.Operations { + if !yield(&ClientNexusOperationMetadata{ + RawExecutionListInfo: op, + OperationID: op.OperationId, + OperationRunID: op.RunId, + Endpoint: op.Endpoint, + Service: op.Service, + Operation: op.Operation, + ScheduledTime: op.ScheduleTime.AsTime(), + CloseTime: op.CloseTime.AsTime(), + Status: op.Status, + SearchAttributes: convertToTypedSearchAttributes(wc.logger, op.SearchAttributes.GetIndexedFields()), + StateTransitionCount: op.StateTransitionCount, + ExecutionDuration: op.ExecutionDuration.AsDuration(), + }, nil) { + return + } + } + + if resp.NextPageToken != nil { + request.NextPageToken = resp.NextPageToken + } else { + return + } + } + }, + }, nil +} + +func (wc *WorkflowClient) getListNexusOperationsPage(ctx context.Context, request *workflowservice.ListNexusOperationExecutionsRequest) (*workflowservice.ListNexusOperationExecutionsResponse, error) { + grpcCtx, cancel := newGRPCContext(ctx, defaultGrpcRetryParameters(ctx)) + defer cancel() + + return wc.WorkflowService().ListNexusOperationExecutions(grpcCtx, request) +} + +// CountNexusOperations does not go through the interceptor chain, consistent with CountActivities. +func (wc *WorkflowClient) CountNexusOperations(ctx context.Context, options ClientCountNexusOperationsOptions) (*ClientCountNexusOperationsResult, error) { + if err := wc.ensureInitialized(ctx); err != nil { + return nil, err + } + + grpcCtx, cancel := newGRPCContext(ctx, defaultGrpcRetryParameters(ctx)) + defer cancel() + + request := &workflowservice.CountNexusOperationExecutionsRequest{ + Namespace: wc.namespace, + Query: options.Query, + } + resp, err := wc.WorkflowService().CountNexusOperationExecutions(grpcCtx, request) + if err != nil { + return nil, err + } + + groups := make([]ClientCountNexusOperationsAggregationGroup, len(resp.Groups)) + for i, group := range resp.Groups { + groupValues := make([]any, len(group.GroupValues)) + for j, groupValue := range group.GroupValues { + // should never fail, and if it does, leaving nil behind + _ = converter.GetDefaultDataConverter().FromPayload(groupValue, &groupValues[j]) + } + groups[i] = ClientCountNexusOperationsAggregationGroup{ + GroupValues: groupValues, + Count: group.Count, + } + } + + return &ClientCountNexusOperationsResult{ + Count: resp.Count, + Groups: groups, + }, nil +} + +// workflowClientInterceptor implementations for Nexus operations + +func (w *workflowClientInterceptor) ExecuteNexusOperation( + ctx context.Context, + in *ClientExecuteNexusOperationInput, +) (ClientNexusOperationHandle, error) { + dataConverter := WithContext(ctx, w.client.dataConverter) + if dataConverter == nil { + dataConverter = converter.GetDefaultDataConverter() + } + + if in.Options.ID == "" { + return nil, errors.New("operation ID is required") + } + if in.Options.ScheduleToCloseTimeout < 0 { + return nil, errors.New("ScheduleToCloseTimeout must not be negative") + } + + // Encode input as a single Payload (not Payloads) + var inputPayload *commonpb.Payload + if in.Input != nil { + var err error + inputPayload, err = dataConverter.ToPayload(in.Input) + if err != nil { + return nil, err + } + } + + searchAttrs, err := serializeTypedSearchAttributes(in.Options.SearchAttributes.GetUntypedValues()) + if err != nil { + return nil, err + } + + userMetadata, err := buildUserMetadata(in.Options.Summary, "", dataConverter) + if err != nil { + return nil, err + } + + request := &workflowservice.StartNexusOperationExecutionRequest{ + Namespace: w.client.namespace, + Identity: w.client.identity, + RequestId: uuid.NewString(), + OperationId: in.Options.ID, + Endpoint: in.Endpoint, + Service: in.Service, + Operation: in.OperationType, + Input: inputPayload, + IdReusePolicy: in.Options.IDReusePolicy, + IdConflictPolicy: in.Options.IDConflictPolicy, + SearchAttributes: searchAttrs, + UserMetadata: userMetadata, + } + if in.Options.ScheduleToCloseTimeout > 0 { + request.ScheduleToCloseTimeout = durationpb.New(in.Options.ScheduleToCloseTimeout) + } + if err := visitProtoPayloads(ctx, w.client.outboundPayloadVisitor, request); err != nil { + return nil, err + } + + grpcCtx, cancel := newGRPCContext(ctx, defaultGrpcRetryParameters(ctx)) + defer cancel() + + resp, err := w.client.WorkflowService().StartNexusOperationExecution(grpcCtx, request) + if err != nil { + return nil, err + } + + return &clientNexusOperationHandleImpl{ + client: w.client, + id: in.Options.ID, + runID: resp.RunId, + }, nil +} + +func (w *workflowClientInterceptor) GetNexusOperationHandle( + in *ClientGetNexusOperationHandleInput, +) ClientNexusOperationHandle { + return &clientNexusOperationHandleImpl{ + client: w.client, + id: in.OperationID, + runID: in.RunID, + } +} + +func (w *workflowClientInterceptor) PollNexusOperationResult( + ctx context.Context, + in *ClientPollNexusOperationResultInput, +) (*ClientPollNexusOperationResultOutput, error) { + request := &workflowservice.PollNexusOperationExecutionRequest{ + Namespace: w.client.namespace, + OperationId: in.OperationID, + RunId: in.RunID, + WaitStage: enumspb.NEXUS_OPERATION_WAIT_STAGE_CLOSED, + } + + var resp *workflowservice.PollNexusOperationExecutionResponse + for resp.GetOutcome() == nil { + grpcCtx, cancel := newGRPCContext(ctx, grpcLongPoll(true), grpcTimeout(pollNexusOperationTimeout), defaultGrpcRetryParameters(ctx)) + var err error + resp, err = w.client.WorkflowService().PollNexusOperationExecution(grpcCtx, request) + cancel() + if err != nil { + return nil, err + } + } + + if err := visitProtoPayloads(ctx, w.client.inboundPayloadVisitor, resp); err != nil { + return nil, err + } + + switch v := resp.GetOutcome().(type) { + case *workflowservice.PollNexusOperationExecutionResponse_Result: + // Wrap single Payload in Payloads for EncodedValue compatibility + payloads := &commonpb.Payloads{Payloads: []*commonpb.Payload{v.Result}} + return &ClientPollNexusOperationResultOutput{Result: newEncodedValue(payloads, w.client.dataConverter)}, nil + case *workflowservice.PollNexusOperationExecutionResponse_Failure: + return &ClientPollNexusOperationResultOutput{Error: w.client.failureConverter.FailureToError(v.Failure)}, nil + default: + return nil, fmt.Errorf("unexpected nexus operation outcome type: %T", v) + } +} + +func (w *workflowClientInterceptor) DescribeNexusOperation( + ctx context.Context, + in *ClientDescribeNexusOperationInput, +) (*ClientDescribeNexusOperationOutput, error) { + grpcCtx, cancel := newGRPCContext(ctx, defaultGrpcRetryParameters(ctx)) + defer cancel() + + request := &workflowservice.DescribeNexusOperationExecutionRequest{ + Namespace: w.client.namespace, + OperationId: in.OperationID, + RunId: in.RunID, + } + resp, err := w.client.WorkflowService().DescribeNexusOperationExecution(grpcCtx, request) + if err != nil { + return nil, err + } + info := resp.GetInfo() + if info == nil { + return nil, errors.New("DescribeNexusOperationExecution response doesn't contain info") + } + + var cancellationInfo *ClientNexusOperationCancellationInfo + if info.CancellationInfo != nil { + cancellationInfo = &ClientNexusOperationCancellationInfo{ + RawInfo: info.CancellationInfo, + RequestedTime: info.CancellationInfo.RequestedTime.AsTime(), + State: info.CancellationInfo.State, + Attempt: info.CancellationInfo.Attempt, + LastAttemptCompleteTime: info.CancellationInfo.LastAttemptCompleteTime.AsTime(), + NextAttemptScheduleTime: info.CancellationInfo.NextAttemptScheduleTime.AsTime(), + BlockedReason: info.CancellationInfo.BlockedReason, + Reason: info.CancellationInfo.Reason, + lastAttemptFailure: info.CancellationInfo.LastAttemptFailure, + failureConverter: w.client.failureConverter, + inboundPayloadVisitor: w.client.inboundPayloadVisitor, + } + } + + return &ClientDescribeNexusOperationOutput{ + Description: &ClientNexusOperationExecutionDescription{ + ClientNexusOperationMetadata: ClientNexusOperationMetadata{ + RawExecutionListInfo: nil, + OperationID: info.OperationId, + OperationRunID: info.RunId, + Endpoint: info.Endpoint, + Service: info.Service, + Operation: info.Operation, + ScheduledTime: info.ScheduleTime.AsTime(), + CloseTime: info.CloseTime.AsTime(), + Status: info.Status, + SearchAttributes: convertToTypedSearchAttributes(w.client.logger, info.SearchAttributes.GetIndexedFields()), + StateTransitionCount: info.StateTransitionCount, + ExecutionDuration: info.ExecutionDuration.AsDuration(), + }, + RawInfo: info, + State: info.State, + ScheduleToCloseTimeout: info.ScheduleToCloseTimeout.AsDuration(), + ScheduleToStartTimeout: info.ScheduleToStartTimeout.AsDuration(), + StartToCloseTimeout: info.StartToCloseTimeout.AsDuration(), + Attempt: info.Attempt, + ExpirationTime: info.ExpirationTime.AsTime(), + LastAttemptCompleteTime: info.LastAttemptCompleteTime.AsTime(), + NextAttemptScheduleTime: info.NextAttemptScheduleTime.AsTime(), + LastAttemptFailure: info.LastAttemptFailure, + BlockedReason: info.BlockedReason, + OperationToken: info.OperationToken, + Identity: info.Identity, + CancellationInfo: cancellationInfo, + dc: WithContext(ctx, w.client.dataConverter), + failureConverter: w.client.failureConverter, + inboundPayloadVisitor: w.client.inboundPayloadVisitor, + }, + }, nil +} + +func (w *workflowClientInterceptor) CancelNexusOperation( + ctx context.Context, + in *ClientCancelNexusOperationInput, +) error { + grpcCtx, cancel := newGRPCContext(ctx, defaultGrpcRetryParameters(ctx)) + defer cancel() + + request := &workflowservice.RequestCancelNexusOperationExecutionRequest{ + Namespace: w.client.namespace, + OperationId: in.OperationID, + RunId: in.RunID, + Identity: w.client.identity, + RequestId: uuid.NewString(), + Reason: in.Reason, + } + _, err := w.client.WorkflowService().RequestCancelNexusOperationExecution(grpcCtx, request) + return err +} + +func (w *workflowClientInterceptor) TerminateNexusOperation( + ctx context.Context, + in *ClientTerminateNexusOperationInput, +) error { + grpcCtx, cancel := newGRPCContext(ctx, defaultGrpcRetryParameters(ctx)) + defer cancel() + + request := &workflowservice.TerminateNexusOperationExecutionRequest{ + Namespace: w.client.namespace, + OperationId: in.OperationID, + RunId: in.RunID, + Identity: w.client.identity, + RequestId: uuid.NewString(), + Reason: in.Reason, + } + _, err := w.client.WorkflowService().TerminateNexusOperationExecution(grpcCtx, request) + return err +} diff --git a/internal/internal_nexus_client_test.go b/internal/internal_nexus_client_test.go new file mode 100644 index 000000000..e4fd9df3c --- /dev/null +++ b/internal/internal_nexus_client_test.go @@ -0,0 +1,110 @@ +package internal + +import ( + "context" + "fmt" + "reflect" + "testing" + + "github.com/stretchr/testify/require" + "go.temporal.io/api/workflowservice/v1" +) + +// nexusHeaderCheckInterceptor is a ClientInterceptor that verifies the header is +// present on the context when ExecuteNexusOperation is called. This ensures that +// contextWithNewHeader is called before the interceptor chain runs, so +// interceptors (like the tracing interceptor) can read/write headers. +type nexusHeaderCheckInterceptor struct { + ClientInterceptorBase + headerWasPresent bool +} + +func (h *nexusHeaderCheckInterceptor) InterceptClient(next ClientOutboundInterceptor) ClientOutboundInterceptor { + return &nexusHeaderCheckOutbound{ + ClientOutboundInterceptorBase: ClientOutboundInterceptorBase{Next: next}, + parent: h, + } +} + +type nexusHeaderCheckOutbound struct { + ClientOutboundInterceptorBase + parent *nexusHeaderCheckInterceptor +} + +func (h *nexusHeaderCheckOutbound) ExecuteNexusOperation( + ctx context.Context, + in *ClientExecuteNexusOperationInput, +) (ClientNexusOperationHandle, error) { + h.parent.headerWasPresent = Header(ctx) != nil + // Return an error to short-circuit the rest of the chain (avoids needing a + // real gRPC connection for the base interceptor). + return nil, fmt.Errorf("short-circuit") +} + +func TestExecuteNexusOperationHeaderAvailableToInterceptors(t *testing.T) { + interceptor := &nexusHeaderCheckInterceptor{} + + client := NewServiceClient(nil, nil, ClientOptions{ + Interceptors: []ClientInterceptor{interceptor}, + }) + // Pre-set capabilities so ensureInitialized doesn't make a gRPC call. + client.capabilities = &workflowservice.GetSystemInfoResponse_Capabilities{} + + nexusClient, err := client.NewNexusClient(ClientNexusClientOptions{ + Endpoint: "test-endpoint", + Service: "test-service", + }) + require.NoError(t, err) + + _, err = nexusClient.ExecuteOperation(context.Background(), "test-op", "test-input", ClientStartNexusOperationOptions{ + ID: "test-op-id", + }) + // We expect the short-circuit error from our interceptor. + require.ErrorContains(t, err, "short-circuit") + require.True(t, interceptor.headerWasPresent, + "Header should be set on context before interceptor chain runs") +} + +func TestNexusClientValidation(t *testing.T) { + client := NewServiceClient(nil, nil, ClientOptions{}) + + _, err := client.NewNexusClient(ClientNexusClientOptions{}) + require.ErrorContains(t, err, "endpoint is required") + + _, err = client.NewNexusClient(ClientNexusClientOptions{Endpoint: "ep"}) + require.ErrorContains(t, err, "service is required") + + nc, err := client.NewNexusClient(ClientNexusClientOptions{Endpoint: "ep", Service: "svc"}) + require.NoError(t, err) + require.NotNil(t, nc) +} + +// mockOperationReference implements the Name()/InputType() interface used by resolveNexusOperationName. +type mockOperationReference struct { + name string + inputType reflect.Type +} + +func (m mockOperationReference) Name() string { return m.name } +func (m mockOperationReference) InputType() reflect.Type { return m.inputType } + +func TestResolveNexusOperationName(t *testing.T) { + // String name + name, err := resolveNexusOperationName("my-op", nil) + require.NoError(t, err) + require.Equal(t, "my-op", name) + + // Typed operation reference with correct input type + op := mockOperationReference{name: "typed-op", inputType: reflect.TypeOf("")} + name, err = resolveNexusOperationName(op, "hello") + require.NoError(t, err) + require.Equal(t, "typed-op", name) + + // Typed operation reference with wrong input type + _, err = resolveNexusOperationName(op, 123) + require.ErrorContains(t, err, "cannot assign argument of type") + + // Invalid type + _, err = resolveNexusOperationName(123, nil) + require.ErrorContains(t, err, "invalid 'operation' parameter") +} diff --git a/internal/nexus_operations.go b/internal/nexus_operations.go index c6961ae29..28f50d177 100644 --- a/internal/nexus_operations.go +++ b/internal/nexus_operations.go @@ -792,6 +792,22 @@ func (t *testSuiteClientForNexusOperations) UpdateWorkflowExecutionOptions(ctx c panic("not implemented in the test environment") } +func (t *testSuiteClientForNexusOperations) NewNexusClient(options ClientNexusClientOptions) (ClientNexusClient, error) { + panic("not implemented in the test environment") +} + +func (t *testSuiteClientForNexusOperations) GetNexusOperationHandle(options ClientGetNexusOperationHandleOptions) ClientNexusOperationHandle { + panic("not implemented in the test environment") +} + +func (t *testSuiteClientForNexusOperations) ListNexusOperations(ctx context.Context, options ClientListNexusOperationsOptions) (ClientListNexusOperationsResult, error) { + panic("not implemented in the test environment") +} + +func (t *testSuiteClientForNexusOperations) CountNexusOperations(ctx context.Context, options ClientCountNexusOperationsOptions) (*ClientCountNexusOperationsResult, error) { + panic("not implemented in the test environment") +} + var _ Client = &testSuiteClientForNexusOperations{} // testEnvWorkflowRunForNexusOperations is a partial [WorkflowRun] implementation for the test workflow environment used diff --git a/mocks/Client.go b/mocks/Client.go index 48c441fe7..5149b0149 100644 --- a/mocks/Client.go +++ b/mocks/Client.go @@ -1294,6 +1294,104 @@ func (_m *Client) CountActivities(ctx context.Context, options client.CountActiv return r0, r1 } +// NewNexusClient provides a mock function with given fields: options +func (_m *Client) NewNexusClient(options client.NexusClientOptions) (client.NexusClient, error) { + ret := _m.Called(options) + + if len(ret) == 0 { + panic("no return value specified for NewNexusClient") + } + + var r0 client.NexusClient + var r1 error + if rf, ok := ret.Get(0).(func(client.NexusClientOptions) (client.NexusClient, error)); ok { + return rf(options) + } + if rf, ok := ret.Get(0).(func(client.NexusClientOptions) client.NexusClient); ok { + r0 = rf(options) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(client.NexusClient) + } + } + + if rf, ok := ret.Get(1).(func(client.NexusClientOptions) error); ok { + r1 = rf(options) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// GetNexusOperationHandle provides a mock function with given fields: options +func (_m *Client) GetNexusOperationHandle(options client.GetNexusOperationHandleOptions) client.NexusOperationHandle { + ret := _m.Called(options) + + if len(ret) == 0 { + panic("no return value specified for GetNexusOperationHandle") + } + + var r0 client.NexusOperationHandle + if rf, ok := ret.Get(0).(func(client.GetNexusOperationHandleOptions) client.NexusOperationHandle); ok { + r0 = rf(options) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(client.NexusOperationHandle) + } + } + + return r0 +} + +// ListNexusOperations provides a mock function with given fields: ctx, options +func (_m *Client) ListNexusOperations(ctx context.Context, options client.ListNexusOperationsOptions) (client.ListNexusOperationsResult, error) { + ret := _m.Called(ctx, options) + + if len(ret) == 0 { + panic("no return value specified for ListNexusOperations") + } + + var r0 client.ListNexusOperationsResult + if rf, ok := ret.Get(0).(func(context.Context, client.ListNexusOperationsOptions) client.ListNexusOperationsResult); ok { + r0 = rf(ctx, options) + } else { + r0 = ret.Get(0).(client.ListNexusOperationsResult) + } + + return r0, nil +} + +// CountNexusOperations provides a mock function with given fields: ctx, options +func (_m *Client) CountNexusOperations(ctx context.Context, options client.CountNexusOperationsOptions) (*client.CountNexusOperationsResult, error) { + ret := _m.Called(ctx, options) + + if len(ret) == 0 { + panic("no return value specified for CountNexusOperations") + } + + var r0 *client.CountNexusOperationsResult + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, client.CountNexusOperationsOptions) (*client.CountNexusOperationsResult, error)); ok { + return rf(ctx, options) + } + if rf, ok := ret.Get(0).(func(context.Context, client.CountNexusOperationsOptions) *client.CountNexusOperationsResult); ok { + r0 = rf(ctx, options) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*client.CountNexusOperationsResult) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, client.CountNexusOperationsOptions) error); ok { + r1 = rf(ctx, options) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // WorkerDeploymentClient provides a mock function with given fields: func (_m *Client) WorkerDeploymentClient() client.WorkerDeploymentClient { ret := _m.Called() diff --git a/test/go.mod b/test/go.mod index 4cf7bbe2e..92ebac4f1 100644 --- a/test/go.mod +++ b/test/go.mod @@ -13,7 +13,7 @@ require ( go.opentelemetry.io/otel v1.40.0 go.opentelemetry.io/otel/sdk v1.40.0 go.opentelemetry.io/otel/trace v1.40.0 - go.temporal.io/api v1.62.7 + go.temporal.io/api v1.62.7-0.20260406221216-94affc798a96 go.temporal.io/sdk v1.29.1 go.temporal.io/sdk/contrib/opentelemetry v0.0.0-00010101000000-000000000000 go.temporal.io/sdk/contrib/opentracing v0.0.0-00010101000000-000000000000 diff --git a/test/go.sum b/test/go.sum index 6d569e2b1..05e77afdd 100644 --- a/test/go.sum +++ b/test/go.sum @@ -176,8 +176,8 @@ go.opentelemetry.io/otel/sdk/metric v1.40.0 h1:mtmdVqgQkeRxHgRv4qhyJduP3fYJRMX4A go.opentelemetry.io/otel/sdk/metric v1.40.0/go.mod h1:4Z2bGMf0KSK3uRjlczMOeMhKU2rhUqdWNoKcYrtcBPg= go.opentelemetry.io/otel/trace v1.40.0 h1:WA4etStDttCSYuhwvEa8OP8I5EWu24lkOzp+ZYblVjw= go.opentelemetry.io/otel/trace v1.40.0/go.mod h1:zeAhriXecNGP/s2SEG3+Y8X9ujcJOTqQ5RgdEJcawiA= -go.temporal.io/api v1.62.7 h1:joCtF30Dr+ynzrFJySewZsWbyf4AETZpuizHhFIyj/o= -go.temporal.io/api v1.62.7/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= +go.temporal.io/api v1.62.7-0.20260406221216-94affc798a96 h1:o1LvmIVZuroFdl2jNp9adY0cvbzItM3PxbZwrTAvvVc= +go.temporal.io/api v1.62.7-0.20260406221216-94affc798a96/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE= go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= diff --git a/test/integration_test.go b/test/integration_test.go index 07af77117..62c317eb2 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -17,6 +17,7 @@ import ( "time" "github.com/google/uuid" + "github.com/nexus-rpc/sdk-go/nexus" "github.com/opentracing/opentracing-go" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -28,6 +29,8 @@ import ( "go.opentelemetry.io/otel/trace" commonpb "go.temporal.io/api/common/v1" enumspb "go.temporal.io/api/enums/v1" + nexuspb "go.temporal.io/api/nexus/v1" + "go.temporal.io/api/operatorservice/v1" "go.temporal.io/api/serviceerror" workflowpb "go.temporal.io/api/workflow/v1" "go.temporal.io/api/workflowservice/v1" @@ -53,6 +56,7 @@ import ( "go.temporal.io/sdk/internal/interceptortest" ilog "go.temporal.io/sdk/internal/log" "go.temporal.io/sdk/temporal" + "go.temporal.io/sdk/temporalnexus" "go.temporal.io/sdk/worker" "go.temporal.io/sdk/workflow" ) @@ -9095,3 +9099,194 @@ func (ts *IntegrationTestSuite) TestPanicWithDeferredYield() { err = run.Get(ctx, nil) ts.NoError(err) } + +func (ts *IntegrationTestSuite) TestExecuteNexusOperationSuite() { + if os.Getenv("DISABLE_STANDALONE_NEXUS_TESTS") != "" { + ts.T().SkipNow() + } + + ctx, cancel := context.WithTimeout(context.Background(), ctxTimeout) + defer cancel() + + // Create a Nexus endpoint targeting our task queue. + endpoint := "sdk-go-nexus-standalone-test-ep-" + uuid.NewString() + _, err := ts.client.OperatorService().CreateNexusEndpoint(ctx, &operatorservice.CreateNexusEndpointRequest{ + Spec: &nexuspb.EndpointSpec{ + Name: endpoint, + Target: &nexuspb.EndpointTarget{ + Variant: &nexuspb.EndpointTarget_Worker_{ + Worker: &nexuspb.EndpointTarget_Worker{ + Namespace: ts.config.Namespace, + TaskQueue: ts.taskQueueName, + }, + }, + }, + }, + }) + ts.NoError(err) + + // Register Nexus operations on the worker. + service := nexus.NewService("test-standalone-service") + syncOp := nexus.NewSyncOperation("echo-op", func(ctx context.Context, input string, opts nexus.StartOperationOptions) (string, error) { + return input, nil + }) + blockForeverWf := func(ctx workflow.Context, input string) (string, error) { + return "", workflow.Await(ctx, func() bool { return false }) + } + ts.worker.RegisterWorkflowWithOptions(blockForeverWf, workflow.RegisterOptions{Name: "block-forever-wf"}) + asyncOp := temporalnexus.NewWorkflowRunOperation( + "async-op", + blockForeverWf, + func(ctx context.Context, input string, opts nexus.StartOperationOptions) (client.StartWorkflowOptions, error) { + return client.StartWorkflowOptions{ID: "nexus-async-" + uuid.NewString()}, nil + }, + ) + ts.NoError(service.Register(syncOp, asyncOp)) + ts.worker.RegisterNexusService(service) + + nexusClient, err := ts.client.NewNexusClient(client.NexusClientOptions{ + Endpoint: endpoint, + Service: "test-standalone-service", + }) + ts.NoError(err) + + // executeNexusOpWithRetry retries ExecuteOperation until the endpoint has propagated. + // The endpoint registry is eventually consistent and may take a few attempts. + executeNexusOpWithRetry := func( + opName string, + input string, + options client.StartNexusOperationOptions, + ) client.NexusOperationHandle { + ts.T().Helper() + var handle client.NexusOperationHandle + require.Eventually(ts.T(), func() bool { + var execErr error + handle, execErr = nexusClient.ExecuteOperation(ctx, opName, input, options) + return execErr == nil + }, 10*time.Second, 100*time.Millisecond, "timed out waiting for endpoint to propagate") + return handle + } + + ts.Run("Execute and Get result", func() { + input := "hello-nexus" + handle := executeNexusOpWithRetry("echo-op", input, client.StartNexusOperationOptions{ + ID: uuid.NewString(), + ScheduleToCloseTimeout: 10 * time.Second, + }) + ts.NotEmpty(handle.GetID()) + + var result string + err := handle.Get(ctx, &result) + ts.NoError(err) + ts.Equal(input, result) + }) + + ts.Run("Describe operation", func() { + handle := executeNexusOpWithRetry("echo-op", "describe-test", client.StartNexusOperationOptions{ + ID: uuid.NewString(), + ScheduleToCloseTimeout: 10 * time.Second, + }) + + // Wait for operation to complete. + err := handle.Get(ctx, nil) + ts.NoError(err) + + description, err := handle.Describe(ctx, client.DescribeNexusOperationOptions{}) + ts.NoError(err) + ts.Equal(handle.GetID(), description.OperationID) + ts.NotNil(description.RawInfo) + }) + + ts.Run("GetNexusOperationHandle", func() { + operationID := uuid.NewString() + handle := executeNexusOpWithRetry("echo-op", "handle-test", client.StartNexusOperationOptions{ + ID: operationID, + ScheduleToCloseTimeout: 10 * time.Second, + }) + + // Wait for operation to complete. + err := handle.Get(ctx, nil) + ts.NoError(err) + + // Get a handle to the same operation. + handle2 := ts.client.GetNexusOperationHandle(client.GetNexusOperationHandleOptions{ + OperationID: operationID, + RunID: handle.GetRunID(), + }) + ts.Equal(operationID, handle2.GetID()) + + var result string + err = handle2.Get(ctx, &result) + ts.NoError(err) + ts.Equal("handle-test", result) + }) + + ts.Run("Cancel operation", func() { + handle := executeNexusOpWithRetry("async-op", "cancel-test", client.StartNexusOperationOptions{ + ID: uuid.NewString(), + ScheduleToCloseTimeout: 30 * time.Second, + }) + ts.NotEmpty(handle.GetID()) + + // Operation record exists on the server after ExecuteOperation returns successfully; + // cancel targets the record by ID so no waiting is needed. + err := handle.Cancel(ctx, client.CancelNexusOperationOptions{Reason: "test cancellation"}) + ts.NoError(err) + }) + + ts.Run("Terminate operation", func() { + handle := executeNexusOpWithRetry("async-op", "terminate-test", client.StartNexusOperationOptions{ + ID: uuid.NewString(), + ScheduleToCloseTimeout: 30 * time.Second, + }) + ts.NotEmpty(handle.GetID()) + + // Operation record exists on the server after ExecuteOperation returns successfully; + // terminate targets the record by ID so no waiting is needed. + err := handle.Terminate(ctx, client.TerminateNexusOperationOptions{Reason: "test termination"}) + ts.NoError(err) + }) + + ts.Run("Count operations", func() { + // Visibility is eventually consistent; poll until operations appear. + require.Eventually(ts.T(), func() bool { + result, err := ts.client.CountNexusOperations(ctx, client.CountNexusOperationsOptions{ + Query: "Endpoint = '" + endpoint + "'", + }) + return err == nil && result.Count > 0 + }, 10*time.Second, 200*time.Millisecond, "timed out waiting for operations to appear in count") + }) + + ts.Run("List operations", func() { + // Visibility is eventually consistent; poll until operations appear. + require.Eventually(ts.T(), func() bool { + listResult, err := ts.client.ListNexusOperations(ctx, client.ListNexusOperationsOptions{ + Query: "Endpoint = '" + endpoint + "'", + }) + if err != nil { + return false + } + count := 0 + for metadata, iterErr := range listResult.Results { + if iterErr != nil { + return false + } + if metadata.OperationID == "" || metadata.Endpoint != endpoint { + return false + } + count++ + } + return count > 0 + }, 10*time.Second, 200*time.Millisecond, "timed out waiting for operations to appear in list") + }) + + ts.Run("NexusClient creation validation", func() { + _, err := ts.client.NewNexusClient(client.NexusClientOptions{}) + ts.Error(err) + ts.Contains(err.Error(), "endpoint is required") + + _, err = ts.client.NewNexusClient(client.NexusClientOptions{Endpoint: "ep"}) + ts.Error(err) + ts.Contains(err.Error(), "service is required") + }) +}