Skip to content

Commit 249e302

Browse files
committed
use error-groups and make timeouts configurable
1 parent 2adc25b commit 249e302

File tree

4 files changed

+104
-82
lines changed

4 files changed

+104
-82
lines changed

relayer/config/config.go

+23-21
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,13 @@ const (
3131
)
3232

3333
const (
34-
defaultStorageLocation = "./.icm-relayer-storage"
35-
defaultProcessMissedBlocks = true
36-
defaultAPIPort = uint16(8080)
37-
defaultMetricsPort = uint16(9090)
38-
defaultIntervalSeconds = uint64(10)
39-
defaultSignatureCacheSize = uint64(1024 * 1024)
34+
defaultStorageLocation = "./.icm-relayer-storage"
35+
defaultProcessMissedBlocks = true
36+
defaultAPIPort = uint16(8080)
37+
defaultMetricsPort = uint16(9090)
38+
defaultIntervalSeconds = uint64(10)
39+
defaultSignatureCacheSize = uint64(1024 * 1024)
40+
defaultInitialConnectionTimeoutSeconds = uint64(300)
4041
)
4142

4243
var defaultLogLevel = logging.Info.String()
@@ -50,21 +51,22 @@ icm-relayer --help Display icm-relayer usag
5051

5152
// Top-level configuration
5253
type Config struct {
53-
LogLevel string `mapstructure:"log-level" json:"log-level"`
54-
StorageLocation string `mapstructure:"storage-location" json:"storage-location"`
55-
RedisURL string `mapstructure:"redis-url" json:"redis-url"`
56-
APIPort uint16 `mapstructure:"api-port" json:"api-port"`
57-
MetricsPort uint16 `mapstructure:"metrics-port" json:"metrics-port"`
58-
DBWriteIntervalSeconds uint64 `mapstructure:"db-write-interval-seconds" json:"db-write-interval-seconds"` //nolint:lll
59-
PChainAPI *basecfg.APIConfig `mapstructure:"p-chain-api" json:"p-chain-api"`
60-
InfoAPI *basecfg.APIConfig `mapstructure:"info-api" json:"info-api"`
61-
SourceBlockchains []*SourceBlockchain `mapstructure:"source-blockchains" json:"source-blockchains"`
62-
DestinationBlockchains []*DestinationBlockchain `mapstructure:"destination-blockchains" json:"destination-blockchains"`
63-
ProcessMissedBlocks bool `mapstructure:"process-missed-blocks" json:"process-missed-blocks"`
64-
DeciderURL string `mapstructure:"decider-url" json:"decider-url"`
65-
SignatureCacheSize uint64 `mapstructure:"signature-cache-size" json:"signature-cache-size"`
66-
ManuallyTrackedPeers []*basecfg.PeerConfig `mapstructure:"manually-tracked-peers" json:"manually-tracked-peers"`
67-
AllowPrivateIPs bool `mapstructure:"allow-private-ips" json:"allow-private-ips"`
54+
LogLevel string `mapstructure:"log-level" json:"log-level"`
55+
StorageLocation string `mapstructure:"storage-location" json:"storage-location"`
56+
RedisURL string `mapstructure:"redis-url" json:"redis-url"`
57+
APIPort uint16 `mapstructure:"api-port" json:"api-port"`
58+
MetricsPort uint16 `mapstructure:"metrics-port" json:"metrics-port"`
59+
DBWriteIntervalSeconds uint64 `mapstructure:"db-write-interval-seconds" json:"db-write-interval-seconds"` //nolint:lll
60+
PChainAPI *basecfg.APIConfig `mapstructure:"p-chain-api" json:"p-chain-api"`
61+
InfoAPI *basecfg.APIConfig `mapstructure:"info-api" json:"info-api"`
62+
SourceBlockchains []*SourceBlockchain `mapstructure:"source-blockchains" json:"source-blockchains"`
63+
DestinationBlockchains []*DestinationBlockchain `mapstructure:"destination-blockchains" json:"destination-blockchains"`
64+
ProcessMissedBlocks bool `mapstructure:"process-missed-blocks" json:"process-missed-blocks"`
65+
DeciderURL string `mapstructure:"decider-url" json:"decider-url"`
66+
SignatureCacheSize uint64 `mapstructure:"signature-cache-size" json:"signature-cache-size"`
67+
ManuallyTrackedPeers []*basecfg.PeerConfig `mapstructure:"manually-tracked-peers" json:"manually-tracked-peers"`
68+
AllowPrivateIPs bool `mapstructure:"allow-private-ips" json:"allow-private-ips"`
69+
InitialConnectionTimeoutSeconds uint64 `mapstructure:"initial-connection-timeout-seconds" json:"initial-connection-timeout-seconds"` // nolint:ll
6870

6971
// convenience field to fetch a blockchain's subnet ID
7072
blockchainIDToSubnetID map[ids.ID]ids.ID

relayer/config/keys.go

+15-14
Original file line numberDiff line numberDiff line change
@@ -10,18 +10,19 @@ const (
1010
HelpKey = "help"
1111

1212
// Top-level configuration keys
13-
LogLevelKey = "log-level"
14-
PChainAPIKey = "p-chain-api"
15-
InfoAPIKey = "info-api"
16-
APIPortKey = "api-port"
17-
MetricsPortKey = "metrics-port"
18-
SourceBlockchainsKey = "source-blockchains"
19-
DestinationBlockchainsKey = "destination-blockchains"
20-
AccountPrivateKeyKey = "account-private-key"
21-
StorageLocationKey = "storage-location"
22-
RedisURLKey = "redis-url"
23-
ProcessMissedBlocksKey = "process-missed-blocks"
24-
ManualWarpMessagesKey = "manual-warp-messages"
25-
DBWriteIntervalSecondsKey = "db-write-interval-seconds"
26-
SignatureCacheSizeKey = "signature-cache-size"
13+
LogLevelKey = "log-level"
14+
PChainAPIKey = "p-chain-api"
15+
InfoAPIKey = "info-api"
16+
APIPortKey = "api-port"
17+
MetricsPortKey = "metrics-port"
18+
SourceBlockchainsKey = "source-blockchains"
19+
DestinationBlockchainsKey = "destination-blockchains"
20+
AccountPrivateKeyKey = "account-private-key"
21+
StorageLocationKey = "storage-location"
22+
RedisURLKey = "redis-url"
23+
ProcessMissedBlocksKey = "process-missed-blocks"
24+
ManualWarpMessagesKey = "manual-warp-messages"
25+
DBWriteIntervalSecondsKey = "db-write-interval-seconds"
26+
SignatureCacheSizeKey = "signature-cache-size"
27+
InitialConnectionTimeoutSeconds = "initial-connection-timeout-seconds"
2728
)

relayer/config/viper.go

+1
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ func SetDefaultConfigValues(v *viper.Viper) {
6262
SignatureCacheSizeKey,
6363
defaultSignatureCacheSize,
6464
)
65+
v.SetDefault(InitialConnectionTimeoutSeconds, defaultInitialConnectionTimeoutSeconds)
6566
}
6667

