Skip to content

Commit e2a632d

Browse files
[CLD-1504]: feat(changeset): update registry apply calls to accept explicit input (#860)
Instead of always relying on environment variable `DURABLE_PIPELINE_INPUT` at the lower level, we need a way to explicitly pass the actual input via method params so we can decouple the dependency on the env var. This allows us build a test runtime which accepts input instead of relying on env var and also decoupling of `DURABLE_PIPELINE_INPUT` on the changeset level in the future JIRA: https://smartcontract-it.atlassian.net/browse/CLD-1504
1 parent 1ccf1b5 commit e2a632d

4 files changed

Lines changed: 184 additions & 68 deletions

File tree

engine/cld/changeset/common.go

Lines changed: 85 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ type Configurations struct {
3232
type internalChangeSet interface {
3333
noop() // unexported function to prevent arbitrary structs from implementing ChangeSet.
3434
Apply(env fdeployment.Environment) (fdeployment.ChangesetOutput, error)
35+
applyWithInput(env fdeployment.Environment, inputStr string) (fdeployment.ChangesetOutput, error)
3536
Configurations() (Configurations, error)
3637
}
3738

@@ -77,6 +78,35 @@ type TypedJSON struct {
7778
ChainOverrides []uint64 `json:"chainOverrides"` // Optional field for chain overrides
7879
}
7980

81+
func parseTypedInput(inputStr string) (TypedJSON, error) {
82+
if inputStr == "" {
83+
return TypedJSON{}, errors.New("input is empty")
84+
}
85+
86+
var inputObject TypedJSON
87+
if err := json.Unmarshal([]byte(inputStr), &inputObject); err != nil {
88+
return TypedJSON{}, fmt.Errorf("JSON must be in JSON format with 'payload' fields: %w", err)
89+
}
90+
if len(inputObject.Payload) == 0 {
91+
return TypedJSON{}, errors.New("'payload' field is required")
92+
}
93+
94+
return inputObject, nil
95+
}
96+
97+
func decodePayload[C any](payload json.RawMessage) (C, error) {
98+
var config C
99+
100+
payloadDecoder := json.NewDecoder(strings.NewReader(string(payload)))
101+
payloadDecoder.UseNumber()
102+
payloadDecoder.DisallowUnknownFields()
103+
if err := payloadDecoder.Decode(&config); err != nil {
104+
return config, fmt.Errorf("failed to unmarshal payload: %w", err)
105+
}
106+
107+
return config, nil
108+
}
109+
80110
// WithJSON returns a fully configured changeset, which pairs a [fdeployment.ChangeSet] with its configuration based
81111
// a JSON input. It also allows extensions, such as a PostProcessing function.
82112
// InputStr must be a JSON object with a "payload" field that contains the actual input data for a Durable Pipeline.
@@ -92,31 +122,13 @@ type TypedJSON struct {
92122
// Note: Prefer WithEnvInput for durable_pipelines.go
93123
func (f WrappedChangeSet[C]) WithJSON(_ C, inputStr string) ConfiguredChangeSet {
94124
return ChangeSetImpl[C]{changeset: f, configProvider: func() (C, error) {
95-
var config C
96-
97-
if inputStr == "" {
98-
return config, errors.New("input is empty")
99-
}
100-
101-
var inputObject TypedJSON
102-
if err := json.Unmarshal([]byte(inputStr), &inputObject); err != nil {
103-
return config, fmt.Errorf("JSON must be in JSON format with 'payload' fields: %w", err)
104-
}
105-
106-
// If payload is null, decode it as null (which will give zero value)
107-
// If payload is missing, return an error
108-
if len(inputObject.Payload) == 0 {
109-
return config, errors.New("'payload' field is required")
110-
}
111-
112-
payloadDecoder := json.NewDecoder(strings.NewReader(string(inputObject.Payload)))
113-
payloadDecoder.UseNumber()
114-
payloadDecoder.DisallowUnknownFields()
115-
if err := payloadDecoder.Decode(&config); err != nil {
116-
return config, fmt.Errorf("failed to unmarshal payload: %w", err)
125+
inputObject, err := parseTypedInput(inputStr)
126+
if err != nil {
127+
var zero C
128+
return zero, err
117129
}
118130

119-
return config, nil
131+
return decodePayload[C](inputObject.Payload)
120132
},
121133
inputChainOverrides: func() ([]uint64, error) {
122134
return loadInputChainOverrides(inputStr)
@@ -152,42 +164,35 @@ func (f WrappedChangeSet[C]) WithEnvInput(opts ...EnvInputOption[C]) ConfiguredC
152164

153165
inputStr := os.Getenv("DURABLE_PIPELINE_INPUT")
154166

155-
return ChangeSetImpl[C]{changeset: f, configProvider: func() (C, error) {
156-
var config C
157-
158-
if inputStr == "" {
159-
return config, errors.New("input is empty")
160-
}
161-
162-
var inputObject TypedJSON
163-
if err := json.Unmarshal([]byte(inputStr), &inputObject); err != nil {
164-
return config, fmt.Errorf("JSON must be in JSON format with 'payload' fields: %w", err)
165-
}
167+
providerFromInput := func(rawInput string) (C, error) {
168+
var zero C
166169

167-
// If payload is null, decode it as null (which will give zero value)
168-
// If payload is missing, return an error
169-
if len(inputObject.Payload) == 0 {
170-
return config, errors.New("'payload' field is required")
170+
inputObject, err := parseTypedInput(rawInput)
171+
if err != nil {
172+
return zero, err
171173
}
172-
173-
payloadDecoder := json.NewDecoder(strings.NewReader(string(inputObject.Payload)))
174-
payloadDecoder.UseNumber()
175-
payloadDecoder.DisallowUnknownFields()
176-
if err := payloadDecoder.Decode(&config); err != nil {
177-
return config, fmt.Errorf("failed to unmarshal payload: %w", err)
174+
config, err := decodePayload[C](inputObject.Payload)
175+
if err != nil {
176+
return zero, err
178177
}
179178

180179
if options.inputModifier != nil {
181-
conf, err := options.inputModifier(config)
182-
if err != nil {
183-
return conf, fmt.Errorf("failed to apply input modifier: %w", err)
180+
conf, modifierErr := options.inputModifier(config)
181+
if modifierErr != nil {
182+
return conf, fmt.Errorf("failed to apply input modifier: %w", modifierErr)
184183
}
185184

186185
return conf, nil
187186
}
188187

189188
return config, nil
190-
},
189+
}
190+
191+
return ChangeSetImpl[C]{changeset: f,
192+
configProvider: func() (C, error) {
193+
return providerFromInput(inputStr)
194+
},
195+
configProviderWithInput: providerFromInput,
191196
inputChainOverrides: func() ([]uint64, error) {
192197
return loadInputChainOverrides(inputStr)
193198
},
@@ -223,25 +228,14 @@ func (f WrappedChangeSet[C]) WithConfigResolver(resolver fresolvers.ConfigResolv
223228
// Read input from environment variable
224229
inputStr := os.Getenv("DURABLE_PIPELINE_INPUT")
225230

226-
configProvider := func() (C, error) {
231+
configProviderFromInput := func(rawInput string) (C, error) {
227232
var zero C
228233

229-
if inputStr == "" {
230-
return zero, errors.New("input is empty")
231-
}
232-
233-
// Parse JSON input
234-
var inputObject TypedJSON
235-
if err := json.Unmarshal([]byte(inputStr), &inputObject); err != nil {
234+
inputObject, err := parseTypedInput(rawInput)
235+
if err != nil {
236236
return zero, fmt.Errorf("failed to parse resolver input as JSON: %w", err)
237237
}
238238

239-
// If payload is null, pass it to the resolver (which will receive null)
240-
// If payload field is missing, return an error
241-
if len(inputObject.Payload) == 0 {
242-
return zero, errors.New("'payload' field is required")
243-
}
244-
245239
// Call resolver – automatically unmarshal into its expected input type.
246240
typedConfig, err := fresolvers.CallResolver[C](resolver, inputObject.Payload)
247241
if err != nil {
@@ -251,8 +245,12 @@ func (f WrappedChangeSet[C]) WithConfigResolver(resolver fresolvers.ConfigResolv
251245
return typedConfig, nil
252246
}
253247

254-
return ChangeSetImpl[C]{changeset: f, configProvider: configProvider,
255-
ConfigResolver: resolver,
248+
return ChangeSetImpl[C]{changeset: f,
249+
configProvider: func() (C, error) {
250+
return configProviderFromInput(inputStr)
251+
},
252+
configProviderWithInput: configProviderFromInput,
253+
ConfigResolver: resolver,
256254
inputChainOverrides: func() ([]uint64, error) {
257255
return loadInputChainOverrides(inputStr)
258256
},
@@ -262,9 +260,10 @@ func (f WrappedChangeSet[C]) WithConfigResolver(resolver fresolvers.ConfigResolv
262260
var _ ConfiguredChangeSet = ChangeSetImpl[any]{}
263261

264262
type ChangeSetImpl[C any] struct {
265-
changeset WrappedChangeSet[C]
266-
configProvider func() (C, error)
267-
inputChainOverrides func() ([]uint64, error)
263+
changeset WrappedChangeSet[C]
264+
configProvider func() (C, error)
265+
configProviderWithInput func(inputStr string) (C, error)
266+
inputChainOverrides func() ([]uint64, error)
268267

269268
// Present only when the changeset was wired with
270269
// Configure(...).WithConfigResolver(...)
@@ -289,6 +288,25 @@ func (ccs ChangeSetImpl[C]) Apply(env fdeployment.Environment) (fdeployment.Chan
289288
return ccs.changeset.operation.Apply(env, c)
290289
}
291290

291+
func (ccs ChangeSetImpl[C]) applyWithInput(env fdeployment.Environment, inputStr string) (fdeployment.ChangesetOutput, error) {
292+
if inputStr == "" {
293+
return ccs.Apply(env)
294+
}
295+
if ccs.configProviderWithInput == nil {
296+
return ccs.Apply(env)
297+
}
298+
299+
c, err := ccs.configProviderWithInput(inputStr)
300+
if err != nil {
301+
return fdeployment.ChangesetOutput{}, err
302+
}
303+
if err := ccs.changeset.operation.VerifyPreconditions(env, c); err != nil {
304+
return fdeployment.ChangesetOutput{}, err
305+
}
306+
307+
return ccs.changeset.operation.Apply(env, c)
308+
}
309+
292310
func (ccs ChangeSetImpl[C]) Configurations() (Configurations, error) {
293311
var chainOverrides []uint64
294312
var err error

engine/cld/changeset/postprocess.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,18 @@ func (ccs PostProcessingChangeSetImpl[C]) Apply(env fdeployment.Environment) (fd
3535
return ccs.postProcessor(env, output)
3636
}
3737

38+
func (ccs PostProcessingChangeSetImpl[C]) applyWithInput(
39+
env fdeployment.Environment, inputStr string,
40+
) (fdeployment.ChangesetOutput, error) {
41+
env.Logger.Debugf("Post-processing ChangesetOutput from %T", ccs.changeset.changeset.operation)
42+
output, err := ccs.changeset.applyWithInput(env, inputStr)
43+
if err != nil {
44+
return output, err
45+
}
46+
47+
return ccs.postProcessor(env, output)
48+
}
49+
3850
func (ccs PostProcessingChangeSetImpl[C]) Configurations() (Configurations, error) {
3951
return ccs.changeset.Configurations()
4052
}

engine/cld/changeset/registry.go

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,19 @@ func (r *ChangesetsRegistry) AddGlobalPostHooks(hooks ...PostHook) {
172172
// a failed Apply are logged but never mask the Apply error.
173173
func (r *ChangesetsRegistry) Apply(
174174
key string, e fdeployment.Environment,
175+
) (fdeployment.ChangesetOutput, error) {
176+
return r.applyWithInput(key, e, "")
177+
}
178+
179+
// ApplyWithInput applies a changeset with explicit input string for this apply invocation.
180+
func (r *ChangesetsRegistry) ApplyWithInput(
181+
key string, e fdeployment.Environment, inputStr string,
182+
) (fdeployment.ChangesetOutput, error) {
183+
return r.applyWithInput(key, e, inputStr)
184+
}
185+
186+
func (r *ChangesetsRegistry) applyWithInput(
187+
key string, e fdeployment.Environment, inputStr string,
175188
) (fdeployment.ChangesetOutput, error) {
176189
entry, globalPre, globalPost, err := r.getApplySnapshot(key)
177190
if err != nil {
@@ -204,7 +217,9 @@ func (r *ChangesetsRegistry) Apply(
204217
}
205218
}
206219

207-
output, applyErr := entry.changeset.Apply(e)
220+
var output fdeployment.ChangesetOutput
221+
var applyErr error
222+
output, applyErr = entry.changeset.applyWithInput(e, inputStr)
208223

209224
postParams := PostHookParams{
210225
Env: hookEnv,

engine/cld/changeset/registry_test.go

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,10 @@ func (noopChangeset) Apply(e fdeployment.Environment) (fdeployment.ChangesetOutp
2525
return fdeployment.ChangesetOutput{}, nil
2626
}
2727

28+
func (n noopChangeset) applyWithInput(e fdeployment.Environment, _ string) (fdeployment.ChangesetOutput, error) {
29+
return n.Apply(e)
30+
}
31+
2832
func (n noopChangeset) Configurations() (Configurations, error) {
2933
return Configurations{
3034
InputChainOverrides: n.chainOverrides,
@@ -45,6 +49,10 @@ func (r *recordingChangeset) Apply(_ fdeployment.Environment) (fdeployment.Chang
4549
return r.output, r.err
4650
}
4751

52+
func (r *recordingChangeset) applyWithInput(e fdeployment.Environment, _ string) (fdeployment.ChangesetOutput, error) {
53+
return r.Apply(e)
54+
}
55+
4856
func (*recordingChangeset) Configurations() (Configurations, error) {
4957
return Configurations{}, nil
5058
}
@@ -61,6 +69,10 @@ func (o *orderRecordingChangeset) Apply(_ fdeployment.Environment) (fdeployment.
6169
return fdeployment.ChangesetOutput{}, nil
6270
}
6371

72+
func (o *orderRecordingChangeset) applyWithInput(e fdeployment.Environment, _ string) (fdeployment.ChangesetOutput, error) {
73+
return o.Apply(e)
74+
}
75+
6476
func (*orderRecordingChangeset) Configurations() (Configurations, error) {
6577
return Configurations{}, nil
6678
}
@@ -148,6 +160,65 @@ func Test_Changesets_Apply(t *testing.T) {
148160
}
149161
}
150162

163+
//nolint:paralleltest
164+
func Test_Changesets_ApplyWithInput_WithEnvConfiguredChangeset(t *testing.T) {
165+
type inputConfig struct {
166+
Value int `json:"value"`
167+
}
168+
169+
t.Setenv("DURABLE_PIPELINE_INPUT", `{"payload":{"value":999}}`)
170+
171+
var received int
172+
cs := fdeployment.CreateChangeSet(
173+
func(_ fdeployment.Environment, cfg inputConfig) (fdeployment.ChangesetOutput, error) {
174+
received = cfg.Value
175+
return fdeployment.ChangesetOutput{}, nil
176+
},
177+
func(_ fdeployment.Environment, _ inputConfig) error { return nil },
178+
)
179+
180+
r := NewChangesetsRegistry()
181+
r.Add("0001_test", Configure(cs).WithEnvInput())
182+
183+
// overrides the input set by the env var
184+
_, err := r.ApplyWithInput("0001_test", fdeployment.Environment{}, `{"payload":{"value":1}}`)
185+
require.NoError(t, err)
186+
require.Equal(t, 1, received)
187+
}
188+
189+
//nolint:paralleltest // Uses process environment for fallback behavior assertions.
190+
func Test_Changesets_ApplyWithInput_WithResolverConfiguredChangeset(t *testing.T) {
191+
type resolverInput struct {
192+
Base int `json:"base"`
193+
}
194+
type resolverOutput struct {
195+
Value int `json:"value"`
196+
}
197+
198+
t.Setenv("DURABLE_PIPELINE_INPUT", `{"payload":{"base":100}}`)
199+
200+
resolver := func(input resolverInput) (resolverOutput, error) {
201+
return resolverOutput{Value: input.Base + 10}, nil
202+
}
203+
204+
var received int
205+
cs := fdeployment.CreateChangeSet(
206+
func(_ fdeployment.Environment, cfg resolverOutput) (fdeployment.ChangesetOutput, error) {
207+
received = cfg.Value
208+
return fdeployment.ChangesetOutput{}, nil
209+
},
210+
func(_ fdeployment.Environment, _ resolverOutput) error { return nil },
211+
)
212+
213+
r := NewChangesetsRegistry()
214+
r.Add("0001_test", Configure(cs).WithConfigResolver(resolver))
215+
216+
// overrides the input set by the env var
217+
_, err := r.ApplyWithInput("0001_test", fdeployment.Environment{}, `{"payload":{"base":7}}`)
218+
require.NoError(t, err)
219+
require.Equal(t, 17, received)
220+
}
221+
151222
func Test_Changesets_Add(t *testing.T) {
152223
t.Parallel()
153224

0 commit comments

Comments
 (0)