Skip to content

Commit 610281d

Browse files
committed
refactor
1 parent 8b522c6 commit 610281d

File tree

11 files changed

+214
-383
lines changed

11 files changed

+214
-383
lines changed

lib/logstorage/block_search.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,7 @@ var blockSearchWorkBatchPool sync.Pool
6666

6767
func (bswb *blockSearchWorkBatch) appendBlockSearchWork(p *part, so *searchOptions, bh *blockHeader) bool {
6868
bsws := bswb.bsws
69-
var dm *deleteMarker
70-
dm = p.marker.delete.Load()
69+
dm := p.deleteMarker.Load()
7170

7271
bsws = append(bsws, blockSearchWork{
7372
p: p,

lib/logstorage/block_stream_merger.go

Lines changed: 5 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -93,9 +93,6 @@ type blockStreamMerger struct {
9393
//
9494
// It is used for limiting the number of columns written per block
9595
uniqueFields int
96-
97-
// Accumulated delete marker for blocks we keep and mark during merge.
98-
dmNew *deleteMarker
9996
}
10097

10198
func (bsm *blockStreamMerger) reset() {
@@ -110,7 +107,6 @@ func (bsm *blockStreamMerger) reset() {
110107
bsm.streamID.reset()
111108
bsm.resetRows()
112109
bsm.uniqueFields = 0
113-
bsm.dmNew = nil
114110
}
115111

116112
func (bsm *blockStreamMerger) resetRows() {
@@ -344,13 +340,8 @@ func (h *blockStreamReadersHeap) Pop() any {
344340
//
345341
// In both cases the original reader is advanced to the next block or popped when exhausted.
346342
func (bsm *blockStreamMerger) processDeleteMarker(bsr *blockStreamReader, blockID uint64) bool {
347-
348-
miAgg := bsr.marker
349-
if miAgg == nil {
350-
return false
351-
}
352-
dm := miAgg.delete.Load()
353-
if dm == nil {
343+
dm := bsr.deleteMarker
344+
if len(dm.blockIDs) == 0 {
354345
return false
355346
}
356347

@@ -381,62 +372,8 @@ func (bsm *blockStreamMerger) processDeleteMarker(bsr *blockStreamReader, blockI
381372
// Obtain the new blockID assigned by the writer.
382373
newID := bsm.bsw.LastBlockID()
383374

384-
// Accumulate delete-marker pointing to the new blockID.
385-
if bsm.dmNew == nil {
386-
bsm.dmNew = &deleteMarker{}
387-
}
388-
bsm.dmNew.AddBlock(newID, bm)
389-
390-
// Uncomment the call below to enable the legacy re-queue logic instead of
391-
// keeping the original compressed block.
392-
// bsm.requeuePartialDeleteBlock(bsr, bm, rowsTotal)
393-
394-
return true // block handled; caller will advance reader
395-
}
396-
397-
// requeuePartialDeleteBlock recreates a pruned block as an in-memory reader and
398-
// pushes it back into the merge heap so ordering is preserved.
399-
func (bsm *blockStreamMerger) requeuePartialDeleteBlock(bsr *blockStreamReader, bm boolRLE, rowsTotal int) {
400-
if bsm.sbu == nil {
401-
bsm.sbu = getStringsBlockUnmarshaler()
402-
}
403-
if bsm.vd == nil {
404-
bsm.vd = getValuesDecoder()
405-
}
406-
407-
// Unmarshal original rows.
408-
var rs rows
409-
if err := bsr.blockData.unmarshalRows(&rs, bsm.sbu, bsm.vd); err != nil {
410-
logger.Panicf("BUG: cannot unmarshal rows for partial delete pruning: %s", err)
411-
}
412-
413-
// Collect rows that remain after applying delete marker.
414-
keptTS := make([]int64, 0, rowsTotal)
415-
keptRows := make([][]Field, 0, rowsTotal)
416-
bm.ForEachZeroBit(rowsTotal, func(i int) {
417-
keptTS = append(keptTS, rs.timestamps[i])
418-
keptRows = append(keptRows, rs.rows[i])
419-
})
420-
421-
if len(keptTS) == 0 {
422-
logger.Panicf("BUG: length of rows after partial delete marker pruning is 0")
423-
}
424-
425-
// Build a tiny in-memory part with the kept rows.
426-
lr := getLogRows()
427-
for i, ts := range keptTS {
428-
lr.mustAddRow(bsr.blockData.streamID, ts, keptRows[i])
429-
}
430-
mp := getInmemoryPart()
431-
mp.mustInitFromRows(lr)
432-
putLogRows(lr)
433-
434-
bsrNew := getBlockStreamReader()
435-
bsrNew.MustInitFromInmemoryPart(mp)
436-
if !bsrNew.NextBlock() {
437-
logger.Panicf("BUG: pruned in-memory reader has no blocks to merge")
438-
}
375+
// Accumulate delete-marker directly in writer
376+
bsm.bsw.dm.AddBlock(newID, bm)
439377

440-
heap.Push(&bsm.readersHeap, bsrNew)
441-
bsm.bsrs = append(bsm.bsrs, bsrNew)
378+
return true
442379
}

lib/logstorage/block_stream_reader.go

Lines changed: 11 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -65,9 +65,6 @@ type streamReaders struct {
6565
columnsHeaderReader readerWithStats
6666
timestampsReader readerWithStats
6767

68-
// Marker reader for tracking bytes read during streaming
69-
markerDatReader readerWithStats
70-
7168
messageBloomValuesReader bloomValuesReader
7269
oldBloomValuesReader bloomValuesReader
7370
bloomValuesShards []bloomValuesReader
@@ -119,8 +116,6 @@ func (sr *streamReaders) reset() {
119116
sr.columnsHeaderReader.reset()
120117
sr.timestampsReader.reset()
121118

122-
sr.markerDatReader.reset()
123-
124119
sr.messageBloomValuesReader.reset()
125120
sr.oldBloomValuesReader.reset()
126121
for i := range sr.bloomValuesShards {
@@ -133,9 +128,7 @@ func (sr *streamReaders) reset() {
133128
}
134129

135130
func (sr *streamReaders) init(partFormatVersion uint, columnNamesReader, columnIdxsReader, metaindexReader, indexReader,
136-
columnsHeaderIndexReader, columnsHeaderReader, timestampsReader filestream.ReadCloser,
137-
markerDatReader filestream.ReadCloser,
138-
messageBloomValuesReader, oldBloomValuesReader bloomValuesStreamReader, bloomValuesShards []bloomValuesStreamReader,
131+
columnsHeaderIndexReader, columnsHeaderReader, timestampsReader filestream.ReadCloser, messageBloomValuesReader, oldBloomValuesReader bloomValuesStreamReader, bloomValuesShards []bloomValuesStreamReader,
139132
) {
140133
sr.partFormatVersion = partFormatVersion
141134

@@ -147,8 +140,6 @@ func (sr *streamReaders) init(partFormatVersion uint, columnNamesReader, columnI
147140
sr.columnsHeaderReader.init(columnsHeaderReader)
148141
sr.timestampsReader.init(timestampsReader)
149142

150-
sr.markerDatReader.init(markerDatReader)
151-
152143
sr.messageBloomValuesReader.init(messageBloomValuesReader)
153144
sr.oldBloomValuesReader.init(oldBloomValuesReader)
154145
sr.bloomValuesShards = slicesutil.SetLength(sr.bloomValuesShards, len(bloomValuesShards))
@@ -195,7 +186,6 @@ func (sr *streamReaders) MustClose() {
195186
sr.columnsHeaderIndexReader.MustClose()
196187
sr.columnsHeaderReader.MustClose()
197188
sr.timestampsReader.MustClose()
198-
sr.markerDatReader.MustClose()
199189
sr.messageBloomValuesReader.MustClose()
200190
sr.oldBloomValuesReader.MustClose()
201191
for i := range sr.bloomValuesShards {
@@ -238,8 +228,8 @@ type blockStreamReader struct {
238228
// ph is the header for the part
239229
ph partHeader
240230

241-
// marker aggregates marker data for the source part (delete, ttl, etc.).
242-
marker *marker
231+
// deleteMarker holds delete marker data for the given part.
232+
deleteMarker deleteMarker
243233

244234
// streamReaders contains data readers in stream mode
245235
streamReaders streamReaders
@@ -301,7 +291,7 @@ func (bsr *blockStreamReader) reset() {
301291
bsr.globalRowsCount = 0
302292
bsr.globalBlocksCount = 0
303293

304-
bsr.marker = nil
294+
bsr.deleteMarker = deleteMarker{}
305295

306296
bsr.sidLast.reset()
307297
bsr.minTimestampLast = 0
@@ -320,7 +310,7 @@ func (bsr *blockStreamReader) MustInitFromInmemoryPart(mp *inmemoryPart) {
320310
bsr.ph = mp.ph
321311

322312
// propagate delete-marker data
323-
bsr.marker = nil
313+
bsr.deleteMarker = deleteMarker{}
324314

325315
// Initialize streamReaders
326316
columnNamesReader := mp.columnNames.NewReader()
@@ -351,11 +341,15 @@ func (bsr *blockStreamReader) MustInitFromInmemoryPart(mp *inmemoryPart) {
351341

352342
bsr.streamReaders.init(bsr.ph.FormatVersion, columnNamesReader, columnIdxsReader, metaindexReader, indexReader,
353343
columnsHeaderIndexReader, columnsHeaderReader, timestampsReader,
354-
nil, // no marker data reader for in-memory parts
355344
messageBloomValuesReader, oldBloomValuesReader, bloomValuesShards)
356345

357346
// Read metaindex data
358347
bsr.indexBlockHeaders = mustReadIndexBlockHeaders(bsr.indexBlockHeaders[:0], &bsr.streamReaders.metaindexReader)
348+
349+
// Link deleteMarker directly from in-memory part
350+
if len(mp.deleteMarker.blockIDs) > 0 {
351+
bsr.deleteMarker = mp.deleteMarker
352+
}
359353
}
360354

361355
// MustInitFromFilePart initializes bsr from file part at the given path.
@@ -432,15 +426,14 @@ func (bsr *blockStreamReader) MustInitFromFilePart(path string) {
432426
// Initialize streamReaders
433427
bsr.streamReaders.init(bsr.ph.FormatVersion, columnNamesReader, columnIdxsReader, metaindexReader, indexReader,
434428
columnsHeaderIndexReader, columnsHeaderReader, timestampsReader,
435-
markerDatReader,
436429
messageBloomValuesReader, oldBloomValuesReader, bloomValuesShards)
437430

438431
// Read metaindex data
439432
bsr.indexBlockHeaders = mustReadIndexBlockHeaders(bsr.indexBlockHeaders[:0], &bsr.streamReaders.metaindexReader)
440433

441434
// Read marker index if available
442435
if markerDatReader != nil {
443-
bsr.marker = mustReadMarkerData(&bsr.streamReaders.markerDatReader, bsr.ph.BlocksCount)
436+
bsr.deleteMarker = mustReadDeleteMarkerData(markerDatReader)
444437
}
445438
}
446439

lib/logstorage/block_stream_writer.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,15 @@ type blockStreamWriter struct {
204204
// streamWriters contains writer for block data
205205
streamWriters streamWriters
206206

207+
// partPath is non-empty when writing a file-based part.
208+
partPath string
209+
210+
// mp points to the destination in-memory part when writing an in-memory part.
211+
mp *inmemoryPart
212+
213+
// dm accumulates per-block delete markers generated while writing.
214+
dm deleteMarker
215+
207216
// sidLast is the streamID for the last written block
208217
sidLast streamID
209218

@@ -276,12 +285,18 @@ func (bsw *blockStreamWriter) reset() {
276285
}
277286

278287
bsw.indexBlockHeader.reset()
288+
289+
bsw.partPath = ""
290+
bsw.mp = nil
291+
bsw.dm = deleteMarker{}
279292
}
280293

281294
// MustInitForInmemoryPart initializes bsw from mp
282295
func (bsw *blockStreamWriter) MustInitForInmemoryPart(mp *inmemoryPart) {
283296
bsw.reset()
284297

298+
bsw.mp = mp
299+
285300
messageBloomValues := mp.messageBloomValues.NewStreamWriter()
286301
createBloomValuesWriter := func(_ uint64) bloomValuesStreamWriter {
287302
return mp.fieldBloomValues.NewStreamWriter()
@@ -296,6 +311,8 @@ func (bsw *blockStreamWriter) MustInitForInmemoryPart(mp *inmemoryPart) {
296311
func (bsw *blockStreamWriter) MustInitForFilePart(path string, nocache bool) {
297312
bsw.reset()
298313

314+
bsw.partPath = path
315+
299316
fs.MustMkdirFailIfExist(path)
300317

301318
columnNamesPath := filepath.Join(path, columnNamesFilename)
@@ -472,6 +489,18 @@ func (bsw *blockStreamWriter) Finalize(ph *partHeader) {
472489
ph.CompressedSizeBytes = bsw.streamWriters.totalBytesWritten()
473490

474491
bsw.streamWriters.MustClose()
492+
493+
// Persist accumulated delete markers if any
494+
if len(bsw.dm.blockIDs) > 0 {
495+
if bsw.mp != nil {
496+
bsw.mp.deleteMarker = bsw.dm
497+
} else if bsw.partPath != "" {
498+
datBuf := bsw.dm.Marshal(nil)
499+
datPath := filepath.Join(bsw.partPath, rowMarkerDatFilename)
500+
fs.MustWriteSync(datPath, datBuf)
501+
}
502+
}
503+
475504
bsw.reset()
476505
}
477506

lib/logstorage/datadb.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -507,7 +507,7 @@ func (ddb *datadb) mustMergeParts(pws []*partWrapper, isFinal bool) {
507507
var totalRows int64
508508
for i := range pws {
509509
pw := pws[i]
510-
delete := pw.p.marker.delete.Load()
510+
delete := pw.p.deleteMarker.Load()
511511
if delete != nil {
512512
deleteMarkerBlocks += int64(len(delete.blockIDs))
513513
for i := range delete.blockIDs {
@@ -549,7 +549,6 @@ func (ddb *datadb) mustMergeParts(pws []*partWrapper, isFinal bool) {
549549
if isFinal && len(pws) == 1 && pws[0].mp != nil {
550550
// Fast path: flush a single in-memory part to disk.
551551
mp := pws[0].mp
552-
// Persist applied sequence from the source part so the newly flushed part is immediately up-to-date.
553552
mp.MustStoreToDisk(dstPartPath)
554553
pwNew := ddb.openCreatedPart(&mp.ph, pws, nil, dstPartPath)
555554
ddb.swapSrcWithDstParts(pws, pwNew, dstPartType)
@@ -597,6 +596,7 @@ func (ddb *datadb) mustMergeParts(pws []*partWrapper, isFinal bool) {
597596
mpNew.ph = ph
598597
} else {
599598
ph.mustWriteMetadata(dstPartPath)
599+
600600
// Make sure the created part directory contents is synced and visible in case of unclean shutdown.
601601
fs.MustSyncPathAndParentDir(dstPartPath)
602602
}

lib/logstorage/filenames.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ const (
1616
messageBloomFilename = "message_bloom.bin"
1717

1818
// Filename for per-row marker data (e.g. delete markers).
19-
rowMarkerDatFilename = "row_marker.bin"
19+
rowMarkerDatFilename = "deleted_rows.bin"
2020

2121
// Filename for async tasks storage at partition level.
2222
asyncTasksFilename = "async_tasks.json"

lib/logstorage/inmemory_part.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ type inmemoryPart struct {
2424

2525
messageBloomValues bloomValuesBuffer
2626
fieldBloomValues bloomValuesBuffer
27+
28+
deleteMarker deleteMarker
2729
}
2830

2931
type bloomValuesBuffer struct {
@@ -64,6 +66,9 @@ func (mp *inmemoryPart) reset() {
6466

6567
mp.messageBloomValues.reset()
6668
mp.fieldBloomValues.reset()
69+
70+
// Drop any attached deleteMarker.
71+
mp.deleteMarker = deleteMarker{}
6772
}
6873

6974
// mustInitFromRows initializes mp from lr.
@@ -130,6 +135,13 @@ func (mp *inmemoryPart) MustStoreToDisk(path string) {
130135
fs.MustWriteStreamSync(messageBloomFilterPath, &mp.messageBloomValues.bloom)
131136
fs.MustWriteStreamSync(messageValuesPath, &mp.messageBloomValues.values)
132137

138+
// Persist delete-marker data if present.
139+
if len(mp.deleteMarker.blockIDs) > 0 {
140+
datBuf := mp.deleteMarker.Marshal(nil)
141+
datPath := filepath.Join(path, rowMarkerDatFilename)
142+
fs.MustWriteSync(datPath, datBuf)
143+
}
144+
133145
bloomPath := getBloomFilePath(path, 0)
134146
fs.MustWriteStreamSync(bloomPath, &mp.fieldBloomValues.bloom)
135147

0 commit comments

Comments
 (0)