diff --git a/converter/payload_handler_test.go b/converter/payload_handler_test.go index 7078a50ea..b0b3ff2b3 100644 --- a/converter/payload_handler_test.go +++ b/converter/payload_handler_test.go @@ -31,11 +31,14 @@ type appendCodec struct { func (c *appendCodec) Encode(payloads []*commonpb.Payload) ([]*commonpb.Payload, error) { result := make([]*commonpb.Payload, len(payloads)) for i, p := range payloads { - enc := string(p.GetMetadata()[converter.MetadataEncoding]) + c.encodingSuffix - data := append(append([]byte(nil), p.GetData()...), c.marker) + meta := make(map[string][]byte, len(p.GetMetadata())) + for k, v := range p.GetMetadata() { + meta[k] = v + } + meta[converter.MetadataEncoding] = []byte(string(p.GetMetadata()[converter.MetadataEncoding]) + c.encodingSuffix) result[i] = &commonpb.Payload{ - Metadata: map[string][]byte{converter.MetadataEncoding: []byte(enc)}, - Data: data, + Metadata: meta, + Data: append(append([]byte(nil), p.GetData()...), c.marker), } } return result, nil @@ -52,8 +55,13 @@ func (c *appendCodec) Decode(payloads []*commonpb.Payload) ([]*commonpb.Payload, if len(data) == 0 || data[len(data)-1] != c.marker { return nil, fmt.Errorf("appendCodec.Decode: expected trailing marker byte %d", c.marker) } + meta := make(map[string][]byte, len(p.GetMetadata())) + for k, v := range p.GetMetadata() { + meta[k] = v + } + meta[converter.MetadataEncoding] = []byte(strings.TrimSuffix(enc, c.encodingSuffix)) result[i] = &commonpb.Payload{ - Metadata: map[string][]byte{converter.MetadataEncoding: []byte(strings.TrimSuffix(enc, c.encodingSuffix))}, + Metadata: meta, Data: data[:len(data)-1], } } @@ -234,52 +242,25 @@ func TestDecode_AppliesPostThenPreCodecs(t *testing.T) { require.Equal(t, originalEncoding, encoding(result[0])) } -func TestDecode_StorageRefReturnedAsIs(t *testing.T) { - // appendCodec.Decode would error on a storage reference because it requires - // the ".pre" encoding suffix — confirms pre-storage codecs are skipped. - preCodec := &appendCodec{encodingSuffix: ".pre", marker: 'P'} - postCodec := &appendCodec{encodingSuffix: ".post", marker: 'O'} - h, err := converter.NewPayloadHTTPHandler(converter.PayloadHTTPHandlerOptions{ - PreStorageCodecs: []converter.PayloadCodec{preCodec}, - PostStorageCodecs: []converter.PayloadCodec{postCodec}, - }) +func TestDecode_NoDrivers_StorageRefFails(t *testing.T) { + h, err := converter.NewPayloadHTTPHandler(converter.PayloadHTTPHandlerOptions{}) require.NoError(t, err) ref := makeStorageRef(t, "drv", "k1") - postEncodedRef, err := postCodec.Encode([]*commonpb.Payload{ref}) - require.NoError(t, err) - - rr := servePost(t, h, "/decode", createRequest(t, postEncodedRef[0])) - result := getPayloads(t, rr) - require.Len(t, result, 1) - require.True(t, extstore.IsStorageReference(result[0])) + rr := servePost(t, h, "/decode", createRequest(t, ref)) + require.Equal(t, http.StatusBadRequest, rr.Code) + require.Contains(t, rr.Body.String(), "no storage driver is configured") } -func TestDecode_MixedBatch(t *testing.T) { - preCodec := &appendCodec{encodingSuffix: ".pre", marker: 'P'} - postCodec := &appendCodec{encodingSuffix: ".post", marker: 'O'} - h, err := converter.NewPayloadHTTPHandler(converter.PayloadHTTPHandlerOptions{ - PreStorageCodecs: []converter.PayloadCodec{preCodec}, - PostStorageCodecs: []converter.PayloadCodec{postCodec}, - }) +func TestDecode_NoDrivers_MixedBatch_StorageRefFails(t *testing.T) { + h, err := converter.NewPayloadHTTPHandler(converter.PayloadHTTPHandlerOptions{}) require.NoError(t, err) regular := makePayload(t, "data") - originalEncoding := encoding(regular) - preEncoded, err := preCodec.Encode([]*commonpb.Payload{regular}) - require.NoError(t, err) - postEncoded, err := postCodec.Encode(preEncoded) - require.NoError(t, err) - ref := makeStorageRef(t, "drv", "k1") - postEncodedRef, err := postCodec.Encode([]*commonpb.Payload{ref}) - require.NoError(t, err) - - rr := servePost(t, h, "/decode", createRequest(t, postEncoded[0], postEncodedRef[0])) - result := getPayloads(t, rr) - require.Len(t, result, 2) - require.Equal(t, originalEncoding, encoding(result[0])) - require.True(t, extstore.IsStorageReference(result[1])) + rr := servePost(t, h, "/decode", createRequest(t, regular, ref)) + require.Equal(t, http.StatusBadRequest, rr.Code) + require.Contains(t, rr.Body.String(), "no storage driver is configured") } func TestDecode_RoundTrip(t *testing.T) { @@ -507,9 +488,8 @@ func TestDownload_NoDrivers(t *testing.T) { ref := makeStorageRef(t, "drv", "k1") rr := servePost(t, h, "/download", createRequest(t, ref)) - result := getPayloads(t, rr) - require.Len(t, result, 1) - require.True(t, extstore.IsStorageReference(result[0])) + require.Equal(t, http.StatusBadRequest, rr.Code) + require.Contains(t, rr.Body.String(), "no storage driver is configured") } func TestDownload_UnknownDriver(t *testing.T) { diff --git a/internal/extstore/extstore_test.go b/internal/extstore/extstore_test.go index d25ced9ca..cc89d6d3d 100644 --- a/internal/extstore/extstore_test.go +++ b/internal/extstore/extstore_test.go @@ -13,6 +13,7 @@ import ( "github.com/stretchr/testify/require" commonpb "go.temporal.io/api/common/v1" "go.temporal.io/api/proxy" + sdkpb "go.temporal.io/sdk/internal/temporalapi/sdk/v1" "google.golang.org/protobuf/encoding/protojson" "google.golang.org/protobuf/proto" ) @@ -210,20 +211,20 @@ func TestExternalStorageToParams_SingleDriverSynthesizesSelector(t *testing.T) { // --------------------------------------------------------------------------- func TestStorageReferenceRoundTrip(t *testing.T) { - ref := storageReference{ - DriverName: "mydriver", - DriverClaim: StorageDriverClaim{ClaimData: map[string]string{"key": "abc123"}}, + ref := &sdkpb.ExternalStorageReference{ + DriverName: "mydriver", + ClaimData: map[string]string{"key": "abc123"}, } p, err := storageReferenceToPayload(ref, 512) require.NoError(t, err) - require.Equal(t, metadataEncodingStorageRef, string(p.Metadata[metadataEncoding])) + require.Equal(t, metadataEncodingProtoJSON, string(p.Metadata[metadataEncoding])) require.Len(t, p.ExternalPayloads, 1) require.Equal(t, int64(512), p.ExternalPayloads[0].SizeBytes) decoded, err := payloadToStorageReference(p) require.NoError(t, err) require.Equal(t, ref.DriverName, decoded.DriverName) - require.Equal(t, ref.DriverClaim.ClaimData, decoded.DriverClaim.ClaimData) + require.Equal(t, ref.ClaimData, decoded.ClaimData) } func TestPayloadToStorageReference_WrongEncoding(t *testing.T) { @@ -237,13 +238,92 @@ func TestPayloadToStorageReference_WrongEncoding(t *testing.T) { func TestPayloadToStorageReference_CorruptJSON(t *testing.T) { p := &commonpb.Payload{ - Metadata: map[string][]byte{metadataEncoding: []byte(metadataEncodingStorageRef)}, + Metadata: map[string][]byte{metadataEncoding: []byte(metadataEncodingStorageRefLegacy)}, Data: []byte(`not json`), } _, err := payloadToStorageReference(p) require.Error(t, err) } +func TestPayloadToStorageReference_ProtoJSONFormat(t *testing.T) { + want := &sdkpb.ExternalStorageReference{ + DriverName: "mydriver", + ClaimData: map[string]string{"bucket": "b", "key": "k"}, + } + p, err := storageReferenceToPayload(want, 0) + require.NoError(t, err) + require.Equal(t, metadataEncodingProtoJSON, string(p.Metadata[metadataEncoding])) + require.Equal(t, externalStorageReferenceMessageType, string(p.Metadata[metadataMessageType])) + + got, err := payloadToStorageReference(p) + require.NoError(t, err) + require.Equal(t, want.DriverName, got.DriverName) + require.Equal(t, want.ClaimData, got.ClaimData) +} + +func TestPayloadToStorageReference_LegacyFormat(t *testing.T) { + legacyData, err := json.Marshal(legacyStorageReference{ + DriverName: "mydriver", + DriverClaim: StorageDriverClaim{ClaimData: map[string]string{"bucket": "b", "key": "k"}}, + }) + require.NoError(t, err) + p := &commonpb.Payload{ + Metadata: map[string][]byte{metadataEncoding: []byte(metadataEncodingStorageRefLegacy)}, + Data: legacyData, + } + + got, err := payloadToStorageReference(p) + require.NoError(t, err) + require.Equal(t, "mydriver", got.DriverName) + require.Equal(t, map[string]string{"bucket": "b", "key": "k"}, got.ClaimData) +} + +func TestPayloadToStorageReference_ProtoJSON_WrongMessageType(t *testing.T) { + p := &commonpb.Payload{ + Metadata: map[string][]byte{ + metadataEncoding: []byte(metadataEncodingProtoJSON), + metadataMessageType: []byte("some.other.MessageType"), + }, + Data: []byte(`{}`), + } + _, err := payloadToStorageReference(p) + require.Error(t, err) + require.Contains(t, err.Error(), "some.other.MessageType") +} + +// --------------------------------------------------------------------------- +// IsStorageReference +// --------------------------------------------------------------------------- + +func TestIsStorageReference_ProtoJSONFormat(t *testing.T) { + p, err := storageReferenceToPayload(&sdkpb.ExternalStorageReference{DriverName: "d"}, 0) + require.NoError(t, err) + require.True(t, IsStorageReference(p)) +} + +func TestIsStorageReference_LegacyFormat(t *testing.T) { + p := &commonpb.Payload{ + Metadata: map[string][]byte{metadataEncoding: []byte(metadataEncodingStorageRefLegacy)}, + Data: []byte(`{"driver_name":"d","driver_claim":{}}`), + } + require.True(t, IsStorageReference(p)) +} + +func TestIsStorageReference_ProtoJSON_WrongMessageType(t *testing.T) { + p := &commonpb.Payload{ + Metadata: map[string][]byte{ + metadataEncoding: []byte(metadataEncodingProtoJSON), + metadataMessageType: []byte("some.other.MessageType"), + }, + } + require.False(t, IsStorageReference(p)) +} + +func TestIsStorageReference_NotStorageReference(t *testing.T) { + require.False(t, IsStorageReference(makePayload(t, "hello"))) + require.False(t, IsStorageReference(&commonpb.Payload{})) +} + // --------------------------------------------------------------------------- // storageStoreVisitor // --------------------------------------------------------------------------- @@ -291,7 +371,7 @@ func TestStoreVisitor_AtThreshold_Stored(t *testing.T) { result, err := visitPayloads(context.Background(), visitor, []*commonpb.Payload{p}) require.NoError(t, err) - require.Equal(t, metadataEncodingStorageRef, string(result[0].Metadata[metadataEncoding])) + require.Equal(t, metadataEncodingProtoJSON, string(result[0].Metadata[metadataEncoding])) require.Equal(t, 1, driver.storeCount) } @@ -308,7 +388,7 @@ func TestStoreVisitor_AboveThreshold_Stored(t *testing.T) { p := makeOversizedPayload(t, threshold+1) result, err := visitPayloads(context.Background(), visitor, []*commonpb.Payload{p}) require.NoError(t, err) - require.Equal(t, metadataEncodingStorageRef, string(result[0].Metadata[metadataEncoding])) + require.Equal(t, metadataEncodingProtoJSON, string(result[0].Metadata[metadataEncoding])) require.Equal(t, 1, driver.storeCount) } @@ -329,8 +409,8 @@ func TestStoreVisitor_MultiplePayloads_Batched(t *testing.T) { result, err := visitPayloads(context.Background(), visitor, []*commonpb.Payload{big1, small, big2}) require.NoError(t, err) require.Len(t, result, 3) - require.Equal(t, metadataEncodingStorageRef, string(result[0].Metadata[metadataEncoding])) - require.Equal(t, metadataEncodingStorageRef, string(result[2].Metadata[metadataEncoding])) + require.Equal(t, metadataEncodingProtoJSON, string(result[0].Metadata[metadataEncoding])) + require.Equal(t, metadataEncodingProtoJSON, string(result[2].Metadata[metadataEncoding])) // small payload is inline require.Empty(t, result[1].ExternalPayloads) // both big payloads batched in a single Store call @@ -663,9 +743,9 @@ func TestRetrievalVisitor_UnknownDriver(t *testing.T) { require.NoError(t, err) visitor := NewExternalRetrievalVisitor(params) - ref, err := storageReferenceToPayload(storageReference{ - DriverName: "unregistered-driver", - DriverClaim: StorageDriverClaim{ClaimData: map[string]string{"key": "k"}}, + ref, err := storageReferenceToPayload(&sdkpb.ExternalStorageReference{ + DriverName: "unregistered-driver", + ClaimData: map[string]string{"key": "k"}, }, 10) require.NoError(t, err) @@ -702,9 +782,9 @@ func TestRetrievalVisitor_RetrievePanic(t *testing.T) { require.NoError(t, err) visitor := NewExternalRetrievalVisitor(params) - ref, err := storageReferenceToPayload(storageReference{ - DriverName: "my-panic-retrieve-driver", - DriverClaim: StorageDriverClaim{ClaimData: map[string]string{"key": "k"}}, + ref, err := storageReferenceToPayload(&sdkpb.ExternalStorageReference{ + DriverName: "my-panic-retrieve-driver", + ClaimData: map[string]string{"key": "k"}, }, 10) require.NoError(t, err) @@ -732,9 +812,9 @@ func TestRetrievalVisitor_CancelOnError(t *testing.T) { require.NoError(t, err) errRef := refs[0] - blockRef, err := storageReferenceToPayload(storageReference{ - DriverName: "block-driver", - DriverClaim: StorageDriverClaim{ClaimData: map[string]string{"key": "k"}}, + blockRef, err := storageReferenceToPayload(&sdkpb.ExternalStorageReference{ + DriverName: "block-driver", + ClaimData: map[string]string{"key": "k"}, }, 10) require.NoError(t, err) @@ -766,9 +846,9 @@ func TestRetrievalVisitor_WrongPayloadCount(t *testing.T) { require.NoError(t, err) visitor := NewExternalRetrievalVisitor(params) - ref, err := storageReferenceToPayload(storageReference{ - DriverName: "my-bad-count-driver", - DriverClaim: StorageDriverClaim{ClaimData: map[string]string{"key": "k"}}, + ref, err := storageReferenceToPayload(&sdkpb.ExternalStorageReference{ + DriverName: "my-bad-count-driver", + ClaimData: map[string]string{"key": "k"}, }, 10) require.NoError(t, err) @@ -825,13 +905,52 @@ func TestRetrievalVisitor_Callback_ExternalCountOnly(t *testing.T) { } // --------------------------------------------------------------------------- -// Claim Compatibility: fixed claim JSON produced by another SDK +// Claim Compatibility: legacy-format reference payload // --------------------------------------------------------------------------- -// TestClaimDeserialization verifies that a full proto-JSON payload -// produced by another language SDK (e.g. Python) is correctly parsed by +// TestRetrievalVisitor_LegacyFormat verifies that the retrieval visitor can +// resolve a payload written in the legacy json/external-storage-reference +// format (as written by earlier prerelease SDK versions). +func TestRetrievalVisitor_LegacyFormat(t *testing.T) { + driver := newTestDriver("d") + params, err := ExternalStorageToParams(ExternalStorage{ + Drivers: []StorageDriver{driver}, + PayloadSizeThreshold: 1, + }) + require.NoError(t, err) + + // Store a payload to get a real claim key in the driver. + original := makePayload(t, "legacy-compat-value") + refs, err := visitPayloads(context.Background(), NewExternalStorageVisitor(params), []*commonpb.Payload{original}) + require.NoError(t, err) + + // Extract the claim data from the new-format reference and rebuild it as a + // legacy-format payload (encoding=json/external-storage-reference, old JSON structure). + newRef, err := payloadToStorageReference(refs[0]) + require.NoError(t, err) + legacyData, err := json.Marshal(legacyStorageReference{ + DriverName: newRef.DriverName, + DriverClaim: StorageDriverClaim{ClaimData: newRef.ClaimData}, + }) + require.NoError(t, err) + legacyPayload := &commonpb.Payload{ + Metadata: map[string][]byte{metadataEncoding: []byte(metadataEncodingStorageRefLegacy)}, + Data: legacyData, + } + + result, err := visitPayloads(context.Background(), NewExternalRetrievalVisitor(params), []*commonpb.Payload{legacyPayload}) + require.NoError(t, err) + require.True(t, proto.Equal(original, result[0])) +} + +// --------------------------------------------------------------------------- +// Claim Compatibility: legacy-format fixed claim JSON produced by another SDK +// --------------------------------------------------------------------------- + +// TestClaimDeserialization_PlainJSON_OtherSdk verifies that a full plain JSON +// payload produced by another language SDK (e.g. Python) is correctly parsed by // the Go SDK's payloadToStorageReference function. -func TestClaimDeserialization(t *testing.T) { +func TestClaimDeserialization_PlainJSON_OtherSdk(t *testing.T) { // Full proto-JSON representation of a storage-reference payload as another // SDK would serialize it onto the wire. const rawPayloadJSON = `{ @@ -851,12 +970,48 @@ func TestClaimDeserialization(t *testing.T) { ref, err := payloadToStorageReference(refPayload) require.NoError(t, err) - require.Equal(t, metadataEncodingStorageRef, string(refPayload.GetMetadata()[metadataEncoding])) + require.Equal(t, metadataEncodingStorageRefLegacy, string(refPayload.GetMetadata()[metadataEncoding])) require.Equal(t, 1, len(refPayload.ExternalPayloads)) require.Equal(t, int64(385), refPayload.ExternalPayloads[0].SizeBytes) require.Equal(t, "temporalio:driver:s3", ref.DriverName) - require.Equal(t, "test-bucket", ref.DriverClaim.ClaimData["bucket"]) - require.Equal(t, "/ns/default/wi/13f3d9cf-1705-4ce1-b3cb-370974a482c7/d/sha256/6ca22c34560cf35ac24427dc7619c9ab472a82cf18f286f27871649a2b5608c8", ref.DriverClaim.ClaimData["key"]) + require.Equal(t, "test-bucket", ref.ClaimData["bucket"]) + require.Equal(t, "/ns/default/wi/13f3d9cf-1705-4ce1-b3cb-370974a482c7/d/sha256/6ca22c34560cf35ac24427dc7619c9ab472a82cf18f286f27871649a2b5608c8", ref.ClaimData["key"]) +} + +// --------------------------------------------------------------------------- +// Claim Compatibility: current-format fixed claim JSON produced by another SDK +// --------------------------------------------------------------------------- + +// TestClaimDeserialization_OtherSdk_ProtoJSON verifies that a full proto-JSON +// payload produced by another language SDK (e.g. Python) is correctly parsed by +// the Go SDK's payloadToStorageReference function. +func TestClaimDeserialization_OtherSdk_ProtoJSON(t *testing.T) { + // Full proto-JSON representation of a storage-reference payload as another + // SDK would serialize it onto the wire. + const rawPayloadJSON = `{ + "metadata": { + "encoding": "anNvbi9wcm90b2J1Zg==", + "messageType": "dGVtcG9yYWwuYXBpLnNkay52MS5FeHRlcm5hbFN0b3JhZ2VSZWZlcmVuY2U=" + }, + "data": "eyJjbGFpbURhdGEiOnsiYnVja2V0IjoidGVzdC1idWNrZXQiLCJoYXNoX2FsZ29yaXRobSI6InNoYTI1NiIsImhhc2hfdmFsdWUiOiI2Y2EyMmMzNDU2MGNmMzVhYzI0NDI3ZGM3NjE5YzlhYjQ3MmE4MmNmMThmMjg2ZjI3ODcxNjQ5YTJiNTYwOGM4Iiwia2V5IjoidjAvbnMvZGVmYXVsdC93dC9MYXJnZUlPV29ya2Zsb3cvd2kvZjFkMmE0YWMtZjhjYi00NWQzLTkwOGMtOTNhMGYzM2FiMjQ1L3JpL251bGwvZC9zaGEyNTYvNmNhMjJjMzQ1NjBjZjM1YWMyNDQyN2RjNzYxOWM5YWI0NzJhODJjZjE4ZjI4NmYyNzg3MTY0OWEyYjU2MDhjOCJ9LCJkcml2ZXJOYW1lIjoiYXdzLnMzZHJpdmVyIn0=", + "externalPayloads": [ + { + "sizeBytes": "385" + } + ] +}` + + refPayload := &commonpb.Payload{} + require.NoError(t, protojson.Unmarshal([]byte(rawPayloadJSON), refPayload)) + + ref, err := payloadToStorageReference(refPayload) + require.NoError(t, err) + require.Equal(t, metadataEncodingProtoJSON, string(refPayload.GetMetadata()[metadataEncoding])) + require.Equal(t, externalStorageReferenceMessageType, string(refPayload.GetMetadata()[metadataMessageType])) + require.Equal(t, 1, len(refPayload.ExternalPayloads)) + require.Equal(t, int64(385), refPayload.ExternalPayloads[0].SizeBytes) + require.Equal(t, "aws.s3driver", ref.DriverName) + require.Equal(t, "v0/ns/default/wt/LargeIOWorkflow/wi/f1d2a4ac-f8cb-45d3-908c-93a0f33ab245/ri/null/d/sha256/6ca22c34560cf35ac24427dc7619c9ab472a82cf18f286f27871649a2b5608c8", ref.ClaimData["key"]) } // --------------------------------------------------------------------------- @@ -876,7 +1031,7 @@ func TestStoreRetrieveRoundTrip_Single(t *testing.T) { original := makePayload(t, "round-trip value") refs, err := visitPayloads(context.Background(), storeVisitor, []*commonpb.Payload{original}) require.NoError(t, err) - require.Equal(t, metadataEncodingStorageRef, string(refs[0].Metadata[metadataEncoding])) + require.Equal(t, metadataEncodingProtoJSON, string(refs[0].Metadata[metadataEncoding])) restored, err := visitPayloads(context.Background(), retrieveVisitor, refs) require.NoError(t, err) @@ -1075,11 +1230,10 @@ func newValDriver(name string) StorageDriver { } type testCallback struct { - mu sync.Mutex - count int - size int64 - duration time.Duration - unconfiguredCount int + mu sync.Mutex + count int + size int64 + duration time.Duration } func (c *testCallback) PayloadBatchCompleted(count int, size int64, duration time.Duration, _ []string) { @@ -1089,9 +1243,3 @@ func (c *testCallback) PayloadBatchCompleted(count int, size int64, duration tim c.size = size c.duration = duration } - -func (c *testCallback) UnconfiguredStorageReference() { - c.mu.Lock() - defer c.mu.Unlock() - c.unconfiguredCount++ -} diff --git a/internal/extstore/internal_extstore.go b/internal/extstore/internal_extstore.go index 9c767a1d3..238998020 100644 --- a/internal/extstore/internal_extstore.go +++ b/internal/extstore/internal_extstore.go @@ -8,7 +8,9 @@ import ( commonpb "go.temporal.io/api/common/v1" "go.temporal.io/api/proxy" + sdkpb "go.temporal.io/sdk/internal/temporalapi/sdk/v1" "golang.org/x/sync/errgroup" + "google.golang.org/protobuf/encoding/protojson" "google.golang.org/protobuf/proto" ) @@ -22,10 +24,18 @@ type StorageParameters struct { payloadSizeThreshold int } -// IsStorageReference reports whether p is an external-storage reference payload -// (i.e. its encoding metadata equals the internal storage-reference marker). +// IsStorageReference reports whether p is an external-storage reference payload. +// It recognizes both the current protojson format (encoding=json/protobuf, +// messageType=temporal.api.sdk.v1.ExternalStorageReference) and the legacy +// format (encoding=json/external-storage-reference) written by earlier releases. func IsStorageReference(p *commonpb.Payload) bool { - return string(p.GetMetadata()[metadataEncoding]) == metadataEncodingStorageRef + switch string(p.GetMetadata()[metadataEncoding]) { + case metadataEncodingProtoJSON: + return string(p.GetMetadata()[metadataMessageType]) == externalStorageReferenceMessageType + case metadataEncodingStorageRefLegacy: + return true + } + return false } func ExternalStorageToParams(options ExternalStorage) (StorageParameters, error) { @@ -86,7 +96,6 @@ func driversEqual(a, b StorageDriver) (equal bool) { type StorageOperationCallback interface { PayloadBatchCompleted(count int, size int64, duration time.Duration, driverNames []string) - UnconfiguredStorageReference() } type contextKey string @@ -112,23 +121,48 @@ func StorageTargetFromContext(ctx context.Context) StorageDriverTargetInfo { // format. Mirrors converter.MetadataEncoding without importing converter package. const metadataEncoding = "encoding" -// metadataEncodingStorageRef is the metadata encoding value used to identify -// payloads that are storage references rather than actual data. -const metadataEncodingStorageRef = "json/external-storage-reference" +// metadataMessageType is the key used in payload metadata to identify the proto +// message type. Mirrors converter.MetadataMessageType without importing converter package. +const metadataMessageType = "messageType" + +// metadataEncodingProtoJSON is the standard protojson encoding value, shared with +// ProtoJSONPayloadConverter. Mirrors converter.MetadataEncodingProtoJSON. +const metadataEncodingProtoJSON = "json/protobuf" -type storageReference struct { +// metadataEncodingStorageRefLegacy is the encoding written by earlier prerelease +// SDK versions. Retained solely for backward-compatible reads. +const metadataEncodingStorageRefLegacy = "json/external-storage-reference" + +// legacyStorageReference is the old wire format retained for backward compatibility +// with payloads written by earlier prerelease SDK versions. +type legacyStorageReference struct { DriverName string `json:"driver_name"` DriverClaim StorageDriverClaim `json:"driver_claim"` } -func storageReferenceToPayload(ref storageReference, storedSizeBytes int64) (*commonpb.Payload, error) { - data, err := json.Marshal(ref) +var ( + // compile-time assertion that ExternalStorageReference implements proto.Message. + _ proto.Message = (*sdkpb.ExternalStorageReference)(nil) + + // externalStorageReferenceMessageType is the fully-qualified proto message name, + // derived from the descriptor so it stays in sync with the generated code. + externalStorageReferenceMessageType = string((*sdkpb.ExternalStorageReference)(nil).ProtoReflect().Descriptor().FullName()) + + protoMarshalOptions = protojson.MarshalOptions{} + protoUnmarshalOptions = protojson.UnmarshalOptions{ + DiscardUnknown: true, + } +) + +func storageReferenceToPayload(ref *sdkpb.ExternalStorageReference, storedSizeBytes int64) (*commonpb.Payload, error) { + data, err := protoMarshalOptions.Marshal(ref) if err != nil { return nil, fmt.Errorf("failed to marshal storage reference: %w", err) } return &commonpb.Payload{ Metadata: map[string][]byte{ - metadataEncoding: []byte(metadataEncodingStorageRef), + metadataEncoding: []byte(metadataEncodingProtoJSON), + metadataMessageType: []byte(externalStorageReferenceMessageType), }, Data: data, ExternalPayloads: []*commonpb.Payload_ExternalPayloadDetails{ @@ -138,15 +172,31 @@ func storageReferenceToPayload(ref storageReference, storedSizeBytes int64) (*co } // payloadToStorageReference decodes a storage reference from a payload. -func payloadToStorageReference(p *commonpb.Payload) (storageReference, error) { - if string(p.GetMetadata()[metadataEncoding]) != metadataEncodingStorageRef { - return storageReference{}, fmt.Errorf("payload is not a storage reference: unexpected encoding %q", string(p.GetMetadata()[metadataEncoding])) - } - var ref storageReference - if err := json.Unmarshal(p.Data, &ref); err != nil { - return storageReference{}, fmt.Errorf("failed to unmarshal storage reference: %w", err) +// The current format uses encoding=json/protobuf with the ExternalStorageReference +// message type. The legacy format uses encoding=json/external-storage-reference. +func payloadToStorageReference(p *commonpb.Payload) (*sdkpb.ExternalStorageReference, error) { + switch string(p.GetMetadata()[metadataEncoding]) { + case metadataEncodingProtoJSON: + if string(p.GetMetadata()[metadataMessageType]) != externalStorageReferenceMessageType { + return nil, fmt.Errorf("payload is not a storage reference: unexpected message type %q", string(p.GetMetadata()[metadataMessageType])) + } + ref := &sdkpb.ExternalStorageReference{} + if err := protoUnmarshalOptions.Unmarshal(p.GetData(), ref); err != nil { + return nil, fmt.Errorf("failed to unmarshal storage reference: %w", err) + } + return ref, nil + case metadataEncodingStorageRefLegacy: + var legacy legacyStorageReference + if err := json.Unmarshal(p.Data, &legacy); err != nil { + return nil, fmt.Errorf("failed to unmarshal storage reference: %w", err) + } + return &sdkpb.ExternalStorageReference{ + DriverName: legacy.DriverName, + ClaimData: legacy.DriverClaim.ClaimData, + }, nil + default: + return nil, fmt.Errorf("payload is not a storage reference: unexpected encoding %q", string(p.GetMetadata()[metadataEncoding])) } - return ref, nil } type externalRetrievalVisitor struct { @@ -168,19 +218,15 @@ func (v *externalRetrievalVisitor) Visit(ctx *proxy.VisitPayloadsContext, payloa result := make([]*commonpb.Payload, len(payloads)) for i, p := range payloads { - if string(p.GetMetadata()[metadataEncoding]) != metadataEncodingStorageRef { + if !IsStorageReference(p) { result[i] = p continue } - // No storage drivers configured at all. Notify the caller and leave the - // payload unresolved so downstream code can surface a clear error. + // No storage drivers configured at all — fail immediately with a clear error + // rather than passing through an unresolved reference. if len(v.params.driverMap) == 0 { - if cb, ok := ctx.Value(storageOperationCallbackContextKey).(StorageOperationCallback); ok { - cb.UnconfiguredStorageReference() - } - result[i] = p - continue + return nil, fmt.Errorf("externally stored payload encountered but no storage driver is configured") } ref, err := payloadToStorageReference(p) @@ -200,7 +246,7 @@ func (v *externalRetrievalVisitor) Visit(ctx *proxy.VisitPayloadsContext, payloa driverOrder = append(driverOrder, ref.DriverName) } batch.indices = append(batch.indices, i) - batch.claims = append(batch.claims, ref.DriverClaim) + batch.claims = append(batch.claims, StorageDriverClaim{ClaimData: ref.ClaimData}) } // Fan out to each driver concurrently. The errgroup context is used as the @@ -333,9 +379,9 @@ func (v *externalStorageVisitor) Visit(ctx *proxy.VisitPayloadsContext, payloads } var batchSize int64 for j, claim := range claims { - ref := storageReference{ - DriverName: name, - DriverClaim: claim, + ref := &sdkpb.ExternalStorageReference{ + DriverName: name, + ClaimData: claim.ClaimData, } storedSize := int64(batch.payloads[j].Size()) batchSize += storedSize diff --git a/internal/internal_task_pollers.go b/internal/internal_task_pollers.go index 3fe4ce7c7..72a067926 100644 --- a/internal/internal_task_pollers.go +++ b/internal/internal_task_pollers.go @@ -466,7 +466,7 @@ func (wtp *workflowTaskProcessor) processWorkflowTask(task *workflowTask) (retEr // close doneCh so local activity worker won't get blocked forever when trying to send back result to laResultCh. defer close(doneCh) - downloadPayloadMetrics := &workflowTaskStorageMetrics{logger: wtp.logger} + downloadPayloadMetrics := &workflowTaskStorageMetrics{} ctx := extstore.WithStorageOperationCallback(context.Background(), downloadPayloadMetrics) var taskErr error @@ -874,13 +874,11 @@ func (wtp *workflowTaskProcessor) errorToFailWorkflowTaskWithCause(taskToken []b } type workflowTaskStorageMetrics struct { - mu sync.Mutex - payloadCount int - totalSize int64 - totalDuration time.Duration - driverNames map[string]struct{} - logger log.Logger - warnedUnconfigured bool + mu sync.Mutex + payloadCount int + totalSize int64 + totalDuration time.Duration + driverNames map[string]struct{} } func (callback *workflowTaskStorageMetrics) PayloadBatchCompleted(count int, size int64, duration time.Duration, driverNames []string) { @@ -906,14 +904,6 @@ func (callback *workflowTaskStorageMetrics) GetDriverNames() []string { return names } -func (callback *workflowTaskStorageMetrics) UnconfiguredStorageReference() { - callback.mu.Lock() - defer callback.mu.Unlock() - if !callback.warnedUnconfigured && callback.logger != nil { - callback.logger.Warn("[TMPRL1105] Detected externally stored payload(s) but no storage driver is configured.") - callback.warnedUnconfigured = true - } -} func newLocalActivityPoller( params workerExecutionParameters, diff --git a/internal/internal_worker.go b/internal/internal_worker.go index 1fc4b2a49..057408f1f 100644 --- a/internal/internal_worker.go +++ b/internal/internal_worker.go @@ -1604,25 +1604,6 @@ func (aw *AggregatedWorker) activeTaskQueueTypes() []enumspb.TaskQueueType { return types } -// replayStorageMetrics is a storageOperationCallback used by WorkflowReplayer. -// It logs a warning once when storage references are encountered but no driver -// is configured, and is otherwise a no-op. -type replayStorageMetrics struct { - mu sync.Mutex - logger log.Logger - warnedUnconfigured bool -} - -func (c *replayStorageMetrics) PayloadBatchCompleted(_ int, _ int64, _ time.Duration, _ []string) {} - -func (c *replayStorageMetrics) UnconfiguredStorageReference() { - c.mu.Lock() - defer c.mu.Unlock() - if !c.warnedUnconfigured && c.logger != nil { - c.logger.Warn("[TMPRL1105] Detected externally stored payload(s) but no storage driver is configured.") - c.warnedUnconfigured = true - } -} // WorkflowReplayer is used to replay workflow code from an event history type WorkflowReplayer struct { @@ -1990,9 +1971,7 @@ func (aw *WorkflowReplayer) replayWorkflowHistoryRoot( } // Resolve externally stored payloads in the history before passing to the // task handler. This mirrors what processWorkflowTask does for live workers. - replayStorageCb := &replayStorageMetrics{logger: logger} - inboundPayloadVisitorCtx := extstore.WithStorageOperationCallback(context.Background(), replayStorageCb) - if err := visitProtoPayloads(inboundPayloadVisitorCtx, aw.inboundPayloadVisitor, task, 0); err != nil { + if err := visitProtoPayloads(context.Background(), aw.inboundPayloadVisitor, task, 0); err != nil { return err } diff --git a/internal/temporalapi/Makefile b/internal/temporalapi/Makefile new file mode 100644 index 000000000..bf9f831a2 --- /dev/null +++ b/internal/temporalapi/Makefile @@ -0,0 +1,9 @@ +.PHONY: proto install-buf + +# Regenerate protobuf Go code from .proto files. +# Requires buf to be installed (run `make install-buf` first). +proto: + buf generate + +install-buf: + go install github.com/bufbuild/buf/cmd/buf@v1.27.0 diff --git a/internal/temporalapi/README.md b/internal/temporalapi/README.md new file mode 100644 index 000000000..d25af8c8d --- /dev/null +++ b/internal/temporalapi/README.md @@ -0,0 +1,39 @@ +# internal/temporalapi + +Temporary home for protobuf definitions that belong in `go.temporal.io/api` but are not yet published there. Once the upstream package ships these types, the generated Go files will be removed and import paths updated to point to `go.temporal.io/api`. + +## Structure + +Proto source files and their generated Go counterparts live side by side under `sdk/v1/`. + +## Regenerating protobuf code + +### Prerequisites + +**buf** is required to generate Go code from the `.proto` files: + +```sh +make install-buf +``` + +This installs `buf` v1.27.0 via `go install`. If `~/go/bin` is not on your `PATH`, add it: + +```sh +export PATH="$PATH:$(go env GOPATH)/bin" +``` + +### Generate + +From this directory (`internal/temporalapi/`): + +```sh +make proto +``` + +Or from the repo root: + +```sh +make -C internal/temporalapi proto +``` + +The generated `.pb.go` files are committed to the repository, so regeneration is only needed when a `.proto` file changes. diff --git a/internal/temporalapi/buf.gen.yaml b/internal/temporalapi/buf.gen.yaml new file mode 100644 index 000000000..819b4f0f0 --- /dev/null +++ b/internal/temporalapi/buf.gen.yaml @@ -0,0 +1,6 @@ +version: v1 +plugins: + - plugin: buf.build/protocolbuffers/go:v1.36.5 + out: . + opt: + - paths=source_relative diff --git a/internal/temporalapi/buf.yaml b/internal/temporalapi/buf.yaml new file mode 100644 index 000000000..c126332f3 --- /dev/null +++ b/internal/temporalapi/buf.yaml @@ -0,0 +1 @@ +version: v1 diff --git a/internal/temporalapi/sdk/v1/external_storage.pb.go b/internal/temporalapi/sdk/v1/external_storage.pb.go new file mode 100644 index 000000000..d58900385 --- /dev/null +++ b/internal/temporalapi/sdk/v1/external_storage.pb.go @@ -0,0 +1,155 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.36.5 +// protoc (unknown) +// source: sdk/v1/external_storage.proto + +package sdk + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" + unsafe "unsafe" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +// ExternalStorageReference identifies a payload stored in an external storage system. +// It is used as a claim-check token, allowing the actual payload data to be retrieved +// from the named driver using the provided claim data. +type ExternalStorageReference struct { + state protoimpl.MessageState `protogen:"open.v1"` + // The name of the storage driver responsible for retrieving the payload. + DriverName string `protobuf:"bytes,1,opt,name=driver_name,json=driverName,proto3" json:"driver_name,omitempty"` + // Driver-specific key-value pairs that identify and provide access to the stored payload. + ClaimData map[string]string `protobuf:"bytes,2,rep,name=claim_data,json=claimData,proto3" json:"claim_data,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *ExternalStorageReference) Reset() { + *x = ExternalStorageReference{} + mi := &file_sdk_v1_external_storage_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *ExternalStorageReference) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ExternalStorageReference) ProtoMessage() {} + +func (x *ExternalStorageReference) ProtoReflect() protoreflect.Message { + mi := &file_sdk_v1_external_storage_proto_msgTypes[0] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ExternalStorageReference.ProtoReflect.Descriptor instead. +func (*ExternalStorageReference) Descriptor() ([]byte, []int) { + return file_sdk_v1_external_storage_proto_rawDescGZIP(), []int{0} +} + +func (x *ExternalStorageReference) GetDriverName() string { + if x != nil { + return x.DriverName + } + return "" +} + +func (x *ExternalStorageReference) GetClaimData() map[string]string { + if x != nil { + return x.ClaimData + } + return nil +} + +var File_sdk_v1_external_storage_proto protoreflect.FileDescriptor + +var file_sdk_v1_external_storage_proto_rawDesc = string([]byte{ + 0x0a, 0x1d, 0x73, 0x64, 0x6b, 0x2f, 0x76, 0x31, 0x2f, 0x65, 0x78, 0x74, 0x65, 0x72, 0x6e, 0x61, + 0x6c, 0x5f, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, + 0x13, 0x74, 0x65, 0x6d, 0x70, 0x6f, 0x72, 0x61, 0x6c, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x73, 0x64, + 0x6b, 0x2e, 0x76, 0x31, 0x22, 0xd6, 0x01, 0x0a, 0x18, 0x45, 0x78, 0x74, 0x65, 0x72, 0x6e, 0x61, + 0x6c, 0x53, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x52, 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e, 0x63, + 0x65, 0x12, 0x1f, 0x0a, 0x0b, 0x64, 0x72, 0x69, 0x76, 0x65, 0x72, 0x5f, 0x6e, 0x61, 0x6d, 0x65, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x64, 0x72, 0x69, 0x76, 0x65, 0x72, 0x4e, 0x61, + 0x6d, 0x65, 0x12, 0x5b, 0x0a, 0x0a, 0x63, 0x6c, 0x61, 0x69, 0x6d, 0x5f, 0x64, 0x61, 0x74, 0x61, + 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x3c, 0x2e, 0x74, 0x65, 0x6d, 0x70, 0x6f, 0x72, 0x61, + 0x6c, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x73, 0x64, 0x6b, 0x2e, 0x76, 0x31, 0x2e, 0x45, 0x78, 0x74, + 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x53, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x52, 0x65, 0x66, 0x65, + 0x72, 0x65, 0x6e, 0x63, 0x65, 0x2e, 0x43, 0x6c, 0x61, 0x69, 0x6d, 0x44, 0x61, 0x74, 0x61, 0x45, + 0x6e, 0x74, 0x72, 0x79, 0x52, 0x09, 0x63, 0x6c, 0x61, 0x69, 0x6d, 0x44, 0x61, 0x74, 0x61, 0x1a, + 0x3c, 0x0a, 0x0e, 0x43, 0x6c, 0x61, 0x69, 0x6d, 0x44, 0x61, 0x74, 0x61, 0x45, 0x6e, 0x74, 0x72, + 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, + 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x42, 0x34, 0x5a, + 0x32, 0x67, 0x6f, 0x2e, 0x74, 0x65, 0x6d, 0x70, 0x6f, 0x72, 0x61, 0x6c, 0x2e, 0x69, 0x6f, 0x2f, + 0x73, 0x64, 0x6b, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x74, 0x65, 0x6d, + 0x70, 0x6f, 0x72, 0x61, 0x6c, 0x61, 0x70, 0x69, 0x2f, 0x73, 0x64, 0x6b, 0x2f, 0x76, 0x31, 0x3b, + 0x73, 0x64, 0x6b, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +}) + +var ( + file_sdk_v1_external_storage_proto_rawDescOnce sync.Once + file_sdk_v1_external_storage_proto_rawDescData []byte +) + +func file_sdk_v1_external_storage_proto_rawDescGZIP() []byte { + file_sdk_v1_external_storage_proto_rawDescOnce.Do(func() { + file_sdk_v1_external_storage_proto_rawDescData = protoimpl.X.CompressGZIP(unsafe.Slice(unsafe.StringData(file_sdk_v1_external_storage_proto_rawDesc), len(file_sdk_v1_external_storage_proto_rawDesc))) + }) + return file_sdk_v1_external_storage_proto_rawDescData +} + +var file_sdk_v1_external_storage_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_sdk_v1_external_storage_proto_goTypes = []any{ + (*ExternalStorageReference)(nil), // 0: temporal.api.sdk.v1.ExternalStorageReference + nil, // 1: temporal.api.sdk.v1.ExternalStorageReference.ClaimDataEntry +} +var file_sdk_v1_external_storage_proto_depIdxs = []int32{ + 1, // 0: temporal.api.sdk.v1.ExternalStorageReference.claim_data:type_name -> temporal.api.sdk.v1.ExternalStorageReference.ClaimDataEntry + 1, // [1:1] is the sub-list for method output_type + 1, // [1:1] is the sub-list for method input_type + 1, // [1:1] is the sub-list for extension type_name + 1, // [1:1] is the sub-list for extension extendee + 0, // [0:1] is the sub-list for field type_name +} + +func init() { file_sdk_v1_external_storage_proto_init() } +func file_sdk_v1_external_storage_proto_init() { + if File_sdk_v1_external_storage_proto != nil { + return + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: unsafe.Slice(unsafe.StringData(file_sdk_v1_external_storage_proto_rawDesc), len(file_sdk_v1_external_storage_proto_rawDesc)), + NumEnums: 0, + NumMessages: 2, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_sdk_v1_external_storage_proto_goTypes, + DependencyIndexes: file_sdk_v1_external_storage_proto_depIdxs, + MessageInfos: file_sdk_v1_external_storage_proto_msgTypes, + }.Build() + File_sdk_v1_external_storage_proto = out.File + file_sdk_v1_external_storage_proto_goTypes = nil + file_sdk_v1_external_storage_proto_depIdxs = nil +} diff --git a/internal/temporalapi/sdk/v1/external_storage.proto b/internal/temporalapi/sdk/v1/external_storage.proto new file mode 100644 index 000000000..9de82367b --- /dev/null +++ b/internal/temporalapi/sdk/v1/external_storage.proto @@ -0,0 +1,15 @@ +syntax = "proto3"; + +package temporal.api.sdk.v1; + +option go_package = "go.temporal.io/sdk/internal/temporalapi/sdk/v1;sdk"; + +// ExternalStorageReference identifies a payload stored in an external storage system. +// It is used as a claim-check token, allowing the actual payload data to be retrieved +// from the named driver using the provided claim data. +message ExternalStorageReference { + // The name of the storage driver responsible for retrieving the payload. + string driver_name = 1; + // Driver-specific key-value pairs that identify and provide access to the stored payload. + map claim_data = 2; +} diff --git a/test/external_storage_test.go b/test/external_storage_test.go index 0a4f63264..185d5df4a 100644 --- a/test/external_storage_test.go +++ b/test/external_storage_test.go @@ -493,6 +493,58 @@ func (s *ExternalStorageTestSuite) TestRetrieveFailure() { s.Greater(storeCount, 0, "client should have stored the input") } +func (s *ExternalStorageTestSuite) TestWorkerWithoutExternalStorageFails() { + // Build a client and worker with no ExternalStorage. s.client still has the + // driver, so it can store the oversized input; the worker below cannot retrieve it. + noStorageClient, err := client.Dial(client.Options{ + HostPort: s.config.ServiceAddr, + Namespace: s.config.Namespace, + Logger: ilog.NewDefaultLogger(), + ConnectionOptions: client.ConnectionOptions{TLS: s.config.TLS}, + WorkerHeartbeatInterval: -1, + }) + s.NoError(err) + defer noStorageClient.Close() + + noStorageWorker := worker.New(noStorageClient, s.taskQueueName, worker.Options{ + WorkflowPanicPolicy: worker.FailWorkflow, + }) + noStorageWorker.RegisterWorkflow(extStoreEchoWorkflow) + s.NoError(noStorageWorker.Start()) + defer noStorageWorker.Stop() + + ctx, cancel := context.WithTimeout(context.Background(), ctxTimeout) + defer cancel() + + wfID := "ext-no-driver-" + uuid.NewString() + _, err = s.client.ExecuteWorkflow(ctx, s.startOpts(wfID), extStoreEchoWorkflow, oversized(72)) + s.NoError(err) + + storeCount, _ := s.driver.getStoreCounts() + s.Greater(storeCount, 0, "client should have stored the large input") + + // Poll history until the first WorkflowTaskFailed event appears, then + // terminate rather than waiting for the execution timeout to expire. + var failureMsg string + s.Eventually(func() bool { + iter := s.client.GetWorkflowHistory(ctx, wfID, "", false, enumspb.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT) + for iter.HasNext() { + event, err := iter.Next() + if err != nil { + return false + } + if event.GetEventType() == enumspb.EVENT_TYPE_WORKFLOW_TASK_FAILED { + failureMsg = event.GetWorkflowTaskFailedEventAttributes().GetFailure().GetMessage() + return true + } + } + return false + }, 10*time.Second, 200*time.Millisecond, "expected a WorkflowTaskFailed event") + + s.Contains(failureMsg, "externally stored payload encountered but no storage driver is configured") + s.NoError(s.client.TerminateWorkflow(ctx, wfID, "", "test complete")) +} + // --------------------------------------------------------------------------- // TestNoStorageWhenBelowThreshold — sanity check that the driver is never // called when all payloads are below the threshold.