Skip to content

Commit a51eb25

Browse files
maouehstevenlanders
authored andcommitted
Added TxTracer to DeliverTxEntry which is the first step to support tracing when using OCC (#478)
## Describe your changes and provide context This brings in an interface that can be set on `DeliverTxEntry` and hooks into the `scheduler` so it call's the necessary tracer callbacks when required. Refer to `types/tx_tracer.go` for extra details about the patch. ## Testing performed to validate your change --------- Co-authored-by: Steven Landers <[email protected]>
1 parent a0636c1 commit a51eb25

File tree

4 files changed

+139
-1
lines changed

4 files changed

+139
-1
lines changed

tasks/scheduler.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ type deliverTxTask struct {
5555
AbsoluteIndex int
5656
Response *types.ResponseDeliverTx
5757
VersionStores map[sdk.StoreKey]*multiversion.VersionIndexedStore
58+
TxTracer sdk.TxTracer
5859
}
5960

6061
// AppendDependencies appends the given indexes to the task's dependencies
@@ -84,6 +85,10 @@ func (dt *deliverTxTask) Reset() {
8485
dt.Abort = nil
8586
dt.AbortCh = nil
8687
dt.VersionStores = nil
88+
89+
if dt.TxTracer != nil {
90+
dt.TxTracer.Reset()
91+
}
8792
}
8893

8994
func (dt *deliverTxTask) Increment() {
@@ -188,7 +193,9 @@ func toTasks(reqs []*sdk.DeliverTxEntry) ([]*deliverTxTask, map[int]*deliverTxTa
188193
AbsoluteIndex: r.AbsoluteIndex,
189194
Status: statusPending,
190195
Dependencies: map[int]struct{}{},
196+
TxTracer: r.TxTracer,
191197
}
198+
192199
tasksMap[r.AbsoluteIndex] = task
193200
allTasks = append(allTasks, task)
194201
}
@@ -199,6 +206,10 @@ func (s *scheduler) collectResponses(tasks []*deliverTxTask) []types.ResponseDel
199206
res := make([]types.ResponseDeliverTx, 0, len(tasks))
200207
for _, t := range tasks {
201208
res = append(res, *t.Response)
209+
210+
if t.TxTracer != nil {
211+
t.TxTracer.Commit()
212+
}
202213
}
203214
return res
204215
}
@@ -510,6 +521,10 @@ func (s *scheduler) prepareTask(task *deliverTxTask) {
510521
ctx = ctx.WithMultiStore(ms)
511522
}
512523

524+
if task.TxTracer != nil {
525+
ctx = task.TxTracer.InjectInContext(ctx)
526+
}
527+
513528
task.AbortCh = abortCh
514529
task.Ctx = ctx
515530
}

tasks/scheduler_test.go

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -395,6 +395,45 @@ func TestProcessAll(t *testing.T) {
395395
},
396396
expectedErr: nil,
397397
},
398+
{
399+
name: "Test tx Reset properly before re-execution via tracer",
400+
workers: 10,
401+
runs: 1,
402+
addStores: true,
403+
requests: addTxTracerToTxEntries(requestList(250)),
404+
deliverTxFunc: func(ctx sdk.Context, req types.RequestDeliverTx, tx sdk.Tx, checksum [32]byte) (res types.ResponseDeliverTx) {
405+
defer abortRecoveryFunc(&res)
406+
wait := rand.Intn(10)
407+
time.Sleep(time.Duration(wait) * time.Millisecond)
408+
// all txs read and write to the same key to maximize conflicts
409+
kv := ctx.MultiStore().GetKVStore(testStoreKey)
410+
val := string(kv.Get(itemKey))
411+
time.Sleep(time.Duration(wait) * time.Millisecond)
412+
// write to the store with this tx's index
413+
newVal := val + fmt.Sprintf("%d", ctx.TxIndex())
414+
kv.Set(itemKey, []byte(newVal))
415+
416+
if v, ok := ctx.Context().Value("test_tracer").(*testTxTracer); ok {
417+
v.OnTxExecute()
418+
}
419+
420+
// return what was read from the store (final attempt should be index-1)
421+
return types.ResponseDeliverTx{
422+
Info: newVal,
423+
}
424+
},
425+
assertions: func(t *testing.T, ctx sdk.Context, res []types.ResponseDeliverTx) {
426+
expected := ""
427+
for idx, response := range res {
428+
expected = expected + fmt.Sprintf("%d", idx)
429+
require.Equal(t, expected, response.Info)
430+
}
431+
// confirm last write made it to the parent store
432+
latest := ctx.MultiStore().GetKVStore(testStoreKey).Get(itemKey)
433+
require.Equal(t, expected, string(latest))
434+
},
435+
expectedErr: nil,
436+
},
398437
}
399438

