-
Notifications
You must be signed in to change notification settings - Fork 272
Expand file tree
/
Copy pathclient.go
More file actions
251 lines (224 loc) · 7.15 KB
/
client.go
File metadata and controls
251 lines (224 loc) · 7.15 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
// SPDX-License-Identifier: BUSL-1.1
//
// Copyright (C) 2025, Berachain Foundation. All rights reserved.
// Use of this software is governed by the Business Source License included
// in the LICENSE file of this repository and at www.mariadb.com/bsl11.
//
// ANY USE OF THE LICENSED WORK IN VIOLATION OF THIS LICENSE WILL AUTOMATICALLY
// TERMINATE YOUR RIGHTS UNDER THIS LICENSE FOR THE CURRENT AND ALL OTHER
// VERSIONS OF THE LICENSED WORK.
//
// THIS LICENSE DOES NOT GRANT YOU ANY RIGHT IN ANY TRADEMARK OR LOGO OF
// LICENSOR OR ITS AFFILIATES (PROVIDED THAT YOU MAY USE A TRADEMARK OR LOGO OF
// LICENSOR AS EXPRESSLY REQUIRED BY THIS LICENSE).
//
// TO THE EXTENT PERMITTED BY APPLICABLE LAW, THE LICENSED WORK IS PROVIDED ON
// AN “AS IS” BASIS. LICENSOR HEREBY DISCLAIMS ALL WARRANTIES AND CONDITIONS,
// EXPRESS OR IMPLIED, INCLUDING (WITHOUT LIMITATION) WARRANTIES OF
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT, AND
// TITLE.
package client
import (
"context"
"fmt"
"math/big"
"sync"
"time"
"github.com/berachain/beacon-kit/errors"
ethclient "github.com/berachain/beacon-kit/execution/client/ethclient"
ethclientrpc "github.com/berachain/beacon-kit/execution/client/ethclient/rpc"
"github.com/berachain/beacon-kit/log"
"github.com/berachain/beacon-kit/primitives/math"
"github.com/berachain/beacon-kit/primitives/net/http"
"github.com/berachain/beacon-kit/primitives/net/jwt"
)
// EngineClient is a struct that holds a pointer to an Eth1Client.
type EngineClient struct {
*ethclient.Client
// cfg is the supplied configuration for the engine client.
cfg *Config
// logger is the logger for the engine client.
logger log.Logger
// eth1ChainID is the chain ID of the execution client.
eth1ChainID *big.Int
// clientMetrics is the metrics for the engine client.
metrics *clientMetrics
// capabilities is a map of capabilities that the execution client has.
capabilitiesMu sync.RWMutex
capabilities map[string]struct{}
// connected will be set to true when we have successfully connected
// to the execution client.
connectedMu sync.RWMutex
connected bool
}
// New creates a new engine client EngineClient.
// It takes an Eth1Client as an argument and returns a pointer to an
// EngineClient.
func New(
cfg *Config,
logger log.Logger,
jwtSecret *jwt.Secret,
telemetrySink TelemetrySink,
eth1ChainID *big.Int,
) *EngineClient {
ethClient := ethclientrpc.NewClient(
cfg.RPCDialURL.String(),
jwtSecret,
cfg.RPCJWTRefreshInterval,
logger,
)
// Enforcing minimum rpc timeout
// The reason we do it is that we previously suggested a
// 900 ms default, which is unnecessarily strict.
// TODO: Not great enforcing this here since, in principle,
// other services may use this config (not currently the case)
// and we pass cfg by pointer, hence we do a global change.
// The altenative of validating every config in ProvideConfig
// should be considered (but logging there is trickier)
if cfg.RPCTimeout < MinRPCTimeout {
logger.Warn("Automatically raising RPCTimeout",
"configured", cfg.RPCTimeout,
"minimum", MinRPCTimeout,
)
}
cfg.RPCTimeout = max(MinRPCTimeout, cfg.RPCTimeout)
if cfg.DeprecatedRPCRetries != 0 {
logger.Warn("ignoring deprecated setting rpc-retries")
}
return &EngineClient{
cfg: cfg,
logger: logger,
Client: ethclient.New(ethClient),
capabilities: make(map[string]struct{}),
eth1ChainID: eth1ChainID,
metrics: newClientMetrics(telemetrySink, logger),
connected: false,
}
}
// Name returns the name of the engine client.
func (s *EngineClient) Name() string {
return "engine-client"
}
// Start the engine client.
func (s *EngineClient) Start(ctx context.Context) error {
// Initialize the JWT token before making any RPC calls
if err := s.Client.Initialize(); err != nil {
return fmt.Errorf("failed to initialize RPC client: %w", err)
}
// Start the Client background refresh loop.
go s.Client.Start(ctx)
s.logger.Info(
"Initializing connection to the execution client...",
"dial_url", s.cfg.RPCDialURL.String(),
)
// If the connection connection succeeds, we can skip the
// connection initialization loop.
if err := s.verifyChainIDAndConnection(ctx); err == nil {
return nil
}
// Attempt to initialize the connection to the execution client.
ticker := time.NewTicker(s.cfg.RPCStartupCheckInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
s.logger.Info(
"Waiting for execution client to start... 🍺🕔",
"dial_url", s.cfg.RPCDialURL,
)
if err := s.verifyChainIDAndConnection(ctx); err != nil {
if errors.Is(err, ErrMismatchedEth1ChainID) {
s.logger.Error(err.Error())
}
continue
}
s.connectedMu.Lock()
s.connected = true
s.connectedMu.Unlock()
return nil
}
}
}
func (s *EngineClient) Stop() error {
return nil
}
func (s *EngineClient) IsConnected() bool {
s.connectedMu.RLock()
defer s.connectedMu.RUnlock()
return s.connected
}
func (s *EngineClient) HasCapability(capability string) bool {
s.capabilitiesMu.RLock()
defer s.capabilitiesMu.RUnlock()
_, ok := s.capabilities[capability]
return ok
}
/* -------------------------------------------------------------------------- */
/* Helpers */
/* -------------------------------------------------------------------------- */
// verifyChainID dials the execution client and
// ensures the chain ID is correct.
func (s *EngineClient) verifyChainIDAndConnection(
ctx context.Context,
) error {
var (
err error
chainID math.U64
)
defer func() {
if err != nil {
err = s.Client.Close()
}
}()
// After the initial dial, check to make sure the chain ID is correct.
chainID, err = s.Client.ChainID(ctx)
if err != nil {
if errors.Is(err, http.ErrUnauthorized) {
// We always log this error as it is a critical error.
s.logger.Error(UnauthenticatedConnectionErrorStr)
}
return err
}
// TODO: consider validating once when config is set or
// client is initialized
if !s.eth1ChainID.IsUint64() {
err = errors.Wrapf(
errors.New("provided chain ID is not uint64"),
s.eth1ChainID.String(),
)
return err
}
if chainID.Unwrap() != s.eth1ChainID.Uint64() {
err = errors.Wrapf(
ErrMismatchedEth1ChainID,
"wanted chain ID %d, got %d",
s.eth1ChainID,
chainID,
)
return err
}
// Log the chain ID.
s.logger.Info(
"Connected to execution client 🔌",
"dial_url", s.cfg.RPCDialURL.String(),
"chain_id", chainID.Unwrap(),
"required_chain_id", s.eth1ChainID,
)
// Exchange capabilities with the execution client.
if _, err = s.ExchangeCapabilities(ctx); err != nil {
s.logger.Error("failed to exchange capabilities", "err", err)
return err
}
return nil
}
/* -------------------------------------------------------------------------- */
/* Getters */
/* -------------------------------------------------------------------------- */
func (s *EngineClient) GetRPCRetryInterval() time.Duration {
return s.cfg.RPCRetryInterval
}
func (s *EngineClient) GetRPCMaxRetryInterval() time.Duration {
return s.cfg.RPCMaxRetryInterval
}