Skip to content

Commit 0538438

Browse files
[CRE-406] (fix): WF Registry Syncer update (#17608)
* [CRE-406] (fix): WF Registry Syncer update properly deletes previous entry * [CAPPL-804] (test): Add integration test for Workflow Registry Syncer updates * [CAPPL-811] (refactor): WF Registry Syncer - Move validations after creating DB entry to avoid refetching * (test): Use SyncStrategyReconciliation setting in Capability integration test framework * (test): Amend unit test * Clean up from code review
1 parent 68aa27d commit 0538438

File tree

6 files changed

+206
-36
lines changed

6 files changed

+206
-36
lines changed

core/capabilities/integration_tests/framework/don.go

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,13 @@ import (
99
"testing"
1010
"time"
1111

12+
"github.com/ethereum/go-ethereum/accounts/abi/bind"
1213
"github.com/ethereum/go-ethereum/common"
1314
"github.com/stretchr/testify/require"
1415
"google.golang.org/protobuf/types/known/durationpb"
1516

1617
"github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types"
1718

18-
"github.com/smartcontractkit/chainlink/v2/core/services/workflows/artifacts"
19-
2019
commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities"
2120
"github.com/smartcontractkit/chainlink-common/pkg/capabilities/consensus/ocr3"
2221
"github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb"
@@ -41,6 +40,8 @@ import (
4140
p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types"
4241
"github.com/smartcontractkit/chainlink/v2/core/services/registrysyncer"
4342
"github.com/smartcontractkit/chainlink/v2/core/services/standardcapabilities"
43+
"github.com/smartcontractkit/chainlink/v2/core/services/workflows/artifacts"
44+
"github.com/smartcontractkit/chainlink/v2/core/services/workflows/syncer"
4445
"github.com/smartcontractkit/chainlink/v2/core/utils/testutils/heavyweight"
4546
)
4647

@@ -96,6 +97,16 @@ func (c DonContext) WaitForCapabilitiesToBeExposed(t *testing.T, dons ...*DON) {
9697
}, 1*time.Minute, 1*time.Second, "timeout waiting for capabilities to be exposed")
9798
}
9899

