Skip to content

Commit 2c75d21

Browse files
authored
DPA-1556 Implement distribute LLO jobs. (#17082)
* Implement distribute LLO jobs. * Fix lint. * Fix lint. * Configurator address is now defined by the migration, not the changeset. * Proposing jobs emits the proposed jobs in ChangesetOutput.
1 parent f45395e commit 2c75d21

File tree

13 files changed

+500
-110
lines changed

13 files changed

+500
-110
lines changed
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
package changeset
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
"time"
8+
9+
chainsel "github.com/smartcontractkit/chain-selectors"
10+
11+
"github.com/smartcontractkit/chainlink-protos/job-distributor/v1/job"
12+
13+
"github.com/smartcontractkit/chainlink/deployment"
14+
)
15+
16+
const (
17+
defaultJobSpecsTimeout = 120 * time.Second
18+
)
19+
20+
func chainAndAddresses(e deployment.Environment, chainSel uint64) (chainID string, addresses map[string]deployment.TypeAndVersion, err error) {
21+
chainID, err = chainsel.GetChainIDFromSelector(chainSel)
22+
if err != nil {
23+
return "", nil, fmt.Errorf("failed to get chain ID from selector: %w", err)
24+
}
25+
26+
addresses, err = e.ExistingAddresses.AddressesForChain(chainSel)
27+
if err != nil {
28+
return "", nil, fmt.Errorf("failed to get existing addresses: %w", err)
29+
}
30+
return chainID, addresses, nil
31+
}
32+
33+
// proposeAllOrNothing proposes all jobs in the list and if any of them fail, it will revoke all already made proposals.
34+
// TODO We need a test here.
35+
func proposeAllOrNothing(ctx context.Context, oc deployment.OffchainClient, prs []*job.ProposeJobRequest) (proposedJobs []deployment.ProposedJob, err error) {
36+
var proposals []*job.ProposeJobResponse
37+
var p *job.ProposeJobResponse
38+
for _, pr := range prs {
39+
p, err = oc.ProposeJob(ctx, pr)
40+
if err != nil {
41+
break
42+
}
43+
proposedJobs = append(proposedJobs, deployment.ProposedJob{
44+
JobID: p.Proposal.JobId,
45+
Node: pr.NodeId,
46+
Spec: pr.Spec,
47+
})
48+
proposals = append(proposals, p)
49+
}
50+
51+
if err != nil {
52+
// There's an error, so we need to revoke all proposals we just made.
53+
var errs []error
54+
for _, pr := range proposals {
55+
if _, errRevoke := oc.RevokeJob(ctx, &job.RevokeJobRequest{
56+
IdOneof: &job.RevokeJobRequest_Id{Id: pr.Proposal.JobId},
57+
}); errRevoke != nil {
58+
errs = append(errs, fmt.Errorf("failed to revoke job %s: %w", pr.Proposal.JobId, errRevoke))
59+
}
60+
}
61+
// If we got any errors while trying to cancel, we need to return them, so we know we sent some jobs twice.
62+
if len(errs) > 0 {
63+
err = errors.Join(err, errors.Join(errs...))
64+
}
65+
}
66+
67+
return proposedJobs, err
68+
}

deployment/data-streams/changeset/jd_distribute_bootstrap_jobs.go

Lines changed: 26 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -3,67 +3,43 @@ package changeset
33
import (
44
"context"
55
"fmt"
6-
"time"
7-
8-
chainsel "github.com/smartcontractkit/chain-selectors"
96

107
jobv1 "github.com/smartcontractkit/chainlink-protos/job-distributor/v1/job"
118
"github.com/smartcontractkit/chainlink-protos/job-distributor/v1/shared/ptypes"
129

1310
"github.com/smartcontractkit/chainlink/deployment"
1411
"github.com/smartcontractkit/chainlink/deployment/data-streams/jd"
1512
"github.com/smartcontractkit/chainlink/deployment/data-streams/jobs"
16-
)
17-
18-
const (
19-
defaultBootstrapJobSpecsTimeout = 120 * time.Second
13+
"github.com/smartcontractkit/chainlink/deployment/data-streams/utils"
2014
)
2115

2216
var _ deployment.ChangeSetV2[CsDistributeBootstrapJobSpecsConfig] = CsDistributeBootstrapJobSpecs{}
2317

2418
type CsDistributeBootstrapJobSpecsConfig struct {
25-
ChainSelectorEVM uint64
26-
Filter *jd.ListFilter
27-
}
28-
29-
func findConfiguratorAddressByDON(addresses map[string]deployment.TypeAndVersion, donID uint64) (string, error) {
30-
for address, contract := range addresses {
31-
if contract.Type == "Configurator" && contract.Labels.Contains(fmt.Sprintf("don-%d", donID)) {
32-
return address, nil
33-
}
34-
}
35-
return "", fmt.Errorf("Configurator contract not found for DON %d", donID)
19+
ChainSelectorEVM uint64
20+
Filter *jd.ListFilter
21+
ConfiguratorAddress string
3622
}
3723

3824
type CsDistributeBootstrapJobSpecs struct{}
3925

