Skip to content

Commit 6df0e02

Browse files
authored
DataStreams: Label nodes we send jobs to (#17613)
* Do not filter the list of nodes we get from JD by don id - we'll filter them by name later. * Label all nodes we propose to with the don identifier. * Add a couple of extra stream types. * lint * Fix a bug which blanks the node's name. Assign names and IDs to nodes on creation. * lint * lint * Fix deps
1 parent f5ed9fe commit 6df0e02

File tree

8 files changed

+120
-70
lines changed

8 files changed

+120
-70
lines changed

deployment/data-streams/changeset/jd_distribute_llo_jobs.go

Lines changed: 95 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -77,22 +77,39 @@ func (CsDistributeLLOJobSpecs) Apply(e cldf.Environment, cfg CsDistributeLLOJobS
7777
if err != nil {
7878
return cldf.ChangesetOutput{}, fmt.Errorf("failed to generate bootstrap proposals: %w", err)
7979
}
80-
oracleProposals, err := generateOracleProposals(ctx, e, cfg, chainID, cfg.Labels)
80+
// These will be empty when we send only oracle jobs. In that case we'll fetch the bootstrappers by the don
81+
// identifier label.
82+
boostrapNodeIDs := make([]string, 0, len(bootstrapProposals))
83+
for _, p := range bootstrapProposals {
84+
boostrapNodeIDs = append(boostrapNodeIDs, p.NodeId)
85+
}
86+
oracleProposals, err := generateOracleProposals(ctx, e, cfg, chainID, cfg.Labels, boostrapNodeIDs)
8187
if err != nil {
8288
return cldf.ChangesetOutput{}, fmt.Errorf("failed to generate oracle proposals: %w", err)
8389
}
84-
85-
proposedJobs, err := proposeAllOrNothing(ctx, e.Offchain, append(bootstrapProposals, oracleProposals...))
90+
allProposals := append(bootstrapProposals, oracleProposals...) //nolint: gocritic // ignore a silly rule
91+
proposedJobs, err := proposeAllOrNothing(ctx, e.Offchain, allProposals)
8692
if err != nil {
8793
return cldf.ChangesetOutput{}, fmt.Errorf("failed to propose all jobs: %w", err)
8894
}
8995

96+
err = labelNodesForProposals(e.GetContext(), e.Offchain, allProposals, utils.DonIdentifier(cfg.Filter.DONID, cfg.Filter.DONName))
97+
if err != nil {
98+
return cldf.ChangesetOutput{}, fmt.Errorf("failed to label nodes for proposals: %w", err)
99+
}
100+
90101
return cldf.ChangesetOutput{
91102
Jobs: proposedJobs,
92103
}, nil
93104
}
94105

95-
func generateBootstrapProposals(ctx context.Context, e cldf.Environment, cfg CsDistributeLLOJobSpecsConfig, chainID string, labels []*ptypes.Label) ([]*jobv1.ProposeJobRequest, error) {
106+
func generateBootstrapProposals(
107+
ctx context.Context,
108+
e cldf.Environment,
109+
cfg CsDistributeLLOJobSpecsConfig,
110+
chainID string,
111+
labels []*ptypes.Label,
112+
) ([]*jobv1.ProposeJobRequest, error) {
96113
bootstrapNodes, err := jd.FetchDONBootstrappersFromJD(ctx, e.Offchain, cfg.Filter, cfg.NodeNames)
97114
if err != nil {
98115
return nil, fmt.Errorf("failed to get bootstrap nodes: %w", err)
@@ -153,7 +170,14 @@ func generateBootstrapProposals(ctx context.Context, e cldf.Environment, cfg CsD
153170
return proposals, nil
154171
}
155172

156-
func generateOracleProposals(ctx context.Context, e cldf.Environment, cfg CsDistributeLLOJobSpecsConfig, chainID string, labels []*ptypes.Label) ([]*jobv1.ProposeJobRequest, error) {
173+
func generateOracleProposals(
174+
ctx context.Context,
175+
e cldf.Environment,
176+
cfg CsDistributeLLOJobSpecsConfig,
177+
chainID string,
178+
labels []*ptypes.Label,
179+
boostrapNodeIDs []string,
180+
) ([]*jobv1.ProposeJobRequest, error) {
157181
// nils will be filled out later with n-specific values:
158182
lloSpec := &jobs.LLOJobSpec{
159183
Base: jobs.Base{
@@ -193,7 +217,7 @@ func generateOracleProposals(ctx context.Context, e cldf.Environment, cfg CsDist
193217
return nil, fmt.Errorf("failed to get node chain configs: %w", err)
194218
}
195219

196-
bootstrapMultiaddr, err := getBootstrapMultiAddr(ctx, e, cfg)
220+
bootstrapMultiaddr, err := getBootstrapMultiAddr(ctx, e, cfg, boostrapNodeIDs)
197221
if err != nil {
198222
return nil, fmt.Errorf("failed to get bootstrap bootstrapMultiaddr: %w", err)
199223
}
@@ -276,45 +300,53 @@ func chainConfigs(ctx context.Context, e cldf.Environment, chainID string, nodes
276300
}
277301

278302
// getBootstrapMultiAddr fetches the bootstrap node from Job Distributor and returns its multiaddr.
279-
func getBootstrapMultiAddr(ctx context.Context, e cldf.Environment, cfg CsDistributeLLOJobSpecsConfig) (string, error) {
280-
// Get all bootstrap nodes for this DON.
281-
// We fetch these with a custom filter because the filter in the config defines which nodes need to be sent jobs
282-
// and this might not cover any bootstrap nodes.
283-
respBoots, err := e.Offchain.ListNodes(ctx, &node.ListNodesRequest{
284-
Filter: &node.ListNodesRequest_Filter{
285-
Selectors: []*ptypes.Selector{
286-
{
287-
Key: utils.DonIdentifier(cfg.Filter.DONID, cfg.Filter.DONName),
288-
Op: ptypes.SelectorOp_EXIST,
289-
},
290-
{
291-
Key: devenv.LabelNodeTypeKey,
292-
Op: ptypes.SelectorOp_EQ,
293-
Value: pointer.To(devenv.LabelNodeTypeValueBootstrap),
294-
},
295-
{
296-
Key: devenv.LabelEnvironmentKey,
297-
Op: ptypes.SelectorOp_EQ,
298-
Value: &cfg.Filter.EnvLabel,
299-
},
300-
{
301-
Key: devenv.LabelProductKey,
302-
Op: ptypes.SelectorOp_EQ,
303-
Value: pointer.To(utils.ProductLabel),
303+
// If boostrapNodeIDs is empty, it will return the first bootstrap node found for this DON.
304+
func getBootstrapMultiAddr(ctx context.Context, e cldf.Environment, cfg CsDistributeLLOJobSpecsConfig, boostrapNodeIDs []string) (string, error) {
305+
if len(boostrapNodeIDs) == 0 {
306+
// Get all bootstrap nodes for this DON.
307+
// We fetch these with a custom filter because the filter in the config defines which nodes need to be sent jobs
308+
// and this might not cover any bootstrap nodes.
309+
respBoots, err := e.Offchain.ListNodes(ctx, &node.ListNodesRequest{
310+
Filter: &node.ListNodesRequest_Filter{
311+
Selectors: []*ptypes.Selector{
312+
// We can afford to filter by DonIdentifier here because if the caller didn't provide any bootstrap node IDs,
313+
// then they are updating an existing job spec and the bootstrap nodes are already labeled with the DON ID.
314+
{
315+
Key: utils.DonIdentifier(cfg.Filter.DONID, cfg.Filter.DONName),
316+
Op: ptypes.SelectorOp_EXIST,
317+
},
318+
{
319+
Key: devenv.LabelNodeTypeKey,
320+
Op: ptypes.SelectorOp_EQ,
321+
Value: pointer.To(devenv.LabelNodeTypeValueBootstrap),
322+
},
323+
{
324+
Key: devenv.LabelEnvironmentKey,
325+
Op: ptypes.SelectorOp_EQ,
326+
Value: &cfg.Filter.EnvLabel,
327+
},
328+
{
329+
Key: devenv.LabelProductKey,
330+
Op: ptypes.SelectorOp_EQ,
331+
Value: pointer.To(utils.ProductLabel),
332+
},
304333
},
305334
},
306-
},
307-
})
308-
if err != nil {
309-
return "", fmt.Errorf("failed to list bootstrap nodes for DON %d - %s: %w", cfg.Filter.DONID, cfg.Filter.DONName, err)
335+
})
336+
if err != nil {
337+
return "", fmt.Errorf("failed to list bootstrap nodes for DON %d - %s: %w", cfg.Filter.DONID, cfg.Filter.DONName, err)
338+
}
339+
if len(respBoots.Nodes) == 0 {
340+
return "", errors.New("no bootstrap nodes found")
341+
}
342+
for _, n := range respBoots.Nodes {
343+
boostrapNodeIDs = append(boostrapNodeIDs, n.Id)
344+
}
310345
}
311346

312-
if len(respBoots.Nodes) == 0 {
313-
return "", errors.New("no bootstrap nodes found")
314-
}
315347
resp, err := e.Offchain.ListNodeChainConfigs(ctx, &node.ListNodeChainConfigsRequest{
316348
Filter: &node.ListNodeChainConfigsRequest_Filter{
317-
NodeIds: []string{respBoots.Nodes[0].Id},
349+
NodeIds: boostrapNodeIDs,
318350
},
319351
})
320352
if err != nil {
@@ -348,3 +380,27 @@ func (f CsDistributeLLOJobSpecs) VerifyPreconditions(_ cldf.Environment, config
348380

349381
return nil
350382
}
383+
384+
// labelNodesForProposals adds a DON Identifier label to the nodes for the given proposals.
385+
func labelNodesForProposals(ctx context.Context, jd cldf.OffchainClient, props []*jobv1.ProposeJobRequest, donIdentifier string) error {
386+
for _, p := range props {
387+
nodeResp, err := jd.GetNode(ctx, &node.GetNodeRequest{Id: p.NodeId})
388+
if err != nil {
389+
return fmt.Errorf("failed to get node %s: %w", p.NodeId, err)
390+
}
391+
newLabels := append(nodeResp.Node.Labels, &ptypes.Label{ //nolint: gocritic // local copy
392+
Key: donIdentifier,
393+
})
394+
395+
_, err = jd.UpdateNode(ctx, &node.UpdateNodeRequest{
396+
Id: p.NodeId,
397+
Name: nodeResp.Node.Name,
398+
PublicKey: nodeResp.Node.PublicKey,
399+
Labels: newLabels,
400+
})
401+
if err != nil {
402+
return fmt.Errorf("failed to label node %s: %w", p.NodeId, err)
403+
}
404+
}
405+
return nil
406+
}

deployment/data-streams/changeset/jd_distribute_llo_jobs_test.go

Lines changed: 3 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -7,19 +7,17 @@ import (
77
"github.com/ethereum/go-ethereum/common"
88
"github.com/stretchr/testify/require"
99

10+
"github.com/smartcontractkit/chainlink-protos/job-distributor/v1/shared/ptypes"
11+
1012
jdJob "github.com/smartcontractkit/chainlink-protos/job-distributor/v1/job"
1113

1214
cldf "github.com/smartcontractkit/chainlink-deployments-framework/deployment"
1315

14-
"github.com/smartcontractkit/chainlink-protos/job-distributor/v1/shared/ptypes"
15-
1616
"github.com/smartcontractkit/chainlink/deployment"
1717
"github.com/smartcontractkit/chainlink/deployment/common/changeset"
1818
"github.com/smartcontractkit/chainlink/deployment/data-streams/changeset/testutil"
1919
"github.com/smartcontractkit/chainlink/deployment/data-streams/jd"
20-
"github.com/smartcontractkit/chainlink/deployment/data-streams/utils"
2120
"github.com/smartcontractkit/chainlink/deployment/data-streams/utils/pointer"
22-
"github.com/smartcontractkit/chainlink/deployment/environment/devenv"
2321
)
2422

2523
func TestDistributeLLOJobSpecs(t *testing.T) {
@@ -31,19 +29,7 @@ func TestDistributeLLOJobSpecs(t *testing.T) {
3129
ShouldDeployLinkToken: false,
3230
NumNodes: 2,
3331
NumBootstrapNodes: 1,
34-
NodeLabels: []*ptypes.Label{
35-
{
36-
Key: devenv.LabelProductKey,
37-
Value: pointer.To(utils.ProductLabel),
38-
},
39-
{
40-
Key: devenv.LabelEnvironmentKey,
41-
Value: pointer.To(testutil.TestDON.Env),
42-
},
43-
{
44-
Key: utils.DonIdentifier(testutil.TestDON.ID, testutil.TestDON.Name),
45-
},
46-
},
32+
NodeLabels: testutil.GetNodeLabels(testutil.TestDON.ID, testutil.TestDON.Name, testutil.TestDON.Env),
4733
}).Environment
4834

4935
bootstrapNodeNames, oracleNodeNames := collectNodeNames(t, env, 2, 1)

deployment/data-streams/changeset/jd_distribute_stream_jobs_test.go

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ import (
88
jdJob "github.com/smartcontractkit/chainlink-protos/job-distributor/v1/job"
99
"github.com/smartcontractkit/chainlink-protos/job-distributor/v1/shared/ptypes"
1010

11+
"github.com/smartcontractkit/chainlink-protos/job-distributor/v1/node"
12+
1113
cldf "github.com/smartcontractkit/chainlink-deployments-framework/deployment"
1214

1315
"github.com/smartcontractkit/chainlink/deployment"
@@ -40,12 +42,19 @@ func TestDistributeStreamJobSpecs(t *testing.T) {
4042
},
4143
}).Environment
4244

45+
resp, err := env.Offchain.ListNodes(t.Context(), &node.ListNodesRequest{})
46+
require.NoError(t, err)
47+
nodeNames := make([]string, 0, len(resp.Nodes))
48+
for _, n := range resp.Nodes {
49+
nodeNames = append(nodeNames, n.Name)
50+
}
51+
4352
// pick the first EVM chain selector
4453
chainSelector := env.AllChainSelectors()[0]
4554

4655
// insert a Configurator address for the given DON
4756
configuratorAddr := "0x4170ed0880ac9a755fd29b2688956bd959f923f4"
48-
err := env.ExistingAddresses.Save(chainSelector, configuratorAddr,
57+
err = env.ExistingAddresses.Save(chainSelector, configuratorAddr,
4958
cldf.TypeAndVersion{
5059
Type: "Configurator",
5160
Version: deployment.Version1_0_0,
@@ -140,7 +149,7 @@ ask_price [type=median allowedFaults=3 index=2];
140149
Value: pointer.To("customTestValue"),
141150
},
142151
},
143-
NodeNames: []string{"node-0", "node-1", "node-2"},
152+
NodeNames: nodeNames,
144153
}
145154

146155
tests := []struct {
@@ -165,7 +174,7 @@ ask_price [type=median allowedFaults=3 index=2];
165174
// This happens to also be a job update when run after "success" because the two use the same ExternalJobID.
166175
name: "success sending jobs to a subset of nodes",
167176
prepConfFn: func(c CsDistributeStreamJobSpecsConfig) CsDistributeStreamJobSpecsConfig {
168-
c.NodeNames = []string{"node-0"}
177+
c.NodeNames = []string{nodeNames[0]}
169178
c.Filter = &jd.ListFilter{
170179
DONID: testutil.TestDON.ID,
171180
DONName: testutil.TestDON.Name,
@@ -181,7 +190,7 @@ ask_price [type=median allowedFaults=3 index=2];
181190
// This happens to also be a job update when run after "success" because the two use the same ExternalJobID.
182191
name: "success sending jobs to a different subset of nodes",
183192
prepConfFn: func(c CsDistributeStreamJobSpecsConfig) CsDistributeStreamJobSpecsConfig {
184-
c.NodeNames = []string{"node-1", "node-2"}
193+
c.NodeNames = nodeNames[1:]
185194
c.Filter = &jd.ListFilter{
186195
DONID: testutil.TestDON.ID,
187196
DONName: testutil.TestDON.Name,

deployment/data-streams/changeset/testutil/test_helpers.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,12 @@ import (
2626
"github.com/smartcontractkit/chainlink/deployment/common/proposalutils"
2727
"github.com/smartcontractkit/chainlink/deployment/common/types"
2828

29+
"github.com/smartcontractkit/chainlink/v2/core/logger"
30+
2931
commonChangesets "github.com/smartcontractkit/chainlink/deployment/common/changeset"
3032
dsTypes "github.com/smartcontractkit/chainlink/deployment/data-streams/changeset/types"
3133
"github.com/smartcontractkit/chainlink/deployment/environment/devenv"
3234
"github.com/smartcontractkit/chainlink/deployment/environment/memory"
33-
"github.com/smartcontractkit/chainlink/v2/core/logger"
3435
)
3536

3637
// TestChain is the chain used by the in-memory environment.

deployment/data-streams/jd/jd.go

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,6 @@ type ListFilter struct {
2929
func (f *ListFilter) bootstrappersFilter() *nodeapiv1.ListNodesRequest_Filter {
3030
return &nodeapiv1.ListNodesRequest_Filter{
3131
Selectors: []*jdtypesv1.Selector{
32-
{
33-
Key: utils.DonIdentifier(f.DONID, f.DONName),
34-
Op: jdtypesv1.SelectorOp_EXIST,
35-
},
3632
{
3733
Key: devenv.LabelNodeTypeKey,
3834
Op: jdtypesv1.SelectorOp_EQ,
@@ -56,10 +52,6 @@ func (f *ListFilter) bootstrappersFilter() *nodeapiv1.ListNodesRequest_Filter {
5652
func (f *ListFilter) oraclesFilter() *nodeapiv1.ListNodesRequest_Filter {
5753
return &nodeapiv1.ListNodesRequest_Filter{
5854
Selectors: []*jdtypesv1.Selector{
59-
{
60-
Key: utils.DonIdentifier(f.DONID, f.DONName),
61-
Op: jdtypesv1.SelectorOp_EXIST,
62-
},
6355
{
6456
Key: devenv.LabelNodeTypeKey,
6557
Op: jdtypesv1.SelectorOp_EQ,

deployment/data-streams/jobs/streamtype.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,9 @@ const (
66
StreamTypeQuote = StreamType("quote")
77
StreamTypeMedian = StreamType("median")
88
StreamTypeMarketStatus = StreamType("market-status")
9+
StreamTypeFundingRate = StreamType("funding-rate")
10+
StreamTypeRWA = StreamType("rwa")
11+
StreamTypeDEX = StreamType("dex")
912
StreamTypeDataLink = StreamType("data-link")
1013
// StreamTypeConsolidated is used for the consolidated stream type
1114
StreamTypeConsolidated = StreamType("consolidated")

deployment/environment/memory/node.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -403,7 +403,9 @@ func NewNode(
403403

404404
setupJD(t, app)
405405
return &Node{
406-
App: app,
406+
Name: "node-" + keys.PeerID.String(),
407+
ID: app.ID().String(),
408+
App: app,
407409
Chains: slices.Concat(
408410
maps.Keys(nodecfg.Chains),
409411
maps.Keys(nodecfg.Solchains),

deployment/environment/memory/node_service_client.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ func (j *JobClient) RegisterNode(ctx context.Context, in *nodev1.RegisterNodeReq
5757
}
5858

5959
// UpdateNode only updates the labels of the node.
60+
// WARNING: The provided input will *overwrite* the existing fields, it won't extend them.
6061
// TODO: Updating the PublicKey is not supported in this implementation.
6162
func (j JobClient) UpdateNode(ctx context.Context, in *nodev1.UpdateNodeRequest, opts ...grpc.CallOption) (*nodev1.UpdateNodeResponse, error) {
6263
node, err := j.nodeStore.get(in.Id)

0 commit comments

Comments
 (0)