Skip to content

Compaction hash #18103

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 11 additions & 7 deletions server/storage/mvcc/kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,16 +224,22 @@ func (s *store) checkPrevCompactionCompleted() bool {
defer tx.RUnlock()
scheduledCompact, scheduledCompactFound := UnsafeReadScheduledCompact(tx)
finishedCompact, finishedCompactFound := UnsafeReadFinishedCompact(tx)
return scheduledCompact == finishedCompact && scheduledCompactFound == finishedCompactFound
completed := scheduledCompact == finishedCompact && scheduledCompactFound == finishedCompactFound
s.lg.Info("check prev compaction completed", zap.Bool("completed", completed), zap.Int64("scheduled-compact", scheduledCompact), zap.Int64("finished-compaction", finishedCompact))
return completed
}

func (s *store) compact(trace *traceutil.Trace, rev, prevCompactRev int64, prevCompactionCompleted bool) <-chan struct{} {
func (s *store) compact(trace *traceutil.Trace, rev, prevCompactRev int64) <-chan struct{} {
ch := make(chan struct{})
j := schedule.NewJob("kvstore_compact", func(ctx context.Context) {
if ctx.Err() != nil {
s.compactBarrier(ctx, ch)
return
}
tx := s.b.ReadTx()
tx.RLock()
finishedCompact, _ := UnsafeReadFinishedCompact(tx)
tx.RUnlock()
hash, err := s.scheduleCompaction(rev, prevCompactRev)
if err != nil {
s.lg.Warn("Failed compaction", zap.Error(err))
Expand All @@ -242,7 +248,7 @@ func (s *store) compact(trace *traceutil.Trace, rev, prevCompactRev int64, prevC
}
// Only store the hash value if the previous hash is completed, i.e. this compaction
// hashes every revision from last compaction. For more details, see #15919.
if prevCompactionCompleted {
if finishedCompact == prevCompactRev {
s.hashes.Store(hash)
} else {
s.lg.Info("previous compaction was interrupted, skip storing compaction hash value")
Expand All @@ -256,18 +262,16 @@ func (s *store) compact(trace *traceutil.Trace, rev, prevCompactRev int64, prevC
}

func (s *store) compactLockfree(rev int64) (<-chan struct{}, error) {
prevCompactionCompleted := s.checkPrevCompactionCompleted()
ch, prevCompactRev, err := s.updateCompactRev(rev)
if err != nil {
return ch, err
}

return s.compact(traceutil.TODO(), rev, prevCompactRev, prevCompactionCompleted), nil
return s.compact(traceutil.TODO(), rev, prevCompactRev), nil
}

func (s *store) Compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, error) {
s.mu.Lock()
prevCompactionCompleted := s.checkPrevCompactionCompleted()
ch, prevCompactRev, err := s.updateCompactRev(rev)
trace.Step("check and update compact revision")
if err != nil {
Expand All @@ -276,7 +280,7 @@ func (s *store) Compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, err
}
s.mu.Unlock()

return s.compact(trace, rev, prevCompactRev, prevCompactionCompleted), nil
return s.compact(trace, rev, prevCompactRev), nil
}

func (s *store) Commit() {
Expand Down
Loading