Skip to content

Commit 304fbf4

Browse files
committed
various fixes related to transaction building & submission via spamoor
1 parent 11fe44b commit 304fbf4

File tree

18 files changed

+508
-590
lines changed

18 files changed

+508
-590
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ require (
77
github.com/donovanhide/eventsource v0.0.0-20210830082556-c59027999da0
88
github.com/ethereum/go-ethereum v1.17.0
99
github.com/ethpandaops/ethwallclock v0.4.0
10-
github.com/ethpandaops/spamoor v1.1.17-0.20260210191103-c3c5caa2a6b6
10+
github.com/ethpandaops/spamoor v1.1.18-0.20260220084150-d48b77369e81
1111
github.com/glebarez/go-sqlite v1.22.0
1212
github.com/golang-jwt/jwt/v5 v5.3.1
1313
github.com/google/uuid v1.6.0

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,8 +75,8 @@ github.com/ethereum/go-ethereum v1.17.0 h1:2D+1Fe23CwZ5tQoAS5DfwKFNI1HGcTwi65/kR
7575
github.com/ethereum/go-ethereum v1.17.0/go.mod h1:2W3msvdosS/MCWytpqTcqgFiRYbTH59FxDJzqah120o=
7676
github.com/ethpandaops/ethwallclock v0.4.0 h1:+sgnhf4pk6hLPukP076VxkiLloE4L0Yk1yat+ZyHh1g=
7777
github.com/ethpandaops/ethwallclock v0.4.0/go.mod h1:y0Cu+mhGLlem19vnAV2x0hpFS5KZ7oOi2SWYayv9l24=
78-
github.com/ethpandaops/spamoor v1.1.17-0.20260210191103-c3c5caa2a6b6 h1:udcP+wIKp8yfhrbJruuApazjEjZ8wRgRcvSInYzOO0M=
79-
github.com/ethpandaops/spamoor v1.1.17-0.20260210191103-c3c5caa2a6b6/go.mod h1:+Z810GiNvmc+MxNj675Wo59Md6Ip8jbJvih6/7oMNc0=
78+
github.com/ethpandaops/spamoor v1.1.18-0.20260220084150-d48b77369e81 h1:0RtYv8dF1tAAank8R+rs/ii4QyqNdEc+dMF7knzrmZQ=
79+
github.com/ethpandaops/spamoor v1.1.18-0.20260220084150-d48b77369e81/go.mod h1:GW6Xjym8fliGk+Si2/dV+7M0RXIJTwW2w/ZHYXnri0A=
8080
github.com/fatih/color v1.16.0 h1:zmkK9Ngbjj+K0yRhTVONQh1p/HknKYSlNT+vZCzyokM=
8181
github.com/fatih/color v1.16.0/go.mod h1:fL2Sau1YI5c0pdGEVCbKQbLXB6edEj1ZgiY4NijnWvE=
8282
github.com/ferranbt/fastssz v0.1.4 h1:OCDB+dYDEQDvAgtAGnTSidK1Pe2tW3nFV40XyMkTeDY=

pkg/scheduler/scheduler.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ func (ts *TaskScheduler) RunTasks(testRunCtx context.Context, timeout time.Durat
9292
defer ts.cancelTaskCtx()
9393

9494
for _, task := range ts.rootTasks {
95-
err := ts.ExecuteTask(tasksCtx, task, ts.WatchTaskPass)
95+
err := ts.ExecuteTask(tasksCtx, task, nil)
9696
if err != nil {
9797
return err
9898
}
@@ -111,7 +111,7 @@ func (ts *TaskScheduler) runCleanupTasks(ctx context.Context) {
111111
return
112112
}
113113

114-
err := ts.ExecuteTask(ctx, taskIndex, ts.WatchTaskPass)
114+
err := ts.ExecuteTask(ctx, taskIndex, nil)
115115
if err != nil {
116116
taskState := ts.getTaskState(taskIndex)
117117
taskState.logger.GetLogger().Errorf("cleanup task failed: %v", err)

pkg/scheduler/task_execution.go

Lines changed: 0 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -273,25 +273,3 @@ func (ts *TaskScheduler) emitTaskProgress(taskState *taskState, percent float64,
273273
message,
274274
)
275275
}
276-
277-
func (ts *TaskScheduler) WatchTaskPass(ctx context.Context, cancelFn context.CancelFunc, taskIndex types.TaskIndex) {
278-
taskState := ts.GetTaskState(taskIndex)
279-
280-
// poll task result and cancel context when task result is passed or failed
281-
for {
282-
updateChan := taskState.GetTaskResultUpdateChan(types.TaskResultNone)
283-
if updateChan != nil {
284-
select {
285-
case <-ctx.Done():
286-
return
287-
case <-updateChan:
288-
}
289-
}
290-
291-
taskStatus := taskState.GetTaskStatus()
292-
if taskStatus.Result != types.TaskResultNone {
293-
cancelFn()
294-
return
295-
}
296-
}
297-
}

pkg/tasks/generate_blob_transactions/task.go

Lines changed: 74 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,48 @@ func (t *Task) Execute(ctx context.Context) error {
219219
return nil
220220
}
221221

222+
// isFuluActive checks whether the Fulu fork is active by comparing the
223+
// current wallclock epoch against FULU_FORK_EPOCH from the consensus specs.
224+
func (t *Task) isFuluActive() bool {
225+
blockCache := t.ctx.Scheduler.GetServices().ClientPool().
226+
GetConsensusPool().GetBlockCache()
227+
228+
specValues := blockCache.GetSpecValues()
229+
if specValues == nil {
230+
return false
231+
}
232+
233+
fuluEpochVal, ok := specValues["FULU_FORK_EPOCH"]
234+
if !ok {
235+
return false
236+
}
237+
238+
var fuluEpoch uint64
239+
240+
switch v := fuluEpochVal.(type) {
241+
case uint64:
242+
fuluEpoch = v
243+
case string:
244+
if _, err := fmt.Sscanf(v, "%d", &fuluEpoch); err != nil {
245+
return false
246+
}
247+
default:
248+
return false
249+
}
250+
251+
wallclock := blockCache.GetWallclock()
252+
if wallclock == nil {
253+
return false
254+
}
255+
256+
_, currentEpoch, err := wallclock.Now()
257+
if err != nil {
258+
return false
259+
}
260+
261+
return currentEpoch.Number() >= fuluEpoch
262+
}
263+
222264
func (t *Task) generateTransaction(ctx context.Context, transactionIdx uint64, completeFn spamoor.TxCompleteFn) error {
223265
txWallet := t.wallet
224266
if t.wallet == nil {
@@ -307,6 +349,13 @@ func (t *Task) generateTransaction(ctx context.Context, transactionIdx uint64, c
307349
}
308350
}
309351

352+
walletMgr := t.ctx.Scheduler.GetServices().WalletManager()
353+
spamoorClients := make([]*spamoor.Client, len(clients))
354+
355+
for i, c := range clients {
356+
spamoorClients[i] = walletMgr.GetClient(c)
357+
}
358+
310359
blobTx, err := txbuilder.BuildBlobTx(&txbuilder.TxMetadata{
311360
GasFeeCap: uint256.MustFromBig(t.config.FeeCap),
312361
GasTipCap: uint256.MustFromBig(t.config.TipCap),
@@ -325,11 +374,13 @@ func (t *Task) generateTransaction(ctx context.Context, transactionIdx uint64, c
325374
return fmt.Errorf("cannot build blob transaction: %w", err)
326375
}
327376

328-
walletMgr := t.ctx.Scheduler.GetServices().WalletManager()
329-
client := walletMgr.GetClient(clients[transactionIdx%uint64(len(clients))])
377+
// Check if Fulu is active to determine blob sidecar version.
378+
// After Fulu, blob sidecars must use v1 format (cell proofs).
379+
useBlobV1 := t.isFuluActive()
330380

331-
err = walletMgr.GetTxPool().SendTransaction(ctx, txWallet, tx, &spamoor.SendTransactionOptions{
332-
Client: client,
381+
sendOpts := &spamoor.SendTransactionOptions{
382+
Client: spamoorClients[transactionIdx%uint64(len(spamoorClients))],
383+
ClientList: spamoorClients,
333384
Rebroadcast: true,
334385
OnComplete: completeFn,
335386
LogFn: func(client *spamoor.Client, retry int, rebroadcast int, err error) {
@@ -351,7 +402,25 @@ func (t *Task) generateTransaction(ctx context.Context, transactionIdx uint64, c
351402

352403
logEntry.Infof("submitted blob transaction %v (nonce: %v, attempt: %v)", transactionIdx, tx.Nonce(), retry)
353404
},
354-
})
405+
}
406+
407+
if useBlobV1 {
408+
convertedToV1 := false
409+
410+
sendOpts.OnEncode = func(tx *ethtypes.Transaction) ([]byte, error) {
411+
if !convertedToV1 {
412+
if convErr := tx.BlobTxSidecar().ToV1(); convErr != nil {
413+
return nil, fmt.Errorf("cannot convert blob sidecar to v1: %w", convErr)
414+
}
415+
416+
convertedToV1 = true
417+
}
418+
419+
return nil, nil
420+
}
421+
}
422+
423+
err = walletMgr.GetTxPool().SendTransaction(ctx, txWallet, tx, sendOpts)
355424
if err != nil {
356425
txWallet.MarkSkippedNonce(tx.Nonce())
357426
return err

pkg/tasks/generate_consolidations/task.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -193,8 +193,6 @@ func (t *Task) Execute(ctx context.Context) error {
193193
// Note: onComplete callback is still called by spamoor even on error,
194194
// so we don't drain pendingChan or call pendingWg.Done() here
195195
} else {
196-
t.ctx.SetResult(types.TaskResultSuccess)
197-
198196
perSlotCount++
199197
totalCount++
200198

@@ -377,6 +375,11 @@ func (t *Task) generateConsolidation(ctx context.Context, accountIdx uint64, onC
377375

378376
walletMgr := t.ctx.Scheduler.GetServices().WalletManager()
379377

378+
spamoorClients := make([]*spamoor.Client, len(clients))
379+
for i, c := range clients {
380+
spamoorClients[i] = walletMgr.GetClient(c)
381+
}
382+
380383
txWallet, err := walletMgr.GetWalletByPrivkey(ctx, t.walletPrivKey)
381384
if err != nil {
382385
return nil, fmt.Errorf("cannot initialize wallet: %w", err)
@@ -405,10 +408,9 @@ func (t *Task) generateConsolidation(ctx context.Context, accountIdx uint64, onC
405408
return nil, fmt.Errorf("cannot build consolidation transaction: %w", err)
406409
}
407410

408-
client := walletMgr.GetClient(clients[0])
409-
410411
err = walletMgr.GetTxPool().SendTransaction(ctx, txWallet, tx, &spamoor.SendTransactionOptions{
411-
Client: client,
412+
Client: spamoorClients[0],
413+
ClientList: spamoorClients,
412414
Rebroadcast: true,
413415
OnComplete: onComplete,
414416
LogFn: func(client *spamoor.Client, retry int, rebroadcast int, err error) {

pkg/tasks/generate_deposits/task.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -220,8 +220,6 @@ func (t *Task) Execute(ctx context.Context) error {
220220
// Note: onComplete callback is still called by spamoor even on error,
221221
// so we don't call pendingWg.Done() here
222222
} else {
223-
t.ctx.SetResult(types.TaskResultSuccess)
224-
225223
perSlotCount++
226224
totalCount++
227225

@@ -527,6 +525,11 @@ func (t *Task) generateDeposit(ctx context.Context, accountIdx uint64, onComplet
527525

528526
walletMgr := t.ctx.Scheduler.GetServices().WalletManager()
529527

528+
spamoorClients := make([]*spamoor.Client, len(clients))
529+
for i, c := range clients {
530+
spamoorClients[i] = walletMgr.GetClient(c)
531+
}
532+
530533
txWallet, err := walletMgr.GetWalletByPrivkey(t.ctx.Scheduler.GetTestRunCtx(), t.walletPrivKey)
531534
if err != nil {
532535
return nil, nil, fmt.Errorf("cannot initialize wallet: %w", err)
@@ -551,10 +554,9 @@ func (t *Task) generateDeposit(ctx context.Context, accountIdx uint64, onComplet
551554
return nil, nil, fmt.Errorf("cannot build deposit transaction: %w", err)
552555
}
553556

554-
client := walletMgr.GetClient(clients[0])
555-
556557
err = walletMgr.GetTxPool().SendTransaction(ctx, txWallet, tx, &spamoor.SendTransactionOptions{
557-
Client: client,
558+
Client: spamoorClients[0],
559+
ClientList: spamoorClients,
558560
Rebroadcast: true,
559561
OnComplete: onComplete,
560562
LogFn: func(client *spamoor.Client, retry int, rebroadcast int, err error) {

pkg/tasks/generate_eoa_transactions/task.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -352,10 +352,15 @@ func (t *Task) generateTransaction(ctx context.Context, transactionIdx uint64, c
352352
}
353353

354354
walletMgr := t.ctx.Scheduler.GetServices().WalletManager()
355-
client := walletMgr.GetClient(clients[transactionIdx%uint64(len(clients))])
355+
356+
spamoorClients := make([]*spamoor.Client, len(clients))
357+
for i, c := range clients {
358+
spamoorClients[i] = walletMgr.GetClient(c)
359+
}
356360

357361
err = walletMgr.GetTxPool().SendTransaction(ctx, txWallet, tx, &spamoor.SendTransactionOptions{
358-
Client: client,
362+
Client: spamoorClients[transactionIdx%uint64(len(spamoorClients))],
363+
ClientList: spamoorClients,
359364
Rebroadcast: true,
360365
OnComplete: completeFn,
361366
LogFn: func(client *spamoor.Client, retry int, rebroadcast int, err error) {

pkg/tasks/generate_transaction/task.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -207,16 +207,21 @@ func (t *Task) Execute(ctx context.Context) error {
207207
}
208208

209209
walletMgr := t.ctx.Scheduler.GetServices().WalletManager()
210+
spamoorClients := make([]*spamoor.Client, len(clients))
211+
for i, c := range clients {
212+
spamoorClients[i] = walletMgr.GetClient(c)
213+
}
210214

211-
for i := 0; i < len(clients); i++ {
212-
client := clients[i%len(clients)]
215+
for i := 0; i < len(spamoorClients); i++ {
216+
clientIdx := i % len(spamoorClients)
213217

214218
t.logger.WithFields(logrus.Fields{
215-
"client": client.GetName(),
219+
"client": spamoorClients[clientIdx].GetName(),
216220
}).Infof("sending tx: %v", tx.Hash().Hex())
217221

218222
err = walletMgr.GetTxPool().SendTransaction(ctx, t.wallet, tx, &spamoor.SendTransactionOptions{
219-
Client: walletMgr.GetClient(client),
223+
Client: spamoorClients[clientIdx],
224+
ClientList: spamoorClients,
220225
ClientsStartOffset: i,
221226
})
222227
if err == nil {

pkg/tasks/generate_withdrawal_requests/task.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -194,8 +194,6 @@ func (t *Task) Execute(ctx context.Context) error {
194194
// Note: onComplete callback is still called by spamoor even on error,
195195
// so we don't drain pendingChan or call pendingWg.Done() here
196196
} else {
197-
t.ctx.SetResult(types.TaskResultSuccess)
198-
199197
perSlotCount++
200198
totalCount++
201199

@@ -366,6 +364,11 @@ func (t *Task) generateWithdrawal(ctx context.Context, accountIdx uint64, onComp
366364

367365
walletMgr := t.ctx.Scheduler.GetServices().WalletManager()
368366

367+
spamoorClients := make([]*spamoor.Client, len(clients))
368+
for i, c := range clients {
369+
spamoorClients[i] = walletMgr.GetClient(c)
370+
}
371+
369372
txWallet, err := walletMgr.GetWalletByPrivkey(ctx, t.walletPrivKey)
370373
if err != nil {
371374
return nil, fmt.Errorf("cannot initialize wallet: %w", err)
@@ -397,10 +400,9 @@ func (t *Task) generateWithdrawal(ctx context.Context, accountIdx uint64, onComp
397400
return nil, fmt.Errorf("cannot build withdrawal transaction: %w", err)
398401
}
399402

400-
client := walletMgr.GetClient(clients[0])
401-
402403
err = walletMgr.GetTxPool().SendTransaction(ctx, txWallet, tx, &spamoor.SendTransactionOptions{
403-
Client: client,
404+
Client: spamoorClients[0],
405+
ClientList: spamoorClients,
404406
Rebroadcast: true,
405407
OnComplete: onComplete,
406408
LogFn: func(client *spamoor.Client, _ int, rebroadcast int, err error) {

0 commit comments

Comments
 (0)