Skip to content

Commit 7682316

Browse files
fix: address slow shutdowns in aggregation loop (#1297)
<!-- Please read and fill out this form before submitting your PR. Please make sure you have reviewed our contributors guide before submitting your first PR. --> ## Overview <!-- Please provide an explanation of the PR, including the appropriate context, background, goal, and rationale. If there is an issue with this information, please provide a tl;dr and link the issue. --> I started trying to debug the test failure in #1058 and #1095 by first cleaning up the test output. I replaced the stdout logger with the file logger and that highlighted some panic conditions of writing to a closed file, which meant there were some hanging threads. The test that was easiest to catch the panic was `TestTx`. One of the issues was the indexer didn't have a ctx so wasn't shutting down cleanly. The second was the edge case of the aggregation loop execution taking some time, resulting in the node shutting down in the middle. I added the ctx check at the end so that any updates are ideally saved to the store before we exit. I tested this locally by running the failing test in a loop. Prior to these changes it panicked every time, afterwards, you can run the test in a loop without failure. <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit ## Summary by CodeRabbit Refactor: - Enhanced the node shutdown process in `block/manager.go` to skip publishing if the node is already shut down, improving system efficiency. - Modified the `createAndStartIndexerService` function in `node/full.go` to require a context during initialization, allowing for better control over the service's lifecycle. Tests: - Updated `node/full_client_test.go` to use a cancellable context and a new file logger, improving test reliability and logging capabilities. - Modified `state/txindex/indexer_service_test.go` to use a cancellable context, ensuring proper cleanup after tests. Style: - Updated `state/txindex/indexer_service.go` to include a context in the `IndexerService` struct and handle its cancellation, aligning with the context-oriented design. <!-- end of auto-generated comment: release notes by coderabbit.ai --> --------- Co-authored-by: Manav Aggarwal <[email protected]>
1 parent 85d54f9 commit 7682316

File tree

5 files changed

+35
-17
lines changed

5 files changed

+35
-17
lines changed

block/manager.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -699,6 +699,13 @@ func (m *Manager) publishBlock(ctx context.Context) error {
699699
return err
700700
}
701701

702+
// Check if the node has shutdown prior to publishing to channels
703+
select {
704+
case <-ctx.Done():
705+
return nil
706+
default:
707+
}
708+
702709
// Publish header to channel so that header exchange service can broadcast
703710
m.HeaderCh <- &block.SignedHeader
704711

node/full.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -435,7 +435,7 @@ func createAndStartIndexerService(
435435
txIndexer = kv.NewTxIndex(ctx, kvStore)
436436
blockIndexer = blockidxkv.New(ctx, newPrefixKV(kvStore, "block_events"))
437437

438-
indexerService := txindex.NewIndexerService(txIndexer, blockIndexer, eventBus)
438+
indexerService := txindex.NewIndexerService(ctx, txIndexer, blockIndexer, eventBus)
439439
indexerService.SetLogger(logger.With("module", "txindex"))
440440

441441
if err := indexerService.Start(); err != nil {

node/full_client_test.go

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ import (
1919
"github.com/cometbft/cometbft/crypto/ed25519"
2020
"github.com/cometbft/cometbft/crypto/encoding"
2121
"github.com/cometbft/cometbft/libs/bytes"
22-
"github.com/cometbft/cometbft/libs/log"
2322
"github.com/cometbft/cometbft/p2p"
2423
"github.com/cometbft/cometbft/proxy"
2524
cmtypes "github.com/cometbft/cometbft/types"
@@ -30,6 +29,7 @@ import (
3029
"github.com/rollkit/rollkit/config"
3130
mockda "github.com/rollkit/rollkit/da/mock"
3231
"github.com/rollkit/rollkit/store"
32+
test "github.com/rollkit/rollkit/test/log"
3333
"github.com/rollkit/rollkit/test/mocks"
3434
"github.com/rollkit/rollkit/types"
3535
abciconv "github.com/rollkit/rollkit/types/abci"
@@ -79,7 +79,7 @@ func getRPC(t *testing.T) (*mocks.Application, *FullClient) {
7979
key, _, _ := crypto.GenerateEd25519Key(crand.Reader)
8080
signingKey, _, _ := crypto.GenerateEd25519Key(crand.Reader)
8181
ctx := context.Background()
82-
node, err := newFullNode(ctx, config.NodeConfig{DALayer: "mock"}, key, signingKey, proxy.NewLocalClientCreator(app), &cmtypes.GenesisDoc{ChainID: "test"}, log.TestingLogger())
82+
node, err := newFullNode(ctx, config.NodeConfig{DALayer: "mock"}, key, signingKey, proxy.NewLocalClientCreator(app), &cmtypes.GenesisDoc{ChainID: "test"}, test.NewFileLogger(t))
8383
require.NoError(err)
8484
require.NotNil(node)
8585

@@ -177,7 +177,7 @@ func TestGenesisChunked(t *testing.T) {
177177
mockApp.On(InitChain, mock.Anything).Return(abci.ResponseInitChain{})
178178
privKey, _, _ := crypto.GenerateEd25519Key(crand.Reader)
179179
signingKey, _, _ := crypto.GenerateEd25519Key(crand.Reader)
180-
n, _ := newFullNode(context.Background(), config.NodeConfig{DALayer: "mock"}, privKey, signingKey, proxy.NewLocalClientCreator(mockApp), genDoc, log.TestingLogger())
180+
n, _ := newFullNode(context.Background(), config.NodeConfig{DALayer: "mock"}, privKey, signingKey, proxy.NewLocalClientCreator(mockApp), genDoc, test.NewFileLogger(t))
181181

182182
rpc := NewFullClient(n)
183183

@@ -483,15 +483,17 @@ func TestTx(t *testing.T) {
483483
mockApp.On(InitChain, mock.Anything).Return(abci.ResponseInitChain{})
484484
key, _, _ := crypto.GenerateEd25519Key(crand.Reader)
485485
genesisValidators, signingKey := getGenesisValidatorSetWithSigner()
486-
node, err := newFullNode(context.Background(), config.NodeConfig{
486+
ctx, cancel := context.WithCancel(context.Background())
487+
defer cancel()
488+
node, err := newFullNode(ctx, config.NodeConfig{
487489
DALayer: "mock",
488490
Aggregator: true,
489491
BlockManagerConfig: config.BlockManagerConfig{
490492
BlockTime: 1 * time.Second, // blocks must be at least 1 sec apart for adjacent headers to get verified correctly
491493
}},
492494
key, signingKey, proxy.NewLocalClientCreator(mockApp),
493495
&cmtypes.GenesisDoc{ChainID: "test", Validators: genesisValidators},
494-
log.TestingLogger())
496+
test.NewFileLogger(t))
495497
require.NoError(err)
496498
require.NotNil(node)
497499

@@ -509,21 +511,21 @@ func TestTx(t *testing.T) {
509511
require.NoError(rpc.node.Stop())
510512
}()
511513
tx1 := cmtypes.Tx("tx1")
512-
res, err := rpc.BroadcastTxSync(context.Background(), tx1)
514+
res, err := rpc.BroadcastTxSync(ctx, tx1)
513515
assert.NoError(err)
514516
assert.NotNil(res)
515517

516518
time.Sleep(2 * time.Second)
517519

518-
resTx, errTx := rpc.Tx(context.Background(), res.Hash, true)
520+
resTx, errTx := rpc.Tx(ctx, res.Hash, true)
519521
assert.NoError(errTx)
520522
assert.NotNil(resTx)
521523
assert.EqualValues(tx1, resTx.Tx)
522524
assert.EqualValues(res.Hash, resTx.Hash)
523525

524526
tx2 := cmtypes.Tx("tx2")
525527
assert.Panics(func() {
526-
resTx, errTx := rpc.Tx(context.Background(), tx2.Hash(), true)
528+
resTx, errTx := rpc.Tx(ctx, tx2.Hash(), true)
527529
assert.Nil(resTx)
528530
assert.Error(errTx)
529531
})
@@ -727,7 +729,7 @@ func createGenesisValidators(t *testing.T, numNodes int, appCreator func(require
727729
dalc := &mockda.DataAvailabilityLayerClient{}
728730
ds, err := store.NewDefaultInMemoryKVStore()
729731
require.Nil(err)
730-
err = dalc.Init([8]byte{}, nil, ds, log.TestingLogger())
732+
err = dalc.Init([8]byte{}, nil, ds, test.NewFileLogger(t))
731733
require.Nil(err)
732734
err = dalc.Start()
733735
require.Nil(err)
@@ -755,7 +757,7 @@ func createGenesisValidators(t *testing.T, numNodes int, appCreator func(require
755757
signingKey,
756758
proxy.NewLocalClientCreator(apps[i]),
757759
&cmtypes.GenesisDoc{ChainID: "test", Validators: genesisValidators},
758-
log.TestingLogger(),
760+
test.NewFileLogger(t),
759761
)
760762
require.NoError(err)
761763
require.NotNil(nodes[i])
@@ -918,7 +920,7 @@ func TestMempool2Nodes(t *testing.T) {
918920
BlockManagerConfig: config.BlockManagerConfig{
919921
BlockTime: 1 * time.Second,
920922
},
921-
}, key1, signingKey1, proxy.NewLocalClientCreator(app), &cmtypes.GenesisDoc{ChainID: "test"}, log.TestingLogger())
923+
}, key1, signingKey1, proxy.NewLocalClientCreator(app), &cmtypes.GenesisDoc{ChainID: "test"}, test.NewFileLogger(t))
922924
require.NoError(err)
923925
require.NotNil(node1)
924926

@@ -928,7 +930,7 @@ func TestMempool2Nodes(t *testing.T) {
928930
ListenAddress: "/ip4/127.0.0.1/tcp/9002",
929931
Seeds: "/ip4/127.0.0.1/tcp/9001/p2p/" + id1.Pretty(),
930932
},
931-
}, key2, signingKey2, proxy.NewLocalClientCreator(app), &cmtypes.GenesisDoc{ChainID: "test"}, log.TestingLogger())
933+
}, key2, signingKey2, proxy.NewLocalClientCreator(app), &cmtypes.GenesisDoc{ChainID: "test"}, test.NewFileLogger(t))
932934
require.NoError(err)
933935
require.NotNil(node1)
934936

@@ -1022,7 +1024,7 @@ func TestStatus(t *testing.T) {
10221024
ChainID: "test",
10231025
Validators: genesisValidators,
10241026
},
1025-
log.TestingLogger(),
1027+
test.NewFileLogger(t),
10261028
)
10271029
require.NoError(err)
10281030
require.NotNil(node)
@@ -1133,7 +1135,7 @@ func TestFutureGenesisTime(t *testing.T) {
11331135
GenesisTime: genesisTime,
11341136
Validators: genesisValidators,
11351137
},
1136-
log.TestingLogger())
1138+
test.NewFileLogger(t))
11371139
require.NoError(err)
11381140
require.NotNil(node)
11391141

state/txindex/indexer_service.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,19 +20,22 @@ const (
2020
type IndexerService struct {
2121
service.BaseService
2222

23+
ctx context.Context
24+
2325
txIdxr TxIndexer
2426
blockIdxr indexer.BlockIndexer
2527
eventBus *types.EventBus
2628
}
2729

2830
// NewIndexerService returns a new service instance.
2931
func NewIndexerService(
32+
ctx context.Context,
3033
txIdxr TxIndexer,
3134
blockIdxr indexer.BlockIndexer,
3235
eventBus *types.EventBus,
3336
) *IndexerService {
3437

35-
is := &IndexerService{txIdxr: txIdxr, blockIdxr: blockIdxr, eventBus: eventBus}
38+
is := &IndexerService{ctx: ctx, txIdxr: txIdxr, blockIdxr: blockIdxr, eventBus: eventBus}
3639
is.BaseService = *service.NewBaseService(nil, "IndexerService", is)
3740
return is
3841
}
@@ -59,6 +62,8 @@ func (is *IndexerService) OnStart() error {
5962
go func() {
6063
for {
6164
select {
65+
case <-is.ctx.Done():
66+
return
6267
case <-blockHeadersSub.Cancelled():
6368
return
6469
case msg := <-blockHeadersSub.Out():
@@ -68,6 +73,8 @@ func (is *IndexerService) OnStart() error {
6873

6974
for i := int64(0); i < eventDataHeader.NumTxs; i++ {
7075
select {
76+
case <-is.ctx.Done():
77+
return
7178
case <-txsSub.Cancelled():
7279
return
7380
case msg2 := <-txsSub.Out():

state/txindex/indexer_service_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,9 @@ func TestIndexerServiceIndexesBlocks(t *testing.T) {
3636
prefixStore := (ktds.Wrap(kvStore, ktds.PrefixTransform{Prefix: ds.NewKey("block_events")}).Children()[0]).(ds.TxnDatastore)
3737
blockIndexer := blockidxkv.New(context.Background(), prefixStore)
3838

39-
service := txindex.NewIndexerService(txIndexer, blockIndexer, eventBus)
39+
ctx, cancel := context.WithCancel(context.Background())
40+
defer cancel()
41+
service := txindex.NewIndexerService(ctx, txIndexer, blockIndexer, eventBus)
4042
service.SetLogger(log.TestingLogger())
4143
err = service.Start()
4244
require.NoError(t, err)

0 commit comments

Comments
 (0)