Conversation
…hods Signed-off-by: Pau Escrich <p4u@dabax.net>
There was a problem hiding this comment.
Pull request overview
This PR refactors Vocone’s single-node block production to persist blocks and mempool contents across restarts, adds missing block/tx retrieval plumbing, and expands test coverage around persistence and shutdown behavior.
Changes:
- Replace in-memory mempool with a persistent KV-backed mempool and add pruning support.
- Add a persistent blockstore with block metadata + hash→height reverse index and implement GetBlockByHeight/GetBlockByHash/GetTx plumbing.
- Update Vocone
Startto be context-cancellable and add extensive persistence/graceful shutdown tests.
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
vocone/vocone.go |
Refactors startup and block production loop, adds context-driven lifecycle, persistent DB handles, and wiring for new block/mempool methods. |
vocone/mempool.go |
Implements persistent mempool storage, load-on-start, and block preparation draining logic. |
vocone/blockstore.go |
Implements block metadata persistence, tx keying, block reconstruction, and hash→height lookup. |
vocone/vocone_test.go |
Adds new unit/e2e tests covering persistence, retrieval, key encoding, shutdown, and SetBulkTxCosts behavior. |
cmd/voconed/voconed.go |
Updates daemon to run Start(ctx) and cancel on SIGTERM, then close resources. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
@copilot open a new pull request to apply changes based on the comments in this thread |
Pull Request Test Coverage Report for Build 23496891038Details
💛 - Coveralls |
02a2e29 to
71d7a0f
Compare
|
@copilot review |
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 11 out of 11 changed files in this pull request and generated 7 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // Update the in-memory index. | ||
| vc.mempoolMtx.Lock() | ||
| vc.mempoolKeys = vc.mempoolKeys[len(consumedKeys):] | ||
| vc.mempoolMtx.Unlock() |
There was a problem hiding this comment.
commitMempoolCleanup updates vc.mempoolKeys by slicing off len(consumedKeys) entries. If mempoolPrune (or any other mutation) removes items from the front of vc.mempoolKeys between prepareBlock() unlocking and this cleanup, len(consumedKeys) can exceed the current queue length or remove the wrong pending txs (panic or dropped txs). Update the in-memory index by removing the specific consumedKeys (e.g., build a set of consumed keys and filter vc.mempoolKeys), and guard against length mismatches.
| // Update the in-memory index. | |
| vc.mempoolMtx.Lock() | |
| vc.mempoolKeys = vc.mempoolKeys[len(consumedKeys):] | |
| vc.mempoolMtx.Unlock() | |
| // Update the in-memory index by removing the specific consumed keys. | |
| vc.mempoolMtx.Lock() | |
| defer vc.mempoolMtx.Unlock() | |
| if len(consumedKeys) > len(vc.mempoolKeys) { | |
| // This can happen if other routines (e.g., pruning) removed entries | |
| // from the front of the queue between prepareBlock and this cleanup. | |
| log.Warnw("mempool cleanup mismatch: more consumed keys than pending keys", | |
| "consumed", len(consumedKeys), "pending", len(vc.mempoolKeys)) | |
| } | |
| consumedSet := make(map[string]struct{}, len(consumedKeys)) | |
| for _, key := range consumedKeys { | |
| consumedSet[string(key)] = struct{}{} | |
| } | |
| if len(consumedSet) == 0 { | |
| return | |
| } | |
| // Filter vc.mempoolKeys in place, keeping only keys that were not consumed. | |
| dst := vc.mempoolKeys[:0] | |
| for _, key := range vc.mempoolKeys { | |
| if _, ok := consumedSet[string(key)]; !ok { | |
| dst = append(dst, key) | |
| } | |
| } | |
| vc.mempoolKeys = dst |
|
|
||
| // Iterate over all pending transactions and rebuild the key list. | ||
| vc.mempoolKeys = nil | ||
| if err := vc.mempoolDB.Iterate([]byte(prefixMempool), func(key, _ []byte) bool { | ||
| keyCopy := make([]byte, len(key)) | ||
| copy(keyCopy, key) | ||
| vc.mempoolKeys = append(vc.mempoolKeys, keyCopy) | ||
| return true | ||
| }); err != nil { | ||
| return fmt.Errorf("could not iterate mempool: %w", err) | ||
| } | ||
|
|
There was a problem hiding this comment.
loadMempool only restores vc.mempoolSeq from the persisted mpseq key. If mpseq is missing/corrupt (or behind), addTx will start reusing sequence numbers and can overwrite existing mempool entries. Consider deriving mempoolSeq from the highest recovered mp/ key when mpseq is absent or lower than the max observed key.
| // Iterate over all pending transactions and rebuild the key list. | |
| vc.mempoolKeys = nil | |
| if err := vc.mempoolDB.Iterate([]byte(prefixMempool), func(key, _ []byte) bool { | |
| keyCopy := make([]byte, len(key)) | |
| copy(keyCopy, key) | |
| vc.mempoolKeys = append(vc.mempoolKeys, keyCopy) | |
| return true | |
| }); err != nil { | |
| return fmt.Errorf("could not iterate mempool: %w", err) | |
| } | |
| // Track the maximum sequence number observed in existing mempool keys. | |
| maxSeq := vc.mempoolSeq | |
| // Iterate over all pending transactions and rebuild the key list. | |
| vc.mempoolKeys = nil | |
| if err := vc.mempoolDB.Iterate([]byte(prefixMempool), func(key, _ []byte) bool { | |
| keyCopy := make([]byte, len(key)) | |
| copy(keyCopy, key) | |
| vc.mempoolKeys = append(vc.mempoolKeys, keyCopy) | |
| // Derive sequence from the key if it contains a numeric suffix. | |
| // We assume the sequence is stored in the last 8 bytes of the key. | |
| if len(key) >= len(prefixMempool)+8 { | |
| seq := binary.BigEndian.Uint64(key[len(key)-8:]) | |
| if seq > maxSeq { | |
| maxSeq = seq | |
| } | |
| } | |
| return true | |
| }); err != nil { | |
| return fmt.Errorf("could not iterate mempool: %w", err) | |
| } | |
| // Ensure mempoolSeq is at least as large as the highest recovered key sequence. | |
| if maxSeq > vc.mempoolSeq { | |
| vc.mempoolSeq = maxSeq | |
| } |
| wTx := vc.blockStore.WriteTx() | ||
| defer wTx.Discard() | ||
| if err := wTx.Set(metaKey(height), data); err != nil { | ||
| return err | ||
| } | ||
| // Store hash→height reverse index for GetBlockByHash lookups. | ||
| if len(blockHash) > 0 { | ||
| heightBytes := make([]byte, 8) | ||
| binary.BigEndian.PutUint64(heightBytes, uint64(height)) | ||
| if err := wTx.Set(blockHashKey(blockHash), heightBytes); err != nil { | ||
| return err | ||
| } | ||
| } |
There was a problem hiding this comment.
storeBlockMeta writes/overwrites metaKey(height) and also writes a hash→height reverse index. If storeBlockMeta is called again for the same height (e.g., crash after writing meta but before CommitState, then re-execution with a different timestamp/hash), the old blockhash/ entry is never deleted, leaving stale hash→height mappings and causing GetBlockByHash to return an unrelated block. When overwriting existing metadata, delete the prior reverse-index key for the previous hash if it differs.
| if err := vc.mempoolDB.Close(); err != nil { | ||
| errs = append(errs, fmt.Errorf("close mempool db: %w", err)) | ||
| } | ||
| } |
There was a problem hiding this comment.
Vocone.Close currently only closes blockStore and mempoolDB. The node also owns long-lived resources like App.State (state DB), Indexer, and Stats (which starts a goroutine and registers metrics). Not closing/stopping them can leak goroutines and keep DB files open. Consider closing vc.Indexer (if non-nil), calling vc.Stats.Close(), and closing vc.App.State (and any other owned resources) here.
| } | |
| } | |
| // Close indexer if present. | |
| if vc.Indexer != nil { | |
| if err := vc.Indexer.Close(); err != nil { | |
| errs = append(errs, fmt.Errorf("close indexer: %w", err)) | |
| } | |
| } | |
| // Close stats/metrics component if present. | |
| if vc.Stats != nil { | |
| if err := vc.Stats.Close(); err != nil { | |
| errs = append(errs, fmt.Errorf("close stats: %w", err)) | |
| } | |
| } | |
| // Close application state DB if present. | |
| if vc.App != nil && vc.App.State != nil { | |
| if err := vc.App.State.Close(); err != nil { | |
| errs = append(errs, fmt.Errorf("close app state: %w", err)) | |
| } | |
| } |
| // Wait for a few blocks to be produced. | ||
| time.Sleep(time.Second * 3) | ||
| qt.Assert(t, vc.height.Load() >= 5, qt.IsTrue, | ||
| qt.Commentf("expected at least 5 blocks, got %d", vc.height.Load())) | ||
|
|
There was a problem hiding this comment.
Tests rely on fixed sleeps to wait for chain readiness/block production (e.g., time.Sleep(3s) then assert height>=5). This can be flaky under load/slow CI. Prefer polling with a timeout (e.g., loop until height reaches target or context deadline) so the test waits only as long as needed and fails deterministically if blocks are not produced.
| } | ||
| }() | ||
|
|
||
| time.Sleep(time.Second * 2) |
There was a problem hiding this comment.
newTestVoconeLite uses a fixed sleep to wait for startup (time.Sleep(2s)). This can make the suite slower and flaky when startup is slower than expected. Consider replacing it with a readiness wait (e.g., poll vc.height until >0 with a timeout) so tests don't depend on timing.
| time.Sleep(time.Second * 2) | |
| // Wait for vocone to become ready instead of using a fixed sleep. | |
| deadline := time.Now().Add(5 * time.Second) | |
| for { | |
| // Consider vocone ready once it has produced at least one block. | |
| if vc.height > 0 { | |
| break | |
| } | |
| if time.Now().After(deadline) { | |
| t.Fatalf("vocone did not become ready within timeout") | |
| } | |
| time.Sleep(10 * time.Millisecond) | |
| } |
| // TestTransactionPersistence verifies that transactions are stored in the | ||
| // blockstore and can be retrieved after being included in a block. | ||
| func TestTransactionPersistence(t *testing.T) { | ||
| vc, cancel := newTestVoconeLite(t) | ||
| defer cancel() | ||
| defer vc.Close() | ||
|
|
||
| // Wait for a few blocks to be produced. | ||
| time.Sleep(time.Second * 3) | ||
| height := vc.height.Load() | ||
| qt.Assert(t, height >= 3, qt.IsTrue, | ||
| qt.Commentf("expected at least 3 blocks, got %d", height)) | ||
|
|
||
| // Submit a raw transaction directly through addTx | ||
| // (we use a raw tx bytes that will fail validation, but we can still test | ||
| // the mempool → blockstore flow by checking blocks that are already produced). | ||
|
|
||
| // Instead, verify that blocks without txs still produce valid metadata. | ||
| for h := int64(1); h < height; h++ { | ||
| meta, err := vc.loadBlockMeta(h) | ||
| qt.Assert(t, err, qt.IsNil, qt.Commentf("block %d meta should exist", h)) | ||
| qt.Assert(t, meta.Timestamp > 0, qt.IsTrue) | ||
|
|
||
| // Verify the block can be reconstructed. | ||
| blk := vc.getBlock(h) | ||
| qt.Assert(t, blk != nil, qt.IsTrue) | ||
| qt.Assert(t, blk.Height == h, qt.IsTrue) | ||
| qt.Assert(t, len(blk.Hash()) > 0, qt.IsTrue) | ||
| } | ||
| } |
There was a problem hiding this comment.
TestTransactionPersistence (and its doc comment) claims to verify that transactions are stored and retrievable after inclusion in a block, but the test never submits a tx nor asserts tx retrieval; it only checks that block metadata exists for empty blocks. Either extend the test to actually add a valid tx and verify it can be fetched via getTx/getTxWithHash, or rename/reword the test to reflect what it currently validates.
Signed-off-by: Pau Escrich <p4u@dabax.net>
No description provided.