Skip to content

Commit f2b10c3

Browse files
committed
Merge branch 'master' into gloas-support
2 parents 92d7c44 + 4778616 commit f2b10c3

File tree

21 files changed

+596
-595
lines changed

21 files changed

+596
-595
lines changed

docs/05-task-config.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,4 +88,4 @@ The categorization serves as a general guide to understanding the nature and pur
8888

8989
The following sections will detail individual tasks within these categories, providing insights into their specific functions, configurations, and use cases.
9090

91-
#!! cat pkg/coordinator/tasks/*/README.md
91+
#!! cat pkg/tasks/*/README.md

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ require (
77
github.com/donovanhide/eventsource v0.0.0-20210830082556-c59027999da0
88
github.com/ethereum/go-ethereum v1.17.0
99
github.com/ethpandaops/ethwallclock v0.4.0
10-
github.com/ethpandaops/spamoor v1.1.17-0.20260210191103-c3c5caa2a6b6
10+
github.com/ethpandaops/spamoor v1.1.18-0.20260226103249-8d1c1fdd2de0
1111
github.com/glebarez/go-sqlite v1.22.0
1212
github.com/golang-jwt/jwt/v5 v5.3.1
1313
github.com/google/uuid v1.6.0

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,8 +75,8 @@ github.com/ethereum/go-ethereum v1.17.0 h1:2D+1Fe23CwZ5tQoAS5DfwKFNI1HGcTwi65/kR
7575
github.com/ethereum/go-ethereum v1.17.0/go.mod h1:2W3msvdosS/MCWytpqTcqgFiRYbTH59FxDJzqah120o=
7676
github.com/ethpandaops/ethwallclock v0.4.0 h1:+sgnhf4pk6hLPukP076VxkiLloE4L0Yk1yat+ZyHh1g=
7777
github.com/ethpandaops/ethwallclock v0.4.0/go.mod h1:y0Cu+mhGLlem19vnAV2x0hpFS5KZ7oOi2SWYayv9l24=
78-
github.com/ethpandaops/spamoor v1.1.17-0.20260210191103-c3c5caa2a6b6 h1:udcP+wIKp8yfhrbJruuApazjEjZ8wRgRcvSInYzOO0M=
79-
github.com/ethpandaops/spamoor v1.1.17-0.20260210191103-c3c5caa2a6b6/go.mod h1:+Z810GiNvmc+MxNj675Wo59Md6Ip8jbJvih6/7oMNc0=
78+
github.com/ethpandaops/spamoor v1.1.18-0.20260226103249-8d1c1fdd2de0 h1:avoDj2pij57dxnA5dpei0ND8LKVpMttvPo3E0QF/6UE=
79+
github.com/ethpandaops/spamoor v1.1.18-0.20260226103249-8d1c1fdd2de0/go.mod h1:GW6Xjym8fliGk+Si2/dV+7M0RXIJTwW2w/ZHYXnri0A=
8080
github.com/fatih/color v1.16.0 h1:zmkK9Ngbjj+K0yRhTVONQh1p/HknKYSlNT+vZCzyokM=
8181
github.com/fatih/color v1.16.0/go.mod h1:fL2Sau1YI5c0pdGEVCbKQbLXB6edEj1ZgiY4NijnWvE=
8282
github.com/ferranbt/fastssz v0.1.4 h1:OCDB+dYDEQDvAgtAGnTSidK1Pe2tW3nFV40XyMkTeDY=

pkg/clients/consensus/rpc/beaconapi.go

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -513,11 +513,26 @@ func (bc *BeaconClient) SubmitVoluntaryExits(ctx context.Context, exit *phase0.S
513513

514514
func (bc *BeaconClient) SubmitAttesterSlashing(ctx context.Context, slashing *phase0.AttesterSlashing) error {
515515
err := bc.postJSON(ctx, fmt.Sprintf("%s/eth/v1/beacon/pool/attester_slashings", bc.endpoint), slashing, nil)
516-
if err != nil {
517-
return err
516+
if err == nil {
517+
return nil
518518
}
519519

520-
return nil
520+
// Prysm removed the v1 endpoint post-Electra, fall back to v2 with version header.
521+
if err.Error() == "not found" {
522+
v2Err := bc.postJSONWithHeaders(
523+
ctx,
524+
fmt.Sprintf("%s/eth/v2/beacon/pool/attester_slashings", bc.endpoint),
525+
slashing, nil,
526+
map[string]string{"Eth-Consensus-Version": "electra"},
527+
)
528+
if v2Err != nil {
529+
return v2Err
530+
}
531+
532+
return nil
533+
}
534+
535+
return err
521536
}
522537

523538
func (bc *BeaconClient) SubmitProposerSlashing(ctx context.Context, slashing *phase0.ProposerSlashing) error {

pkg/scheduler/scheduler.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ func (ts *TaskScheduler) RunTasks(testRunCtx context.Context, timeout time.Durat
9292
defer ts.cancelTaskCtx()
9393

9494
for _, task := range ts.rootTasks {
95-
err := ts.ExecuteTask(tasksCtx, task, ts.WatchTaskPass)
95+
err := ts.ExecuteTask(tasksCtx, task, nil)
9696
if err != nil {
9797
return err
9898
}
@@ -111,7 +111,7 @@ func (ts *TaskScheduler) runCleanupTasks(ctx context.Context) {
111111
return
112112
}
113113

114-
err := ts.ExecuteTask(ctx, taskIndex, ts.WatchTaskPass)
114+
err := ts.ExecuteTask(ctx, taskIndex, nil)
115115
if err != nil {
116116
taskState := ts.getTaskState(taskIndex)
117117
taskState.logger.GetLogger().Errorf("cleanup task failed: %v", err)

pkg/scheduler/task_execution.go

Lines changed: 0 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -273,25 +273,3 @@ func (ts *TaskScheduler) emitTaskProgress(taskState *taskState, percent float64,
273273
message,
274274
)
275275
}
276-
277-
func (ts *TaskScheduler) WatchTaskPass(ctx context.Context, cancelFn context.CancelFunc, taskIndex types.TaskIndex) {
278-
taskState := ts.GetTaskState(taskIndex)
279-
280-
// poll task result and cancel context when task result is passed or failed
281-
for {
282-
updateChan := taskState.GetTaskResultUpdateChan(types.TaskResultNone)
283-
if updateChan != nil {
284-
select {
285-
case <-ctx.Done():
286-
return
287-
case <-updateChan:
288-
}
289-
}
290-
291-
taskStatus := taskState.GetTaskStatus()
292-
if taskStatus.Result != types.TaskResultNone {
293-
cancelFn()
294-
return
295-
}
296-
}
297-
}

pkg/tasks/generate_blob_transactions/task.go

Lines changed: 74 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,48 @@ func (t *Task) Execute(ctx context.Context) error {
219219
return nil
220220
}
221221

222+
// isFuluActive checks whether the Fulu fork is active by comparing the
223+
// current wallclock epoch against FULU_FORK_EPOCH from the consensus specs.
224+
func (t *Task) isFuluActive() bool {
225+
blockCache := t.ctx.Scheduler.GetServices().ClientPool().
226+
GetConsensusPool().GetBlockCache()
227+
228+
specValues := blockCache.GetSpecValues()
229+
if specValues == nil {
230+
return false
231+
}
232+
233+
fuluEpochVal, ok := specValues["FULU_FORK_EPOCH"]
234+
if !ok {
235+
return false
236+
}
237+
238+
var fuluEpoch uint64
239+
240+
switch v := fuluEpochVal.(type) {
241+
case uint64:
242+
fuluEpoch = v
243+
case string:
244+
if _, err := fmt.Sscanf(v, "%d", &fuluEpoch); err != nil {
245+
return false
246+
}
247+
default:
248+
return false
249+
}
250+
251+
wallclock := blockCache.GetWallclock()
252+
if wallclock == nil {
253+
return false
254+
}
255+
256+
_, currentEpoch, err := wallclock.Now()
257+
if err != nil {
258+
return false
259+
}
260+
261+
return currentEpoch.Number() >= fuluEpoch
262+
}
263+
222264
func (t *Task) generateTransaction(ctx context.Context, transactionIdx uint64, completeFn spamoor.TxCompleteFn) error {
223265
txWallet := t.wallet
224266
if t.wallet == nil {
@@ -307,6 +349,13 @@ func (t *Task) generateTransaction(ctx context.Context, transactionIdx uint64, c
307349
}
308350
}
309351

352+
walletMgr := t.ctx.Scheduler.GetServices().WalletManager()
353+
spamoorClients := make([]*spamoor.Client, len(clients))
354+
355+
for i, c := range clients {
356+
spamoorClients[i] = walletMgr.GetClient(c)
357+
}
358+
310359
blobTx, err := txbuilder.BuildBlobTx(&txbuilder.TxMetadata{
311360
GasFeeCap: uint256.MustFromBig(t.config.FeeCap),
312361
GasTipCap: uint256.MustFromBig(t.config.TipCap),
@@ -325,11 +374,13 @@ func (t *Task) generateTransaction(ctx context.Context, transactionIdx uint64, c
325374
return fmt.Errorf("cannot build blob transaction: %w", err)
326375
}
327376

328-
walletMgr := t.ctx.Scheduler.GetServices().WalletManager()
329-
client := walletMgr.GetClient(clients[transactionIdx%uint64(len(clients))])
377+
// Check if Fulu is active to determine blob sidecar version.
378+
// After Fulu, blob sidecars must use v1 format (cell proofs).
379+
useBlobV1 := t.isFuluActive()
330380

331-
err = walletMgr.GetTxPool().SendTransaction(ctx, txWallet, tx, &spamoor.SendTransactionOptions{
332-
Client: client,
381+
sendOpts := &spamoor.SendTransactionOptions{
382+
Client: spamoorClients[transactionIdx%uint64(len(spamoorClients))],
383+
ClientList: spamoorClients,
333384
Rebroadcast: true,
334385
OnComplete: completeFn,
335386
LogFn: func(client *spamoor.Client, retry int, rebroadcast int, err error) {
@@ -351,7 +402,25 @@ func (t *Task) generateTransaction(ctx context.Context, transactionIdx uint64, c
351402

352403
logEntry.Infof("submitted blob transaction %v (nonce: %v, attempt: %v)", transactionIdx, tx.Nonce(), retry)
353404
},
354-
})
405+
}
406+
407+
if useBlobV1 {
408+
convertedToV1 := false
409+
410+
sendOpts.OnEncode = func(tx *ethtypes.Transaction) ([]byte, error) {
411+
if !convertedToV1 {
412+
if convErr := tx.BlobTxSidecar().ToV1(); convErr != nil {
413+
return nil, fmt.Errorf("cannot convert blob sidecar to v1: %w", convErr)
414+
}
415+
416+
convertedToV1 = true
417+
}
418+
419+
return nil, nil
420+
}
421+
}
422+
423+
err = walletMgr.GetTxPool().SendTransaction(ctx, txWallet, tx, sendOpts)
355424
if err != nil {
356425
txWallet.MarkSkippedNonce(tx.Nonce())
357426
return err

pkg/tasks/generate_consolidations/task.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -193,8 +193,6 @@ func (t *Task) Execute(ctx context.Context) error {
193193
// Note: onComplete callback is still called by spamoor even on error,
194194
// so we don't drain pendingChan or call pendingWg.Done() here
195195
} else {
196-
t.ctx.SetResult(types.TaskResultSuccess)
197-
198196
perSlotCount++
199197
totalCount++
200198

@@ -377,6 +375,11 @@ func (t *Task) generateConsolidation(ctx context.Context, accountIdx uint64, onC
377375

378376
walletMgr := t.ctx.Scheduler.GetServices().WalletManager()
379377

378+
spamoorClients := make([]*spamoor.Client, len(clients))
379+
for i, c := range clients {
380+
spamoorClients[i] = walletMgr.GetClient(c)
381+
}
382+
380383
txWallet, err := walletMgr.GetWalletByPrivkey(ctx, t.walletPrivKey)
381384
if err != nil {
382385
return nil, fmt.Errorf("cannot initialize wallet: %w", err)
@@ -405,10 +408,9 @@ func (t *Task) generateConsolidation(ctx context.Context, accountIdx uint64, onC
405408
return nil, fmt.Errorf("cannot build consolidation transaction: %w", err)
406409
}
407410

408-
client := walletMgr.GetClient(clients[0])
409-
410411
err = walletMgr.GetTxPool().SendTransaction(ctx, txWallet, tx, &spamoor.SendTransactionOptions{
411-
Client: client,
412+
Client: spamoorClients[0],
413+
ClientList: spamoorClients,
412414
Rebroadcast: true,
413415
OnComplete: onComplete,
414416
LogFn: func(client *spamoor.Client, retry int, rebroadcast int, err error) {

pkg/tasks/generate_deposits/task.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -220,8 +220,6 @@ func (t *Task) Execute(ctx context.Context) error {
220220
// Note: onComplete callback is still called by spamoor even on error,
221221
// so we don't call pendingWg.Done() here
222222
} else {
223-
t.ctx.SetResult(types.TaskResultSuccess)
224-
225223
perSlotCount++
226224
totalCount++
227225

@@ -527,6 +525,11 @@ func (t *Task) generateDeposit(ctx context.Context, accountIdx uint64, onComplet
527525

528526
walletMgr := t.ctx.Scheduler.GetServices().WalletManager()
529527

528+
spamoorClients := make([]*spamoor.Client, len(clients))
529+
for i, c := range clients {
530+
spamoorClients[i] = walletMgr.GetClient(c)
531+
}
532+
530533
txWallet, err := walletMgr.GetWalletByPrivkey(t.ctx.Scheduler.GetTestRunCtx(), t.walletPrivKey)
531534
if err != nil {
532535
return nil, nil, fmt.Errorf("cannot initialize wallet: %w", err)
@@ -551,10 +554,9 @@ func (t *Task) generateDeposit(ctx context.Context, accountIdx uint64, onComplet
551554
return nil, nil, fmt.Errorf("cannot build deposit transaction: %w", err)
552555
}
553556

554-
client := walletMgr.GetClient(clients[0])
555-
556557
err = walletMgr.GetTxPool().SendTransaction(ctx, txWallet, tx, &spamoor.SendTransactionOptions{
557-
Client: client,
558+
Client: spamoorClients[0],
559+
ClientList: spamoorClients,
558560
Rebroadcast: true,
559561
OnComplete: onComplete,
560562
LogFn: func(client *spamoor.Client, retry int, rebroadcast int, err error) {

pkg/tasks/generate_eoa_transactions/task.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -352,10 +352,15 @@ func (t *Task) generateTransaction(ctx context.Context, transactionIdx uint64, c
352352
}
353353

354354
walletMgr := t.ctx.Scheduler.GetServices().WalletManager()
355-
client := walletMgr.GetClient(clients[transactionIdx%uint64(len(clients))])
355+
356+
spamoorClients := make([]*spamoor.Client, len(clients))
357+
for i, c := range clients {
358+
spamoorClients[i] = walletMgr.GetClient(c)
359+
}
356360

357361
err = walletMgr.GetTxPool().SendTransaction(ctx, txWallet, tx, &spamoor.SendTransactionOptions{
358-
Client: client,
362+
Client: spamoorClients[transactionIdx%uint64(len(spamoorClients))],
363+
ClientList: spamoorClients,
359364
Rebroadcast: true,
360365
OnComplete: completeFn,
361366
LogFn: func(client *spamoor.Client, retry int, rebroadcast int, err error) {

0 commit comments

Comments
 (0)