Skip to content
134 changes: 95 additions & 39 deletions deployment/data-streams/changeset/jd_distribute_llo_jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,22 +77,39 @@ func (CsDistributeLLOJobSpecs) Apply(e cldf.Environment, cfg CsDistributeLLOJobS
if err != nil {
return cldf.ChangesetOutput{}, fmt.Errorf("failed to generate bootstrap proposals: %w", err)
}
oracleProposals, err := generateOracleProposals(ctx, e, cfg, chainID, cfg.Labels)
// These will be empty when we send only oracle jobs. In that case we'll fetch the bootstrappers by the don
// identifier label.
boostrapNodeIDs := make([]string, 0, len(bootstrapProposals))
for _, p := range bootstrapProposals {
boostrapNodeIDs = append(boostrapNodeIDs, p.NodeId)
}
oracleProposals, err := generateOracleProposals(ctx, e, cfg, chainID, cfg.Labels, boostrapNodeIDs)
if err != nil {
return cldf.ChangesetOutput{}, fmt.Errorf("failed to generate oracle proposals: %w", err)
}

proposedJobs, err := proposeAllOrNothing(ctx, e.Offchain, append(bootstrapProposals, oracleProposals...))
allProposals := append(bootstrapProposals, oracleProposals...) //nolint: gocritic // ignore a silly rule
proposedJobs, err := proposeAllOrNothing(ctx, e.Offchain, allProposals)
if err != nil {
return cldf.ChangesetOutput{}, fmt.Errorf("failed to propose all jobs: %w", err)
}

err = labelNodesForProposals(e.GetContext(), e.Offchain, allProposals, utils.DonIdentifier(cfg.Filter.DONID, cfg.Filter.DONName))
if err != nil {
return cldf.ChangesetOutput{}, fmt.Errorf("failed to label nodes for proposals: %w", err)
}

return cldf.ChangesetOutput{
Jobs: proposedJobs,
}, nil
}

