Skip to content

Commit 83b8ddd

Browse files
committed
chore(embedded/store): allow skipping precommitted transactions during open
Signed-off-by: Stefano Scafiti <[email protected]>
1 parent cdd00cb commit 83b8ddd

File tree

2 files changed

+47
-38
lines changed

2 files changed

+47
-38
lines changed

embedded/store/immustore.go

Lines changed: 29 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -533,43 +533,45 @@ func OpenWith(path string, vLogs []appendable.Appendable, txLog, cLog appendable
533533
precommittedAlh := committedAlh
534534
precommittedTxLogSize := committedTxLogSize
535535

536-
// read pre-committed txs from txLog and insert into cLogBuf to continue with the commit process
537-
// txLog may be partially written, precommitted transactions loading is terminated if an inconsistency is found
538-
txReader := appendable.NewReaderFrom(txLog, precommittedTxLogSize, multiapp.DefaultReadBufferSize)
536+
if !opts.SkipPrecommittedTransactions {
537+
// read pre-committed txs from txLog and insert into cLogBuf to continue with the commit process
538+
// txLog may be partially written, precommitted transactions loading is terminated if an inconsistency is found
539+
txReader := appendable.NewReaderFrom(txLog, precommittedTxLogSize, multiapp.DefaultReadBufferSize)
539540

540-
tx, _ := txPool.Alloc()
541+
tx, _ := txPool.Alloc()
541542

542-
for {
543-
err = tx.readFrom(txReader, false)
544-
if errors.Is(err, io.EOF) {
545-
break
546-
}
547-
if err != nil {
548-
opts.logger.Infof("%v: discarding pre-committed transaction: %d", err, precommittedTxID+1)
549-
break
550-
}
543+
for {
544+
err = tx.readFrom(txReader, false)
545+
if errors.Is(err, io.EOF) {
546+
break
547+
}
548+
if err != nil {
549+
opts.logger.Infof("%v: discarding pre-committed transaction: %d", err, precommittedTxID+1)
550+
break
551+
}
551552

552-
if tx.header.ID != precommittedTxID+1 || tx.header.PrevAlh != precommittedAlh {
553-
opts.logger.Infof("%v: discarding pre-committed transaction: %d", ErrCorruptedData, precommittedTxID+1)
554-
break
555-
}
553+
if tx.header.ID != precommittedTxID+1 || tx.header.PrevAlh != precommittedAlh {
554+
opts.logger.Infof("%v: discarding pre-committed transaction: %d", ErrCorruptedData, precommittedTxID+1)
555+
break
556+
}
556557

557-
precommittedTxID++
558-
precommittedAlh = tx.header.Alh()
558+
precommittedTxID++
559+
precommittedAlh = tx.header.Alh()
559560

560-
txSize := int(txReader.ReadCount() - (precommittedTxLogSize - committedTxLogSize))
561+
txSize := int(txReader.ReadCount() - (precommittedTxLogSize - committedTxLogSize))
561562

562-
err = cLogBuf.put(precommittedTxID, precommittedAlh, precommittedTxLogSize, txSize)
563-
if err != nil {
564-
txPool.Release(tx)
565-
return nil, fmt.Errorf("%v: while loading pre-committed transaction: %v", err, precommittedTxID+1)
563+
err = cLogBuf.put(precommittedTxID, precommittedAlh, precommittedTxLogSize, txSize)
564+
if err != nil {
565+
txPool.Release(tx)
566+
return nil, fmt.Errorf("%v: while loading pre-committed transaction: %v", err, precommittedTxID+1)
567+
}
568+
569+
precommittedTxLogSize += int64(txSize)
566570
}
567571

568-
precommittedTxLogSize += int64(txSize)
572+
txPool.Release(tx)
569573
}
570574

571-
txPool.Release(tx)
572-
573575
vLogsMap := make(map[byte]*refVLog, len(vLogs))
574576
vLogUnlockedList := list.New()
575577

embedded/store/options.go

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,8 @@ type Options struct {
147147
CompressionLevel int
148148
EmbeddedValues bool
149149
PreallocFiles bool
150+
// Skip processing of transactions that were precommitted before opening
151+
SkipPrecommittedTransactions bool
150152

151153
// options below affect indexing
152154
IndexOpts *IndexOptions
@@ -247,17 +249,17 @@ func DefaultOptions() *Options {
247249
WriteTxHeaderVersion: DefaultWriteTxHeaderVersion,
248250

249251
// options below are only set during initialization and stored as metadata
250-
MaxTxEntries: DefaultMaxTxEntries,
251-
MaxKeyLen: DefaultMaxKeyLen,
252-
MaxValueLen: DefaultMaxValueLen,
253-
FileSize: DefaultFileSize,
254-
CompressionFormat: DefaultCompressionFormat,
255-
CompressionLevel: DefaultCompressionLevel,
256-
EmbeddedValues: DefaultEmbeddedValues,
257-
PreallocFiles: DefaultPreallocFiles,
258-
259-
IndexOpts: DefaultIndexOptions(),
260-
AHTOpts: DefaultAHTOptions(),
252+
MaxTxEntries: DefaultMaxTxEntries,
253+
MaxKeyLen: DefaultMaxKeyLen,
254+
MaxValueLen: DefaultMaxValueLen,
255+
FileSize: DefaultFileSize,
256+
CompressionFormat: DefaultCompressionFormat,
257+
CompressionLevel: DefaultCompressionLevel,
258+
EmbeddedValues: DefaultEmbeddedValues,
259+
PreallocFiles: DefaultPreallocFiles,
260+
SkipPrecommittedTransactions: false,
261+
IndexOpts: DefaultIndexOptions(),
262+
AHTOpts: DefaultAHTOptions(),
261263
}
262264
}
263265

@@ -620,6 +622,11 @@ func (opts *Options) WithAHTOptions(ahtOptions *AHTOptions) *Options {
620622
return opts
621623
}
622624

625+
func (opts *Options) WithSkipPrecommittedTransactions(skip bool) *Options {
626+
opts.SkipPrecommittedTransactions = true
627+
return opts
628+
}
629+
623630
// IndexOptions
624631

625632
func (opts *IndexOptions) WithCacheSize(cacheSize int) *IndexOptions {

0 commit comments

Comments
 (0)