400439
for _, tt := range tests {
@@ -428,3 +467,42 @@ func TestProcessAll(t *testing.T) {
428467
})
429468
}
430469
}
470+
471+
func addTxTracerToTxEntries(txEntries []*sdk.DeliverTxEntry) []*sdk.DeliverTxEntry {
472+
for _, txEntry := range txEntries {
473+
txEntry.TxTracer = newTestTxTracer(txEntry.AbsoluteIndex)
474+
}
475+
476+
return txEntries
477+
}
478+
479+
var _ sdk.TxTracer = &testTxTracer{}
480+
481+
func newTestTxTracer(txIndex int) *testTxTracer {
482+
return &testTxTracer{txIndex: txIndex, canExecute: true}
483+
}
484+
485+
type testTxTracer struct {
486+
txIndex int
487+
canExecute bool
488+
}
489+
490+
func (t *testTxTracer) Commit() {
491+
t.canExecute = false
492+
}
493+
494+
func (t *testTxTracer) InjectInContext(ctx sdk.Context) sdk.Context {
495+
return ctx.WithContext(context.WithValue(ctx.Context(), "test_tracer", t))
496+
}
497+
498+
func (t *testTxTracer) Reset() {
499+
t.canExecute = true
500+
}
501+
502+
func (t *testTxTracer) OnTxExecute() {
503+
if !t.canExecute {
504+
panic(fmt.Errorf("task #%d was asked to execute but the tracer is not in the correct state, most probably due to missing Reset call or over execution", t.txIndex))
505+
}
506+
507+
t.canExecute = false
508+
}

types/tx_batch.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,14 @@ import (
66
)
77

88
// DeliverTxEntry represents an individual transaction's request within a batch.
9-
// This can be extended to include tx-level tracing or metadata
9+
// This can be extended to include tx-level metadata
1010
type DeliverTxEntry struct {
1111
Request abci.RequestDeliverTx
1212
SdkTx Tx
1313
Checksum [32]byte
1414
AbsoluteIndex int
1515
EstimatedWritesets MappedWritesets
16+
TxTracer TxTracer
1617
}
1718

1819
// EstimatedWritesets represents an estimated writeset for a transaction mapped by storekey to the writeset estimate.

types/tx_tracer.go

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
package types
2+
3+
// TxTracer is an interface for tracing transactions generic
4+
// enough to be used by any transaction processing engine be it
5+
// CoWasm or EVM.
6+
//
7+
// The TxTracer responsibility is to inject itself in the context
8+
// that will be used to process the transaction. How the context
9+
// will be used afterward is up to the transaction processing engine.
10+
//
11+
// Today, only EVM transaction processing engine do something with the
12+
// TxTracer (it inject itself into the EVM execution context for
13+
// go-ethereum level tracing).
14+
//
15+
// The TxTracer receives signals from the scheduler when the tracer
16+
// should be reset because the transaction is being re-executed and
17+
// when the transaction is committed.
18+
type TxTracer interface {
19+
// InjectInContext injects the transaction specific tracer in the context
20+
// that will be used to process the transaction.
21+
//
22+
// For now only the EVM transaction processing engine uses the tracer
23+
// so it only make sense to inject an EVM tracer. Future updates might
24+
// add the possibility to inject a tracer for other transaction kind.
25+
//
26+
// Which tracer implementation to provied and how will be retrieved later on
27+
// from the context is dependent on the transaction processing engine.
28+
InjectInContext(ctx Context) Context
29+
30+
// Reset is called when the transaction is being re-executed and the tracer
31+
// should be reset. A transaction executed by the OCC parallel engine might
32+
// be re-executed multiple times before being committed, each time `Reset`
33+
// will be called.
34+
//
35+
// When Reset is received, it means everything that was traced before should
36+
// be discarded.
37+
Reset()
38+
39+
// Commit is called when the transaction is committed. This is the last signal
40+
// the tracer will receive for a given transaction. After this call, the tracer
41+
// should do whatever it needs to forward the tracing information to the
42+
// appropriate place/collector.
43+
Commit()
44+
}

0 commit comments

Comments
 (0)