Skip to content

[CRE-406] (fix): WF Registry Syncer update #17608

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
May 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 32 additions & 2 deletions core/capabilities/integration_tests/framework/don.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,13 @@ import (
"testing"
"time"

"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/types/known/durationpb"

"github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types"

"github.com/smartcontractkit/chainlink/v2/core/services/workflows/artifacts"

commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink-common/pkg/capabilities/consensus/ocr3"
"github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb"
Expand All @@ -41,6 +40,8 @@ import (
p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types"
"github.com/smartcontractkit/chainlink/v2/core/services/registrysyncer"
"github.com/smartcontractkit/chainlink/v2/core/services/standardcapabilities"
"github.com/smartcontractkit/chainlink/v2/core/services/workflows/artifacts"
"github.com/smartcontractkit/chainlink/v2/core/services/workflows/syncer"
"github.com/smartcontractkit/chainlink/v2/core/utils/testutils/heavyweight"
)

Expand Down Expand Up @@ -96,6 +97,16 @@ func (c DonContext) WaitForCapabilitiesToBeExposed(t *testing.T, dons ...*DON) {
}, 1*time.Minute, 1*time.Second, "timeout waiting for capabilities to be exposed")
}

func (c DonContext) WaitForWorkflowRegistryMetadata(t *testing.T, workflowName string, owner string, workflowID [32]byte) {
require.Eventually(t, func() bool {
wf, err := c.workflowRegistry.contract.GetWorkflowMetadata(&bind.CallOpts{}, common.HexToAddress(owner), workflowName)
if err != nil {
return false
}
return wf.WorkflowID == workflowID
}, 1*time.Minute, 5*time.Second, "timeout waiting for workflow")
}

