Skip to content

Commit c87d197

Browse files
committed
Fix storing compaction hash with multiple ongoing compaction requests
Signed-off-by: Marek Siarkowicz <[email protected]>
1 parent 2f216dc commit c87d197

File tree

1 file changed

+11
-7
lines changed

1 file changed

+11
-7
lines changed

server/storage/mvcc/kvstore.go

+11-7
Original file line numberDiff line numberDiff line change
@@ -224,16 +224,22 @@ func (s *store) checkPrevCompactionCompleted() bool {
224224
defer tx.RUnlock()
225225
scheduledCompact, scheduledCompactFound := UnsafeReadScheduledCompact(tx)
226226
finishedCompact, finishedCompactFound := UnsafeReadFinishedCompact(tx)
227-
return scheduledCompact == finishedCompact && scheduledCompactFound == finishedCompactFound
227+
completed := scheduledCompact == finishedCompact && scheduledCompactFound == finishedCompactFound
228+
s.lg.Info("check prev compaction completed", zap.Bool("completed", completed), zap.Int64("scheduled-compact", scheduledCompact), zap.Int64("finished-compaction", finishedCompact))
229+
return completed
228230
}
229231

230-
func (s *store) compact(trace *traceutil.Trace, rev, prevCompactRev int64, prevCompactionCompleted bool) <-chan struct{} {
232+
func (s *store) compact(trace *traceutil.Trace, rev, prevCompactRev int64) <-chan struct{} {
231233
ch := make(chan struct{})
232234
j := schedule.NewJob("kvstore_compact", func(ctx context.Context) {
233235
if ctx.Err() != nil {
234236
s.compactBarrier(ctx, ch)
235237
return
236238
}
239+
tx := s.b.ReadTx()
240+
tx.RLock()
241+
finishedCompact, _ := UnsafeReadFinishedCompact(tx)
242+
tx.RUnlock()
237243
hash, err := s.scheduleCompaction(rev, prevCompactRev)
238244
if err != nil {
239245
s.lg.Warn("Failed compaction", zap.Error(err))
@@ -242,7 +248,7 @@ func (s *store) compact(trace *traceutil.Trace, rev, prevCompactRev int64, prevC
242248
}
243249
// Only store the hash value if the previous hash is completed, i.e. this compaction
244250
// hashes every revision from last compaction. For more details, see #15919.
245-
if prevCompactionCompleted {
251+
if finishedCompact == prevCompactRev {
246252
s.hashes.Store(hash)
247253
} else {
248254
s.lg.Info("previous compaction was interrupted, skip storing compaction hash value")
@@ -256,18 +262,16 @@ func (s *store) compact(trace *traceutil.Trace, rev, prevCompactRev int64, prevC
256262
}
257263

258264
func (s *store) compactLockfree(rev int64) (<-chan struct{}, error) {
259-
prevCompactionCompleted := s.checkPrevCompactionCompleted()
260265
ch, prevCompactRev, err := s.updateCompactRev(rev)
261266
if err != nil {
262267
return ch, err
263268
}
264269

265-
return s.compact(traceutil.TODO(), rev, prevCompactRev, prevCompactionCompleted), nil
270+
return s.compact(traceutil.TODO(), rev, prevCompactRev), nil
266271
}
267272

268273
func (s *store) Compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, error) {
269274
s.mu.Lock()
270-
prevCompactionCompleted := s.checkPrevCompactionCompleted()
271275
ch, prevCompactRev, err := s.updateCompactRev(rev)
272276
trace.Step("check and update compact revision")
273277
if err != nil {
@@ -276,7 +280,7 @@ func (s *store) Compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, err
276280
}
277281
s.mu.Unlock()
278282

279-
return s.compact(trace, rev, prevCompactRev, prevCompactionCompleted), nil
283+
return s.compact(trace, rev, prevCompactRev), nil
280284
}
281285

282286
func (s *store) Commit() {

0 commit comments

Comments
 (0)