Skip to content

Commit 8146902

Browse files
committed
fix: propagate context through tx, DB, and signing call chains
- Thread ctx through Submitter.RunEpoch/GetPayload so channel reads respect cancellation (mirrors fix already applied to SignatureSubmitter) - Add ctx to chain.Client interface (SendRawTx, Nonce) and all tx_utils functions so blockchain operations derive timeouts from the parent context instead of context.Background() - Add ctx to finalizerDB and epochClientDB interfaces; pass through to database calls replacing context.TODO() - Add ctx to SetGas, RegisterVoter, PreregisterVoter, SignNewSigningPolicy, SignUptimeVote, SignRewards so the epoch signing chain is cancellable
1 parent 9e8336b commit 8146902

18 files changed

Lines changed: 160 additions & 146 deletions

client/epoch/db.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,17 +10,17 @@ import (
1010
)
1111

1212
type epochClientDB interface {
13-
FetchLogsByAddressAndTopic0Timestamp(common.Address, common.Hash, int64, int64) ([]database.Log, error)
13+
FetchLogsByAddressAndTopic0Timestamp(ctx context.Context, address common.Address, topic0 common.Hash, from, to int64) ([]database.Log, error)
1414
}
1515

1616
type epochClientDBGorm struct {
1717
db *gorm.DB
1818
}
1919