func generateBootstrapProposals(ctx context.Context, e cldf.Environment, cfg CsDistributeLLOJobSpecsConfig, chainID string, labels []*ptypes.Label) ([]*jobv1.ProposeJobRequest, error) {
func generateBootstrapProposals(
ctx context.Context,
e cldf.Environment,
cfg CsDistributeLLOJobSpecsConfig,
chainID string,
labels []*ptypes.Label,
) ([]*jobv1.ProposeJobRequest, error) {
bootstrapNodes, err := jd.FetchDONBootstrappersFromJD(ctx, e.Offchain, cfg.Filter, cfg.NodeNames)
if err != nil {
return nil, fmt.Errorf("failed to get bootstrap nodes: %w", err)
Expand Down Expand Up @@ -153,7 +170,14 @@ func generateBootstrapProposals(ctx context.Context, e cldf.Environment, cfg CsD
return proposals, nil
}

func generateOracleProposals(ctx context.Context, e cldf.Environment, cfg CsDistributeLLOJobSpecsConfig, chainID string, labels []*ptypes.Label) ([]*jobv1.ProposeJobRequest, error) {
func generateOracleProposals(
ctx context.Context,
e cldf.Environment,
cfg CsDistributeLLOJobSpecsConfig,
chainID string,
labels []*ptypes.Label,
boostrapNodeIDs []string,
) ([]*jobv1.ProposeJobRequest, error) {
// nils will be filled out later with n-specific values:
lloSpec := &jobs.LLOJobSpec{
Base: jobs.Base{
Expand Down Expand Up @@ -193,7 +217,7 @@ func generateOracleProposals(ctx context.Context, e cldf.Environment, cfg CsDist
return nil, fmt.Errorf("failed to get node chain configs: %w", err)
}

bootstrapMultiaddr, err := getBootstrapMultiAddr(ctx, e, cfg)
bootstrapMultiaddr, err := getBootstrapMultiAddr(ctx, e, cfg, boostrapNodeIDs)
if err != nil {
return nil, fmt.Errorf("failed to get bootstrap bootstrapMultiaddr: %w", err)
}
Expand Down Expand Up @@ -276,45 +300,53 @@ func chainConfigs(ctx context.Context, e cldf.Environment, chainID string, nodes
}

// getBootstrapMultiAddr fetches the bootstrap node from Job Distributor and returns its multiaddr.
func getBootstrapMultiAddr(ctx context.Context, e cldf.Environment, cfg CsDistributeLLOJobSpecsConfig) (string, error) {
// Get all bootstrap nodes for this DON.
// We fetch these with a custom filter because the filter in the config defines which nodes need to be sent jobs
// and this might not cover any bootstrap nodes.
respBoots, err := e.Offchain.ListNodes(ctx, &node.ListNodesRequest{
Filter: &node.ListNodesRequest_Filter{
Selectors: []*ptypes.Selector{
{
Key: utils.DonIdentifier(cfg.Filter.DONID, cfg.Filter.DONName),
Op: ptypes.SelectorOp_EXIST,
},
{
Key: devenv.LabelNodeTypeKey,
Op: ptypes.SelectorOp_EQ,
Value: pointer.To(devenv.LabelNodeTypeValueBootstrap),
},
{
Key: devenv.LabelEnvironmentKey,
Op: ptypes.SelectorOp_EQ,
Value: &cfg.Filter.EnvLabel,
},
{
Key: devenv.LabelProductKey,
Op: ptypes.SelectorOp_EQ,
Value: pointer.To(utils.ProductLabel),
// If boostrapNodeIDs is empty, it will return the first bootstrap node found for this DON.
func getBootstrapMultiAddr(ctx context.Context, e cldf.Environment, cfg CsDistributeLLOJobSpecsConfig, boostrapNodeIDs []string) (string, error) {
if len(boostrapNodeIDs) == 0 {
// Get all bootstrap nodes for this DON.
// We fetch these with a custom filter because the filter in the config defines which nodes need to be sent jobs
// and this might not cover any bootstrap nodes.
respBoots, err := e.Offchain.ListNodes(ctx, &node.ListNodesRequest{
Filter: &node.ListNodesRequest_Filter{
Selectors: []*ptypes.Selector{
// We can afford to filter by DonIdentifier here because if the caller didn't provide any bootstrap node IDs,
// then they are updating an existing job spec and the bootstrap nodes are already labeled with the DON ID.
{
Key: utils.DonIdentifier(cfg.Filter.DONID, cfg.Filter.DONName),
Op: ptypes.SelectorOp_EXIST,
},
{
Key: devenv.LabelNodeTypeKey,
Op: ptypes.SelectorOp_EQ,
Value: pointer.To(devenv.LabelNodeTypeValueBootstrap),
},
{
Key: devenv.LabelEnvironmentKey,
Op: ptypes.SelectorOp_EQ,
Value: &cfg.Filter.EnvLabel,
},
{
Key: devenv.LabelProductKey,
Op: ptypes.SelectorOp_EQ,
Value: pointer.To(utils.ProductLabel),
},
},
},
},
})
if err != nil {
return "", fmt.Errorf("failed to list bootstrap nodes for DON %d - %s: %w", cfg.Filter.DONID, cfg.Filter.DONName, err)
})
if err != nil {
return "", fmt.Errorf("failed to list bootstrap nodes for DON %d - %s: %w", cfg.Filter.DONID, cfg.Filter.DONName, err)
}
if len(respBoots.Nodes) == 0 {
return "", errors.New("no bootstrap nodes found")
}
for _, n := range respBoots.Nodes {
boostrapNodeIDs = append(boostrapNodeIDs, n.Id)
}
}

if len(respBoots.Nodes) == 0 {
return "", errors.New("no bootstrap nodes found")
}
resp, err := e.Offchain.ListNodeChainConfigs(ctx, &node.ListNodeChainConfigsRequest{
Filter: &node.ListNodeChainConfigsRequest_Filter{
NodeIds: []string{respBoots.Nodes[0].Id},
NodeIds: boostrapNodeIDs,
},
})
if err != nil {
Expand Down Expand Up @@ -348,3 +380,27 @@ func (f CsDistributeLLOJobSpecs) VerifyPreconditions(_ cldf.Environment, config

return nil
}

// labelNodesForProposals adds a DON Identifier label to the nodes for the given proposals.
func labelNodesForProposals(ctx context.Context, jd cldf.OffchainClient, props []*jobv1.ProposeJobRequest, donIdentifier string) error {
for _, p := range props {
nodeResp, err := jd.GetNode(ctx, &node.GetNodeRequest{Id: p.NodeId})
if err != nil {
return fmt.Errorf("failed to get node %s: %w", p.NodeId, err)
}
newLabels := append(nodeResp.Node.Labels, &ptypes.Label{ //nolint: gocritic // local copy
Key: donIdentifier,
})

_, err = jd.UpdateNode(ctx, &node.UpdateNodeRequest{
Id: p.NodeId,
Name: nodeResp.Node.Name,
PublicKey: nodeResp.Node.PublicKey,
Labels: newLabels,
})
if err != nil {
return fmt.Errorf("failed to label node %s: %w", p.NodeId, err)
}
}
return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,17 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink-protos/job-distributor/v1/shared/ptypes"

jdJob "github.com/smartcontractkit/chainlink-protos/job-distributor/v1/job"

cldf "github.com/smartcontractkit/chainlink-deployments-framework/deployment"

"github.com/smartcontractkit/chainlink-protos/job-distributor/v1/shared/ptypes"

"github.com/smartcontractkit/chainlink/deployment"
"github.com/smartcontractkit/chainlink/deployment/common/changeset"
"github.com/smartcontractkit/chainlink/deployment/data-streams/changeset/testutil"
"github.com/smartcontractkit/chainlink/deployment/data-streams/jd"
"github.com/smartcontractkit/chainlink/deployment/data-streams/utils"
"github.com/smartcontractkit/chainlink/deployment/data-streams/utils/pointer"
"github.com/smartcontractkit/chainlink/deployment/environment/devenv"
)

func TestDistributeLLOJobSpecs(t *testing.T) {
Expand All @@ -31,19 +29,7 @@ func TestDistributeLLOJobSpecs(t *testing.T) {
ShouldDeployLinkToken: false,
NumNodes: 2,
NumBootstrapNodes: 1,
NodeLabels: []*ptypes.Label{
{
Key: devenv.LabelProductKey,
Value: pointer.To(utils.ProductLabel),
},
{
Key: devenv.LabelEnvironmentKey,
Value: pointer.To(testutil.TestDON.Env),
},
{
Key: utils.DonIdentifier(testutil.TestDON.ID, testutil.TestDON.Name),
},
},
NodeLabels: testutil.GetNodeLabels(testutil.TestDON.ID, testutil.TestDON.Name, testutil.TestDON.Env),
}).Environment

bootstrapNodeNames, oracleNodeNames := collectNodeNames(t, env, 2, 1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
jdJob "github.com/smartcontractkit/chainlink-protos/job-distributor/v1/job"
"github.com/smartcontractkit/chainlink-protos/job-distributor/v1/shared/ptypes"

"github.com/smartcontractkit/chainlink-protos/job-distributor/v1/node"

cldf "github.com/smartcontractkit/chainlink-deployments-framework/deployment"

"github.com/smartcontractkit/chainlink/deployment"
Expand Down Expand Up @@ -40,12 +42,19 @@ func TestDistributeStreamJobSpecs(t *testing.T) {
},
}).Environment

resp, err := env.Offchain.ListNodes(t.Context(), &node.ListNodesRequest{})
require.NoError(t, err)
nodeNames := make([]string, 0, len(resp.Nodes))
for _, n := range resp.Nodes {
nodeNames = append(nodeNames, n.Name)
}

// pick the first EVM chain selector
chainSelector := env.AllChainSelectors()[0]

// insert a Configurator address for the given DON
configuratorAddr := "0x4170ed0880ac9a755fd29b2688956bd959f923f4"
err := env.ExistingAddresses.Save(chainSelector, configuratorAddr,
err = env.ExistingAddresses.Save(chainSelector, configuratorAddr,
cldf.TypeAndVersion{
Type: "Configurator",
Version: deployment.Version1_0_0,
Expand Down Expand Up @@ -140,7 +149,7 @@ ask_price [type=median allowedFaults=3 index=2];
Value: pointer.To("customTestValue"),
},
},
NodeNames: []string{"node-0", "node-1", "node-2"},
NodeNames: nodeNames,
}

tests := []struct {
Expand All @@ -165,7 +174,7 @@ ask_price [type=median allowedFaults=3 index=2];
// This happens to also be a job update when run after "success" because the two use the same ExternalJobID.
name: "success sending jobs to a subset of nodes",
prepConfFn: func(c CsDistributeStreamJobSpecsConfig) CsDistributeStreamJobSpecsConfig {
c.NodeNames = []string{"node-0"}
c.NodeNames = []string{nodeNames[0]}
c.Filter = &jd.ListFilter{
DONID: testutil.TestDON.ID,
DONName: testutil.TestDON.Name,
Expand All @@ -181,7 +190,7 @@ ask_price [type=median allowedFaults=3 index=2];
// This happens to also be a job update when run after "success" because the two use the same ExternalJobID.
name: "success sending jobs to a different subset of nodes",
prepConfFn: func(c CsDistributeStreamJobSpecsConfig) CsDistributeStreamJobSpecsConfig {
c.NodeNames = []string{"node-1", "node-2"}
c.NodeNames = nodeNames[1:]
c.Filter = &jd.ListFilter{
DONID: testutil.TestDON.ID,
DONName: testutil.TestDON.Name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,12 @@ import (
"github.com/smartcontractkit/chainlink/deployment/common/proposalutils"
"github.com/smartcontractkit/chainlink/deployment/common/types"

"github.com/smartcontractkit/chainlink/v2/core/logger"

commonChangesets "github.com/smartcontractkit/chainlink/deployment/common/changeset"
dsTypes "github.com/smartcontractkit/chainlink/deployment/data-streams/changeset/types"
"github.com/smartcontractkit/chainlink/deployment/environment/devenv"
"github.com/smartcontractkit/chainlink/deployment/environment/memory"
"github.com/smartcontractkit/chainlink/v2/core/logger"
)

// TestChain is the chain used by the in-memory environment.
Expand Down
8 changes: 0 additions & 8 deletions deployment/data-streams/jd/jd.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,6 @@ type ListFilter struct {
func (f *ListFilter) bootstrappersFilter() *nodeapiv1.ListNodesRequest_Filter {
return &nodeapiv1.ListNodesRequest_Filter{
Selectors: []*jdtypesv1.Selector{
{
Key: utils.DonIdentifier(f.DONID, f.DONName),
Op: jdtypesv1.SelectorOp_EXIST,
},
{
Key: devenv.LabelNodeTypeKey,
Op: jdtypesv1.SelectorOp_EQ,
Expand All @@ -56,10 +52,6 @@ func (f *ListFilter) bootstrappersFilter() *nodeapiv1.ListNodesRequest_Filter {
func (f *ListFilter) oraclesFilter() *nodeapiv1.ListNodesRequest_Filter {
return &nodeapiv1.ListNodesRequest_Filter{
Selectors: []*jdtypesv1.Selector{
{
Key: utils.DonIdentifier(f.DONID, f.DONName),
Op: jdtypesv1.SelectorOp_EXIST,
},
{
Key: devenv.LabelNodeTypeKey,
Op: jdtypesv1.SelectorOp_EQ,
Expand Down
3 changes: 3 additions & 0 deletions deployment/data-streams/jobs/streamtype.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ const (
StreamTypeQuote = StreamType("quote")
StreamTypeMedian = StreamType("median")
StreamTypeMarketStatus = StreamType("market-status")
StreamTypeFundingRate = StreamType("funding-rate")
StreamTypeRWA = StreamType("rwa")
StreamTypeDEX = StreamType("dex")
StreamTypeDataLink = StreamType("data-link")
// StreamTypeConsolidated is used for the consolidated stream type
StreamTypeConsolidated = StreamType("consolidated")
Expand Down
4 changes: 3 additions & 1 deletion deployment/environment/memory/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,9 @@ func NewNode(

setupJD(t, app)
return &Node{
App: app,
Name: "node-" + keys.PeerID.String(),
ID: app.ID().String(),
App: app,
Chains: slices.Concat(
maps.Keys(nodecfg.Chains),
maps.Keys(nodecfg.Solchains),
Expand Down
1 change: 1 addition & 0 deletions deployment/environment/memory/node_service_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ func (j *JobClient) RegisterNode(ctx context.Context, in *nodev1.RegisterNodeReq
}

// UpdateNode only updates the labels of the node.
// WARNING: The provided input will *overwrite* the existing fields, it won't extend them.
// TODO: Updating the PublicKey is not supported in this implementation.
func (j JobClient) UpdateNode(ctx context.Context, in *nodev1.UpdateNodeRequest, opts ...grpc.CallOption) (*nodev1.UpdateNodeResponse, error) {
node, err := j.nodeStore.get(in.Id)
Expand Down
Loading