Skip to content

Commit 88984a2

Browse files
authored
txnprovider/shutter: fix premature encrypted txn pool cleanup bug and peer drops (#18264)
for #18217 and #18263
1 parent d7fc84d commit 88984a2

File tree

3 files changed

+146
-6
lines changed

3 files changed

+146
-6
lines changed

txnprovider/shutter/decryption_keys_processor.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -345,7 +345,7 @@ func (dkp *DecryptionKeysProcessor) processBlockEventCleanup(blockEvent BlockEve
345345

346346
for _, mark := range cleanUpMarks {
347347
dkp.processed.Remove(mark)
348-
dkp.encryptedTxnsPool.DeleteUpTo(mark.Eon, mark.To+1)
348+
dkp.encryptedTxnsPool.DeleteUpTo(mark.Eon, mark.To)
349349
}
350350

351351
return nil

txnprovider/shutter/decryption_keys_source.go

Lines changed: 54 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
pubsub "github.com/libp2p/go-libp2p-pubsub"
3030
libp2pcrypto "github.com/libp2p/go-libp2p/core/crypto"
3131
"github.com/libp2p/go-libp2p/core/host"
32+
"github.com/libp2p/go-libp2p/core/network"
3233
"github.com/libp2p/go-libp2p/core/peer"
3334
"github.com/multiformats/go-multiaddr"
3435
"golang.org/x/sync/errgroup"
@@ -136,10 +137,11 @@ func (dks *PubSubDecryptionKeysSource) Run(ctx context.Context) error {
136137
})
137138

138139
eg.Go(func() error {
139-
select {
140-
case <-ctx.Done(): // to keep the host and topic alive until Run's ctx is cancelled
141-
return ctx.Err()
140+
err := dks.reconnectBootstrapNodes(ctx, p2pHost)
141+
if err != nil {
142+
return fmt.Errorf("reconnect bootstrap nodes failure: %w", err)
142143
}
144+
return nil
143145
})
144146

145147
return eg.Wait()
@@ -194,6 +196,7 @@ func (dks *PubSubDecryptionKeysSource) initP2pHost() (host.Host, error) {
194196
return nil, err
195197
}
196198

199+
p2pHost.Network().Notify(&peerNotifiee{logger: dks.logger})
197200
dks.logger.Info("shutter p2p host initialised", "addr", listenAddr, "id", p2pHost.ID())
198201
return p2pHost, nil
199202
}
@@ -331,6 +334,34 @@ func (dks *PubSubDecryptionKeysSource) peerInfoLoop(ctx context.Context, pubSub
331334
}
332335
}
333336

337+
func (dks *PubSubDecryptionKeysSource) reconnectBootstrapNodes(ctx context.Context, host host.Host) error {
338+
bootstrapNodes, err := dks.config.BootstrapNodesAddrInfo()
339+
if err != nil {
340+
return err
341+
}
342+
ticker := time.NewTicker(30 * time.Second)
343+
defer ticker.Stop()
344+
for {
345+
select {
346+
case <-ctx.Done():
347+
return ctx.Err()
348+
case <-ticker.C:
349+
for _, node := range bootstrapNodes {
350+
connectedness := host.Network().Connectedness(node.ID)
351+
if connectedness != network.Connected {
352+
dks.logger.Warn("lost connection to shutter bootstrap node, reconnecting", "node", node, "connectedness", connectedness)
353+
err := host.Connect(ctx, node)
354+
if err != nil {
355+
dks.logger.Warn("failed to reconnect to bootstrap node", "node", node, "err", err)
356+
} else {
357+
dks.logger.Info("reconnected to shutter bootstrap node", "node", node)
358+
}
359+
}
360+
}
361+
}
362+
}
363+
}
364+
334365
func decryptionKeysTopicScoreParams() *pubsub.TopicScoreParams {
335366
// NOTE: this is taken from
336367
// https://github.com/shutter-network/rolling-shutter/blob/main/rolling-shutter/p2p/params.go#L100
@@ -358,3 +389,23 @@ func decryptionKeysTopicScoreParams() *pubsub.TopicScoreParams {
358389
InvalidMessageDeliveriesDecay: 0.9994,
359390
}
360391
}
392+
393+
type peerNotifiee struct {
394+
logger log.Logger
395+
}
396+
397+
func (n *peerNotifiee) Listen(network.Network, multiaddr.Multiaddr) {
398+
// no-op
399+
}
400+
401+
func (n *peerNotifiee) ListenClose(network.Network, multiaddr.Multiaddr) {
402+
// no-op
403+
}
404+
405+
func (n *peerNotifiee) Connected(_ network.Network, conn network.Conn) {
406+
n.logger.Debug("shutter peer connected", "peer", conn.RemotePeer(), "addr", conn.RemoteMultiaddr())
407+
}
408+
409+
func (n *peerNotifiee) Disconnected(_ network.Network, conn network.Conn) {
410+
n.logger.Debug("shutter peer disconnected", "peer", conn.RemotePeer(), "addr", conn.RemoteMultiaddr())
411+
}