6768
// BuildConfig constructs the relayer config using Viper.

relayer/network_utils.go

+65-47
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,10 @@ import (
1616
"github.com/ava-labs/icm-services/relayer/config"
1717
"github.com/ava-labs/icm-services/utils"
1818
"go.uber.org/zap"
19+
"golang.org/x/sync/errgroup"
1920
)
2021

21-
const initialConnectionTimeoutSeconds = 1800
22+
const retryPeriodSeconds = 5
2223

2324
// Convenience function to initialize connections and check stake for all source blockchains.
2425
// Only returns an error if it fails to get a list of canonical validator or a valid warp config.
@@ -36,26 +37,38 @@ func InitializeConnectionsAndCheckStake(
3637
for _, sourceBlockchainConfig := range cfg.SourceBlockchains {
3738
network.TrackSubnet(sourceBlockchainConfig.GetSubnetID())
3839
}
39-
ctx, cancel := context.WithTimeout(context.Background(), initialConnectionTimeoutSeconds*time.Second)
40+
ctx, cancel := context.WithTimeout(
41+
context.Background(),
42+
time.Duration(cfg.InitialConnectionTimeoutSeconds)*time.Second,
43+
)
4044
defer cancel()
45+
46+
var eg errgroup.Group
4147
for _, sourceBlockchain := range cfg.SourceBlockchains {
4248
if sourceBlockchain.GetSubnetID() == constants.PrimaryNetworkID {
43-
if err := connectToPrimaryNetworkPeers(ctx, logger, network, cfg, sourceBlockchain); err != nil {
44-
return fmt.Errorf(
45-
"failed to connect to primary network peers: %w",
46-
err,
47-
)
48-
}
49+
eg.Go(func() error {
50+
err := connectToPrimaryNetworkPeers(ctx, logger, network, cfg, sourceBlockchain)
51+
if err != nil {
52+
return fmt.Errorf(
53+
"failed to connect to primary network peers: %w",
54+
err,
55+
)
56+
}
57+
return nil
58+
})
4959
} else {
50-
if err := connectToNonPrimaryNetworkPeers(ctx, logger, network, cfg, sourceBlockchain); err != nil {
51-
return fmt.Errorf(
52-
"failed to connect to non-primary network peers: %w",
53-
err,
54-
)
55-
}
60+
eg.Go(func() error {
61+
if err := connectToNonPrimaryNetworkPeers(ctx, logger, network, cfg, sourceBlockchain); err != nil {
62+
return fmt.Errorf(
63+
"failed to connect to non-primary network peers: %w",
64+
err,
65+
)
66+
}
67+
return nil
68+
})
5669
}
5770
}
58-
return nil
71+
return eg.Wait()
5972
}
6073