4026
func (CsDistributeBootstrapJobSpecs) Apply(e deployment.Environment, cfg CsDistributeBootstrapJobSpecsConfig) (deployment.ChangesetOutput, error) {
41-
ctx, cancel := context.WithTimeout(e.GetContext(), defaultBootstrapJobSpecsTimeout)
27+
ctx, cancel := context.WithTimeout(e.GetContext(), defaultJobSpecsTimeout)
4228
defer cancel()
4329

44-
bootstrapNodes, err := jd.FetchDONBootstrappersFromJD(ctx, e.Offchain, cfg.Filter)
45-
if err != nil {
46-
return deployment.ChangesetOutput{}, fmt.Errorf("failed to get workflow don nodes: %w", err)
47-
}
48-
49-
chainID, err := chainsel.GetChainIDFromSelector(cfg.ChainSelectorEVM)
50-
if err != nil {
51-
return deployment.ChangesetOutput{}, fmt.Errorf("failed to get chain ID from selector: %w", err)
52-
}
53-
54-
addresses, err := e.ExistingAddresses.AddressesForChain(cfg.ChainSelectorEVM)
55-
if err != nil {
56-
return deployment.ChangesetOutput{}, fmt.Errorf("failed to get existing addresses: %w", err)
57-
}
58-
59-
// Search for the Configurator address in the address book by DON ID
60-
configuratorAddress, err := findConfiguratorAddressByDON(addresses, cfg.Filter.DONID)
30+
chainID, _, err := chainAndAddresses(e, cfg.ChainSelectorEVM)
6131
if err != nil {
6232
return deployment.ChangesetOutput{}, err
6333
}
6434

35+
// Add a label to the job spec to identify the related DON
36+
labels := append([]*ptypes.Label(nil),
37+
&ptypes.Label{
38+
Key: utils.DonIdentifier(cfg.Filter.DONID, cfg.Filter.DONName),
39+
})
40+
6541
bootstrapSpec := jobs.NewBootstrapSpec(
66-
configuratorAddress,
42+
cfg.ConfiguratorAddress,
6743
cfg.Filter.DONID,
6844
jobs.RelayTypeEVM,
6945
jobs.RelayConfig{
@@ -76,28 +52,27 @@ func (CsDistributeBootstrapJobSpecs) Apply(e deployment.Environment, cfg CsDistr
7652
return deployment.ChangesetOutput{}, fmt.Errorf("failed to marshal bootstrap spec: %w", err)
7753
}
7854

79-
// Add a label to the job spec to identify the related DON
80-
labels := append([]*ptypes.Label(nil),
81-
&ptypes.Label{
82-
Key: fmt.Sprintf("don-%d-%s", cfg.Filter.DONID, cfg.Filter.DONName),
83-
})
55+
bootstrapNodes, err := jd.FetchDONBootstrappersFromJD(ctx, e.Offchain, cfg.Filter)
56+
if err != nil {
57+
return deployment.ChangesetOutput{}, fmt.Errorf("failed to get workflow don nodes: %w", err)
58+
}
8459

85-
// TODO: For now the implementation uses a very simple approach, in case of partial failures there is a risk
86-
// of sending the same job spec multiple times to the same node. We need to understand the implications of this
87-
// and decide if we need to implement a more complex approach.
60+
var proposals []*jobv1.ProposeJobRequest
8861
for _, node := range bootstrapNodes {
89-
_, err = e.Offchain.ProposeJob(ctx, &jobv1.ProposeJobRequest{
62+
proposals = append(proposals, &jobv1.ProposeJobRequest{
9063
NodeId: node.Id,
9164
Spec: string(renderedSpec),
9265
Labels: labels,
9366
})
94-
95-
if err != nil {
96-
return deployment.ChangesetOutput{}, fmt.Errorf("failed to propose job: %w", err)
97-
}
67+
}
68+
proposedJobs, err := proposeAllOrNothing(ctx, e.Offchain, proposals)
69+
if err != nil {
70+
return deployment.ChangesetOutput{}, fmt.Errorf("failed to propose all bootstrap jobs: %w", err)
9871
}
9972

100-
return deployment.ChangesetOutput{}, nil
73+
return deployment.ChangesetOutput{
74+
Jobs: proposedJobs,
75+
}, nil
10176
}
10277

10378
func (f CsDistributeBootstrapJobSpecs) VerifyPreconditions(e deployment.Environment, config CsDistributeBootstrapJobSpecsConfig) error {

deployment/data-streams/changeset/jd_distribute_bootstrap_jobs_test.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ import (
1212
)
1313

1414
func TestDistributeBootstrapJobSpecs(t *testing.T) {
15-
t.Skip("Flaky Test: https://smartcontract-it.atlassian.net/browse/DX-196")
1615
t.Parallel()
1716

1817
e := testutil.NewMemoryEnv(t, false, 1)
@@ -21,7 +20,8 @@ func TestDistributeBootstrapJobSpecs(t *testing.T) {
2120
chainSelector := e.AllChainSelectors()[0]
2221

2322
// insert a Configurator address for the given DON
24-
err := e.ExistingAddresses.Save(chainSelector, "0x4170ed0880ac9a755fd29b2688956bd959f923f4",
23+
configuratorAddr := "0x4170ed0880ac9a755fd29b2688956bd959f923f4"
24+
err := e.ExistingAddresses.Save(chainSelector, configuratorAddr,
2525
deployment.TypeAndVersion{
2626
Type: "Configurator",
2727
Version: deployment.Version1_0_0,
@@ -37,6 +37,7 @@ func TestDistributeBootstrapJobSpecs(t *testing.T) {
3737
EnvLabel: "env",
3838
Size: 0,
3939
},
40+
ConfiguratorAddress: configuratorAddr,
4041
}
4142

4243
tests := []struct {
@@ -66,7 +67,7 @@ func TestDistributeBootstrapJobSpecs(t *testing.T) {
6667
require.NoError(t, err)
6768
} else {
6869
require.Error(t, err)
69-
require.Contains(t, err.Error(), tt.wantErr)
70+
require.Contains(t, err.Error(), *tt.wantErr)
7071
}
7172
})
7273
}

0 commit comments

Comments
 (0)