txnprovider/shutter/pool_test.go

Lines changed: 91 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,10 +69,12 @@ func TestPoolCleanup(t *testing.T) {
6969
// simulate the first block
7070
err = handle.SimulateNewBlockChange(ctx)
7171
require.NoError(t, err)
72+
synctest.Wait()
73+
require.Len(t, pool.AllEncryptedTxns(), 0)
74+
require.Len(t, pool.AllDecryptedTxns(), 0)
7275
// simulate some encrypted txn submissions and simulate a new block
7376
encTxn1 := MockEncryptedTxn(t, handle.config.ChainId, ekg.Eon())
7477
encTxn2 := MockEncryptedTxn(t, handle.config.ChainId, ekg.Eon())
75-
require.Len(t, pool.AllEncryptedTxns(), 0)
7678
err = handle.SimulateLogEvents(ctx, []types.Log{
7779
MockTxnSubmittedEventLog(t, handle.config, ekg.Eon(), 1, encTxn1),
7880
MockTxnSubmittedEventLog(t, handle.config, ekg.Eon(), 2, encTxn2),
@@ -83,29 +85,114 @@ func TestPoolCleanup(t *testing.T) {
8385
require.NoError(t, err)
8486
synctest.Wait()
8587
require.Len(t, pool.AllEncryptedTxns(), 2)
88+
require.Len(t, pool.AllDecryptedTxns(), 0)
8689
// simulate decryption keys
8790
handle.SimulateCurrentSlot()
8891
handle.SimulateDecryptionKeys(ctx, t, ekg, 1, encTxn1.IdentityPreimage, encTxn2.IdentityPreimage)
8992
synctest.Wait()
93+
require.Len(t, pool.AllEncryptedTxns(), 2)
9094
require.Len(t, pool.AllDecryptedTxns(), 2)
9195
// simulate one block passing by - decrypted txns pool should get cleaned up after 1 slot
9296
handle.SimulateCachedEonRead(t, ekg)
9397
err = handle.SimulateNewBlockChange(ctx)
9498
require.NoError(t, err)
9599
synctest.Wait()
100+
require.Len(t, pool.AllEncryptedTxns(), 2)
96101
require.Len(t, pool.AllDecryptedTxns(), 0)
97102
// simulate more blocks passing by - encrypted txns pool should get cleaned up after config.ReorgDepthAwareness
98103
handle.SimulateCachedEonRead(t, ekg)
99104
err = handle.SimulateNewBlockChange(ctx)
100105
require.NoError(t, err)
106+
synctest.Wait()
107+
require.Len(t, pool.AllEncryptedTxns(), 2)
108+
require.Len(t, pool.AllDecryptedTxns(), 0)
109+
handle.SimulateCachedEonRead(t, ekg)
110+
err = handle.SimulateNewBlockChange(ctx)
111+
require.NoError(t, err)
112+
synctest.Wait()
113+
require.Len(t, pool.AllEncryptedTxns(), 2)
114+
require.Len(t, pool.AllDecryptedTxns(), 0)
115+
handle.SimulateCachedEonRead(t, ekg)
116+
err = handle.SimulateNewBlockChange(ctx)
117+
require.NoError(t, err)
118+
synctest.Wait()
119+
require.Len(t, pool.AllEncryptedTxns(), 0)
120+
require.Len(t, pool.AllDecryptedTxns(), 0)
121+
})
122+
}
123+
124+
func TestPoolCleanupShouldNotDeleteNewEncTxnsDueToConsecutiveEmptyDecrMsgs(t *testing.T) {
125+
/*
126+
Reproduces the following bug:
127+
1. we were getting decryption key messages for our slots with 0 keys for transactions (valid behaviour when no transactions to decrypt)
128+
2. we got an encrypted txn submission at block 19,182,545
129+
3. got block event for 19,182,546 (next block)
130+
4. encrypted txn got deleted from the pool (although it was just added 1 block ago)
131+
5. the decryption keys arrived for the slot of block 19,182,547 and the txn was missed
132+
133+
[DBUG] [12-10|13:05:25.131] received encrypted txn submission event component=shutter eonIndex=14 txnIndex=47 blockNum=19182545 unwind=false
134+
...
135+
[DBUG] [12-10|13:05:30.155] block tracker got block event component=shutter blockNum=19182546
136+
[DBUG] [12-10|13:05:30.155] deleted encrypted txns component=shutter count=1 fromEon=14 fromTxn=47 toEon=14 toTxn=47
137+
...
138+
[DBUG] [12-10|13:05:30.342] processing decryption keys message component=shutter instanceId=102000 eonIndex=14 slot=19995127 txnIndex=47 keys=2
139+
[DBUG] [12-10|13:05:30.342] looking up encrypted txns for component=shutter eon=14 from=47 to=48 gasLimit=10000000
140+
*/
141+
t.Parallel()
142+
pt := PoolTest{t}
143+
pt.Run(func(ctx context.Context, t *testing.T, pool *shutter.Pool, handle PoolTestHandle) {
144+
// simulate expected contract calls for reading the first ekg after the first block event
145+
ekg, err := testhelpers.MockEonKeyGeneration(shutter.EonIndex(0), 1, 2, 1)
146+
require.NoError(t, err)
147+
handle.SimulateInitialEonRead(t, ekg)
148+
// simulate loadSubmissions after the first block
149+
handle.SimulateFilterLogs(common.HexToAddress(handle.config.SequencerContractAddress), []types.Log{})
150+
// simulate the first block
151+
err = handle.SimulateNewBlockChange(ctx)
152+
require.NoError(t, err)
153+
synctest.Wait()
154+
// simulate decryption keys msg with 0 decrypted txns for 2 slots in a row to get close to the currently
155+
// configured reorg awareness window of 3 (make sure that is the case otherwise the test won't be accurately
156+
// capturing the situation)
157+
require.Equal(t, uint64(3), handle.config.ReorgDepthAwareness)
158+
// 1
159+
handle.SimulateCurrentSlot()
160+
handle.SimulateDecryptionKeys(ctx, t, ekg, 1)
101161
handle.SimulateCachedEonRead(t, ekg)
102162
err = handle.SimulateNewBlockChange(ctx)
103163
require.NoError(t, err)
164+
synctest.Wait()
165+
require.Len(t, pool.AllEncryptedTxns(), 0)
166+
// 2
167+
handle.SimulateCurrentSlot()
168+
handle.SimulateDecryptionKeys(ctx, t, ekg, 1)
104169
handle.SimulateCachedEonRead(t, ekg)
105170
err = handle.SimulateNewBlockChange(ctx)
106171
require.NoError(t, err)
107172
synctest.Wait()
108173
require.Len(t, pool.AllEncryptedTxns(), 0)
174+
// simulate a new encrypted txn submission
175+
// 3
176+
encTxn1 := MockEncryptedTxn(t, handle.config.ChainId, ekg.Eon())
177+
err = handle.SimulateLogEvents(ctx, []types.Log{
178+
MockTxnSubmittedEventLog(t, handle.config, ekg.Eon(), 1, encTxn1),
179+
})
180+
require.NoError(t, err)
181+
handle.SimulateCachedEonRead(t, ekg)
182+
err = handle.SimulateNewBlockChange(ctx)
183+
require.NoError(t, err)
184+
synctest.Wait()
185+
require.Len(t, pool.AllEncryptedTxns(), 1)
186+
// simulate decryption keys msg with 0 decrypted txns for the next slot
187+
// the encrypted txn should not get deleted!
188+
// 4
189+
handle.SimulateCurrentSlot()
190+
handle.SimulateDecryptionKeys(ctx, t, ekg, 1)
191+
handle.SimulateCachedEonRead(t, ekg)
192+
err = handle.SimulateNewBlockChange(ctx)
193+
require.NoError(t, err)
194+
synctest.Wait()
195+
require.Len(t, pool.AllEncryptedTxns(), 1)
109196
})
110197
}
111198

@@ -122,6 +209,7 @@ func TestPoolSkipsBlobTxns(t *testing.T) {
122209
// simulate the first block
123210
err = handle.SimulateNewBlockChange(ctx)
124211
require.NoError(t, err)
212+
synctest.Wait()
125213
// simulate some encrypted txn submissions and simulate a new block
126214
encBlobTxn1 := MockEncryptedBlobTxn(t, handle.config.ChainId, ekg.Eon())
127215
encTxn1 := MockEncryptedTxn(t, handle.config.ChainId, ekg.Eon())
@@ -167,6 +255,7 @@ func TestPoolProvideTxnsUsesGasTargetAndTxnsIdFilter(t *testing.T) {
167255
// simulate the first block
168256
err = handle.SimulateNewBlockChange(ctx)
169257
require.NoError(t, err)
258+
synctest.Wait()
170259
// simulate some encrypted txn submissions and simulate a new block
171260
encTxn1 := MockEncryptedTxn(t, handle.config.ChainId, ekg.Eon())
172261
encTxn2 := MockEncryptedTxn(t, handle.config.ChainId, ekg.Eon())
@@ -220,7 +309,7 @@ func (t PoolTest) Run(testCase func(ctx context.Context, t *testing.T, pool *shu
220309
synctest.Test(t.T, func(t *testing.T) {
221310
ctx, cancel := context.WithCancel(t.Context())
222311
defer cancel()
223-
logger := testlog.Logger(t, log.LvlTrace)
312+
logger := testlog.Logger(t, log.LvlCrit)
224313
logHandler := testhelpers.NewCollectingLogHandler(logger.GetHandler())
225314
logger.SetHandler(logHandler)
226315
config := shuttercfg.ConfigByChainName(networkname.Chiado)

0 commit comments

Comments
 (0)