6174
// Connect to the validators of the source blockchain. For each destination blockchain,
@@ -81,28 +94,28 @@ func connectToNonPrimaryNetworkPeers(
8194
)
8295
return err
8396
}
84-
ok, warpConfig, err := checkForSufficientConnectedStake(
97+
ok, err := checkForSufficientConnectedStake(
8598
logger,
8699
cfg,
87100
connectedValidators,
88-
blockchainID);
101+
blockchainID)
89102
if err != nil {
90103
return err
91-
}
104+
}
92105
if ok {
93106
break
94107
}
95-
logger.Warn(
96-
"Failed to connect to a threshold of stake, retrying...",
97-
zap.String("destinationBlockchainID", blockchainID.String()),
98-
zap.Uint64("connectedWeight", connectedValidators.ConnectedWeight),
99-
zap.Uint64("totalValidatorWeight", connectedValidators.ValidatorSet.TotalWeight),
100-
)
101-
select {
102-
case <-ctx.Done():
103-
return ctx.Err()
104-
default:
105-
time.Sleep(5 * time.Second) // Retry after a short delay
108+
logger.Warn(
109+
"Failed to connect to a threshold of stake, retrying...",
110+
zap.String("destinationBlockchainID", blockchainID.String()),
111+
zap.Uint64("connectedWeight", connectedValidators.ConnectedWeight),
112+
zap.Uint64("totalValidatorWeight", connectedValidators.ValidatorSet.TotalWeight),
113+
)
114+
select {
115+
case <-ctx.Done():
116+
return ctx.Err()
117+
default:
118+
time.Sleep(5 * time.Second) // Retry after a short delay
106119
}
107120
}
108121
}
@@ -133,22 +146,27 @@ func connectToPrimaryNetworkPeers(
133146
)
134147
return err
135148
}
136-
if ok, warpConfig, err := checkForSufficientConnectedStake(logger, cfg, connectedValidators, blockchainID); ok {
149+
ok, err := checkForSufficientConnectedStake(
150+
logger,
151+
cfg,
152+
connectedValidators,
153+
blockchainID)
154+
if err != nil {
137155
return err
138-
} else {
139-
logger.Warn(
140-
"Failed to connect to a threshold of stake, retrying...",
141-
zap.String("destinationBlockchainID", blockchainID.String()),
142-
zap.Uint64("connectedWeight", connectedValidators.ConnectedWeight),
143-
zap.Uint64("totalValidatorWeight", connectedValidators.ValidatorSet.TotalWeight),
144-
zap.Any("WarpConfig", warpConfig),
145-
)
146-
select {
147-
case <-ctx.Done():
148-
return ctx.Err()
149-
default:
150-
time.Sleep(1 * time.Second) // Retry after a short delay
151-
}
156+
} else if ok {
157+
break
158+
}
159+
logger.Warn(
160+
"Failed to connect to a threshold of stake, retrying...",
161+
zap.String("destinationBlockchainID", blockchainID.String()),
162+
zap.Uint64("connectedWeight", connectedValidators.ConnectedWeight),
163+
zap.Uint64("totalValidatorWeight", connectedValidators.ValidatorSet.TotalWeight),
164+
)
165+
select {
166+
case <-ctx.Done():
167+
return ctx.Err()
168+
default:
169+
time.Sleep(retryPeriodSeconds)
152170
}
153171
}
154172
}
@@ -161,19 +179,19 @@ func checkForSufficientConnectedStake(
161179
cfg *config.Config,
162180
connectedValidators *peers.ConnectedCanonicalValidators,
163181
destinationBlockchainID ids.ID,
164-
) (bool, *config.WarpConfig, error) {
182+
) (bool, error) {
165183
warpConfig, err := cfg.GetWarpConfig(destinationBlockchainID)
166184
if err != nil {
167185
logger.Error(
168186
"Failed to get warp config from chain config",
169187
zap.String("destinationBlockchainID", destinationBlockchainID.String()),
170188
zap.Error(err),
171189
)
172-
return false, nil, err
190+
return false, err
173191
}
174192
return utils.CheckStakeWeightExceedsThreshold(
175193
big.NewInt(0).SetUint64(connectedValidators.ConnectedWeight),
176194
connectedValidators.ValidatorSet.TotalWeight,
177195
warpConfig.QuorumNumerator,
178-
), &warpConfig, nil
196+
), nil
179197
}

0 commit comments

Comments
 (0)