Skip to content

Commit 97579a5

Browse files
authored
fix(listener): fix memory leak in core.listener (#4733)
1 parent 42a61ce commit 97579a5

File tree

4 files changed

+17
-19
lines changed

4 files changed

+17
-19
lines changed

core/fetcher.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -145,8 +145,8 @@ func (f *BlockFetcher) ValidatorSet(ctx context.Context, height int64) (*types.V
145145

146146
// SubscribeNewBlockEvent subscribes to new block events from Core, returning
147147
// a new block event channel.
148-
func (f *BlockFetcher) SubscribeNewBlockEvent(ctx context.Context) (chan types.EventDataSignedBlock, error) {
149-
signedBlockCh := make(chan types.EventDataSignedBlock, 1)
148+
func (f *BlockFetcher) SubscribeNewBlockEvent(ctx context.Context) (chan SignedBlock, error) {
149+
signedBlockCh := make(chan SignedBlock, 1)
150150

151151
go func() {
152152
defer close(signedBlockCh)
@@ -178,7 +178,7 @@ func (f *BlockFetcher) SubscribeNewBlockEvent(ctx context.Context) (chan types.E
178178

179179
func (f *BlockFetcher) receive(
180180
ctx context.Context,
181-
signedBlockCh chan types.EventDataSignedBlock,
181+
signedBlockCh chan SignedBlock,
182182
subscription coregrpc.BlockAPI_SubscribeNewHeightsClient,
183183
) error {
184184
log.Debug("fetcher: started listening for new blocks")
@@ -198,11 +198,11 @@ func (f *BlockFetcher) receive(
198198
}
199199

200200
select {
201-
case signedBlockCh <- types.EventDataSignedBlock{
202-
Header: *signedBlock.Header,
203-
Commit: *signedBlock.Commit,
204-
ValidatorSet: *signedBlock.ValidatorSet,
205-
Data: *signedBlock.Data,
201+
case signedBlockCh <- SignedBlock{
202+
Header: signedBlock.Header,
203+
Commit: signedBlock.Commit,
204+
ValidatorSet: signedBlock.ValidatorSet,
205+
Data: signedBlock.Data,
206206
}:
207207
case <-ctx.Done():
208208
return ctx.Err()

core/fetcher_no_race_test.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import (
77
"testing"
88
"time"
99

10-
"github.com/cometbft/cometbft/types"
1110
"github.com/stretchr/testify/assert"
1211
"github.com/stretchr/testify/require"
1312
)
@@ -43,7 +42,7 @@ func TestBlockFetcherHeaderValues(t *testing.T) {
4342
valSet, err := fetcher.ValidatorSet(ctx, h)
4443
require.NoError(t, err)
4544
// get next block
46-
var nextBlock types.EventDataSignedBlock
45+
var nextBlock SignedBlock
4746
select {
4847
case nextBlock = <-newBlockChan:
4948
case <-ctx.Done():

core/fetcher_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,10 @@ func TestBlockFetcher_GetBlock_and_SubscribeNewBlockEvent(t *testing.T) {
3232
h := newBlockFromChan.Header.Height
3333
block, err := fetcher.GetSignedBlock(ctx, h)
3434
require.NoError(t, err)
35-
assert.Equal(t, newBlockFromChan.Data, *block.Data)
36-
assert.Equal(t, newBlockFromChan.Header, *block.Header)
37-
assert.Equal(t, newBlockFromChan.Commit, *block.Commit)
38-
assert.Equal(t, newBlockFromChan.ValidatorSet, *block.ValidatorSet)
35+
assert.Equal(t, newBlockFromChan.Data, block.Data)
36+
assert.Equal(t, newBlockFromChan.Header, block.Header)
37+
assert.Equal(t, newBlockFromChan.Commit, block.Commit)
38+
assert.Equal(t, newBlockFromChan.ValidatorSet, block.ValidatorSet)
3939
require.GreaterOrEqual(t, newBlockFromChan.Header.Height, int64(i))
4040
case <-ctx.Done():
4141
require.NoError(t, ctx.Err())

core/listener.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import (
66
"fmt"
77
"time"
88

9-
"github.com/cometbft/cometbft/types"
109
pubsub "github.com/libp2p/go-libp2p-pubsub"
1110
"go.opentelemetry.io/otel/attribute"
1211
"go.opentelemetry.io/otel/trace"
@@ -126,7 +125,7 @@ func (cl *Listener) Stop(ctx context.Context) error {
126125
// listen kicks off a loop, listening for new block events from Core,
127126
// generating ExtendedHeaders and broadcasting them to the header-sub
128127
// gossipsub network.
129-
func (cl *Listener) listen(ctx context.Context, sub <-chan types.EventDataSignedBlock) {
128+
func (cl *Listener) listen(ctx context.Context, sub <-chan SignedBlock) {
130129
defer close(cl.closed)
131130
defer log.Info("listener: listening stopped")
132131
timeout := time.NewTimer(cl.listenerTimeout)
@@ -166,7 +165,7 @@ func (cl *Listener) listen(ctx context.Context, sub <-chan types.EventDataSigned
166165
}
167166
}
168167

169-
func (cl *Listener) handleNewSignedBlock(ctx context.Context, b types.EventDataSignedBlock) error {
168+
func (cl *Listener) handleNewSignedBlock(ctx context.Context, b SignedBlock) error {
170169
var err error
171170

172171
ctx, span := tracer.Start(ctx, "listener/handleNewSignedBlock")
@@ -183,7 +182,7 @@ func (cl *Listener) handleNewSignedBlock(ctx context.Context, b types.EventDataS
183182
}
184183

185184
// generate extended header
186-
eh, err := cl.construct(&b.Header, &b.Commit, &b.ValidatorSet, eds)
185+
eh, err := cl.construct(b.Header, b.Commit, b.ValidatorSet, eds)
187186
if err != nil {
188187
panic(fmt.Errorf("making extended header: %w", err))
189188
}
@@ -212,7 +211,7 @@ func (cl *Listener) handleNewSignedBlock(ctx context.Context, b types.EventDataS
212211
if err != nil && !errors.Is(err, context.Canceled) {
213212
log.Errorw("listener: broadcasting data hash",
214213
"height", b.Header.Height,
215-
"hash", b.Header.Hash(), "err", err) // TODO: hash or datahash?
214+
"datahash", eh.DAH.String(), "err", err)
216215
}
217216
}
218217

0 commit comments

Comments
 (0)