Skip to content

Commit a64bcc4

Browse files
Merge pull request #3454 from OffchainLabs/sequencetxs-using-hooks
Tx sequencing using hooks
2 parents 6564891 + 8a5bf7c commit a64bcc4

File tree

9 files changed

+263
-87
lines changed

9 files changed

+263
-87
lines changed

arbos/block_processor.go

Lines changed: 55 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,8 @@ func createNewHeader(prevHeader *types.Header, l1info *L1Info, state *arbosState
115115
type ConditionalOptionsForTx []*arbitrum_types.ConditionalOptions
116116

117117
type SequencingHooks struct {
118+
NextTxToSequence func() (*types.Transaction, error) // Must be set
119+
SequencedTx func(int) (*types.Transaction, error) // Must be set
118120
TxErrors []error // This can be unset
119121
DiscardInvalidTxsEarly bool // This can be unset
120122
PreTxFilter func(*params.ChainConfig, *types.Header, *state.StateDB, *arbosState.ArbosState, *types.Transaction, *arbitrum_types.ConditionalOptions, common.Address, *L1Info) error // This has to be set. Writes to *state.StateDB object should be avoided to prevent invalid state from permeating
@@ -123,8 +125,43 @@ type SequencingHooks struct {
123125
ConditionalOptionsForTx []*arbitrum_types.ConditionalOptions // This can be unset
124126
}
125127

126-
func NoopSequencingHooks() *SequencingHooks {
128+
type noopTxScheduler struct {
129+
txs types.Transactions
130+
scheduledTxsCount int
131+
}
132+
133+
func (s *noopTxScheduler) GetNextTx() (*types.Transaction, error) {
134+
// This is not supposed to happen, if so we have a bug
135+
if s.scheduledTxsCount > len(s.txs) {
136+
return nil, errors.New("noopTxScheduler: requested too many transactions")
137+
}
138+
if s.scheduledTxsCount == len(s.txs) {
139+
return nil, nil
140+
}
141+
s.scheduledTxsCount += 1
142+
return s.txs[s.scheduledTxsCount-1], nil
143+
}
144+
145+
func (s *noopTxScheduler) GetScheduledTx(txId int) (*types.Transaction, error) {
146+
// This is not supposed to happen, if so we have a bug
147+
if txId > len(s.txs) {
148+
return nil, errors.New("transaction queried for does not exist in the noopTxScheduler")
149+
}
150+
// This is not supposed to happen, if so we have a bug
151+
if txId > s.scheduledTxsCount {
152+
return nil, errors.New("transaction queried for was not scheduled by the noopTxScheduler")
153+
}
154+
return s.txs[txId], nil
155+
}
156+
157+
func NoopSequencingHooks(txes types.Transactions) *SequencingHooks {
158+
scheduler := &noopTxScheduler{
159+
txes,
160+
0,
161+
}
127162
return &SequencingHooks{
163+
scheduler.GetNextTx,
164+
scheduler.GetScheduledTx,
128165
[]error{},
129166
false,
130167
func(*params.ChainConfig, *types.Header, *state.StateDB, *arbosState.ArbosState, *types.Transaction, *arbitrum_types.ConditionalOptions, common.Address, *L1Info) error {
@@ -153,17 +190,16 @@ func ProduceBlock(
153190
log.Warn("error parsing incoming message", "err", err)
154191
txes = types.Transactions{}
155192
}
193+
hooks := NoopSequencingHooks(txes)
156194

157-
hooks := NoopSequencingHooks()
158195
return ProduceBlockAdvanced(
159-
message.Header, txes, delayedMessagesRead, lastBlockHeader, statedb, chainContext, hooks, isMsgForPrefetch, runCtx,
196+
message.Header, delayedMessagesRead, lastBlockHeader, statedb, chainContext, hooks, isMsgForPrefetch, runCtx,
160197
)
161198
}
162199

163200
// A bit more flexible than ProduceBlock for use in the sequencer.
164201
func ProduceBlockAdvanced(
165202
l1Header *arbostypes.L1IncomingMessageHeader,
166-
txes types.Transactions,
167203
delayedMessagesRead uint64,
168204
lastBlockHeader *types.Header,
169205
statedb *state.StateDB,
@@ -200,7 +236,6 @@ func ProduceBlockAdvanced(
200236

201237
// Prepend a tx before all others to touch up the state (update the L1 block num, pricing pools, etc)
202238
startTx := InternalTxStartBlock(chainConfig.ChainID, l1Header.L1BaseFee, l1BlockNum, header, lastBlockHeader)
203-
txes = append(types.Transactions{types.NewTx(startTx)}, txes...)
204239

205240
complete := types.Transactions{}
206241
receipts := types.Receipts{}
@@ -213,14 +248,19 @@ func ProduceBlockAdvanced(
213248
// We'll check that the block can fit each message, so this pool is set to not run out
214249
gethGas := core.GasPool(l2pricing.GethBlockGasLimit)
215250

216-
for len(txes) > 0 || len(redeems) > 0 {
251+
firstTx := types.NewTx(startTx)
252+
253+
for {
217254
// repeatedly process the next tx, doing redeems created along the way in FIFO order
218255

219256
var tx *types.Transaction
220257
var options *arbitrum_types.ConditionalOptions
221-
hooks := NoopSequencingHooks()
258+
hooks := NoopSequencingHooks(nil) // TODO: NIT-3678
222259
isUserTx := false
223-
if len(redeems) > 0 {
260+
if firstTx != nil {
261+
tx = firstTx
262+
firstTx = nil
263+
} else if len(redeems) > 0 {
224264
tx = redeems[0]
225265
redeems = redeems[1:]
226266

@@ -234,8 +274,13 @@ func ProduceBlockAdvanced(
234274
continue
235275
}
236276
} else {
237-
tx = txes[0]
238-
txes = txes[1:]
277+
tx, err = sequencingHooks.NextTxToSequence()
278+
if err != nil {
279+
return nil, nil, fmt.Errorf("error fetching next transaction to sequence, userTxsProcessed: %d, hookTxErrors: %d, err: %w", userTxsProcessed, len(sequencingHooks.TxErrors), err)
280+
}
281+
if tx == nil {
282+
break
283+
}
239284
if tx.Type() != types.ArbitrumInternalTxType {
240285
hooks = sequencingHooks // the sequencer has the ability to drop this tx
241286
isUserTx = true

execution/gethexec/executionengine.go

Lines changed: 21 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -372,10 +372,14 @@ func (s *ExecutionEngine) NextDelayedMessageNumber() (uint64, error) {
372372
return currentHeader.Nonce.Uint64(), nil
373373
}
374374

375-
func MessageFromTxes(header *arbostypes.L1IncomingMessageHeader, txes types.Transactions, txErrors []error) (*arbostypes.L1IncomingMessage, error) {
375+
func MessageFromTxes(header *arbostypes.L1IncomingMessageHeader, hooks *arbos.SequencingHooks) (*arbostypes.L1IncomingMessage, error) {
376376
var l2Message []byte
377-
if len(txes) == 1 && txErrors[0] == nil {
378-
txBytes, err := txes[0].MarshalBinary()
377+
if len(hooks.TxErrors) == 1 && hooks.TxErrors[0] == nil {
378+
tx, err := hooks.SequencedTx(0)
379+
if err != nil {
380+
return nil, err
381+
}
382+
txBytes, err := tx.MarshalBinary()
379383
if err != nil {
380384
return nil, err
381385
}
@@ -384,10 +388,14 @@ func MessageFromTxes(header *arbostypes.L1IncomingMessageHeader, txes types.Tran
384388
} else {
385389
l2Message = append(l2Message, arbos.L2MessageKind_Batch)
386390
sizeBuf := make([]byte, 8)
387-
for i, tx := range txes {
388-
if txErrors[i] != nil {
391+
for i := 0; i < len(hooks.TxErrors); i++ {
392+
if hooks.TxErrors[i] != nil {
389393
continue
390394
}
395+
tx, err := hooks.SequencedTx(i)
396+
if err != nil {
397+
return nil, err
398+
}
391399
txBytes, err := tx.MarshalBinary()
392400
if err != nil {
393401
return nil, err
@@ -451,9 +459,9 @@ func (s *ExecutionEngine) resequenceReorgedMessages(messages []*arbostypes.Messa
451459
log.Warn("failed to parse sequencer message found from reorg", "err", err)
452460
continue
453461
}
454-
hooks := arbos.NoopSequencingHooks()
462+
hooks := arbos.NoopSequencingHooks(txes)
455463
hooks.DiscardInvalidTxsEarly = true
456-
_, err = s.sequenceTransactionsWithBlockMutex(msg.Message.Header, txes, hooks, nil)
464+
_, err = s.sequenceTransactionsWithBlockMutex(msg.Message.Header, hooks, nil)
457465
if err != nil {
458466
log.Error("failed to re-sequence old user message removed by reorg", "err", err)
459467
return
@@ -490,17 +498,17 @@ func (s *ExecutionEngine) sequencerWrapper(sequencerFunc func() (*types.Block, e
490498
}
491499
}
492500

493-
func (s *ExecutionEngine) SequenceTransactions(header *arbostypes.L1IncomingMessageHeader, txes types.Transactions, hooks *arbos.SequencingHooks, timeboostedTxs map[common.Hash]struct{}) (*types.Block, error) {
501+
func (s *ExecutionEngine) SequenceTransactions(header *arbostypes.L1IncomingMessageHeader, hooks *arbos.SequencingHooks, timeboostedTxs map[common.Hash]struct{}) (*types.Block, error) {
494502
return s.sequencerWrapper(func() (*types.Block, error) {
495503
hooks.TxErrors = nil
496-
return s.sequenceTransactionsWithBlockMutex(header, txes, hooks, timeboostedTxs)
504+
return s.sequenceTransactionsWithBlockMutex(header, hooks, timeboostedTxs)
497505
})
498506
}
499507

500508
// SequenceTransactionsWithProfiling runs SequenceTransactions with tracing and
501509
// CPU profiling enabled. If the block creation takes longer than 2 seconds, it
502510
// keeps both and prints out filenames in an error log line.
503-
func (s *ExecutionEngine) SequenceTransactionsWithProfiling(header *arbostypes.L1IncomingMessageHeader, txes types.Transactions, hooks *arbos.SequencingHooks, timeboostedTxs map[common.Hash]struct{}) (*types.Block, error) {
511+
func (s *ExecutionEngine) SequenceTransactionsWithProfiling(header *arbostypes.L1IncomingMessageHeader, hooks *arbos.SequencingHooks, timeboostedTxs map[common.Hash]struct{}) (*types.Block, error) {
504512
pprofBuf, traceBuf := bytes.NewBuffer(nil), bytes.NewBuffer(nil)
505513
if err := pprof.StartCPUProfile(pprofBuf); err != nil {
506514
log.Error("Starting CPU profiling", "error", err)
@@ -509,7 +517,7 @@ func (s *ExecutionEngine) SequenceTransactionsWithProfiling(header *arbostypes.L
509517
log.Error("Starting tracing", "error", err)
510518
}
511519
start := time.Now()
512-
res, err := s.SequenceTransactions(header, txes, hooks, timeboostedTxs)
520+
res, err := s.SequenceTransactions(header, hooks, timeboostedTxs)
513521
elapsed := time.Since(start)
514522
pprof.StopCPUProfile()
515523
trace.Stop()
@@ -535,7 +543,7 @@ func writeAndLog(pprof, trace *bytes.Buffer) {
535543
log.Info("Transactions sequencing took longer than 2 seconds, created pprof and trace files", "pprof", pprofFile, "traceFile", traceFile)
536544
}
537545

538-
func (s *ExecutionEngine) sequenceTransactionsWithBlockMutex(header *arbostypes.L1IncomingMessageHeader, txes types.Transactions, hooks *arbos.SequencingHooks, timeboostedTxs map[common.Hash]struct{}) (*types.Block, error) {
546+
func (s *ExecutionEngine) sequenceTransactionsWithBlockMutex(header *arbostypes.L1IncomingMessageHeader, hooks *arbos.SequencingHooks, timeboostedTxs map[common.Hash]struct{}) (*types.Block, error) {
539547
lastBlockHeader, err := s.getCurrentHeader()
540548
if err != nil {
541549
return nil, err
@@ -563,7 +571,6 @@ func (s *ExecutionEngine) sequenceTransactionsWithBlockMutex(header *arbostypes.
563571
startTime := time.Now()
564572
block, receipts, err := arbos.ProduceBlockAdvanced(
565573
header,
566-
txes,
567574
delayedMessagesRead,
568575
lastBlockHeader,
569576
statedb,
@@ -577,9 +584,6 @@ func (s *ExecutionEngine) sequenceTransactionsWithBlockMutex(header *arbostypes.
577584
}
578585
blockCalcTime := time.Since(startTime)
579586
blockExecutionTimer.Update(blockCalcTime.Nanoseconds())
580-
if len(hooks.TxErrors) != len(txes) {
581-
return nil, fmt.Errorf("unexpected number of error results: %v vs number of txes %v", len(hooks.TxErrors), len(txes))
582-
}
583587

584588
if len(receipts) == 0 {
585589
return nil, nil
@@ -596,7 +600,7 @@ func (s *ExecutionEngine) sequenceTransactionsWithBlockMutex(header *arbostypes.
596600
return nil, nil
597601
}
598602

599-
msg, err := MessageFromTxes(header, txes, hooks.TxErrors)
603+
msg, err := MessageFromTxes(header, hooks)
600604
if err != nil {
601605
return nil, err
602606
}

0 commit comments

Comments
 (0)