2020
func (g epochClientDBGorm) FetchLogsByAddressAndTopic0Timestamp(
21-
address common.Address, topic0 common.Hash, fromTimestamp int64, toTimestamp int64,
21+
ctx context.Context, address common.Address, topic0 common.Hash, fromTimestamp int64, toTimestamp int64,
2222
) ([]database.Log, error) {
23-
return database.FetchLogsByAddressAndTopic0Timestamp(context.TODO(), g.db, database.LogsParams{
23+
return database.FetchLogsByAddressAndTopic0Timestamp(ctx, g.db, database.LogsParams{
2424
Address: address,
2525
Topic0: topic0,
2626
From: fromTimestamp,

client/epoch/epoch_client.go

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -162,58 +162,58 @@ func (c *client) Run(ctx context.Context) error {
162162
select {
163163
case rewardEpochStarted := <-epochStartedListener:
164164
logger.Debugf("RewardEpochStarted event emitted for epoch %v", rewardEpochStarted.RewardEpochId)
165-
c.preregisterVoter(new(big.Int).Add(rewardEpochStarted.RewardEpochId, big.NewInt(1)))
165+
c.preregisterVoter(ctx, new(big.Int).Add(rewardEpochStarted.RewardEpochId, big.NewInt(1)))
166166
case powerBlockData := <-vpbsListener:
167167
logger.Debugf("VotePowerBlockSelected event emitted for epoch %v", powerBlockData.RewardEpochId)
168-
c.registerVoter(powerBlockData.RewardEpochId)
168+
c.registerVoter(ctx, powerBlockData.RewardEpochId)
169169
case signingPolicy := <-policyListener:
170170
logger.Debugf("SigningPolicyInitialized event emitted for epoch %v", signingPolicy.RewardEpochId)
171-
c.signPolicy(signingPolicy.RewardEpochId, signingPolicy.SigningPolicyBytes)
171+
c.signPolicy(ctx, signingPolicy.RewardEpochId, signingPolicy.SigningPolicyBytes)
172172
case uptimeVoteEnabled := <-uptimeEnabledListener:
173173
logger.Debugf("SignUptimeVoteEnabled event emitted for epoch %v", uptimeVoteEnabled.RewardEpochId)
174-
c.signUptimeVote(uptimeVoteEnabled.RewardEpochId)
174+
c.signUptimeVote(ctx, uptimeVoteEnabled.RewardEpochId)
175175
case uptimeVoteSigned := <-uptimeSignedListener:
176176
logger.Infof("Uptime vote threshold reached for epoch %v, signing rewards", uptimeVoteSigned.RewardEpochId)
177-
c.signRewards(uptimeVoteSigned.RewardEpochId)
177+
c.signRewards(ctx, uptimeVoteSigned.RewardEpochId)
178178
case <-ctx.Done():
179179
return ctx.Err()
180180
}
181181
}
182182
}
183183

184-
func (c *client) registerVoter(epochID *big.Int) {
184+
func (c *client) registerVoter(ctx context.Context, epochID *big.Int) {
185185
if !c.isFutureEpoch(epochID) {
186186
logger.Debugf("Skipping registration process for old epoch %v", epochID)
187187
return
188188
}
189189

190190
logger.Infof("VotePowerBlockSelected event emitted for next epoch %v, starting registration", epochID)
191-
registerResult := <-c.registryClient.RegisterVoter(epochID, c.identityAddress)
191+
registerResult := <-c.registryClient.RegisterVoter(ctx, epochID, c.identityAddress)
192192
if !registerResult.Success {
193193
logger.Errorf("RegisterVoter failed %s", registerResult.Message)
194194
}
195195
}
196196

197-
func (c *client) preregisterVoter(epochID *big.Int) {
197+
func (c *client) preregisterVoter(ctx context.Context, epochID *big.Int) {
198198
if !c.isFutureEpoch(epochID) {
199199
logger.Debugf("Skipping pre-registration process for old epoch %v", epochID)
200200
return
201201
}
202202

203-
registerResult := <-c.registryClient.PreregisterVoter(epochID, c.identityAddress)
203+
registerResult := <-c.registryClient.PreregisterVoter(ctx, epochID, c.identityAddress)
204204
if !registerResult.Success {
205205
logger.Errorf("PreregisterVoter failed %s", registerResult.Message)
206206
}
207207
}
208208

209-
func (c *client) signPolicy(epochID *big.Int, policy []byte) {
209+
func (c *client) signPolicy(ctx context.Context, epochID *big.Int, policy []byte) {
210210
if !c.isFutureEpoch(epochID) {
211211
logger.Debugf("Skipping policy signing for old epoch %v", epochID)
212212
return
213213
}
214214

215215
logger.Infof("SigningPolicyInitialized event emitted for next epoch %v, signing new policy", epochID)
216-
signingResult := <-c.systemsManagerClient.SignNewSigningPolicy(epochID, policy)
216+
signingResult := <-c.systemsManagerClient.SignNewSigningPolicy(ctx, epochID, policy)
217217
if signingResult.Success {
218218
logger.Info("SignNewSigningPolicy success")
219219
} else {
@@ -222,8 +222,8 @@ func (c *client) signPolicy(epochID *big.Int, policy []byte) {
222222
}
223223
}
224224

225-
func (c *client) signUptimeVote(epochId *big.Int) {
226-
signUptimeVoteResult := <-c.systemsManagerClient.SignUptimeVote(epochId)
225+
func (c *client) signUptimeVote(ctx context.Context, epochId *big.Int) {
226+
signUptimeVoteResult := <-c.systemsManagerClient.SignUptimeVote(ctx, epochId)
227227
if signUptimeVoteResult.Success {
228228
logger.Info("SignUptimeVote completed")
229229
} else {
@@ -261,7 +261,7 @@ func (c *client) isFutureEpoch(epochID *big.Int) bool {
261261
//
262262
// Since reward claim data is currently published manually, and it might take a day or so for the data to be available,
263263
// a retry mechanism is employed with a large retry interval (configurable).
264-
func (c *client) signRewards(epochId *big.Int) {
264+
func (c *client) signRewards(ctx context.Context, epochId *big.Int) {
265265
res := shared.ExecuteWithRetryAttempts(func(i int) (*struct{}, error) {
266266
if c.systemsManagerClient.IsRewardHashSigned(epochId) {
267267
return nil, nil
@@ -280,7 +280,7 @@ func (c *client) signRewards(epochId *big.Int) {
280280
if err != nil {
281281
return nil, errors.Wrapf(err, "reward data verification for epoch %d failed", epochId)
282282
}
283-
signingResult := <-c.systemsManagerClient.SignRewards(epochId, hash, weightClaims)
283+
signingResult := <-c.systemsManagerClient.SignRewards(ctx, epochId, hash, weightClaims)
284284
if !signingResult.Success {
285285
return nil, errors.Errorf("unable to send reward signature")
286286
}

client/epoch/epoch_client_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,7 @@ func TestEpochClientRegisterErr(t *testing.T) {
258258
type testDB struct{}
259259

260260
func (db testDB) FetchLogsByAddressAndTopic0Timestamp(
261-
address common.Address, topic0 common.Hash, from, to int64,
261+
ctx context.Context, address common.Address, topic0 common.Hash, from, to int64,
262262
) ([]database.Log, error) {
263263
return nil, errors.New("not implemented")
264264
}
@@ -297,7 +297,7 @@ func (c testSystemsManagerClient) VotePowerBlockSelectedListener(
297297
}
298298

299299
func (c testSystemsManagerClient) SignNewSigningPolicy(
300-
epochID *big.Int, policy []byte,
300+
_ context.Context, epochID *big.Int, policy []byte,
301301
) <-chan shared.ExecuteStatus[any] {
302302
return shared.ExecuteWithRetryChan(func() (any, error) {
303303
if c.signingErr != nil {
@@ -358,7 +358,7 @@ func newTestRegistryClient() testRegistryClient {
358358
}
359359

360360
func (c testRegistryClient) RegisterVoter(
361-
epochID *big.Int, address common.Address,
361+
_ context.Context, epochID *big.Int, address common.Address,
362362
) <-chan shared.ExecuteStatus[any] {
363363
return shared.ExecuteWithRetryChan(func() (any, error) {
364364
if c.registerErr != nil {
@@ -385,7 +385,7 @@ func (c testSystemsManagerClient) RewardEpochStartedListener(db epochClientDB, c
385385
return make(chan *system.FlareSystemsManagerRewardEpochStarted)
386386
}
387387

388-
func (c testRegistryClient) PreregisterVoter(nextRewardEpochId *big.Int, address common.Address) <-chan shared.ExecuteStatus[any] {
388+
func (c testRegistryClient) PreregisterVoter(_ context.Context, nextRewardEpochId *big.Int, address common.Address) <-chan shared.ExecuteStatus[any] {
389389
return shared.ExecuteWithRetryChan(func() (any, error) {
390390
return nil, nil
391391
}, 1, 0)
@@ -395,7 +395,7 @@ func (c testSystemsManagerClient) SignUptimeVoteEnabledListener(db epochClientDB
395395
return make(chan *system.FlareSystemsManagerSignUptimeVoteEnabled)
396396
}
397397

398-
func (c testSystemsManagerClient) SignUptimeVote(b *big.Int) <-chan shared.ExecuteStatus[any] {
398+
func (c testSystemsManagerClient) SignUptimeVote(_ context.Context, b *big.Int) <-chan shared.ExecuteStatus[any] {
399399
return shared.ExecuteWithRetryChan(func() (any, error) {
400400
return nil, nil
401401
}, 1, 0)
@@ -405,7 +405,7 @@ func (c testSystemsManagerClient) UptimeVoteSignedListener(db epochClientDB, epo
405405
return make(chan *system.FlareSystemsManagerUptimeVoteSigned)
406406
}
407407

408-
func (c testSystemsManagerClient) SignRewards(b *big.Int, hash *common.Hash, claims int) <-chan shared.ExecuteStatus[any] {
408+
func (c testSystemsManagerClient) SignRewards(_ context.Context, b *big.Int, hash *common.Hash, claims int) <-chan shared.ExecuteStatus[any] {
409409
return shared.ExecuteWithRetryChan(func() (any, error) {
410410
return nil, nil
411411
}, 1, 0)

client/epoch/register_utils_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package epoch_test
22

33
import (
4+
"context"
45
"math/big"
56
"testing"
67

@@ -45,7 +46,7 @@ func TestSetGas(t *testing.T) {
4546
txOptions := new(bind.TransactOpts)
4647
txOptions.From = address
4748

48-
err = epoch.SetGas(txOptions, cl, &test.gasConfig)
49+
err = epoch.SetGas(context.Background(), txOptions, cl, &test.gasConfig)
4950

5051
require.Equal(t, address, txOptions.From)
5152

client/epoch/registry_utils.go

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -103,8 +103,8 @@ func init() {
103103
}
104104

105105
type registryContractClient interface {
106-
RegisterVoter(nextRewardEpochId *big.Int, address common.Address) <-chan shared.ExecuteStatus[any]
107-
PreregisterVoter(nextRewardEpochId *big.Int, address common.Address) <-chan shared.ExecuteStatus[any]
106+
RegisterVoter(ctx context.Context, nextRewardEpochId *big.Int, address common.Address) <-chan shared.ExecuteStatus[any]
107+
PreregisterVoter(ctx context.Context, nextRewardEpochId *big.Int, address common.Address) <-chan shared.ExecuteStatus[any]
108108
}
109109

110110
type registryContractClientImpl struct {
@@ -153,9 +153,9 @@ func NewRegistryContractClient(
153153
}
154154

155155
// RegisterVoter tries to register voter on VoterRegistry smart contract.
156-
func (r *registryContractClientImpl) RegisterVoter(nextRewardEpochID *big.Int, address common.Address) <-chan shared.ExecuteStatus[any] {
156+
func (r *registryContractClientImpl) RegisterVoter(ctx context.Context, nextRewardEpochID *big.Int, address common.Address) <-chan shared.ExecuteStatus[any] {
157157
return shared.ExecuteWithRetryChan(func() (any, error) {
158-
err := r.sendRegisterVoter(nextRewardEpochID, address)
158+
err := r.sendRegisterVoter(ctx, nextRewardEpochID, address)
159159
if err != nil {
160160
if shared.ExistsAsSubstring(nonFatalRegisterErrors, err.Error()) {
161161
logger.Debugf("Non fatal error sending register voter: %v", err)
@@ -167,7 +167,7 @@ func (r *registryContractClientImpl) RegisterVoter(nextRewardEpochID *big.Int, a
167167
}, shared.MaxTxSendRetriesLong, shared.TxRetryIntervalLong)
168168
}
169169

170-
func (r *registryContractClientImpl) sendRegisterVoter(nextRewardEpochID *big.Int, address common.Address) error {
170+
func (r *registryContractClientImpl) sendRegisterVoter(ctx context.Context, nextRewardEpochID *big.Int, address common.Address) error {
171171
epochID := uint32(nextRewardEpochID.Uint64())
172172

173173
var (
@@ -193,12 +193,13 @@ func (r *registryContractClientImpl) sendRegisterVoter(nextRewardEpochID *big.In
193193
V: signature[64] + 27,
194194
}
195195

196-
err = SetGas(r.senderTxOpts, r.ethClient, r.gasCfg)
196+
err = SetGas(ctx, r.senderTxOpts, r.ethClient, r.gasCfg)
197197
if err != nil {
198198
return err
199199
}
200200

201201
estimatedGasLimit, err := chain.DryRunTxAbi(
202+
ctx,
202203
r.ethClient,
203204
chain.DefaultTxTimeout,
204205
r.senderTxOpts.From,
@@ -224,7 +225,7 @@ func (r *registryContractClientImpl) sendRegisterVoter(nextRewardEpochID *big.In
224225
return fmt.Errorf("sending registry tx: %w", err)
225226
}
226227

227-
err = r.txVerifier.WaitUntilMined(r.senderTxOpts.From, tx, chain.DefaultTxTimeout)
228+
err = r.txVerifier.WaitUntilMined(ctx, r.senderTxOpts.From, tx, chain.DefaultTxTimeout)
228229
if err != nil {
229230
return err
230231
}
@@ -233,9 +234,9 @@ func (r *registryContractClientImpl) sendRegisterVoter(nextRewardEpochID *big.In
233234
}
234235

235236
// PreregisterVoter tries to pre-register voter on VoterPreRegistry smart contract.
236-
func (r *registryContractClientImpl) PreregisterVoter(nextRewardEpochId *big.Int, address common.Address) <-chan shared.ExecuteStatus[any] {
237+
func (r *registryContractClientImpl) PreregisterVoter(ctx context.Context, nextRewardEpochId *big.Int, address common.Address) <-chan shared.ExecuteStatus[any] {
237238
return shared.ExecuteWithRetryChan(func() (any, error) {
238-
err := r.sendPreRegisterVoter(nextRewardEpochId, address)
239+
err := r.sendPreRegisterVoter(ctx, nextRewardEpochId, address)
239240
if err != nil {
240241
if shared.ExistsAsSubstring(nonFatalPreregisterErrors, err.Error()) {
241242
logger.Debugf("Non fatal error sending pre-register voter: %v", err)
@@ -247,7 +248,7 @@ func (r *registryContractClientImpl) PreregisterVoter(nextRewardEpochId *big.Int
247248
}, shared.MaxTxSendRetries, shared.TxRetryInterval)
248249
}
249250

250-
func (r *registryContractClientImpl) sendPreRegisterVoter(nextRewardEpochID *big.Int, address common.Address) error {
251+
func (r *registryContractClientImpl) sendPreRegisterVoter(ctx context.Context, nextRewardEpochID *big.Int, address common.Address) error {
251252
epochID := uint32(nextRewardEpochID.Uint64())
252253

253254
var (
@@ -273,12 +274,13 @@ func (r *registryContractClientImpl) sendPreRegisterVoter(nextRewardEpochID *big
273274
V: signature[64] + 27,
274275
}
275276

276-
err = SetGas(r.senderTxOpts, r.ethClient, r.gasCfg)
277+
err = SetGas(ctx, r.senderTxOpts, r.ethClient, r.gasCfg)
277278
if err != nil {
278279
return fmt.Errorf("setting gas pre registry:%w", err)
279280
}
280281

281282
estimatedGasLimit, err := chain.DryRunTxAbi(
283+
ctx,
282284
r.ethClient,
283285
chain.DefaultTxTimeout,
284286
r.senderTxOpts.From,
@@ -304,7 +306,7 @@ func (r *registryContractClientImpl) sendPreRegisterVoter(nextRewardEpochID *big
304306
return fmt.Errorf("sending preregistry tx: %w", err)
305307
}
306308

307-
err = r.txVerifier.WaitUntilMined(r.senderTxOpts.From, tx, chain.DefaultTxTimeout)
309+
err = r.txVerifier.WaitUntilMined(ctx, r.senderTxOpts.From, tx, chain.DefaultTxTimeout)
308310
if err != nil {
309311
return err
310312
}
@@ -335,19 +337,19 @@ func (r *registryContractClientImpl) createSignatureNew(chainID int, nextRewardE
335337
}
336338

337339
// SetGas sets gas parameters in txOptions according to the gasConfig.
338-
func SetGas(txOptions *bind.TransactOpts, client *ethclient.Client, gasConfig *config.Gas) error {
340+
func SetGas(ctx context.Context, txOptions *bind.TransactOpts, client *ethclient.Client, gasConfig *config.Gas) error {
339341
switch gasConfig.TxType {
340342
case 0:
341-
gasPrice, err := chain.GetGasPrice(gasConfig, client, chain.DefaultTxTimeout)
343+
gasPrice, err := chain.GetGasPrice(ctx, gasConfig, client, chain.DefaultTxTimeout)
342344
if err != nil {
343345
logger.Warnf("Unable to obtain gas price: %v, using fallback %d", err, fallbackGasPrice)
344346
gasPrice = new(big.Int).Set(fallbackGasPrice)
345347
}
346348
txOptions.GasPrice = gasPrice
347349
return nil
348350
case 2:
349-
ctx, cancelFunc := context.WithTimeout(context.Background(), timeout)
350-
baseFeePerGas, err := chain.BaseFee(ctx, client)
351+
feeCtx, cancelFunc := context.WithTimeout(ctx, chain.DefaultTxTimeout)
352+
baseFeePerGas, err := chain.BaseFee(feeCtx, client)
351353
cancelFunc()
352354

353355
if err != nil {

client/epoch/relay_utils.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package epoch
22

33
import (
4+
"context"
45
"time"
56

67
"github.com/flare-foundation/flare-system-client/client/shared"
@@ -56,7 +57,7 @@ func (r *relayContractClientImpl) SigningPolicyInitializedListener(db epochClien
5657
for {
5758
<-ticker.C
5859
now := time.Now().Unix()
59-
logs, err := db.FetchLogsByAddressAndTopic0Timestamp(r.address, topic0, eventRangeStart, now)
60+
logs, err := db.FetchLogsByAddressAndTopic0Timestamp(context.Background(), r.address, topic0, eventRangeStart, now)
6061
if err != nil {
6162
logger.Errorf("Error fetching logs %v", err)
6263
continue

0 commit comments

Comments
 (0)