100+
func (c DonContext) WaitForWorkflowRegistryMetadata(t *testing.T, workflowName string, owner string, workflowID [32]byte) {
101+
require.Eventually(t, func() bool {
102+
wf, err := c.workflowRegistry.contract.GetWorkflowMetadata(&bind.CallOpts{}, common.HexToAddress(owner), workflowName)
103+
if err != nil {
104+
return false
105+
}
106+
return wf.WorkflowID == workflowID
107+
}, 1*time.Minute, 5*time.Second, "timeout waiting for workflow")
108+
}
109+
99110
type capabilityNode struct {
100111
*cltest.TestApplication
101112
registry *capabilities.Registry
@@ -389,6 +400,24 @@ func (d *DON) AddWorkflow(workflow Workflow) error {
389400
return nil
390401
}
391402

403+
func (d *DON) UpdateWorkflow(workflow UpdatedWorkflow) error {
404+
if !d.config.AcceptsWorkflows {
405+
return errors.New("cannot add workflow to non-workflow DON")
406+
}
407+
408+
if !d.initialised {
409+
return errors.New("cannot add workflow to non-initialised DON")
410+
}
411+
412+
d.workflowRegistry.UpdateWorkflow(workflow, *d.id)
413+
414+
return nil
415+
}
416+
417+
func (d *DON) ComputeHashKey(owner string, field string) [32]byte {
418+
return d.workflowRegistry.ComputeHashKey(owner, field)
419+
}
420+
392421
type TriggerFactory interface {
393422
CreateNewTrigger(t *testing.T) commoncap.TriggerCapability
394423
GetTriggerID() string
@@ -419,6 +448,7 @@ func startNewNode(ctx context.Context,
419448
c.Capabilities.ExternalRegistry.ChainID = ptr(fmt.Sprintf("%d", testutils.SimulatedChainID))
420449
c.Capabilities.ExternalRegistry.Address = ptr(capRegistryAddr.String())
421450
c.Capabilities.Peering.V2.Enabled = ptr(true)
451+
c.Capabilities.WorkflowRegistry.SyncStrategy = ptr(syncer.SyncStrategyReconciliation)
422452
c.Feature.FeedsManager = ptr(false)
423453
c.Feature.LogPoller = ptr(true)
424454

core/capabilities/integration_tests/framework/workflow_registry.go

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,14 @@ func (r *WorkflowRegistry) RegisterWorkflow(input Workflow, donID uint32) {
3939
registerWorkflow(r.t, r.backend, r.contract, input, donID)
4040
}
4141

42+
func (r *WorkflowRegistry) UpdateWorkflow(input UpdatedWorkflow, donID uint32) {
43+
updateWorkflow(r.t, r.backend, r.contract, input)
44+
}
45+
46+
func (r *WorkflowRegistry) ComputeHashKey(owner string, field string) [32]byte {
47+
return computeHashKey(r.t, r.contract, owner, field)
48+
}
49+
4250
func updateAuthorizedAddress(
4351
t *testing.T,
4452
th *EthBlockchain,
@@ -103,3 +111,37 @@ func registerWorkflow(
103111
th.Backend.Commit()
104112
th.Backend.Commit()
105113
}
114+
115+
type UpdatedWorkflow struct {
116+
WorkflowKey [32]byte
117+
ID [32]byte
118+
BinaryURL string
119+
ConfigURL string
120+
SecretsURL string
121+
}
122+
123+
func updateWorkflow(
124+
t *testing.T,
125+
th *EthBlockchain,
126+
wfRegC *workflow_registry_wrapper.WorkflowRegistry,
127+
input UpdatedWorkflow,
128+
) {
129+
t.Helper()
130+
_, err := wfRegC.UpdateWorkflow(th.transactionOpts, input.WorkflowKey, input.ID, input.BinaryURL, input.ConfigURL, input.SecretsURL)
131+
require.NoError(t, err, "failed to update workflow")
132+
th.Backend.Commit()
133+
th.Backend.Commit()
134+
th.Backend.Commit()
135+
}
136+
137+
func computeHashKey(
138+
t *testing.T,
139+
wfRegC *workflow_registry_wrapper.WorkflowRegistry,
140+
owner string,
141+
field string,
142+
) [32]byte {
143+
t.Helper()
144+
hashKey, err := wfRegC.ComputeHashKey(&bind.CallOpts{}, common.HexToAddress(owner), field)
145+
require.NoError(t, err, "failed to compute hash key")
146+
return hashKey
147+
}

core/capabilities/integration_tests/por/fetchtrueusd_test.go

Lines changed: 71 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ func Test_FetchTrueUSDWorkflow(t *testing.T) {
6868
framework.CreateWasmBinary(t, mainFile, wasmFile)
6969

7070
triggerSink := framework.NewTriggerSink(t, "cron-trigger", "1.0.0")
71-
dataFeedsCacheContract := setupDons(ctx, t, lggr, wasmFile, "*/5 * * * * *", triggerSink)
71+
dataFeedsCacheContract, thenCallback := setupDons(ctx, t, lggr, wasmFile, "*/5 * * * * *", "*/4 * * * * *", triggerSink)
7272

7373
bundleReceived := make(chan *data_feeds_cache.DataFeedsCacheBundleReportUpdated, 1000)
7474
bundleSub, err := dataFeedsCacheContract.WatchBundleReportUpdated(&bind.WatchOpts{}, bundleReceived, nil, nil)
@@ -82,17 +82,39 @@ func Test_FetchTrueUSDWorkflow(t *testing.T) {
8282
require.NoError(t, err)
8383
triggerSink.SendOutput(wrappedMap, uuid.New().String())
8484

85-
loop:
86-
for {
85+
endTime, ok := ctxWithTimeout.Deadline()
86+
if !ok {
87+
endTime = time.Now().Add(time.Minute)
88+
}
89+
require.Eventually(t, func() bool {
8790
select {
8891
case <-ctxWithTimeout.Done():
8992
t.Fatalf("timed out waiting for bundle")
9093
case err := <-bundleSub.Err():
9194
require.NoError(t, err)
9295
case <-bundleReceived:
93-
break loop
96+
return true
9497
}
95-
}
98+
return false
99+
}, time.Until(endTime), time.Second, "timed out waiting for first bundle")
100+
101+
// Update workflow
102+
thenCallback()
103+
104+
time.Sleep(time.Second * 30)
105+
triggerSink.SendOutput(wrappedMap, uuid.New().String())
106+
107+
require.Eventually(t, func() bool {
108+
select {
109+
case <-ctxWithTimeout.Done():
110+
t.Fatalf("timed out waiting for bundle")
111+
case err := <-bundleSub.Err():
112+
require.NoError(t, err)
113+
case <-bundleReceived:
114+
return true
115+
}
116+
return false
117+
}, time.Until(endTime), time.Second, "timed out waiting for second bundle")
96118
}
97119

98120
func generateRandomReservesResponse() string {
@@ -147,12 +169,16 @@ func (n ComputeFetcherFactory) NewFetcher(log commonlogger.Logger, emitter custm
147169
}
148170
}
149171

150-
func setupDons(ctx context.Context, t *testing.T, lggr logger.SugaredLogger, workflowURL string, cronSchedule string,
151-
triggerFactory framework.TriggerFactory) *data_feeds_cache.DataFeedsCache {
172+
func setupDons(ctx context.Context, t *testing.T, lggr logger.SugaredLogger, workflowURL string, cronSchedule string, cronSchedule2 string,
173+
triggerFactory framework.TriggerFactory) (*data_feeds_cache.DataFeedsCache, func()) {
152174
configURL := "workflow-config.json"
153175
workflowConfig := fetchTrueUSDConfig{
154176
CronSchedule: cronSchedule,
155177
}
178+
configURL2 := "workflow-config2.json"
179+
workflowConfig2 := fetchTrueUSDConfig{
180+
CronSchedule: cronSchedule2,
181+
}
156182

157183
compressedBinary, base64EncodedCompressedBinary := framework.GetCompressedWorkflowWasm(t, workflowURL)
158184

@@ -165,6 +191,10 @@ func setupDons(ctx context.Context, t *testing.T, lggr logger.SugaredLogger, wor
165191
configBytes, err := json.Marshal(workflowConfig)
166192
require.NoError(t, err)
167193
return configBytes, nil
194+
case configURL2:
195+
configBytes2, err := json.Marshal(workflowConfig2)
196+
require.NoError(t, err)
197+
return configBytes2, nil
168198
}
169199

170200
return nil, fmt.Errorf("unknown url: %s", url)
@@ -205,11 +235,13 @@ func setupDons(ctx context.Context, t *testing.T, lggr logger.SugaredLogger, wor
205235
workflows.HashTruncateName(workflowName))
206236

207237
workflowConfig.ConsumerAddress = dataFeedsCacheAddr.String()
238+
workflowConfig2.ConsumerAddress = dataFeedsCacheAddr.String()
208239

209240
// Setup Write capability DON
210241
writeTargetCapabilityID, err := writeCapabilityDon.AddPublishedEthereumWriteTargetNonStandardCapability(forwarderAddr)
211242
require.NoError(t, err)
212243
workflowConfig.WriteTargetCapabilityID = writeTargetCapabilityID
244+
workflowConfig2.WriteTargetCapabilityID = writeTargetCapabilityID
213245

214246
writeCapabilityDon.Initialise()
215247
servicetest.Run(t, writeCapabilityDon)
@@ -222,7 +254,21 @@ func setupDons(ctx context.Context, t *testing.T, lggr logger.SugaredLogger, wor
222254

223255
registerWorkflow(t, donContext, workflowName, compressedBinary, "", workflowDon,
224256
workflowURL, configURL, workflowConfigBytes)
225-
return dataFeedsCache
257+
258+
workflowConfigBytes2, err := json.Marshal(workflowConfig2)
259+
require.NoError(t, err)
260+
261+
then := func() {
262+
newWfID := updateWorkflow(t, donContext, workflowName, compressedBinary, "", workflowDon,
263+
workflowURL, configURL2, workflowConfigBytes2, workflowOwner)
264+
265+
// Wait for workflow to be added to the Workflow Registry
266+
donContext.WaitForWorkflowRegistryMetadata(t, workflowName, workflowOwner, newWfID)
267+
268+
donContext.WaitForCapabilitiesToBeExposed(t, writeCapabilityDon, workflowDon)
269+
}
270+
271+
return dataFeedsCache, then
226272
}
227273

228274
func SetupDataFeedsCacheContract(t *testing.T, backend *framework.EthBlockchain,
@@ -274,3 +320,20 @@ func registerWorkflow(t *testing.T, donContext framework.DonContext, workflowNam
274320
})
275321
require.NoError(t, err)
276322
}
323+
324+
func updateWorkflow(t *testing.T, donContext framework.DonContext, workflowName string, compressedBinary []byte,
325+
secretsURL string, workflowDon *framework.DON, binaryURL string, configURL string, configBytes []byte, owner string) [32]byte {
326+
workflowID, err := workflows.GenerateWorkflowID(donContext.EthBlockchain.TransactionOpts().From[:], workflowName, compressedBinary, configBytes, secretsURL)
327+
require.NoError(t, err)
328+
329+
err = workflowDon.UpdateWorkflow(framework.UpdatedWorkflow{
330+
WorkflowKey: workflowDon.ComputeHashKey(owner, workflowName),
331+
ID: workflowID,
332+
BinaryURL: binaryURL,
333+
ConfigURL: configURL,
334+
SecretsURL: secretsURL,
335+
})
336+
require.NoError(t, err)
337+
338+
return workflowID
339+
}

core/services/workflows/artifacts/store.go

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -234,13 +234,26 @@ func (h *Store) GetSecrets(ctx context.Context, secretsURL string, workflowID [3
234234
return nil, fmt.Errorf("failed to fetch secrets from %s : %w", secretsURL, fetchErr)
235235
}
236236

237-
// sanity check by decoding the secrets
238-
_, decryptErr := h.decryptSecrets(fetchedSecrets, hex.EncodeToString(workflowOwner))
237+
return fetchedSecrets, nil
238+
}
239+
240+
func (h *Store) ValidateSecrets(ctx context.Context, workflowID, workflowOwner string) error {
241+
_, secretsPayload, err := h.orm.GetContentsByWorkflowID(ctx, workflowID)
242+
if err != nil {
243+
// The workflow record was found, but secrets_id was empty.
244+
if errors.Is(err, ErrEmptySecrets) {
245+
return nil
246+
}
247+
248+
return fmt.Errorf("failed to retrieve secrets by workflow ID: %w", err)
249+
}
250+
251+
_, decryptErr := h.decryptSecrets([]byte(secretsPayload), workflowOwner)
239252
if decryptErr != nil {
240-
return nil, fmt.Errorf("failed to decrypt secrets %s: %w", secretsURL, decryptErr)
253+
return fmt.Errorf("failed to decrypt secrets: %w", decryptErr)
241254
}
242255

243-
return fetchedSecrets, nil
256+
return nil
244257
}
245258

246259
func (h *Store) ForceUpdateSecrets(

0 commit comments

Comments
 (0)