diff --git a/server/storage/mvcc/kvstore.go b/server/storage/mvcc/kvstore.go index 236b09981ef..7d25a1b11fb 100644 --- a/server/storage/mvcc/kvstore.go +++ b/server/storage/mvcc/kvstore.go @@ -224,16 +224,22 @@ func (s *store) checkPrevCompactionCompleted() bool { defer tx.RUnlock() scheduledCompact, scheduledCompactFound := UnsafeReadScheduledCompact(tx) finishedCompact, finishedCompactFound := UnsafeReadFinishedCompact(tx) - return scheduledCompact == finishedCompact && scheduledCompactFound == finishedCompactFound + completed := scheduledCompact == finishedCompact && scheduledCompactFound == finishedCompactFound + s.lg.Info("check prev compaction completed", zap.Bool("completed", completed), zap.Int64("scheduled-compact", scheduledCompact), zap.Int64("finished-compaction", finishedCompact)) + return completed } -func (s *store) compact(trace *traceutil.Trace, rev, prevCompactRev int64, prevCompactionCompleted bool) <-chan struct{} { +func (s *store) compact(trace *traceutil.Trace, rev, prevCompactRev int64) <-chan struct{} { ch := make(chan struct{}) j := schedule.NewJob("kvstore_compact", func(ctx context.Context) { if ctx.Err() != nil { s.compactBarrier(ctx, ch) return } + tx := s.b.ReadTx() + tx.RLock() + finishedCompact, _ := UnsafeReadFinishedCompact(tx) + tx.RUnlock() hash, err := s.scheduleCompaction(rev, prevCompactRev) if err != nil { s.lg.Warn("Failed compaction", zap.Error(err)) @@ -242,7 +248,7 @@ func (s *store) compact(trace *traceutil.Trace, rev, prevCompactRev int64, prevC } // Only store the hash value if the previous hash is completed, i.e. this compaction // hashes every revision from last compaction. For more details, see #15919. - if prevCompactionCompleted { + if finishedCompact == prevCompactRev { s.hashes.Store(hash) } else { s.lg.Info("previous compaction was interrupted, skip storing compaction hash value") @@ -256,18 +262,16 @@ func (s *store) compact(trace *traceutil.Trace, rev, prevCompactRev int64, prevC } func (s *store) compactLockfree(rev int64) (<-chan struct{}, error) { - prevCompactionCompleted := s.checkPrevCompactionCompleted() ch, prevCompactRev, err := s.updateCompactRev(rev) if err != nil { return ch, err } - return s.compact(traceutil.TODO(), rev, prevCompactRev, prevCompactionCompleted), nil + return s.compact(traceutil.TODO(), rev, prevCompactRev), nil } func (s *store) Compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, error) { s.mu.Lock() - prevCompactionCompleted := s.checkPrevCompactionCompleted() ch, prevCompactRev, err := s.updateCompactRev(rev) trace.Step("check and update compact revision") if err != nil { @@ -276,7 +280,7 @@ func (s *store) Compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, err } s.mu.Unlock() - return s.compact(trace, rev, prevCompactRev, prevCompactionCompleted), nil + return s.compact(trace, rev, prevCompactRev), nil } func (s *store) Commit() {