Skip to content

Commit 3e6d8ec

Browse files
authored
feat: add solana worker type (#153)
* wip: splitting solana worker and evm worker * wip: refactoring solana worker * feat: add timelock worker and Listen() partial implementation (pending log polling on next PR) * feat: add unit tests * feat: rename operations to operations_evm * fix: address comments * fix: refactor unit tests
1 parent 0f91c8f commit 3e6d8ec

11 files changed

Lines changed: 454 additions & 115 deletions

File tree

cmd/start.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ func startTimelock(cmd *cobra.Command) {
135135
}
136136

137137
if chainFamily == chain_selectors.FamilyEVM {
138-
tWorker, err := timelock.NewTimelockWorker(nodeURL, timelockAddress, callProxyAddress, privateKey,
138+
tWorker, err := timelock.NewTimelockWorkerEVM(nodeURL, timelockAddress, callProxyAddress, privateKey,
139139
big.NewInt(fromBlock), pollPeriod, eventListenerPollPeriod, eventListenerPollSize, dryRun, slog)
140140
if err != nil {
141141
slog.Fatalf("error creating the timelock-worker: %s", err.Error())

go.mod

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,13 @@ require (
88
github.com/avast/retry-go/v4 v4.6.1
99
github.com/docker/go-connections v0.5.0
1010
github.com/ethereum/go-ethereum v1.15.7
11+
github.com/gagliardetto/solana-go v1.12.0
1112
github.com/google/go-cmp v0.7.0
1213
github.com/prometheus/client_golang v1.21.1
1314
github.com/samber/lo v1.47.0
1415
github.com/smartcontractkit/ccip-owner-contracts v0.0.0-20240917103524-56f1a8d2cd4b
1516
github.com/smartcontractkit/chain-selectors v1.0.55
16-
github.com/smartcontractkit/mcms v0.19.2
17+
github.com/smartcontractkit/mcms v0.20.1
1718
github.com/spf13/cobra v1.8.1
1819
github.com/spf13/viper v1.19.0
1920
github.com/stretchr/testify v1.10.0
@@ -29,7 +30,6 @@ require (
2930
github.com/DataDog/zstd v1.5.2 // indirect
3031
github.com/Microsoft/go-winio v0.6.2 // indirect
3132
github.com/VictoriaMetrics/fastcache v1.12.2 // indirect
32-
github.com/allegro/bigcache v1.2.1 // indirect
3333
github.com/benbjohnson/clock v1.3.5 // indirect
3434
github.com/beorn7/perks v1.0.1 // indirect
3535
github.com/bits-and-blooms/bitset v1.17.0 // indirect
@@ -54,7 +54,6 @@ require (
5454
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
5555
github.com/deckarep/golang-set/v2 v2.6.0 // indirect
5656
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.4.0 // indirect
57-
github.com/deepmap/oapi-codegen v1.8.2 // indirect
5857
github.com/distribution/reference v0.6.0 // indirect
5958
github.com/docker/docker v27.3.1+incompatible // indirect
6059
github.com/docker/go-units v0.5.0 // indirect
@@ -65,9 +64,7 @@ require (
6564
github.com/fsnotify/fsnotify v1.8.0 // indirect
6665
github.com/gabriel-vasile/mimetype v1.4.8 // indirect
6766
github.com/gagliardetto/binary v0.8.0 // indirect
68-
github.com/gagliardetto/solana-go v1.12.0 // indirect
6967
github.com/gagliardetto/treeout v0.1.4 // indirect
70-
github.com/gballet/go-libpcsclite v0.0.0-20191108122812-4678299bea08 // indirect
7168
github.com/getsentry/sentry-go v0.27.0 // indirect
7269
github.com/go-logr/logr v1.4.2 // indirect
7370
github.com/go-logr/stdr v1.2.2 // indirect
@@ -88,11 +85,9 @@ require (
8885
github.com/holiman/uint256 v1.3.2 // indirect
8986
github.com/huin/goupnp v1.3.0 // indirect
9087
github.com/inconshreveable/mousetrap v1.1.0 // indirect
91-
github.com/influxdata/line-protocol v0.0.0-20210311194329-9aa0e372d097 // indirect
9288
github.com/jackpal/go-nat-pmp v1.0.2 // indirect
9389
github.com/json-iterator/go v1.1.12 // indirect
9490
github.com/klauspost/compress v1.18.0 // indirect
95-
github.com/klauspost/cpuid/v2 v2.2.9 // indirect
9691
github.com/kr/pretty v0.3.1 // indirect
9792
github.com/kr/text v0.2.0 // indirect
9893
github.com/leodido/go-urn v1.4.0 // indirect
@@ -121,7 +116,6 @@ require (
121116
github.com/olekukonko/tablewriter v0.0.5 // indirect
122117
github.com/opencontainers/go-digest v1.0.0 // indirect
123118
github.com/opencontainers/image-spec v1.1.0 // indirect
124-
github.com/opentracing/opentracing-go v1.2.0 // indirect
125119
github.com/pelletier/go-toml/v2 v2.2.3 // indirect
126120
github.com/pion/dtls/v2 v2.2.7 // indirect
127121
github.com/pion/logging v0.2.2 // indirect
@@ -144,11 +138,11 @@ require (
144138
github.com/shirou/gopsutil/v3 v3.23.12 // indirect
145139
github.com/shoenig/go-m1cpu v0.1.6 // indirect
146140
github.com/sirupsen/logrus v1.9.3 // indirect
147-
github.com/smartcontractkit/chainlink-ccip/chains/solana v0.0.0-20250512193142-11507db18550 // indirect
141+
github.com/smartcontractkit/chainlink-ccip/chains/solana v0.0.0-20250515132731-ad40fab9b75e // indirect
148142
github.com/sourcegraph/conc v0.3.0 // indirect
149143
github.com/spf13/afero v1.11.0 // indirect
150144
github.com/spf13/cast v1.7.1 // indirect
151-
github.com/spf13/pflag v1.0.6 // indirect
145+
github.com/spf13/pflag v1.0.5 // indirect
152146
github.com/streamingfast/logging v0.0.0-20230608130331-f22c91403091 // indirect
153147
github.com/stretchr/objx v0.5.2 // indirect
154148
github.com/subosito/gotenv v1.6.0 // indirect

go.sum

Lines changed: 16 additions & 58 deletions
Large diffs are not rendered by default.
Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ import (
1919
// - The predecessor operation is finished
2020
// - The operation is ready to be executed
2121
// Otherwise the operation will throw an info log and wait for a future tick.
22-
func (tw *Worker) execute(ctx context.Context, op []*contracts.RBACTimelockCallScheduled) {
22+
func (tw *WorkerEVM) execute(ctx context.Context, op []*contracts.RBACTimelockCallScheduled) {
2323
isReady, err := isReady(ctx, tw.contract, op[0].Id)
2424
if err != nil {
2525
tw.logger.Errorw("unable to read operation %x \"ready\" status: %s", op[0].Id, err.Error())
@@ -48,7 +48,7 @@ func (tw *Worker) execute(ctx context.Context, op []*contracts.RBACTimelockCallS
4848
}
4949

5050
// executeCallScheduleOperation is the handler to execute a CallScheduled operation.
51-
func (tw *Worker) executeCallSchedule(ctx context.Context, c *contracts.RBACTimelockTransactor, cs []*contracts.RBACTimelockCallScheduled, privateKey *ecdsa.PrivateKey) (*types.Transaction, error) {
51+
func (tw *WorkerEVM) executeCallSchedule(ctx context.Context, c *contracts.RBACTimelockTransactor, cs []*contracts.RBACTimelockCallScheduled, privateKey *ecdsa.PrivateKey) (*types.Transaction, error) {
5252
fromAddress, err := privateKeyToAddress(privateKey)
5353
if err != nil {
5454
return nil, err
@@ -123,7 +123,7 @@ func isPending(ctx context.Context, c *contracts.RBACTimelock, id [32]byte) (boo
123123
}
124124

125125
// signTx is a function that implements the type SignerFn, so can be passed as a Signer method.
126-
func (tw *Worker) signTx(chainID *big.Int) bind.SignerFn {
126+
func (tw *WorkerEVM) signTx(chainID *big.Int) bind.SignerFn {
127127
return func(address common.Address, tx *types.Transaction) (*types.Transaction, error) {
128128
signedTx, err := types.SignTx(tx, types.LatestSignerForChainID(chainID), tw.privateKey)
129129
if err != nil {
@@ -134,9 +134,9 @@ func (tw *Worker) signTx(chainID *big.Int) bind.SignerFn {
134134
}
135135
}
136136

137-
// privateKeyToAddress is an util function to calculate the address of a given private key.
137+
// privateKeyToAddress is an util function to calculate the addresses of a given private key.
138138
// From a private key the public key can be deducted, and with the pubkey is
139-
// trivial to calculate the address.
139+
// trivial to calculate the addresses.
140140
func privateKeyToAddress(privateKey *ecdsa.PrivateKey) (common.Address, error) {
141141
publicKeyECDSA, ok := privateKey.Public().(*ecdsa.PublicKey)
142142
if !ok {

pkg/timelock/worker.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package timelock
2+
3+
import "context"
4+
5+
type Worker interface {
6+
Listen(ctx context.Context) error
7+
}
Lines changed: 31 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,15 @@ import (
2424
"github.com/smartcontractkit/timelock-worker/pkg/isclosed"
2525
)
2626

27-
// Worker represents a worker instance.
28-
// address is an array of addresses as expected by ethereum.FilterQuery,
29-
// but it's enforced only to one address in the logic.
30-
type Worker struct {
27+
// WorkerEVM represents an EVM worker instance.
28+
// addresses is an array of addresses as expected by ethereum.FilterQuery,
29+
// but it's enforced only to one addresses in the logic.
30+
type WorkerEVM struct {
3131
ethClient *ethclient.Client
3232
contract *contracts.RBACTimelock
3333
executeContract *contracts.RBACTimelock
3434
abi *abi.ABI
35-
address []common.Address
35+
addresses []common.Address
3636
fromBlock *big.Int
3737
pollPeriod int64
3838
listenerPollPeriod int64
@@ -47,12 +47,12 @@ var httpSchemes = []string{"http", "https"}
4747

4848
var validNodeUrlSchemes = []string{"http", "https", "ws", "wss"}
4949

50-
// NewTimelockWorker initializes and returns a timelockWorker.
50+
// NewTimelockWorkerEVM initializes and returns a timelockWorker.
5151
// It's a singleton, so further executions will retrieve the same timelockWorker.
52-
func NewTimelockWorker(
52+
func NewTimelockWorkerEVM(
5353
nodeURL, timelockAddress, callProxyAddress, privateKey string, fromBlock *big.Int,
5454
pollPeriod int64, listenerPollPeriod int64, pollSize uint64, dryRun bool, logger *zap.SugaredLogger,
55-
) (*Worker, error) {
55+
) (*WorkerEVM, error) {
5656
// Sanity check on each provided variable before allocating more resources.
5757
u, err := url.ParseRequestURI(nodeURL)
5858
if err != nil {
@@ -64,11 +64,11 @@ func NewTimelockWorker(
6464
}
6565

6666
if !common.IsHexAddress(timelockAddress) {
67-
return nil, fmt.Errorf("timelock address provided is not valid: %s", timelockAddress)
67+
return nil, fmt.Errorf("timelock addresses provided is not valid: %s", timelockAddress)
6868
}
6969

7070
if !common.IsHexAddress(callProxyAddress) {
71-
return nil, fmt.Errorf("call proxy address provided is not valid: %s", callProxyAddress)
71+
return nil, fmt.Errorf("call proxy addresses provided is not valid: %s", callProxyAddress)
7272
}
7373

7474
if pollPeriod <= 0 {
@@ -105,7 +105,7 @@ func NewTimelockWorker(
105105
}
106106

107107
// The contract ABI give grants capabilities such as parsing events and accessing to fields.
108-
// As NewTimelock only accepts one contract, hardcode it to address[0].
108+
// As NewTimelock only accepts one contract, hardcode it to addresses[0].
109109
timelockContract, err := contracts.NewRBACTimelock(common.HexToAddress(timelockAddress), ethClient)
110110
if err != nil {
111111
return nil, err
@@ -123,12 +123,12 @@ func NewTimelockWorker(
123123
return nil, err
124124
}
125125

126-
tWorker := &Worker{
126+
tWorker := &WorkerEVM{
127127
ethClient: ethClient,
128128
contract: timelockContract,
129129
executeContract: executeContract,
130130
abi: timelockABI,
131-
address: []common.Address{common.HexToAddress(timelockAddress)},
131+
addresses: []common.Address{common.HexToAddress(timelockAddress)},
132132
fromBlock: fromBlock,
133133
pollPeriod: pollPeriod,
134134
listenerPollPeriod: listenerPollPeriod,
@@ -147,9 +147,9 @@ func NewTimelockWorker(
147147
return tWorker, nil
148148
}
149149

150-
// Listen is the main function of a Timelock Worker.
150+
// Listen is the main function of a Timelock WorkerEVM.
151151
// It handles the retrieval of old and new events, contexts and cancellations.
152-
func (tw *Worker) Listen(ctx context.Context) error {
152+
func (tw *WorkerEVM) Listen(ctx context.Context) error {
153153
ctxwc, cancel := signal.NotifyContext(ctx, syscall.SIGINT, syscall.SIGTERM)
154154

155155
// Log timelock-worker configuration.
@@ -199,9 +199,9 @@ func (tw *Worker) Listen(ctx context.Context) error {
199199
}
200200

201201
// setupFilterQuery returns an ethereum.FilterQuery initialized to watch the Timelock contract.
202-
func (tw *Worker) setupFilterQuery(fromBlock, toBlock *big.Int) ethereum.FilterQuery {
202+
func (tw *WorkerEVM) setupFilterQuery(fromBlock, toBlock *big.Int) ethereum.FilterQuery {
203203
return ethereum.FilterQuery{
204-
Addresses: tw.address,
204+
Addresses: tw.addresses,
205205
FromBlock: fromBlock,
206206
ToBlock: toBlock,
207207
Topics: [][]common.Hash{},
@@ -214,7 +214,7 @@ func (tw *Worker) setupFilterQuery(fromBlock, toBlock *big.Int) ethereum.FilterQ
214214
// The actual retrieval is performed by either `subscribeNewLogs`, if the node connection
215215
// supports subscriptions, or `pollNewLogs` otherwise. In practice, the ethclient library
216216
// simply checks if the given node URL is "http(s)" or not.
217-
func (tw *Worker) retrieveNewLogs(ctx context.Context) (<-chan struct{}, <-chan types.Log, error) {
217+
func (tw *WorkerEVM) retrieveNewLogs(ctx context.Context) (<-chan struct{}, <-chan types.Log, error) {
218218
if tw.ethClient.Client().SupportsSubscriptions() {
219219
return tw.subscribeNewLogs(ctx)
220220
}
@@ -223,7 +223,7 @@ func (tw *Worker) retrieveNewLogs(ctx context.Context) (<-chan struct{}, <-chan
223223
}
224224

225225
// subscribeNewLogs subscribes to a Timelock contract and emit logs through the channel it returns.
226-
func (tw *Worker) subscribeNewLogs(ctx context.Context) (<-chan struct{}, <-chan types.Log, error) {
226+
func (tw *WorkerEVM) subscribeNewLogs(ctx context.Context) (<-chan struct{}, <-chan types.Log, error) {
227227
query := tw.setupFilterQuery(tw.fromBlock, nil)
228228
logCh := make(chan types.Log)
229229
done := make(chan struct{})
@@ -285,7 +285,7 @@ func (tw *Worker) subscribeNewLogs(ctx context.Context) (<-chan struct{}, <-chan
285285
}
286286

287287
// pollNewLogs periodically retrieves logs from the Timelock and emit them through the channel it returns.
288-
func (tw *Worker) pollNewLogs(ctx context.Context) (<-chan struct{}, <-chan types.Log, error) {
288+
func (tw *WorkerEVM) pollNewLogs(ctx context.Context) (<-chan struct{}, <-chan types.Log, error) {
289289
lastBlock := tw.fromBlock
290290
logCh := make(chan types.Log)
291291
done := make(chan struct{})
@@ -318,7 +318,7 @@ func (tw *Worker) pollNewLogs(ctx context.Context) (<-chan struct{}, <-chan type
318318

319319
// retrieveHistoricalLogs returns a types.Log channel and retrieves all the historical events of a given contract.
320320
// Once all the logs have been sent into the channel the function returns and the channel is closed.
321-
func (tw *Worker) retrieveHistoricalLogs(ctx context.Context) (<-chan struct{}, <-chan types.Log, error) {
321+
func (tw *WorkerEVM) retrieveHistoricalLogs(ctx context.Context) (<-chan struct{}, <-chan types.Log, error) {
322322
query := tw.setupFilterQuery(tw.fromBlock, nil)
323323
logCh := make(chan types.Log)
324324
done := make(chan struct{})
@@ -360,7 +360,7 @@ func (tw *Worker) retrieveHistoricalLogs(ctx context.Context) (<-chan struct{},
360360
return done, logCh, nil
361361
}
362362

363-
func (tw *Worker) fetchAndDispatchLogs(
363+
func (tw *WorkerEVM) fetchAndDispatchLogs(
364364
ctx context.Context, logCh chan types.Log, fromBlock, currentChainBlock *big.Int,
365365
) *big.Int {
366366
if currentChainBlock == nil {
@@ -411,7 +411,7 @@ func (tw *Worker) fetchAndDispatchLogs(
411411

412412
// processLogs is implemented as a fan-in for all the logs channels, merging all the data and handling logs sequentially.
413413
// This function is thread safe.
414-
func (tw *Worker) processLogs(ctx context.Context, oldLog, newLog <-chan types.Log) <-chan struct{} {
414+
func (tw *WorkerEVM) processLogs(ctx context.Context, oldLog, newLog <-chan types.Log) <-chan struct{} {
415415
var (
416416
done, newDone, oldDone = make(chan struct{}), make(chan struct{}), make(chan struct{})
417417
ctxwc, cancel = context.WithCancel(ctx)
@@ -468,7 +468,7 @@ func (tw *Worker) processLogs(ctx context.Context, oldLog, newLog <-chan types.L
468468
// handleLog handles the logic of parsing every event, its type and actions associated to each one.
469469
// CallScheduled events have to be added to the scheduler.
470470
// CallExecuted and CallCanceled signals an event that has to be removed from the scheduler.
471-
func (tw *Worker) handleLog(ctx context.Context, log types.Log) error {
471+
func (tw *WorkerEVM) handleLog(ctx context.Context, log types.Log) error {
472472
// Ignore logs with no topics.
473473
if len(log.Topics) == 0 {
474474
return nil
@@ -499,7 +499,7 @@ func (tw *Worker) handleLog(ctx context.Context, log types.Log) error {
499499

500500
// A CallScheduled event should be added to an scheduler only if it's not already done
501501
// and it's a valid Operation.
502-
func (tw *Worker) handleEventScheduled(ctx context.Context, log types.Log) error {
502+
func (tw *WorkerEVM) handleEventScheduled(ctx context.Context, log types.Log) error {
503503
cs, err := tw.contract.ParseCallScheduled(log)
504504
if err != nil {
505505
return fmt.Errorf("failed to parse CallScheduled log: %w", err)
@@ -532,7 +532,7 @@ func (tw *Worker) handleEventScheduled(ctx context.Context, log types.Log) error
532532
}
533533

534534
// A CallExecuted which is in Done status should delete the task in the scheduler store.
535-
func (tw *Worker) handleEventExecuted(ctx context.Context, log types.Log) error {
535+
func (tw *WorkerEVM) handleEventExecuted(ctx context.Context, log types.Log) error {
536536
cs, err := tw.contract.ParseCallExecuted(log)
537537
if err != nil {
538538
return fmt.Errorf("failed to parse CallExecuted log: %w", err)
@@ -558,7 +558,7 @@ func (tw *Worker) handleEventExecuted(ctx context.Context, log types.Log) error
558558
}
559559

560560
// A Cancelled which is in Done status should delete the task in the scheduler store.
561-
func (tw *Worker) handleEventCancelled(_ context.Context, log types.Log) error {
561+
func (tw *WorkerEVM) handleEventCancelled(_ context.Context, log types.Log) error {
562562
cs, err := tw.contract.ParseCancelled(log)
563563
if err != nil {
564564
return fmt.Errorf("failed to parse Cancelled log: %w", err)
@@ -574,16 +574,16 @@ func (tw *Worker) handleEventCancelled(_ context.Context, log types.Log) error {
574574
}
575575

576576
// startLog prints the timelock-worker configuration.
577-
func (tw *Worker) startLog() {
577+
func (tw *WorkerEVM) startLog() {
578578
tw.logger.Info("timelock-worker started")
579-
tw.logger.Infof("\tTimelock contract address: %v", tw.address[0])
579+
tw.logger.Infof("\tTimelock contract addresses: %v", tw.addresses[0])
580580

581581
wallet, err := privateKeyToAddress(tw.privateKey)
582582
if err != nil {
583-
tw.logger.Fatal("\tEOA address: unable to determine")
583+
tw.logger.Fatal("\tEOA addresses: unable to determine")
584584
}
585585

586-
tw.logger.Infof("\tEOA address: %v", wallet)
586+
tw.logger.Infof("\tEOA addresses: %v", wallet)
587587
tw.logger.Infof("\tStarting from block: %v", tw.fromBlock)
588588
tw.logger.Infof("\tPoll Period: %v", time.Duration(tw.pollPeriod*int64(time.Second)).String())
589589
tw.logger.Infof("\tEvent Listener Poll Period: %v", time.Duration(tw.listenerPollPeriod*int64(time.Second)).String())

0 commit comments

Comments
 (0)