type capabilityNode struct {
*cltest.TestApplication
registry *capabilities.Registry
Expand Down Expand Up @@ -389,6 +400,24 @@ func (d *DON) AddWorkflow(workflow Workflow) error {
return nil
}

func (d *DON) UpdateWorkflow(workflow UpdatedWorkflow) error {
if !d.config.AcceptsWorkflows {
return errors.New("cannot add workflow to non-workflow DON")
}

if !d.initialised {
return errors.New("cannot add workflow to non-initialised DON")
}

d.workflowRegistry.UpdateWorkflow(workflow, *d.id)

return nil
}

func (d *DON) ComputeHashKey(owner string, field string) [32]byte {
return d.workflowRegistry.ComputeHashKey(owner, field)
}

type TriggerFactory interface {
CreateNewTrigger(t *testing.T) commoncap.TriggerCapability
GetTriggerID() string
Expand Down Expand Up @@ -419,6 +448,7 @@ func startNewNode(ctx context.Context,
c.Capabilities.ExternalRegistry.ChainID = ptr(fmt.Sprintf("%d", testutils.SimulatedChainID))
c.Capabilities.ExternalRegistry.Address = ptr(capRegistryAddr.String())
c.Capabilities.Peering.V2.Enabled = ptr(true)
c.Capabilities.WorkflowRegistry.SyncStrategy = ptr(syncer.SyncStrategyReconciliation)
c.Feature.FeedsManager = ptr(false)
c.Feature.LogPoller = ptr(true)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,14 @@ func (r *WorkflowRegistry) RegisterWorkflow(input Workflow, donID uint32) {
registerWorkflow(r.t, r.backend, r.contract, input, donID)
}

func (r *WorkflowRegistry) UpdateWorkflow(input UpdatedWorkflow, donID uint32) {
updateWorkflow(r.t, r.backend, r.contract, input)
}

func (r *WorkflowRegistry) ComputeHashKey(owner string, field string) [32]byte {
return computeHashKey(r.t, r.contract, owner, field)
}

func updateAuthorizedAddress(
t *testing.T,
th *EthBlockchain,
Expand Down Expand Up @@ -103,3 +111,37 @@ func registerWorkflow(
th.Backend.Commit()
th.Backend.Commit()
}

type UpdatedWorkflow struct {
WorkflowKey [32]byte
ID [32]byte
BinaryURL string
ConfigURL string
SecretsURL string
}

func updateWorkflow(
t *testing.T,
th *EthBlockchain,
wfRegC *workflow_registry_wrapper.WorkflowRegistry,
input UpdatedWorkflow,
) {
t.Helper()
_, err := wfRegC.UpdateWorkflow(th.transactionOpts, input.WorkflowKey, input.ID, input.BinaryURL, input.ConfigURL, input.SecretsURL)
require.NoError(t, err, "failed to update workflow")
th.Backend.Commit()
th.Backend.Commit()
th.Backend.Commit()
}

func computeHashKey(
t *testing.T,
wfRegC *workflow_registry_wrapper.WorkflowRegistry,
owner string,
field string,
) [32]byte {
t.Helper()
hashKey, err := wfRegC.ComputeHashKey(&bind.CallOpts{}, common.HexToAddress(owner), field)
require.NoError(t, err, "failed to compute hash key")
return hashKey
}
79 changes: 71 additions & 8 deletions core/capabilities/integration_tests/por/fetchtrueusd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func Test_FetchTrueUSDWorkflow(t *testing.T) {
framework.CreateWasmBinary(t, mainFile, wasmFile)

triggerSink := framework.NewTriggerSink(t, "cron-trigger", "1.0.0")
dataFeedsCacheContract := setupDons(ctx, t, lggr, wasmFile, "*/5 * * * * *", triggerSink)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so if I understand the existing POR test has been 'hijacked' to test workflow update, fine for the hot fix, but could this be followed up with a test to explicity test workflow update that is seperate from the POR test? (reusing where necessary)

dataFeedsCacheContract, thenCallback := setupDons(ctx, t, lggr, wasmFile, "*/5 * * * * *", "*/4 * * * * *", triggerSink)

bundleReceived := make(chan *data_feeds_cache.DataFeedsCacheBundleReportUpdated, 1000)
bundleSub, err := dataFeedsCacheContract.WatchBundleReportUpdated(&bind.WatchOpts{}, bundleReceived, nil, nil)
Expand All @@ -82,17 +82,39 @@ func Test_FetchTrueUSDWorkflow(t *testing.T) {
require.NoError(t, err)
triggerSink.SendOutput(wrappedMap, uuid.New().String())

loop:
for {
endTime, ok := ctxWithTimeout.Deadline()
if !ok {
endTime = time.Now().Add(time.Minute)
}
require.Eventually(t, func() bool {
select {
case <-ctxWithTimeout.Done():
t.Fatalf("timed out waiting for bundle")
case err := <-bundleSub.Err():
require.NoError(t, err)
case <-bundleReceived:
break loop
return true
}
}
return false
}, time.Until(endTime), time.Second, "timed out waiting for first bundle")

// Update workflow
thenCallback()

time.Sleep(time.Second * 30)
triggerSink.SendOutput(wrappedMap, uuid.New().String())

require.Eventually(t, func() bool {
select {
case <-ctxWithTimeout.Done():
t.Fatalf("timed out waiting for bundle")
case err := <-bundleSub.Err():
require.NoError(t, err)
case <-bundleReceived:
return true
}
return false
}, time.Until(endTime), time.Second, "timed out waiting for second bundle")
}

func generateRandomReservesResponse() string {
Expand Down Expand Up @@ -147,12 +169,16 @@ func (n ComputeFetcherFactory) NewFetcher(log commonlogger.Logger, emitter custm
}
}

func setupDons(ctx context.Context, t *testing.T, lggr logger.SugaredLogger, workflowURL string, cronSchedule string,
triggerFactory framework.TriggerFactory) *data_feeds_cache.DataFeedsCache {
func setupDons(ctx context.Context, t *testing.T, lggr logger.SugaredLogger, workflowURL string, cronSchedule string, cronSchedule2 string,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit this signature is a bit opaque. consider returning a named interface of hooks that happen at during execution points (or name the return values)

Copy link
Contributor Author

@justinkaseman justinkaseman May 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's fair. I'm going to punt on a refactor like that for now, as this is a hot fix that needs to go out.

triggerFactory framework.TriggerFactory) (*data_feeds_cache.DataFeedsCache, func()) {
configURL := "workflow-config.json"
workflowConfig := fetchTrueUSDConfig{
CronSchedule: cronSchedule,
}
configURL2 := "workflow-config2.json"
workflowConfig2 := fetchTrueUSDConfig{
CronSchedule: cronSchedule2,
}

compressedBinary, base64EncodedCompressedBinary := framework.GetCompressedWorkflowWasm(t, workflowURL)

Expand All @@ -165,6 +191,10 @@ func setupDons(ctx context.Context, t *testing.T, lggr logger.SugaredLogger, wor
configBytes, err := json.Marshal(workflowConfig)
require.NoError(t, err)
return configBytes, nil
case configURL2:
configBytes2, err := json.Marshal(workflowConfig2)
require.NoError(t, err)
return configBytes2, nil
}

return nil, fmt.Errorf("unknown url: %s", url)
Expand Down Expand Up @@ -205,11 +235,13 @@ func setupDons(ctx context.Context, t *testing.T, lggr logger.SugaredLogger, wor
workflows.HashTruncateName(workflowName))

workflowConfig.ConsumerAddress = dataFeedsCacheAddr.String()
workflowConfig2.ConsumerAddress = dataFeedsCacheAddr.String()

// Setup Write capability DON
writeTargetCapabilityID, err := writeCapabilityDon.AddPublishedEthereumWriteTargetNonStandardCapability(forwarderAddr)
require.NoError(t, err)
workflowConfig.WriteTargetCapabilityID = writeTargetCapabilityID
workflowConfig2.WriteTargetCapabilityID = writeTargetCapabilityID

writeCapabilityDon.Initialise()
servicetest.Run(t, writeCapabilityDon)
Expand All @@ -222,7 +254,21 @@ func setupDons(ctx context.Context, t *testing.T, lggr logger.SugaredLogger, wor

registerWorkflow(t, donContext, workflowName, compressedBinary, "", workflowDon,
workflowURL, configURL, workflowConfigBytes)
return dataFeedsCache

workflowConfigBytes2, err := json.Marshal(workflowConfig2)
require.NoError(t, err)

then := func() {
newWfID := updateWorkflow(t, donContext, workflowName, compressedBinary, "", workflowDon,
workflowURL, configURL2, workflowConfigBytes2, workflowOwner)

// Wait for workflow to be added to the Workflow Registry
donContext.WaitForWorkflowRegistryMetadata(t, workflowName, workflowOwner, newWfID)

donContext.WaitForCapabilitiesToBeExposed(t, writeCapabilityDon, workflowDon)
}

return dataFeedsCache, then
}

func SetupDataFeedsCacheContract(t *testing.T, backend *framework.EthBlockchain,
Expand Down Expand Up @@ -274,3 +320,20 @@ func registerWorkflow(t *testing.T, donContext framework.DonContext, workflowNam
})
require.NoError(t, err)
}

func updateWorkflow(t *testing.T, donContext framework.DonContext, workflowName string, compressedBinary []byte,
secretsURL string, workflowDon *framework.DON, binaryURL string, configURL string, configBytes []byte, owner string) [32]byte {
workflowID, err := workflows.GenerateWorkflowID(donContext.EthBlockchain.TransactionOpts().From[:], workflowName, compressedBinary, configBytes, secretsURL)
require.NoError(t, err)

err = workflowDon.UpdateWorkflow(framework.UpdatedWorkflow{
WorkflowKey: workflowDon.ComputeHashKey(owner, workflowName),
ID: workflowID,
BinaryURL: binaryURL,
ConfigURL: configURL,
SecretsURL: secretsURL,
})
require.NoError(t, err)

return workflowID
}
21 changes: 17 additions & 4 deletions core/services/workflows/artifacts/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,13 +234,26 @@ func (h *Store) GetSecrets(ctx context.Context, secretsURL string, workflowID [3
return nil, fmt.Errorf("failed to fetch secrets from %s : %w", secretsURL, fetchErr)
}

// sanity check by decoding the secrets
Copy link
Contributor Author

@justinkaseman justinkaseman May 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved check to separate method below ValidateSecrets. This is called in handler.go func tryEngineCreate (line 715)

_, decryptErr := h.decryptSecrets(fetchedSecrets, hex.EncodeToString(workflowOwner))
return fetchedSecrets, nil
}

func (h *Store) ValidateSecrets(ctx context.Context, workflowID, workflowOwner string) error {
_, secretsPayload, err := h.orm.GetContentsByWorkflowID(ctx, workflowID)
if err != nil {
// The workflow record was found, but secrets_id was empty.
if errors.Is(err, ErrEmptySecrets) {
return nil
}

return fmt.Errorf("failed to retrieve secrets by workflow ID: %w", err)
}

_, decryptErr := h.decryptSecrets([]byte(secretsPayload), workflowOwner)
if decryptErr != nil {
return nil, fmt.Errorf("failed to decrypt secrets %s: %w", secretsURL, decryptErr)
return fmt.Errorf("failed to decrypt secrets: %w", decryptErr)
}

return fetchedSecrets, nil
return nil
}

func (h *Store) ForceUpdateSecrets(
Expand Down
Loading
Loading