Skip to content

Commit 34c0961

Browse files
authored
Update storage driver store context (temporalio#2268)
1 parent 156715b commit 34c0961

10 files changed

Lines changed: 1270 additions & 24 deletions

contrib/aws/s3driver/driver.go

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"encoding/hex"
66
"errors"
77
"fmt"
8+
"net/url"
89

910
commonpb "go.temporal.io/api/common/v1"
1011
"go.temporal.io/sdk/converter"
@@ -144,7 +145,7 @@ func (d *s3StorageDriver) Store(
144145
g, gctx := errgroup.WithContext(ctx.Context)
145146
for i, pp := range prepared {
146147
g.Go(func() error {
147-
key := objectKey(pp.hexDigest)
148+
key := objectKey(ctx.Target, pp.hexDigest)
148149
exists, err := d.client.ObjectExists(gctx, pp.bucket, key)
149150
if err != nil {
150151
return fmt.Errorf("existence check failed [bucket=%s, key=%s]: %w", pp.bucket, key, err)
@@ -231,8 +232,33 @@ func (d *s3StorageDriver) Retrieve(
231232
return payloads, nil
232233
}
233234

234-
func objectKey(hexDigest string) string {
235-
return keyVersion + "/d/" + hashAlgorithm + "/" + hexDigest
235+
func objectKey(target converter.StorageDriverTargetInfo, hexDigest string) string {
236+
digestSegment := "/d/" + hashAlgorithm + "/" + hexDigest
237+
switch t := target.(type) {
238+
case converter.StorageDriverWorkflowInfo:
239+
return keyVersion +
240+
"/ns/" + pathEscape(t.Namespace) +
241+
"/wt/" + pathEscape(t.WorkflowType) +
242+
"/wi/" + pathEscape(t.WorkflowID) +
243+
"/ri/" + pathEscape(t.RunID) +
244+
digestSegment
245+
case converter.StorageDriverActivityInfo:
246+
return keyVersion +
247+
"/ns/" + pathEscape(t.Namespace) +
248+
"/at/" + pathEscape(t.ActivityType) +
249+
"/ai/" + pathEscape(t.ActivityID) +
250+
"/ri/" + pathEscape(t.RunID) +
251+
digestSegment
252+
default:
253+
return keyVersion + digestSegment
254+
}
255+
}
256+
257+
func pathEscape(s string) string {
258+
if s == "" {
259+
return "null"
260+
}
261+
return url.PathEscape(s)
236262
}
237263

238264
func sha256Hex(data []byte) string {

contrib/aws/s3driver/driver_test.go

Lines changed: 148 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,10 @@ func storeCtx() converter.StorageDriverStoreContext {
8181
return converter.StorageDriverStoreContext{Context: context.Background()}
8282
}
8383

84+
func storeCtxWithTarget(target converter.StorageDriverTargetInfo) converter.StorageDriverStoreContext {
85+
return converter.StorageDriverStoreContext{Context: context.Background(), Target: target}
86+
}
87+
8488
func retrieveCtx() converter.StorageDriverRetrieveContext {
8589
return converter.StorageDriverRetrieveContext{Context: context.Background()}
8690
}
@@ -459,8 +463,72 @@ func TestRetrieve_ClaimMissingHashValue(t *testing.T) {
459463

460464
// --- Key generation tests ---
461465

462-
func TestObjectKey(t *testing.T) {
463-
assert.Equal(t, "v0/d/sha256/abc123", objectKey("abc123"))
466+
func TestObjectKey_NoTarget(t *testing.T) {
467+
assert.Equal(t, "v0/d/sha256/abc123", objectKey(nil, "abc123"))
468+
}
469+
470+
func TestObjectKey_WorkflowInfo(t *testing.T) {
471+
target := converter.StorageDriverWorkflowInfo{
472+
Namespace: "default",
473+
WorkflowType: "MyWorkflow",
474+
WorkflowID: "wf-123",
475+
RunID: "run-456",
476+
}
477+
assert.Equal(t,
478+
"v0/ns/default/wt/MyWorkflow/wi/wf-123/ri/run-456/d/sha256/abc123",
479+
objectKey(target, "abc123"),
480+
)
481+
}
482+
483+
func TestObjectKey_ActivityInfo(t *testing.T) {
484+
target := converter.StorageDriverActivityInfo{
485+
Namespace: "default",
486+
ActivityType: "MyActivity",
487+
ActivityID: "act-789",
488+
RunID: "run-abc",
489+
}
490+
assert.Equal(t,
491+
"v0/ns/default/at/MyActivity/ai/act-789/ri/run-abc/d/sha256/abc123",
492+
objectKey(target, "abc123"),
493+
)
494+
}
495+
496+
func TestObjectKey_WorkflowInfo_EmptyFields(t *testing.T) {
497+
// Empty strings should fall back to "null" in each segment.
498+
target := converter.StorageDriverWorkflowInfo{
499+
Namespace: "my-ns",
500+
// WorkflowType, WorkflowID, RunID intentionally empty
501+
}
502+
assert.Equal(t,
503+
"v0/ns/my-ns/wt/null/wi/null/ri/null/d/sha256/abc123",
504+
objectKey(target, "abc123"),
505+
)
506+
}
507+
508+
func TestObjectKey_ActivityInfo_EmptyFields(t *testing.T) {
509+
target := converter.StorageDriverActivityInfo{
510+
Namespace: "my-ns",
511+
// ActivityType, ActivityID, RunID intentionally empty
512+
}
513+
assert.Equal(t,
514+
"v0/ns/my-ns/at/null/ai/null/ri/null/d/sha256/abc123",
515+
objectKey(target, "abc123"),
516+
)
517+
}
518+
519+
func TestObjectKey_WorkflowInfo_SpecialChars(t *testing.T) {
520+
// Slashes, spaces, and other special characters must be percent-encoded.
521+
target := converter.StorageDriverWorkflowInfo{
522+
Namespace: "my namespace",
523+
WorkflowType: "my/workflow",
524+
WorkflowID: "wf id+1",
525+
RunID: "run=abc",
526+
}
527+
key := objectKey(target, "abc123")
528+
assert.Equal(t,
529+
"v0/ns/my%20namespace/wt/my%2Fworkflow/wi/wf%20id+1/ri/run=abc/d/sha256/abc123",
530+
key,
531+
)
464532
}
465533

466534
func TestSha256Hex(t *testing.T) {
@@ -469,3 +537,81 @@ func TestSha256Hex(t *testing.T) {
469537
expected := hex.EncodeToString(h[:])
470538
assert.Equal(t, expected, sha256Hex(data))
471539
}
540+
541+
// --- Store with target context tests ---
542+
543+
func TestStore_WithWorkflowTarget(t *testing.T) {
544+
mc := newMemClient()
545+
d := newDriver(t, mc)
546+
p := testPayload("workflow payload")
547+
548+
target := converter.StorageDriverWorkflowInfo{
549+
Namespace: "default",
550+
WorkflowType: "MyWorkflow",
551+
WorkflowID: "wf-123",
552+
RunID: "run-456",
553+
}
554+
claims, err := d.Store(storeCtxWithTarget(target), []*commonpb.Payload{p})
555+
require.NoError(t, err)
556+
require.Len(t, claims, 1)
557+
558+
key := claims[0].ClaimData["key"]
559+
assert.Contains(t, key, "v0/ns/default/wt/MyWorkflow/wi/wf-123/ri/run-456/d/sha256/")
560+
}
561+
562+
func TestStore_WithActivityTarget(t *testing.T) {
563+
mc := newMemClient()
564+
d := newDriver(t, mc)
565+
p := testPayload("activity payload")
566+
567+
target := converter.StorageDriverActivityInfo{
568+
Namespace: "default",
569+
ActivityType: "MyActivity",
570+
ActivityID: "act-789",
571+
RunID: "run-abc",
572+
}
573+
claims, err := d.Store(storeCtxWithTarget(target), []*commonpb.Payload{p})
574+
require.NoError(t, err)
575+
require.Len(t, claims, 1)
576+
577+
key := claims[0].ClaimData["key"]
578+
assert.Contains(t, key, "v0/ns/default/at/MyActivity/ai/act-789/ri/run-abc/d/sha256/")
579+
}
580+
581+
func TestStore_RoundTrip_WithWorkflowTarget(t *testing.T) {
582+
mc := newMemClient()
583+
d := newDriver(t, mc)
584+
original := testPayload("round-trip with target")
585+
586+
target := converter.StorageDriverWorkflowInfo{
587+
Namespace: "default",
588+
WorkflowType: "MyWorkflow",
589+
WorkflowID: "wf-123",
590+
RunID: "run-456",
591+
}
592+
claims, err := d.Store(storeCtxWithTarget(target), []*commonpb.Payload{original})
593+
require.NoError(t, err)
594+
595+
restored, err := d.Retrieve(retrieveCtx(), claims)
596+
require.NoError(t, err)
597+
require.Len(t, restored, 1)
598+
assert.True(t, proto.Equal(original, restored[0]))
599+
}
600+
601+
func TestStore_DifferentTargets_SamePayload_DifferentKeys(t *testing.T) {
602+
// The same payload stored under different targets produces different keys.
603+
mc := newMemClient()
604+
d := newDriver(t, mc)
605+
p := testPayload("shared payload")
606+
607+
wfTarget := converter.StorageDriverWorkflowInfo{Namespace: "ns", WorkflowID: "wf-1", RunID: "run-1"}
608+
actTarget := converter.StorageDriverActivityInfo{Namespace: "ns", ActivityID: "act-1", RunID: "run-1"}
609+
610+
wfClaims, err := d.Store(storeCtxWithTarget(wfTarget), []*commonpb.Payload{p})
611+
require.NoError(t, err)
612+
613+
actClaims, err := d.Store(storeCtxWithTarget(actTarget), []*commonpb.Payload{p})
614+
require.NoError(t, err)
615+
616+
assert.NotEqual(t, wfClaims[0].ClaimData["key"], actClaims[0].ClaimData["key"])
617+
}

converter/extstore.go

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,53 @@ import (
66
commonpb "go.temporal.io/api/common/v1"
77
)
88

9+
// StorageDriverTargetInfo identifies the workflow or activity on whose behalf
10+
// a payload is being stored. Use a type switch on [StorageDriverWorkflowInfo]
11+
// and [StorageDriverActivityInfo] to access the concrete values.
12+
//
13+
// NOTE: Experimental
14+
type StorageDriverTargetInfo interface {
15+
isStorageDriverTargetInfo()
16+
}
17+
18+
// StorageDriverWorkflowInfo carries workflow identity for a storage operation.
19+
//
20+
// NOTE: Experimental
21+
type StorageDriverWorkflowInfo struct {
22+
// Namespace is the Temporal namespace of the workflow execution.
23+
Namespace string
24+
// WorkflowType is the type name of the workflow.
25+
WorkflowType string
26+
// WorkflowID is the ID of the workflow execution.
27+
WorkflowID string
28+
// RunID is the run ID of the workflow execution.
29+
RunID string
30+
}
31+
32+
func (StorageDriverWorkflowInfo) isStorageDriverTargetInfo() {}
33+
34+
var _ StorageDriverTargetInfo = StorageDriverWorkflowInfo{}
35+
36+
// StorageDriverActivityInfo carries activity identity for a storage operation.
37+
// This is only used for standalone (non-workflow-bound) activities; activities
38+
// started by a workflow use [StorageDriverWorkflowInfo] as the target.
39+
//
40+
// NOTE: Experimental
41+
type StorageDriverActivityInfo struct {
42+
// Namespace is the Temporal namespace of the activity execution.
43+
Namespace string
44+
// ActivityType is the type name of the activity.
45+
ActivityType string
46+
// ActivityID is the ID of the activity execution.
47+
ActivityID string
48+
// RunID is the run ID of the activity execution.
49+
RunID string
50+
}
51+
52+
func (StorageDriverActivityInfo) isStorageDriverTargetInfo() {}
53+
54+
var _ StorageDriverTargetInfo = StorageDriverActivityInfo{}
55+
956
// StorageDriverStoreContext carries context passed to StorageDriver.Store and
1057
// StorageDriverSelector.SelectDriver operations.
1158
//
@@ -15,6 +62,10 @@ type StorageDriverStoreContext struct {
1562
// Drivers should use it to respect cancellation and to propagate deadlines
1663
// and trace information to downstream calls (e.g. cloud storage SDKs).
1764
Context context.Context
65+
// Target identifies the workflow or activity on whose behalf payloads are
66+
// being stored. Use a type switch on [StorageDriverWorkflowInfo] and
67+
// [StorageDriverActivityInfo] to access the concrete values.
68+
Target StorageDriverTargetInfo
1869
}
1970

2071
// StorageDriverRetrieveContext carries context passed to StorageDriver.Retrieve

internal/extstore.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ type storageOperationCallback interface {
8383
}
8484

8585
const storageOperationCallbackContextKey contextKey = "storageOperationCallback"
86+
const storageTargetContextKey contextKey = "storageTarget"
8687

8788
// metadataEncodingStorageRef is the metadata encoding value used to identify
8889
// payloads that are storage references rather than actual data.
@@ -247,7 +248,11 @@ func (v *externalStorageVisitor) Visit(ctx *proxy.VisitPayloadsContext, payloads
247248
driverBatches := map[string]*driverBatch{}
248249

249250
result := make([]*commonpb.Payload, len(payloads))
250-
driverCtx := converter.StorageDriverStoreContext{Context: ctx.Context}
251+
var target converter.StorageDriverTargetInfo
252+
if t, ok := ctx.Context.Value(storageTargetContextKey).(converter.StorageDriverTargetInfo); ok {
253+
target = t
254+
}
255+
driverCtx := converter.StorageDriverStoreContext{Context: ctx.Context, Target: target}
251256

252257
for i, p := range payloads {
253258
if proto.Size(p) < v.params.payloadSizeThreshold {
@@ -287,7 +292,7 @@ func (v *externalStorageVisitor) Visit(ctx *proxy.VisitPayloadsContext, payloads
287292
// Fan out to each driver concurrently. The errgroup context is used as the
288293
// StorageDriverStoreContext so a failing driver cancels in-flight siblings.
289294
eg, egCtx := errgroup.WithContext(ctx.Context)
290-
storeDrCtx := converter.StorageDriverStoreContext{Context: egCtx}
295+
storeDrCtx := converter.StorageDriverStoreContext{Context: egCtx, Target: target}
291296
sizes := make([]int64, len(driverOrder))
292297

293298
externalCount := 0

internal/internal_activity_client.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -587,7 +587,12 @@ func (w *workflowClientInterceptor) ExecuteActivity(
587587
return nil, err
588588
}
589589

590-
if err := visitProtoPayloads(ctx, w.client.outboundPayloadVisitor, request); err != nil {
590+
storeCtx := context.WithValue(ctx, storageTargetContextKey, converter.StorageDriverActivityInfo{
591+
Namespace: w.client.namespace,
592+
ActivityID: request.ActivityId,
593+
ActivityType: in.ActivityType,
594+
})
595+
if err := visitProtoPayloads(storeCtx, w.client.outboundPayloadVisitor, request); err != nil {
591596
return nil, err
592597
}
593598

internal/internal_schedule_client.go

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,15 @@ func (w *workflowClientInterceptor) CreateSchedule(ctx context.Context, in *Sche
131131
SearchAttributes: searchAttr,
132132
}
133133

134+
storeCtx := context.WithValue(ctx, storageTargetContextKey, converter.StorageDriverWorkflowInfo{
135+
Namespace: w.client.namespace,
136+
WorkflowID: action.GetStartWorkflow().GetWorkflowId(),
137+
WorkflowType: action.GetStartWorkflow().GetWorkflowType().GetName(),
138+
})
139+
if err := visitProtoPayloads(storeCtx, w.client.outboundPayloadVisitor, startRequest); err != nil {
140+
return nil, err
141+
}
142+
134143
grpcCtx, cancel := newGRPCContext(ctx, defaultGrpcRetryParameters(ctx))
135144
defer cancel()
136145

@@ -283,15 +292,26 @@ func (scheduleHandle *scheduleHandleImpl) Update(ctx context.Context, options Sc
283292
}
284293
}
285294

286-
_, err = scheduleHandle.client.workflowService.UpdateSchedule(grpcCtx, &workflowservice.UpdateScheduleRequest{
295+
updateRequest := &workflowservice.UpdateScheduleRequest{
287296
Namespace: scheduleHandle.client.namespace,
288297
ScheduleId: scheduleHandle.ID,
289298
Schedule: newSchedulePB,
290299
ConflictToken: nil,
291300
Identity: scheduleHandle.client.identity,
292301
RequestId: uuid.NewString(),
293302
SearchAttributes: newSA,
303+
}
304+
305+
storeCtx := context.WithValue(ctx, storageTargetContextKey, converter.StorageDriverWorkflowInfo{
306+
Namespace: scheduleHandle.client.namespace,
307+
WorkflowID: newSchedulePB.GetAction().GetStartWorkflow().GetWorkflowId(),
308+
WorkflowType: newSchedulePB.GetAction().GetStartWorkflow().GetWorkflowType().GetName(),
294309
})
310+
if err := visitProtoPayloads(storeCtx, scheduleHandle.client.outboundPayloadVisitor, updateRequest); err != nil {
311+
return err
312+
}
313+
314+
_, err = scheduleHandle.client.workflowService.UpdateSchedule(grpcCtx, updateRequest)
295315
return err
296316
}
297317

0 commit comments

Comments
 (0)