diff --git a/app/vlselect/main.go b/app/vlselect/main.go index 7b52ff23f1..5ea27162ce 100644 --- a/app/vlselect/main.go +++ b/app/vlselect/main.go @@ -325,6 +325,5 @@ var ( logsqlStreamsRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/logsql/streams"}`) logsqlStreamsDuration = metrics.NewSummary(`vl_http_request_duration_seconds{path="/select/logsql/streams"}`) - // no need to track duration for tail requests, as they usually take long time logsqlTailRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/logsql/tail"}`) ) diff --git a/app/vlstorage/main.go b/app/vlstorage/main.go index b99404d0d0..bc9a54a56b 100644 --- a/app/vlstorage/main.go +++ b/app/vlstorage/main.go @@ -1,12 +1,14 @@ package vlstorage import ( + "context" "encoding/json" "flag" "fmt" "io" "math" "net/http" + "strconv" "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" @@ -57,6 +59,9 @@ var ( partitionManageAuthKey = flagutil.NewPassword("partitionManageAuthKey", "authKey, which must be passed in query string to /internal/partition/* . It overrides -httpAuth.* . "+ "See https://docs.victoriametrics.com/victorialogs/#partitions-lifecycle") + deleteAuthKey = flagutil.NewPassword("deleteAuthKey", "authKey, which must be passed in query string to /delete and /internal/delete . It overrides -httpAuth.* . "+ + "See https://docs.victoriametrics.com/victorialogs/#delete-log-rows") + storageNodeAddrs = flagutil.NewArrayString("storageNode", "Comma-separated list of TCP addresses for storage nodes to route the ingested logs to and to send select queries to. "+ "If the list is empty, then the ingested logs are stored and queried locally from -storageDataPath") insertConcurrency = flag.Int("insert.concurrency", 2, "The average number of concurrent data ingestion requests, which can be sent to every -storageNode") @@ -94,6 +99,11 @@ var netstorageInsert *netinsert.Storage var netstorageSelect *netselect.Storage +// CheckDeleteAuth validates auth for delete endpoints. +func CheckDeleteAuth(w http.ResponseWriter, r *http.Request) bool { + return httpserver.CheckAuthFlag(w, r, deleteAuthKey) +} + // Init initializes vlstorage. // // Stop must be called when vlstorage is no longer needed @@ -246,6 +256,8 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool { return processPartitionSnapshotCreate(w, r) case "/internal/partition/snapshot/list": return processPartitionSnapshotList(w, r) + case "/delete": + return processDelete(r.Context(), w, r) } return false } @@ -397,6 +409,85 @@ func writeJSONResponse(w http.ResponseWriter, response any) { w.Write(responseBody) } +// processDelete handles the /delete endpoint for both local and remote storage nodes. +// It supports POST for scheduling deletes and GET for retrieving task status. +func processDelete(ctx context.Context, w http.ResponseWriter, r *http.Request) bool { + if localStorage != nil { + if !CheckDeleteAuth(w, r) { + return true + } + } + + authKey := r.FormValue("authKey") + + switch r.Method { + case http.MethodPost: + if tenantIDsStr := r.FormValue("tenant_ids"); tenantIDsStr != "" { + tenantIDs, err := logstorage.UnmarshalTenantIDs([]byte(tenantIDsStr)) + if err != nil { + httpserver.Errorf(w, r, "cannot unmarshal tenant_ids=%q: %s", tenantIDsStr, err) + return true + } + + timestampStr := r.FormValue("timestamp") + timestamp, err := strconv.ParseInt(timestampStr, 10, 64) + if err != nil { + httpserver.Errorf(w, r, "cannot parse timestamp=%q: %s", timestampStr, err) + return true + } + + qStr := r.FormValue("query") + q, err := logstorage.ParseQueryAtTimestamp(qStr, timestamp) + if err != nil { + httpserver.Errorf(w, r, "cannot parse query=%q: %s", qStr, err) + return true + } + + if err := DeleteRows(ctx, tenantIDs, q, authKey); err != nil { + httpserver.Errorf(w, r, "cannot delete rows: %s", err) + return true + } + } else { + tenantID, err := logstorage.GetTenantIDFromRequest(r) + if err != nil { + httpserver.Errorf(w, r, "cannot obtain tenantID: %s", err) + return true + } + + qStr := r.FormValue("query") + q, err := logstorage.ParseQuery(qStr) + if err != nil { + httpserver.Errorf(w, r, "cannot parse query [%s]: %s", qStr, err) + return true + } + + if err := DeleteRows(ctx, []logstorage.TenantID{tenantID}, q, authKey); err != nil { + httpserver.Errorf(w, r, "%s", err) + return true + } + } + + w.Header().Set("Content-Type", "text/plain") + w.WriteHeader(http.StatusOK) + + case http.MethodGet: + tasks, err := ListDeleteTasks(ctx, authKey) + if err != nil { + httpserver.Errorf(w, r, "cannot list delete tasks: %s", err) + return true + } + + w.Header().Set("Content-Type", "application/json") + if err := json.NewEncoder(w).Encode(tasks); err != nil { + httpserver.Errorf(w, r, "internal error: %s", err) + } + + default: + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + } + return true +} + // Storage implements insertutil.LogRowsStorage interface type Storage struct{} @@ -500,6 +591,41 @@ func GetStreamIDs(qctx *logstorage.QueryContext, limit uint64) ([]logstorage.Val return netstorageSelect.GetStreamIDs(qctx, limit) } +// DeleteRows marks rows matching q with the Deleted marker (full or partial) and flushes markers to disk immediately. +func DeleteRows(ctx context.Context, tenantIDs []logstorage.TenantID, q *logstorage.Query, authKey string) error { + if err := logstorage.ValidateDeleteQuery(q); err != nil { + return fmt.Errorf("validate query: %w", err) + } + + if localStorage != nil { + return localStorage.DeleteRows(ctx, tenantIDs, q) + } + + return netstorageSelect.DeleteRows(ctx, tenantIDs, q, authKey) +} + +// IsLocalStorage confirms whether the running instance is a storage node. +func IsLocalStorage() bool { + return localStorage != nil +} + +// ListDeleteTasks collects delete task information either from the local storage or from all configured storage nodes. +// It returns a slice with the tasks and an extra Storage field indicating the source node address (or "local" for the embedded storage). +func ListDeleteTasks(ctx context.Context, authKey string) ([]logstorage.DeleteTaskInfoWithSource, error) { + if localStorage != nil { + tasks := localStorage.ListDeleteTasks() + out := make([]logstorage.DeleteTaskInfoWithSource, len(tasks)) + for i, t := range tasks { + out[i] = logstorage.DeleteTaskInfoWithSource{ + DeleteTaskInfo: t, + } + } + return out, nil + } + + return netstorageSelect.ListDeleteTasks(ctx, authKey) +} + func writeStorageMetrics(w io.Writer, strg *logstorage.Storage) { var ss logstorage.StorageStats strg.UpdateStats(&ss) diff --git a/app/vlstorage/netselect/delete_tasks.go b/app/vlstorage/netselect/delete_tasks.go new file mode 100644 index 0000000000..9a090e3ced --- /dev/null +++ b/app/vlstorage/netselect/delete_tasks.go @@ -0,0 +1,92 @@ +package netselect + +import ( + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "net/url" + + "github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage" + "golang.org/x/sync/errgroup" +) + +// ListDeleteTasks gathers all delete tasks from every storage node and returns them along with the originating storage address. +func (s *Storage) ListDeleteTasks(ctx context.Context, authKey string) ([]logstorage.DeleteTaskInfoWithSource, error) { + if len(s.sns) == 0 { + return nil, nil + } + + g, ctx := errgroup.WithContext(ctx) + + // race-free slices + results := make([][]logstorage.DeleteTaskInfoWithSource, len(s.sns)) + for i, sn := range s.sns { + i, sn := i, sn + g.Go(func() error { + tasks, err := sn.getDeleteTasks(ctx, authKey) + if err != nil { + return err + } + results[i] = tasks + return nil + }) + } + + if err := g.Wait(); err != nil { + return nil, err + } + + var all []logstorage.DeleteTaskInfoWithSource + for _, ts := range results { + all = append(all, ts...) + } + + return all, nil +} + +func (sn *storageNode) getDeleteTasks(ctx context.Context, authKey string) ([]logstorage.DeleteTaskInfoWithSource, error) { + args := url.Values{} + args.Set("version", DeleteProtocolVersion) + if authKey != "" { + args.Set("authKey", authKey) + } + + reqURL := sn.getRequestURLWithArgs("/delete", args) + req, err := http.NewRequestWithContext(ctx, "GET", reqURL, nil) + if err != nil { + return nil, err + } + if err := sn.ac.SetHeaders(req, true); err != nil { + return nil, fmt.Errorf("cannot set auth headers for %q: %w", reqURL, err) + } + + resp, err := sn.c.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("cannot read response body from %q: %w", reqURL, err) + } + + if resp.StatusCode != 200 { + return nil, fmt.Errorf("unexpected status code for %q: %d; response: %q", reqURL, resp.StatusCode, body) + } + + var tasks []logstorage.DeleteTaskInfoWithSource + if err := json.Unmarshal(body, &tasks); err != nil { + return nil, fmt.Errorf("cannot decode delete tasks response from %q: %w; response body: %q", reqURL, err, body) + } + + // Attach origin address. + for i := range tasks { + if tasks[i].Storage == "" { + tasks[i].Storage = sn.addr + } + } + return tasks, nil +} diff --git a/app/vlstorage/netselect/netselect.go b/app/vlstorage/netselect/netselect.go index 0f2117e7ae..bc86cb7bec 100644 --- a/app/vlstorage/netselect/netselect.go +++ b/app/vlstorage/netselect/netselect.go @@ -61,6 +61,10 @@ const ( // // It must be updated every time the protocol changes. QueryProtocolVersion = "v2" + + // DeleteProtocolVersion is the version of the protocol used for /internal/delete HTTP endpoint. + // It must be updated every time the protocol changes. + DeleteProtocolVersion = "v1" ) // Storage is a network storage for querying remote storage nodes in the cluster. @@ -578,3 +582,63 @@ func unmarshalQueryStats(qs *logstorage.QueryStats, src []byte) ([]byte, error) } return tail, nil } + +// DeleteRows propagates delete markers to all storage nodes. +func (s *Storage) DeleteRows(ctx context.Context, tenantIDs []logstorage.TenantID, q *logstorage.Query, authKey string) error { + var wg sync.WaitGroup + errs := make([]error, len(s.sns)) + for i, sn := range s.sns { + i, sn := i, sn + wg.Add(1) + go func() { + defer wg.Done() + if err := sn.deleteRows(ctx, tenantIDs, q, authKey); err != nil { + errs[i] = fmt.Errorf("storage node %s: %w", sn.addr, err) + } + }() + } + wg.Wait() + + return errors.Join(errs...) +} + +func (sn *storageNode) deleteRows(ctx context.Context, tenantIDs []logstorage.TenantID, q *logstorage.Query, authKey string) error { + args := sn.getDeleteArgs(DeleteProtocolVersion, tenantIDs, q, authKey) + + reqURL := sn.getRequestURLWithArgs("/delete", args) + req, err := http.NewRequestWithContext(ctx, "POST", reqURL, nil) + if err != nil { + return err + } + if err := sn.ac.SetHeaders(req, true); err != nil { + return fmt.Errorf("cannot set auth headers for %q: %w", reqURL, err) + } + resp, err := sn.c.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + return fmt.Errorf("unexpected status code for request to %q: %d; response: %q", reqURL, resp.StatusCode, body) + } + return nil +} + +// getDeleteArgs builds url.Values for delete requests +func (sn *storageNode) getDeleteArgs(version string, tenantIDs []logstorage.TenantID, q *logstorage.Query, authKey string) url.Values { + args := url.Values{} + args.Set("version", version) + args.Set("tenant_ids", string(logstorage.MarshalTenantIDs(nil, tenantIDs))) + args.Set("query", q.String()) + args.Set("timestamp", fmt.Sprintf("%d", q.GetTimestamp())) + if authKey != "" { + args.Set("authKey", authKey) + } + return args +} + +// getRequestURLWithArgs builds request URL with encoded query args +func (sn *storageNode) getRequestURLWithArgs(path string, args url.Values) string { + return fmt.Sprintf("%s://%s%s?%s", sn.scheme, sn.addr, path, args.Encode()) +} diff --git a/go.mod b/go.mod index 62d57f9180..e64eda93d4 100644 --- a/go.mod +++ b/go.mod @@ -15,6 +15,7 @@ require ( github.com/valyala/fastjson v1.6.4 github.com/valyala/fastrand v1.1.0 github.com/valyala/quicktemplate v1.8.0 + golang.org/x/sync v0.16.0 ) require ( diff --git a/go.sum b/go.sum index c36b9069a3..866a475ef3 100644 --- a/go.sum +++ b/go.sum @@ -36,6 +36,8 @@ github.com/valyala/quicktemplate v1.8.0 h1:zU0tjbIqTRgKQzFY1L42zq0qR3eh4WoQQdIdq github.com/valyala/quicktemplate v1.8.0/go.mod h1:qIqW8/igXt8fdrUln5kOSb+KWMaJ4Y8QUsfd1k6L2jM= golang.org/x/oauth2 v0.30.0 h1:dnDm7JmhM45NNpd8FDDeLhK6FwqbOf4MLCM9zb1BOHI= golang.org/x/oauth2 v0.30.0/go.mod h1:B++QgG3ZKulg6sRPGD/mqlHQs5rB3Ml9erfeDY7xKlU= +golang.org/x/sync v0.16.0 h1:ycBJEhp9p4vXvUZNszeOq0kGTPghopOL8q0fq3vstxw= +golang.org/x/sync v0.16.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.35.0 h1:vz1N37gP5bs89s7He8XuIYXpyY0+QlsKmzipCbUtyxI= golang.org/x/sys v0.35.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= diff --git a/lib/logstorage/block_search.go b/lib/logstorage/block_search.go index b519dedfe8..1d23f47f19 100644 --- a/lib/logstorage/block_search.go +++ b/lib/logstorage/block_search.go @@ -24,6 +24,9 @@ type blockSearchWork struct { // bh is the header of the block to search. bh blockHeader + + // dm is the delete marker for the block. + dm *deleteMarker } func (bsw *blockSearchWork) reset() { @@ -63,10 +66,10 @@ var blockSearchWorkBatchPool sync.Pool func (bswb *blockSearchWorkBatch) appendBlockSearchWork(p *part, so *searchOptions, bh *blockHeader) bool { bsws := bswb.bsws - bsws = append(bsws, blockSearchWork{ p: p, so: so, + dm: p.deleteMarker.Load(), }) bsw := &bsws[len(bsws)-1] bsw.bh.copyFrom(bh) @@ -217,8 +220,22 @@ func (bs *blockSearch) search(qs *QueryStats, bsw *blockSearchWork, bm *bitmap) // search rows matching the given filter bm.init(int(bsw.bh.rowsCount)) bm.setBits() - bs.bsw.so.filter.applyToBlockSearch(bs, bm) + if bsw.dm != nil { + if ent, ok := bsw.dm.GetMarkedRows(bsw.bh.columnsHeaderOffset); ok { + if ent.IsOnes(bsw.bh.rowsCount) { + // Full-block delete – skip the block entirely. + return + } + + // Partial delete – measure rows before/after + // applying the marker for investigation. + ent.AndNotRLE(bm) + } + } + + // Apply query filter. + bs.bsw.so.filter.applyToBlockSearch(bs, bm) if bm.isZero() { // The filter doesn't match any logs in the current block. return diff --git a/lib/logstorage/block_stream_merger.go b/lib/logstorage/block_stream_merger.go index d684f8c38f..3abc7ba579 100644 --- a/lib/logstorage/block_stream_merger.go +++ b/lib/logstorage/block_stream_merger.go @@ -20,7 +20,19 @@ func mustMergeBlockStreams(ph *partHeader, bsw *blockStreamWriter, bsrs []*block break } bsr := bsm.readersHeap[0] - bsm.mustWriteBlock(&bsr.blockData) + + // Use columnsHeaderOffset (uint64) as a stable identifier for the block. + var currBlockID uint64 + if len(bsr.blockHeaders) > 0 { + idx := bsr.nextBlockIdx - 1 + if idx >= 0 && idx < len(bsr.blockHeaders) { + currBlockID = bsr.blockHeaders[idx].columnsHeaderOffset + } + } + if !bsm.processDeleteMarker(bsr, currBlockID) { + bsm.mustWriteBlock(&bsr.blockData) + } + if bsr.NextBlock() { heap.Fix(&bsm.readersHeap, 0) } else { @@ -292,3 +304,42 @@ func (h *blockStreamReadersHeap) Pop() any { *h = x[:len(x)-1] return bsr } + +// processDeleteMarker applies delete-markers to the current block in bsr. +// It returns true if the caller must `continue` the outer merge loop. +// Depending on the marker entry the function can: +// 1. Skip the block entirely (full delete); +// 2. Partially prune rows and re-queue the pruned block so that heap order is preserved. +// +// In both cases the original reader is advanced to the next block or popped when exhausted. +func (bsm *blockStreamMerger) processDeleteMarker(bsr *blockStreamReader, blockID uint64) bool { + dm := bsr.deleteMarker + if len(dm.blockIDs) == 0 { + return false + } + + bm, ok := dm.GetMarkedRows(blockID) + if !ok { + return false + } + + rowsTotal := int(bsr.blockData.rowsCount) + + // FULL DELETE ─ drop block completely. + if bm.IsOnes(uint64(rowsTotal)) { + return true + } + + // PARTIAL DELETE ─ keep compressed block, attach new marker because we must + // preserve ordering by minTimestamp and avoid heavy rewriting. + // Since we are about to write a block that originally followed everything + // already flushed, we must make sure nothing is buffered. + bsm.mustFlushRows() + + // Write the original block bytes unchanged. + bsm.bsw.MustWriteBlockData(&bsr.blockData) + newID := bsm.bsw.lastBlockID + bsm.bsw.dm.AddBlock(newID, bm) + + return true +} diff --git a/lib/logstorage/block_stream_reader.go b/lib/logstorage/block_stream_reader.go index 352fad9573..9c5f869acd 100644 --- a/lib/logstorage/block_stream_reader.go +++ b/lib/logstorage/block_stream_reader.go @@ -129,8 +129,7 @@ func (sr *streamReaders) reset() { } func (sr *streamReaders) init(partFormatVersion uint, columnNamesReader, columnIdxsReader, metaindexReader, indexReader, - columnsHeaderIndexReader, columnsHeaderReader, timestampsReader filestream.ReadCloser, - messageBloomValuesReader, oldBloomValuesReader bloomValuesStreamReader, bloomValuesShards []bloomValuesStreamReader, + columnsHeaderIndexReader, columnsHeaderReader, timestampsReader filestream.ReadCloser, messageBloomValuesReader, oldBloomValuesReader bloomValuesStreamReader, bloomValuesShards []bloomValuesStreamReader, ) { sr.partFormatVersion = partFormatVersion @@ -235,6 +234,9 @@ type blockStreamReader struct { // ph is the header for the part ph partHeader + // deleteMarker holds delete marker data for the given part. + deleteMarker deleteMarker + // streamReaders contains data readers in stream mode streamReaders streamReaders @@ -295,6 +297,7 @@ func (bsr *blockStreamReader) reset() { bsr.globalRowsCount = 0 bsr.globalBlocksCount = 0 + bsr.deleteMarker = deleteMarker{} bsr.sidLast.reset() bsr.minTimestampLast = 0 } @@ -332,6 +335,11 @@ func (bsr *blockStreamReader) MustInitFromInmemoryPart(mp *inmemoryPart) { // Read metaindex data bsr.indexBlockHeaders = mustReadIndexBlockHeaders(bsr.indexBlockHeaders[:0], &bsr.streamReaders.metaindexReader) + + // Initialize delete marker + if len(mp.deleteMarker.blockIDs) > 0 { + bsr.deleteMarker = mp.deleteMarker + } } // MustInitFromFilePart initializes bsr from file part at the given path. @@ -346,6 +354,7 @@ func (bsr *blockStreamReader) MustInitFromFilePart(path string) { columnNamesPath := filepath.Join(path, columnNamesFilename) columnIdxsPath := filepath.Join(path, columnIdxsFilename) + deleteMarkersPath := filepath.Join(path, deleteMarkerFilename) metaindexPath := filepath.Join(path, metaindexFilename) indexPath := filepath.Join(path, indexFilename) columnsHeaderIndexPath := filepath.Join(path, columnsHeaderIndexFilename) @@ -373,6 +382,14 @@ func (bsr *blockStreamReader) MustInitFromFilePart(path string) { var indexReader filestream.ReadCloser pfo.Add(indexPath, &indexReader, nocache) + // Open marker readers - check if files exist first + var deleteMarkerReader filestream.ReadCloser + var hasDeleteMarker bool + if fs.IsPathExist(deleteMarkersPath) { + hasDeleteMarker = true + pfo.Add(deleteMarkersPath, &deleteMarkerReader, nocache) + } + var columnsHeaderIndexReader filestream.ReadCloser if bsr.ph.FormatVersion >= 1 { pfo.Add(columnsHeaderIndexPath, &columnsHeaderIndexReader, nocache) @@ -420,6 +437,12 @@ func (bsr *blockStreamReader) MustInitFromFilePart(path string) { // Read metaindex data bsr.indexBlockHeaders = mustReadIndexBlockHeaders(bsr.indexBlockHeaders[:0], &bsr.streamReaders.metaindexReader) + + // Read delete marker if available + if hasDeleteMarker { + bsr.deleteMarker = mustReadDeleteMarkerData(deleteMarkerReader) + deleteMarkerReader.MustClose() + } } // NextBlock reads the next block from bsr and puts it into bsr.blockData. diff --git a/lib/logstorage/block_stream_writer.go b/lib/logstorage/block_stream_writer.go index e638743a14..08d042977c 100644 --- a/lib/logstorage/block_stream_writer.go +++ b/lib/logstorage/block_stream_writer.go @@ -211,6 +211,15 @@ type blockStreamWriter struct { // streamWriters contains writer for block data streamWriters streamWriters + // partPath is non-empty when writing a file-based part. + partPath string + + // mp points to the destination in-memory part when writing an in-memory part. + mp *inmemoryPart + + // dm accumulates per-block delete markers generated while writing. + dm deleteMarker + // sidLast is the streamID for the last written block sidLast streamID @@ -252,6 +261,9 @@ type blockStreamWriter struct { // indexBlockHeader is used for marshaling the data to metaindexData indexBlockHeader indexBlockHeader + + // lastBlockID is the columnsHeaderOffset of the most recently written block. + lastBlockID uint64 } // reset resets bsw for subsequent reuse. @@ -269,6 +281,7 @@ func (bsw *blockStreamWriter) reset() { bsw.globalMinTimestamp = 0 bsw.globalMaxTimestamp = 0 bsw.indexBlockData = bsw.indexBlockData[:0] + bsw.lastBlockID = 0 if len(bsw.metaindexData) > 1024*1024 { // The length of bsw.metaindexData is unbound, so drop too long buffer @@ -279,12 +292,18 @@ func (bsw *blockStreamWriter) reset() { } bsw.indexBlockHeader.reset() + + bsw.partPath = "" + bsw.mp = nil + bsw.dm = deleteMarker{} } // MustInitForInmemoryPart initializes bsw from mp func (bsw *blockStreamWriter) MustInitForInmemoryPart(mp *inmemoryPart) { bsw.reset() + bsw.mp = mp + messageBloomValues := mp.messageBloomValues.NewStreamWriter() createBloomValuesWriter := func(_ uint64) bloomValuesStreamWriter { return mp.fieldBloomValues.NewStreamWriter() @@ -299,6 +318,7 @@ func (bsw *blockStreamWriter) MustInitForInmemoryPart(mp *inmemoryPart) { func (bsw *blockStreamWriter) MustInitForFilePart(path string, nocache bool) { bsw.reset() + bsw.partPath = path fs.MustMkdirFailIfExist(path) // Open part files in parallel in order to minimze the time needed for this operation @@ -442,6 +462,7 @@ func (bsw *blockStreamWriter) mustWriteBlockInternal(sid *streamID, b *block, bd // Marshal bh bsw.indexBlockData = bh.marshal(bsw.indexBlockData) + bsw.lastBlockID = bh.columnsHeaderOffset putBlockHeader(bh) if len(bsw.indexBlockData) > maxUncompressedIndexBlockSize { bsw.mustFlushIndexBlock(bsw.indexBlockData) @@ -488,6 +509,18 @@ func (bsw *blockStreamWriter) Finalize(ph *partHeader) { ph.CompressedSizeBytes = bsw.streamWriters.totalBytesWritten() bsw.streamWriters.MustClose() + + // Persist accumulated delete markers if any + if len(bsw.dm.blockIDs) > 0 { + if bsw.mp != nil { + bsw.mp.deleteMarker = bsw.dm + } else if bsw.partPath != "" { + datBuf := bsw.dm.Marshal(nil) + datPath := filepath.Join(bsw.partPath, deleteMarkerFilename) + fs.MustWriteSync(datPath, datBuf) + } + } + bsw.reset() } diff --git a/lib/logstorage/datadb.go b/lib/logstorage/datadb.go index f52ebdce9f..89bdfc9dce 100644 --- a/lib/logstorage/datadb.go +++ b/lib/logstorage/datadb.go @@ -130,6 +130,12 @@ type partWrapper struct { // The deadline when in-memory part must be flushed to disk. flushDeadline time.Time + + // taskSeq is the highest task sequence applied to this part. + // Avoid storing this field on disk, as it requires updating every part of the affected partitions. + // The drawback is that on restart, it doesn't remember whether the task was completed and will re-run it. + // Therefore, the task must be idempotent to ensure this works correctly. + taskSeq atomic.Uint64 } func (pw *partWrapper) incRef() { @@ -310,7 +316,7 @@ func (ddb *datadb) mustFlushInmemoryPartsToFiles(isFinal bool) { ddb.partsLock.Lock() for _, pw := range ddb.inmemoryParts { - if !pw.isInMerge && (isFinal || pw.flushDeadline.Before(currentTime)) { + if !pw.isInMerge && (isFinal || pw.flushDeadline.Before(currentTime)) && !pw.hasPendingTask() { pw.isInMerge = true pws = append(pws, pw) } @@ -451,9 +457,11 @@ func (ddb *datadb) bigPartsMerger() { func getPartsToMergeLocked(pws []*partWrapper, maxOutBytes uint64) []*partWrapper { pwsRemaining := make([]*partWrapper, 0, len(pws)) for _, pw := range pws { - if !pw.isInMerge { - pwsRemaining = append(pwsRemaining, pw) + if pw.isInMerge || pw.hasPendingTask() { + continue } + + pwsRemaining = append(pwsRemaining, pw) } pwsToMerge := appendPartsToMerge(nil, pwsRemaining, maxOutBytes) @@ -585,6 +593,7 @@ func (ddb *datadb) mustMergeParts(pws []*partWrapper, isFinal bool) { mpNew.ph = ph } else { ph.mustWriteMetadata(dstPartPath) + // Make sure the created part directory contents is synced and visible in case of unclean shutdown. fs.MustSyncPathAndParentDir(dstPartPath) } @@ -1026,6 +1035,20 @@ func (ddb *datadb) swapSrcWithDstParts(pws []*partWrapper, pwNew *partWrapper, d ddb.bigParts, removedBigParts = removeParts(ddb.bigParts, partsToRemove) if pwNew != nil { + // Find the minimum taskSeq across all the source parts. + // This is the sequence number of the last applied task. + minSeq := uint64(math.MaxUint64) + for _, pw := range pws { + seq := pw.taskSeq.Load() + if seq < minSeq { + minSeq = seq + } + } + if minSeq == math.MaxUint64 { + minSeq = 0 + } + pwNew.taskSeq.Store(minSeq) + switch dstPartType { case partInmemory: ddb.inmemoryParts = append(ddb.inmemoryParts, pwNew) @@ -1110,6 +1133,8 @@ func newPartWrapper(p *part, mp *inmemoryPart, flushDeadline time.Time) *partWra flushDeadline: flushDeadline, } + seq := p.pt.deleteQueue.seq.Load() + pw.taskSeq.Store(seq) // Increase reference counter for newly created part - it is decreased when the part // is removed from the list of open parts. @@ -1118,6 +1143,14 @@ func newPartWrapper(p *part, mp *inmemoryPart, flushDeadline time.Time) *partWra return pw } +func (pw *partWrapper) hasPendingTask() bool { + pt := pw.p.pt + curSeq := pt.deleteQueue.seq.Load() + isPartitionPayingDebt := pt.s.deleteTaskSeq.Load() == curSeq + isPartInDebt := pw.taskSeq.Load() < curSeq + return isPartitionPayingDebt && isPartInDebt +} + func (ddb *datadb) getFlushToDiskDeadline(pws []*partWrapper) time.Time { d := time.Now().Add(ddb.flushInterval) for _, pw := range pws { @@ -1481,8 +1514,8 @@ func (ddb *datadb) mustForceMergeAllParts() { // Collect all the file parts for forced merge ddb.partsLock.Lock() - pws = appendAllPartsForMergeLocked(pws, ddb.smallParts) - pws = appendAllPartsForMergeLocked(pws, ddb.bigParts) + pws = appendAllPartsForForceMergeLocked(pws, ddb.smallParts) + pws = appendAllPartsForForceMergeLocked(pws, ddb.bigParts) ddb.partsLock.Unlock() // If len(pws) == 1, then the merge must run anyway. @@ -1508,7 +1541,7 @@ func (ddb *datadb) mustForceMergeAllParts() { putWaitGroup(wg) } -func appendAllPartsForMergeLocked(dst, src []*partWrapper) []*partWrapper { +func appendAllPartsForForceMergeLocked(dst, src []*partWrapper) []*partWrapper { for _, pw := range src { if !pw.isInMerge { pw.isInMerge = true diff --git a/lib/logstorage/delete_task.go b/lib/logstorage/delete_task.go new file mode 100644 index 0000000000..18697f36f4 --- /dev/null +++ b/lib/logstorage/delete_task.go @@ -0,0 +1,122 @@ +package logstorage + +import ( + "encoding/json" + "fmt" + "sync" + "sync/atomic" + "time" +) + +// deleteTaskStatus tracks the current state of a background delete operation. +type deleteTaskStatus string + +const ( + deleteTaskPending deleteTaskStatus = "pending" + deleteTaskSuccess deleteTaskStatus = "success" + deleteTaskError deleteTaskStatus = "error" +) + +// deleteTask captures all information needed to replay the delete query on parts. +type deleteTask struct { + TenantIDs []TenantID `json:"tenantIDs,omitempty"` + Query string `json:"query"` + Timestamp int64 `json:"timestamp,omitempty"` + Seq uint64 `json:"seq,omitempty"` + Status deleteTaskStatus `json:"status,omitempty"` + CreatedTime int64 `json:"createdTime,omitempty"` + DoneTime int64 `json:"doneTime,omitempty"` + ErrorMsg string `json:"error,omitempty"` +} + +// deleteTaskQueue holds the backlog of delete jobs for a partition. +type deleteTaskQueue struct { + pt *partition + + mu sync.Mutex + ts []deleteTask + seq atomic.Uint64 +} + +func newDeleteTaskQueue(pt *partition, tasks []deleteTask) *deleteTaskQueue { + return &deleteTaskQueue{ + pt: pt, + ts: tasks, + } +} + +func (dq *deleteTaskQueue) nextPendingTask() deleteTask { + var result deleteTask + + dq.mu.Lock() + for i := range dq.ts { + task := dq.ts[i] + if task.Status == deleteTaskPending { + result = task + break + } + } + dq.mu.Unlock() + + dq.seq.Store(result.Seq) + return result +} + +func (dq *deleteTaskQueue) resolve(seq uint64, err error) { + status, errMsg := deleteTaskSuccess, "" + if err != nil { + status, errMsg = deleteTaskError, err.Error() + } + + dq.mu.Lock() + for i := range dq.ts { + t := &dq.ts[i] + + if t.Seq < seq { + continue + } + if t.Seq > seq || t.Status != deleteTaskPending { + dq.mu.Unlock() + return + } + + t.Status = status + t.DoneTime = time.Now().UnixNano() + t.ErrorMsg = errMsg + dq.mu.Unlock() + + dq.pt.mustSaveDeleteTasks() + return + } + dq.mu.Unlock() +} + +func (dq *deleteTaskQueue) add(tenantIDs []TenantID, q *Query, seq uint64) uint64 { + task := deleteTask{ + Seq: seq, + TenantIDs: tenantIDs, + Query: q.String(), + Timestamp: q.GetTimestamp(), + Status: deleteTaskPending, + CreatedTime: time.Now().UnixNano(), + } + + dq.mu.Lock() + dq.ts = append(dq.ts, task) + dq.mu.Unlock() + + dq.pt.mustSaveDeleteTasks() + return seq +} + +func unmarshalDeleteTasks(data []byte) ([]deleteTask, error) { + if len(data) == 0 { + return nil, nil + } + + var tasks []deleteTask + if err := json.Unmarshal(data, &tasks); err != nil { + return nil, fmt.Errorf("unmarshal delete tasks: %w", err) + } + return tasks, nil +} diff --git a/lib/logstorage/delete_task_info.go b/lib/logstorage/delete_task_info.go new file mode 100644 index 0000000000..ecf2ecd35b --- /dev/null +++ b/lib/logstorage/delete_task_info.go @@ -0,0 +1,129 @@ +package logstorage + +import ( + "strings" + "sync" + "time" +) + +// DeleteTaskInfo represents brief information about a background delete task. +type DeleteTaskInfo struct { + Seq uint64 `json:"seq"` + Status deleteTaskStatus `json:"status"` + Tenant string `json:"tenant"` + Query string `json:"query"` + CreatedTime int64 `json:"createdTime,omitempty"` + DoneTime int64 `json:"doneTime,omitempty"` + Error string `json:"error,omitempty"` +} + +// DeleteTaskInfoWithSource extends DeleteTaskInfo with metadata about the storage node it originated from. +type DeleteTaskInfoWithSource struct { + DeleteTaskInfo `json:",inline"` + Storage string `json:"storage"` +} + +var deleteTasksCache struct { + mu sync.Mutex + ts time.Time + data []DeleteTaskInfo +} + +// ListDeleteTasks gathers information about all delete tasks known to this Storage instance. +// The returned slice isn't sorted. +func (s *Storage) ListDeleteTasks() []DeleteTaskInfo { + deleteTasksCache.mu.Lock() + const cacheTTL = 5 * time.Second + if time.Since(deleteTasksCache.ts) < cacheTTL && deleteTasksCache.data != nil { + d := append([]DeleteTaskInfo(nil), deleteTasksCache.data...) + deleteTasksCache.mu.Unlock() + return d + } + deleteTasksCache.mu.Unlock() + + s.partitionsLock.Lock() + pws := append([]*partitionWrapper(nil), s.partitions...) + for _, pw := range pws { + pw.incRef() + } + s.partitionsLock.Unlock() + defer func() { + for _, p := range pws { + p.decRef() + } + }() + + var out []DeleteTaskInfo + seqToIdx := make(map[uint64]int) + merge := func(dst *DeleteTaskInfo, src *DeleteTaskInfo) { + prioritize := func(s deleteTaskStatus) int { + switch s { + case deleteTaskError: + return 3 + case deleteTaskPending: + return 2 + case deleteTaskSuccess: + return 1 + } + return 0 + } + if prioritize(src.Status) > prioritize(dst.Status) { + dst.Status = src.Status + dst.Error = src.Error + } + if dst.CreatedTime == 0 || (src.CreatedTime != 0 && src.CreatedTime < dst.CreatedTime) { + dst.CreatedTime = src.CreatedTime + } + if src.DoneTime > dst.DoneTime { + dst.DoneTime = src.DoneTime + } + } + + for _, p := range pws { + dq := p.pt.deleteQueue + + dq.mu.Lock() + tasks := append([]deleteTask(nil), dq.ts...) + dq.mu.Unlock() + + for _, t := range tasks { + info := deleteTaskToInfo(t) + if idx, ok := seqToIdx[info.Seq]; ok { + merge(&out[idx], &info) + continue + } + seqToIdx[info.Seq] = len(out) + out = append(out, info) + } + } + + deleteTasksCache.mu.Lock() + deleteTasksCache.ts = time.Now() + deleteTasksCache.data = out + deleteTasksCache.mu.Unlock() + + return out +} + +func deleteTaskToInfo(t deleteTask) DeleteTaskInfo { + tn := "*" + if len(t.TenantIDs) > 0 { + var b strings.Builder + for i, id := range t.TenantIDs { + if i > 0 { + b.WriteByte(',') + } + b.WriteString(id.String()) + } + tn = b.String() + } + return DeleteTaskInfo{ + Seq: t.Seq, + Status: t.Status, + Tenant: tn, + Query: t.Query, + CreatedTime: t.CreatedTime, + DoneTime: t.DoneTime, + Error: t.ErrorMsg, + } +} diff --git a/lib/logstorage/delete_task_worker.go b/lib/logstorage/delete_task_worker.go new file mode 100644 index 0000000000..2d3f719d8e --- /dev/null +++ b/lib/logstorage/delete_task_worker.go @@ -0,0 +1,227 @@ +package logstorage + +import ( + "context" + "fmt" + "math" + "time" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" +) + +// startDeleteTaskWorker launches a background goroutine, which periodically +// scans partitions for parts lagging behind delete tasks and applies these +// tasks by re-executing their underlying queries via MarkRows (with +// createTask=false). +func (s *Storage) startDeleteTaskWorker() { + const interval = 5 * time.Second + const maxFailure = 3 + + ctx, cancel := context.WithCancel(context.Background()) + go func() { + <-s.stopCh + cancel() + }() + + s.wg.Add(1) + go func() { + defer s.wg.Done() + + for failedTime := 0; ; { + select { + case <-s.stopCh: + return + case <-time.After(interval): + // Honour pause requests, if any. If cannot process, just reset timer and continue. + if s.deleteTaskState.isPaused() { + continue + } + + seq, err := s.executeDeleteTasksOnce(ctx) + if err != nil { + logger.Errorf("delete task worker: %s", err) + failedTime++ + } else { + failedTime = 0 + } + + if failedTime > maxFailure { + s.setTaskFailed(seq, err) + failedTime = 0 + } + } + } + }() +} + +// executeDeleteTasksOnce performs a single pass over all partitions (latest → oldest) +// and applies pending delete tasks to every part that hasn't caught up yet. +func (s *Storage) executeDeleteTasksOnce(ctx context.Context) (uint64, error) { + var seq uint64 + + // Snapshot partitions (most recent first). + s.partitionsLock.Lock() + ptws := append([]*partitionWrapper{}, s.partitions...) + for _, ptw := range ptws { + ptw.incRef() + } + s.partitionsLock.Unlock() + + defer func() { + for _, ptw := range ptws { + ptw.decRef() + } + }() + + outdatedPtws, task := s.findNextDeleteTask(ptws) + if task.Seq == 0 { + return 0, nil + } + + seq = task.Seq + + // Gather all lagging parts in the target partition for this sequence. + lagging := []*partWrapper{} + pending := 0 + for _, ptw := range outdatedPtws { + pt := ptw.pt + pt.ddb.partsLock.Lock() + allPws := [][]*partWrapper{pt.ddb.inmemoryParts, pt.ddb.smallParts, pt.ddb.bigParts} + for _, arr := range allPws { + for _, pw := range arr { + if pw.taskSeq.Load() >= task.Seq { + continue + } + + if pw.isInMerge { + pending++ + continue + } + + if pw.mustDrop.Load() { + continue + } + + pw.incRef() + lagging = append(lagging, pw) + } + } + pt.ddb.partsLock.Unlock() + } + + // If there are no lagging parts and no pending parts, + // mark the task as successful and return. + if len(lagging) == 0 { + if pending > 0 { + return seq, nil + } + + s.setTaskComplete(outdatedPtws, task.Seq, false, nil) + return seq, nil + } + defer func() { + for _, pw := range lagging { + pw.decRef() + } + }() + + if err := s.processDeleteTask(ctx, task, lagging); err != nil { + return 0, fmt.Errorf("run delete task: %w", err) + } + + for _, pw := range lagging { + pw.taskSeq.Store(task.Seq) + } + + if pending == 0 { + s.setTaskComplete(outdatedPtws, task.Seq, false, nil) + } + + return seq, nil +} + +func (s *Storage) setTaskFailed(sequence uint64, err error) { + if sequence == 0 || err == nil { + return + } + + // Take a snapshot of partitions + s.partitionsLock.Lock() + ptws := append([]*partitionWrapper{}, s.partitions...) + for _, ptw := range ptws { + ptw.incRef() + } + s.partitionsLock.Unlock() + + // Mark the tasks as error for partitions and parts + s.setTaskComplete(ptws, sequence, true, err) + for _, ptw := range ptws { + ptw.decRef() + } +} + +func (s *Storage) setTaskComplete(ptws []*partitionWrapper, taskSeq uint64, includeParts bool, err error) { + for _, ptw := range ptws { + pt := ptw.pt + + if includeParts { + // Ensure every part in this partition has taskSeq at least `sequence`. + pt.ddb.partsLock.Lock() + all := [][]*partWrapper{pt.ddb.inmemoryParts, pt.ddb.smallParts, pt.ddb.bigParts} + for _, arr := range all { + for _, pw := range arr { + if pw.taskSeq.Load() < taskSeq { + pw.taskSeq.Store(taskSeq) + } + } + } + pt.ddb.partsLock.Unlock() + } + + pt.deleteQueue.resolve(taskSeq, err) + } +} + +func (s *Storage) processDeleteTask(ctx context.Context, task deleteTask, lagging []*partWrapper) error { + // Build allowed set + allowed := make(map[*partition][]*partWrapper, len(lagging)) + for _, pw := range lagging { + allowed[pw.p.pt] = append(allowed[pw.p.pt], pw) + } + + err := s.markDeleteRowsOnParts(ctx, task.TenantIDs, task.Query, task.Timestamp, task.Seq, allowed) + if err != nil { + return fmt.Errorf("failed to mark delete rows on parts: %w", err) + } + + return nil +} + +func (s *Storage) findNextDeleteTask(ptws []*partitionWrapper) ([]*partitionWrapper, deleteTask) { + var minSeq uint64 = math.MaxUint64 + var result deleteTask + var resultPtws []*partitionWrapper + + for _, ptw := range ptws { + pt := ptw.pt + + task := pt.deleteQueue.nextPendingTask() + if task.Seq == 0 || task.Seq > minSeq { + continue + } + + // If we find a smaller sequence, + // reset the slice to start a new collection. + if task.Seq < minSeq { + minSeq = task.Seq + resultPtws = append(resultPtws[:0], ptw) + result = task + continue + } + + resultPtws = append(resultPtws, ptw) + } + + s.deleteTaskSeq.Store(result.Seq) + return resultPtws, result +} diff --git a/lib/logstorage/filenames.go b/lib/logstorage/filenames.go index 23f5f41828..68ffe0ee7c 100644 --- a/lib/logstorage/filenames.go +++ b/lib/logstorage/filenames.go @@ -15,9 +15,15 @@ const ( messageValuesFilename = "message_values.bin" messageBloomFilename = "message_bloom.bin" + // Filename for per-row marker data (e.g. delete markers). + deleteMarkerFilename = "deleted_rows.bin" + metadataFilename = "metadata.json" partsFilename = "parts.json" + // Filename for delete tasks storage at partition level. + deleteTasksFilename = "delete_tasks.json" + indexdbDirname = "indexdb" datadbDirname = "datadb" partitionsDirname = "partitions" diff --git a/lib/logstorage/inmemory_part.go b/lib/logstorage/inmemory_part.go index ee36260d69..cdcacc301e 100644 --- a/lib/logstorage/inmemory_part.go +++ b/lib/logstorage/inmemory_part.go @@ -5,6 +5,8 @@ import ( "sort" "sync" + "io" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/chunkedbuffer" "github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" @@ -25,6 +27,22 @@ type inmemoryPart struct { messageBloomValues bloomValuesBuffer fieldBloomValues bloomValuesBuffer + + deleteMarker deleteMarker +} + +// deleteMarkerWriter implements io.WriterTo for writing delete marker data +type deleteMarkerWriter struct { + dm *deleteMarker +} + +func (dmw *deleteMarkerWriter) WriteTo(w io.Writer) (int64, error) { + if len(dmw.dm.blockIDs) == 0 { + return 0, nil + } + data := dmw.dm.Marshal(nil) + n, err := w.Write(data) + return int64(n), err } type bloomValuesBuffer struct { @@ -65,6 +83,9 @@ func (mp *inmemoryPart) reset() { mp.messageBloomValues.reset() mp.fieldBloomValues.reset() + + // Drop any attached deleteMarker. + mp.deleteMarker = deleteMarker{} } // mustInitFromRows initializes mp from lr. @@ -133,6 +154,13 @@ func (mp *inmemoryPart) MustStoreToDisk(path string) { psw.Add(messageBloomFilterPath, &mp.messageBloomValues.bloom) psw.Add(messageValuesPath, &mp.messageBloomValues.values) + // Persist delete-marker data if present using ParallelStreamWriter + if len(mp.deleteMarker.blockIDs) > 0 { + deleteMarkerPath := filepath.Join(path, deleteMarkerFilename) + dmw := &deleteMarkerWriter{dm: &mp.deleteMarker} + psw.Add(deleteMarkerPath, dmw) + } + bloomPath := getBloomFilePath(path, 0) psw.Add(bloomPath, &mp.fieldBloomValues.bloom) diff --git a/lib/logstorage/marker_delete.go b/lib/logstorage/marker_delete.go new file mode 100644 index 0000000000..71885ef9c6 --- /dev/null +++ b/lib/logstorage/marker_delete.go @@ -0,0 +1,244 @@ +package logstorage + +import ( + "fmt" + "io" + "path/filepath" + "slices" + "strings" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" +) + +// deleteMarker keeps per-block Delete markers. +type deleteMarker struct { + blockIDs []uint64 // sorted block sequence numbers that have marker data + rows []boolRLE // same length and order as blockIDs +} + +func (dm *deleteMarker) String() string { + s := strings.Builder{} + for i, blockID := range dm.blockIDs { + s.WriteString(fmt.Sprintf("| [blockID: %d, row: %v] ", blockID, dm.rows[i].String())) + } + return s.String() +} + +// GetMarkedRows returns marked rows for the given block sequence number. +func (dm *deleteMarker) GetMarkedRows(blockSeq uint64) (boolRLE, bool) { + idx, found := slices.BinarySearch(dm.blockIDs, blockSeq) + if !found { + return nil, false + } + return dm.rows[idx], true +} + +// Marshal serializes delete marker data to the provided buffer. +// Format: [num_blocks:varuint64][block_id:uint64][rle_len:varuint64][rle_data:bytes]... +func (dm *deleteMarker) Marshal(dst []byte) []byte { + // Number of blocks with markers + dst = encoding.MarshalVarUint64(dst, uint64(len(dm.blockIDs))) + + // For each block, write: block_id + rle_length + rle_data + for i, blockID := range dm.blockIDs { + dst = encoding.MarshalUint64(dst, blockID) + + rleData := dm.rows[i] + dst = encoding.MarshalVarUint64(dst, uint64(len(rleData))) + dst = append(dst, rleData...) + } + + return dst +} + +// Unmarshal parses delete‑marker data from the provided bytes. +// It returns an error if the data is malformed. +func (dm *deleteMarker) Unmarshal(data []byte) error { + *dm = deleteMarker{} // reset receiver + + if len(data) == 0 { + return nil + } + + pos := 0 + end := len(data) + + numBlocks, n := encoding.UnmarshalVarUint64(data) + pos += n + + // sanity guard against corrupt data + if numBlocks > 1<<31 { + return fmt.Errorf("too many blocks: %d", numBlocks) + } + + dm.blockIDs = make([]uint64, 0, numBlocks) + dm.rows = make([]boolRLE, 0, numBlocks) + for i := range numBlocks { + // block‑ID (8 bytes) + if pos+8 > end { + return fmt.Errorf("truncated data: cannot read block_id %d", i) + } + blockID := encoding.UnmarshalUint64(data[pos:]) + pos += 8 + + // RLE length (varuint) + rleLen, n := encoding.UnmarshalVarUint64(data[pos:]) + pos += n + + // RLE bytes + if pos+int(rleLen) > end { + return fmt.Errorf("truncated data: cannot read rle_data for block %d", i) + } + rleSlice := data[pos : pos+int(rleLen)] // zero‑copy view + pos += int(rleLen) + + dm.blockIDs = append(dm.blockIDs, blockID) + dm.rows = append(dm.rows, boolRLE(rleSlice)) + } + + if pos != end { + return fmt.Errorf("unexpected %d trailing bytes in deleteMarker", end-pos) + } + return nil +} + +// merge combines this deleteMarker with another deleteMarker. +// It merges block deletions using RLE union operations where both markers have the same blockID. +func (dm *deleteMarker) merge(other *deleteMarker) { + // Nothing to merge? + if other == nil || len(other.blockIDs) == 0 { + return + } + if len(dm.blockIDs) == 0 { + // dm is empty – just copy other's data. + dm.blockIDs = append([]uint64(nil), other.blockIDs...) + dm.rows = append([]boolRLE(nil), other.rows...) + return + } + + // Two‑pointer merge because both blockID slices are already sorted. + mergedIDs := make([]uint64, 0, len(dm.blockIDs)+len(other.blockIDs)) + mergedRows := make([]boolRLE, 0, len(dm.rows)+len(other.rows)) + + i, j := 0, 0 + for i < len(dm.blockIDs) && j < len(other.blockIDs) { + idA, idB := dm.blockIDs[i], other.blockIDs[j] + + switch { + case idA == idB: + // Same block exists in both markers – union their RLEs. + mergedIDs = append(mergedIDs, idA) + mergedRows = append(mergedRows, dm.rows[i].Union(other.rows[j])) + i++ + j++ + + case idA < idB: + mergedIDs = append(mergedIDs, idA) + mergedRows = append(mergedRows, dm.rows[i]) + i++ + + default: // idB < idA + mergedIDs = append(mergedIDs, idB) + mergedRows = append(mergedRows, other.rows[j]) + j++ + } + } + + // Append leftovers from either slice. + for ; i < len(dm.blockIDs); i++ { + mergedIDs = append(mergedIDs, dm.blockIDs[i]) + mergedRows = append(mergedRows, dm.rows[i]) + } + for ; j < len(other.blockIDs); j++ { + mergedIDs = append(mergedIDs, other.blockIDs[j]) + mergedRows = append(mergedRows, other.rows[j]) + } + + // Install merged slices. + dm.blockIDs = mergedIDs + dm.rows = mergedRows +} + +// AddBlock adds a block with its RLE data to the deleteMarker. +// If blockID already exists, it merges the RLE data using union operation. +func (dm *deleteMarker) AddBlock(blockID uint64, rle boolRLE) { + // Find existing block or insertion point + idx, found := slices.BinarySearch(dm.blockIDs, blockID) + + if found { + // Block already exists - merge RLE data using union operation + existingRLE := dm.rows[idx] + combined := existingRLE.Union(rle) + dm.rows[idx] = combined + } else { + // Block doesn't exist - insert at correct position + dm.blockIDs = slices.Insert(dm.blockIDs, idx, blockID) + dm.rows = slices.Insert(dm.rows, idx, rle) + } +} + +// mustReadDeleteMarkerData reads delete marker data from the provided reader. +func mustReadDeleteMarkerData(datReader filestream.ReadCloser) deleteMarker { + var res deleteMarker + if datReader == nil { + return res + } + + datBytes, err := io.ReadAll(datReader) + if err != nil { + logger.Panicf("FATAL: %s: cannot read delete marker data: %s", datReader.Path(), err) + } + + if len(datBytes) == 0 { + return res + } + + if err := res.Unmarshal(datBytes); err != nil { + logger.Panicf("FATAL: %s: cannot unmarshal delete marker data: %s", datReader.Path(), err) + } + + return res +} + +// flushDeleteMarker writes delMarker to disk and updates the in-memory index +// for the given part. Writers are serialized by ddb.partsLock; readers access +// the index lock-free via atomic.Load. +func flushDeleteMarker(pw *partWrapper, dm *deleteMarker, seq uint64) { + if dm == nil || len(dm.blockIDs) == 0 { + return // nothing to flush + } + + current := pw.p.deleteMarker.Load() + var merged *deleteMarker + if current == nil { + // First marker – just deep-copy delMarker to avoid sharing mutable slices. + merged = &deleteMarker{ + blockIDs: append([]uint64(nil), dm.blockIDs...), + rows: append([]boolRLE(nil), dm.rows...), + } + } else { + // Copy-on-write: start from current snapshot and merge additions. + merged = &deleteMarker{ + blockIDs: append([]uint64(nil), current.blockIDs...), + rows: append([]boolRLE(nil), current.rows...), + } + merged.merge(dm) + } + + // Publish the new snapshot for readers. + pw.p.deleteMarker.Store(merged) + + // Persist to disk. + if pw.p.path != "" { + datBuf := merged.Marshal(nil) + partPath := pw.p.path + datPath := filepath.Join(partPath, deleteMarkerFilename) + fs.MustWriteAtomic(datPath, datBuf, true /*overwrite*/) + fs.MustSyncPath(partPath) + } + + pw.taskSeq.Store(seq) +} diff --git a/lib/logstorage/marker_rle.go b/lib/logstorage/marker_rle.go new file mode 100644 index 0000000000..41e2c07051 --- /dev/null +++ b/lib/logstorage/marker_rle.go @@ -0,0 +1,460 @@ +package logstorage + +import ( + "math/bits" + "strconv" + "strings" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" +) + +// MarshalBoolRLE encodes the given bitmap into run-length encoding and appends the +// resulting bytes to dst. The format starts with the length of the initial run +// of zero bits and then alternates zero-runs and one-runs until the end of the +// bitmap is reached. All run lengths are encoded with variable-length unsigned +// integers from the encoding package. +// +// Examples: +// +// 000111 -> 3,3 (encoded as VarUInt64 3,3) +// 1100 -> 0,2,2 (encoded 0,2,2) +func (bm *bitmap) MarshalBoolRLE(dst []byte) boolRLE { + if bm == nil || bm.bitsLen == 0 { + return nil + } + + zerosRun := true // we always start with a zeros run + runLen := uint64(0) + + flush := func() { + dst = encoding.MarshalVarUint64(dst, runLen) + runLen = 0 + zerosRun = !zerosRun + } + + words := bm.a + fullWords := bm.bitsLen / 64 + tailBits := bm.bitsLen % 64 + + for wi := range fullWords { + w := words[wi] + + // fast-path whole-word runs + if zerosRun && w == 0 { + runLen += 64 + continue + } + if !zerosRun && w == ^uint64(0) { + runLen += 64 + continue + } + + // drill into the mixed word + for wBits := 64; wBits > 0; { + var step int + if zerosRun { + step = bits.TrailingZeros64(w) + } else { + step = trailingOnes64(w) + } + if step == 0 { // current bit flips + flush() + continue + } + if step > wBits { // when trailing* returns 64 + step = wBits + } + runLen += uint64(step) + w >>= uint(step) + wBits -= step + } + } + + // handle tail bits (if bitsLen not a multiple of 64) + if tailBits > 0 { + tail := words[fullWords] & ((uint64(1) << tailBits) - 1) + for tb := tailBits; tb > 0; { + var step int + if zerosRun { + step = bits.TrailingZeros64(tail) + } else { + step = trailingOnes64(tail) + } + if step == 0 { + flush() + continue + } + if step > tb { + step = tb + } + runLen += uint64(step) + tail >>= uint(step) + tb -= step + } + } + + // flush the final run + dst = encoding.MarshalVarUint64(dst, runLen) + return dst +} + +func trailingOnes64(x uint64) int { + return bits.TrailingZeros64(^x) +} + +// AndNotRLE performs dst &= ^rle, where rle is a bitmap encoded with MarshalRLE. +// In other words, it clears bits in dst that correspond to one-runs in the supplied RLE stream. +func (rle boolRLE) AndNotRLE(dst *bitmap) { + if dst == nil || len(rle) == 0 { + return + } + + var ( + idx int // current byte position in rle slice + pos int // current bit position inside dst + ) + + for idx < len(rle) && pos < dst.bitsLen { + // Decode zeros-run. + zeros, n := encoding.UnmarshalVarUint64(rle[idx:]) + idx += n + pos += int(zeros) + if pos >= dst.bitsLen { + break + } + + // Decode ones-run. + ones, n := encoding.UnmarshalVarUint64(rle[idx:]) + idx += n + if ones > 0 { + dst.clearBitsRange(pos, int(ones)) + } + pos += int(ones) + } +} + +func (rle boolRLE) IsOnes(totalRows uint64) bool { + zeros, n := encoding.UnmarshalVarUint64(rle) + if zeros != 0 { + return false + } + ones, _ := encoding.UnmarshalVarUint64(rle[n:]) + return ones >= totalRows +} + +// clearBitsRange clears n bits starting from the given bit offset. +func (bm *bitmap) clearBitsRange(start, n int) { + if n <= 0 || start >= bm.bitsLen { + return + } + + end := min(start+n, bm.bitsLen) + + startWord := start >> 6 // starting word index + endWord := (end - 1) >> 6 // ending word index (inclusive) + + // handle the first word (may be the only one) + if startWord == endWord { + maskStart := uint(start & 63) + maskEnd := uint((end - 1) & 63) + mask := ((^uint64(0)) << maskStart) & ((uint64(1) << (maskEnd + 1)) - 1) + bm.a[startWord] &^= mask + return + } + + // clear from start bit to end of start word + if offset := uint(start & 63); offset != 0 { + mask := ^uint64(0) << offset + bm.a[startWord] &^= mask + startWord++ + } + + // clear full words in the middle + for i := startWord; i < endWord; i++ { + bm.a[i] = 0 + } + + // clear beginning part of the last word up to end bit + maskEnd := uint((end - 1) & 63) + tailMask := (uint64(1) << (maskEnd + 1)) - 1 + bm.a[endWord] &^= tailMask +} + +type boolRLE []byte + +// String decodes the RLE stream (var-uint64-encoded run lengths of +// zero-runs and one-runs) and prints it as "[r0,r1,r2,…]". +// Debug only. +func (r boolRLE) String() string { + if len(r) == 0 { + return "[]" + } + var sb strings.Builder + sb.WriteByte('[') + + pos := 0 + first := true + for pos < len(r) { + run, n := encoding.UnmarshalVarUint64(r[pos:]) + if n == 0 { + // malformed tail: give up on further decoding + break + } + pos += n + if !first { + sb.WriteByte(',') + } + first = false + sb.WriteString(strconv.FormatUint(run, 10)) + } + sb.WriteByte(']') + return sb.String() +} + +func (rle boolRLE) SetAllOnes(count int) boolRLE { + rle = make([]byte, 0, 16) + rle = encoding.MarshalVarUint64(rle, 0) + rle = encoding.MarshalVarUint64(rle, uint64(count)) + return rle +} + +// IsStateful returns true if the bitmap encoded in rle +// contains two or more 1-bits. +// +// It walks the RLE stream once and exits as soon as the second +// one-bit is found, so it is O(#runs) with early termination. +// IsStateful reports whether the RLE slice contains +// at least two encoded run-lengths. +func (rle boolRLE) IsStateful() bool { + if len(rle) == 0 { + return false + } + + _, n := encoding.UnmarshalVarUint64(rle) + if n <= 0 || n >= len(rle) { + return false + } + return true +} + +func (rle boolRLE) IsSubsetOf(other boolRLE) bool { + // Fast paths -------------------------------------------------------- + if len(rle) == 0 { // empty bitmap is subset of anything + return true + } + if len(other) == 0 { // other is all-zero ⇒ rle must have no 1s + return !rle.containsOne() + } + + // Decoder for a single RLE stream ---------------------------------- + type decoder struct { + src boolRLE + idx int // read offset in src + rem uint64 // rows left in current run + ones bool // type of current run (false = zeros) + } + load := func(d *decoder) { + for d.rem == 0 && d.idx < len(d.src) { + run, n := encoding.UnmarshalVarUint64(d.src[d.idx:]) + d.idx += n + if run == 0 { + // Zero-length run: flip run type and continue reading. + d.ones = !d.ones + continue + } + d.rem = run + // NOTE: do *not* flip d.ones here; it already describes this run. + } + } + + var a, b decoder + a.src = rle + b.src = other + + for { + load(&a) + load(&b) + + // a finished → every 1-bit matched ⇒ subset + if a.rem == 0 && a.idx >= len(a.src) { + return true + } + + // b finished → remaining bits are zeros + if b.rem == 0 && b.idx >= len(b.src) { + // If a still has any 1s, not a subset + return !(a.ones && a.rem > 0) + } + + // Determine how many rows we can consume in this step. + var span uint64 + switch { + case a.rem == 0: + span = b.rem + case b.rem == 0: + span = a.rem + case a.rem < b.rem: + span = a.rem + default: + span = b.rem + } + + // If this span has 1s in a and 0s in b -> violation. + if a.ones && !b.ones && span > 0 { + return false + } + + // Consume span from both streams. + if a.rem >= span { + a.rem -= span + if a.rem == 0 { + a.ones = !a.ones + } + } + if b.rem >= span { + b.rem -= span + if b.rem == 0 { + b.ones = !b.ones + } + } + } +} + +// containsOne is an O(#runs) helper: true if bitmap has any 1-bit. +func (rle boolRLE) containsOne() bool { + idx := 0 + ones := false + for idx < len(rle) { + run, n := encoding.UnmarshalVarUint64(rle[idx:]) + idx += n + if ones && run > 0 { + return true + } + ones = !ones + } + return false +} + +// Union returns an RLE-encoded bitmap equal to the bit-wise OR of rle and other. +// It walks the two RLE streams in lock-step, so it never allocates an +// intermediate bitmap. The first run in every RLE stream is zeros. +func (rle boolRLE) Union(other boolRLE) boolRLE { + // Fast paths. + if len(rle) == 0 { + return append(boolRLE(nil), other...) + } + if len(other) == 0 { + return append(boolRLE(nil), rle...) + } + + // Decoder state for one stream. + type decoder struct { + src boolRLE + idx int // byte offset + pos int // absolute bit position at start of current run + n uint64 // rows left in current run + isOne bool // current run type (false = zeros) + } + + // Advance d to the next non-empty run. + step := func(d *decoder) { + for d.idx < len(d.src) && d.n == 0 { + run, n := encoding.UnmarshalVarUint64(d.src[d.idx:]) + d.idx += n + d.pos += int(run) + d.n = run + if d.n == 0 { + d.isOne = !d.isOne // zero-length run, just flip type + continue + } + return + } + } + + var a, b decoder + a.src = rle + b.src = other + step(&a) + step(&b) + + // Output builder. + outRuns := make([]uint64, 0, 8) + outIsOne := false // first run type (zeros) + curLen := uint64(0) + + flush := func() { + outRuns = append(outRuns, curLen) + curLen = 0 + } + + const inf = ^uint64(0) >> 1 + + for !(a.n == 0 && a.idx >= len(a.src) && + b.n == 0 && b.idx >= len(b.src)) { + + // Remaining run lengths (∞ once a stream is exhausted). + nA := a.n + if nA == 0 && a.idx >= len(a.src) { + nA = inf + } + nB := b.n + if nB == 0 && b.idx >= len(b.src) { + nB = inf + } + span := nA + if nB < span { + span = nB + } + if span == inf { // both exhausted + break + } + + spanOnes := (a.isOne && a.n > 0) || (b.isOne && b.n > 0) + + if spanOnes == outIsOne { + // Same run type – extend. + curLen += span + } else { + // Run type flips. + if curLen > 0 || len(outRuns) > 0 { + flush() + } else if curLen == 0 && len(outRuns) == 0 { + // First span is a 1-run: emit leading zero-run of length 0. + outRuns = append(outRuns, 0) + } + outIsOne = spanOnes + curLen = span + } + + // Consume span from stream A. + if a.n >= span { + a.n -= span + if a.n == 0 { + a.isOne = !a.isOne + step(&a) + } + } + // Consume span from stream B. + if b.n >= span { + b.n -= span + if b.n == 0 { + b.isOne = !b.isOne + step(&b) + } + } + } + + flush() + + // Drop trailing zero-run if present (length-saving convention). + if len(outRuns) > 0 && !outIsOne && outRuns[len(outRuns)-1] == 0 { + outRuns = outRuns[:len(outRuns)-1] + } + + // Varint-encode result. + var dst boolRLE + for _, rl := range outRuns { + dst = encoding.MarshalVarUint64(dst, rl) + } + return dst +} diff --git a/lib/logstorage/marker_rle_test.go b/lib/logstorage/marker_rle_test.go new file mode 100644 index 0000000000..224aac9090 --- /dev/null +++ b/lib/logstorage/marker_rle_test.go @@ -0,0 +1,375 @@ +package logstorage + +import ( + "bytes" + "testing" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" +) + +// createTestRLE creates a test RLE bitmap with specified pattern +func createTestRLE(pattern string) boolRLE { + bm := getBitmap(len(pattern)) + defer putBitmap(bm) + + for i, c := range pattern { + if c == '1' { + bm.setBit(i) + } + } + + return boolRLE(bm.MarshalBoolRLE(nil)) +} + +func applyRLEToBitmap(bm *bitmap, rle boolRLE) { + idx := 0 + pos := 0 + for idx < len(rle) { + run, n := encoding.UnmarshalVarUint64(rle[idx:]) + idx += n + if run > 0 { + for i := 0; i < int(run); i++ { + bm.setBit(pos + i) + } + } + } +} + +func equalRLE(a, b boolRLE) bool { + bmA := getBitmap(1000) + bmB := getBitmap(1000) + defer putBitmap(bmA) + defer putBitmap(bmB) + + applyRLEToBitmap(bmA, a) // SET 1-bits + applyRLEToBitmap(bmB, b) + + if len(bmA.a) != len(bmB.a) { + return false + } + for i := range bmA.a { + if bmA.a[i] != bmB.a[i] { + return false + } + } + return true +} + +func TestMarshalRLE(t *testing.T) { + f := func(pattern string, expectedRuns []uint64) { + t.Helper() + + bm := getBitmap(len(pattern)) + defer putBitmap(bm) + for i, c := range pattern { + if c == '1' { + bm.setBit(i) + } + } + + // Build expected RLE from run lengths + var expectedRLE []byte + for _, run := range expectedRuns { + expectedRLE = encoding.MarshalVarUint64(expectedRLE, run) + } + + actualRLE := bm.MarshalBoolRLE(nil) + if !bytes.Equal(actualRLE, expectedRLE) { + t.Fatalf("unexpected result for pattern %s; got %v; want %v", pattern, actualRLE, expectedRLE) + } + } + + // Empty and single bits + f("", []uint64{}) + f("0", []uint64{1}) + f("1", []uint64{0, 1}) + + // Small patterns + f("00", []uint64{2}) + f("01", []uint64{1, 1}) + f("10", []uint64{0, 1, 1}) + f("11", []uint64{0, 2}) + + // Basic runs + f("000", []uint64{3}) + f("111", []uint64{0, 3}) + f("0001", []uint64{3, 1}) + f("1000", []uint64{0, 1, 3}) + f("0011", []uint64{2, 2}) + f("1100", []uint64{0, 2, 2}) + + // Alternating patterns + f("0101", []uint64{1, 1, 1, 1}) + f("1010", []uint64{0, 1, 1, 1, 1}) + f("010101", []uint64{1, 1, 1, 1, 1, 1}) + f("101010", []uint64{0, 1, 1, 1, 1, 1, 1}) + + // Mixed patterns + f("001100", []uint64{2, 2, 2}) + f("110011", []uint64{0, 2, 2, 2}) + f("00111000", []uint64{2, 3, 3}) + f("11000111", []uint64{0, 2, 3, 3}) + + // Edge cases - all zeros/ones + f("0000000000", []uint64{10}) + f("1111111111", []uint64{0, 10}) + + // Single bit surrounded by zeros + f("000100000", []uint64{3, 1, 5}) + f("000010000", []uint64{4, 1, 4}) + f("000001000", []uint64{5, 1, 3}) +} + +func TestBoolRLEUnion(t *testing.T) { + f := func(a, b, expected []uint64) { + t.Helper() + + rleA := encodeTestRLE(a) + rleB := encodeTestRLE(b) + expectedRLE := encodeTestRLE(expected) + + result := rleA.Union(rleB) + if !equalRLE(result, expectedRLE) { + t.Fatalf("unexpected result;\n got: %v\n want: %v", decodeRLE(result), decodeRLE(expectedRLE)) + } + } + + // Basic + f([]uint64{}, []uint64{}, []uint64{}) + f([]uint64{0, 1}, []uint64{}, []uint64{0, 1}) + f([]uint64{}, []uint64{0, 1}, []uint64{0, 1}) + f([]uint64{1}, []uint64{0, 1}, []uint64{0, 1}) + f([]uint64{0, 1}, []uint64{1}, []uint64{0, 1}) + f([]uint64{0, 1, 1, 1}, []uint64{1, 1, 1, 1}, []uint64{0, 4}) + f([]uint64{0, 2, 2}, []uint64{2, 2, 2}, []uint64{0, 4, 2}) + f([]uint64{0, 1, 1, 1}, []uint64{0, 1, 1, 1}, []uint64{0, 1, 1, 1}) + f([]uint64{2, 2, 2, 2}, []uint64{1, 1, 1, 1, 1, 1, 1, 1}, []uint64{1, 3, 1, 3}) + f([]uint64{0, 5}, []uint64{5, 5}, []uint64{0, 10}) + + // Edge-cases + f([]uint64{}, []uint64{4}, []uint64{4}) + f([]uint64{4}, []uint64{}, []uint64{4}) + f([]uint64{4}, []uint64{4}, []uint64{4}) + f([]uint64{0, 4}, []uint64{0, 4}, []uint64{0, 4}) + f([]uint64{4}, []uint64{0, 4}, []uint64{0, 4}) + f([]uint64{0, 4}, []uint64{4}, []uint64{0, 4}) + + // Two halves overlap + f([]uint64{0, 4, 4}, []uint64{4, 4, 4}, []uint64{0, 8, 4}) + + // Middle gap + f([]uint64{0, 4, 4, 4}, []uint64{8, 4}, []uint64{0, 4, 4, 4}) + + // Complementary patterns + f([]uint64{0, 1, 6, 1}, []uint64{1, 6, 1, 1}, []uint64{0, 9}) + f([]uint64{3, 1, 3}, []uint64{4, 1, 2}, []uint64{3, 2, 3}) + + // Different lengths + f([]uint64{0, 3, 3}, []uint64{2, 1}, []uint64{0, 3, 3}) + f([]uint64{0, 2}, []uint64{2, 1, 3}, []uint64{0, 2, 1, 3}) + + // Alternation + f([]uint64{0, 1, 1, 1, 1, 1, 1, 1, 1}, []uint64{1, 1, 1, 1, 1, 1, 1, 1, 1}, []uint64{0, 8, 1}) + + // Extremes + f([]uint64{0, 1, 8, 1}, []uint64{8, 1}, []uint64{0, 1, 7, 2}) + + // All zeros remain zeros + f([]uint64{6}, []uint64{1}, []uint64{6}) + + // Second adds a one just beyond the end + f([]uint64{0, 3}, []uint64{3, 1, 3}, []uint64{0, 4, 3}) + + // Disjoint ones at opposite ends + f([]uint64{3, 1}, []uint64{1, 3}, []uint64{1, 1, 3}) + + // Leading zero-runs of different sizes + f([]uint64{2, 4, 2}, []uint64{3, 2, 3}, []uint64{2, 4, 3}) + + // Large runs + f([]uint64{362}, []uint64{362, 246}, []uint64{362, 246}) + + // Real-world testcases + f( + []uint64{1, 4, 1, 16, 1, 52, 2, 24, 1, 32, 1, 4, 1, 7, 1, 10, 1, 6, 1, 53, 1, 28, 1, 41, 1, 14, 1, 1, 1, 87, 1, 29, 1, 15, 1, 1, 1, 36, 1, 13, 1, 37, 1, 18, 1, 8}, + []uint64{0, 1, 4, 1, 16, 1, 116, 1, 25, 1, 275, 1, 1, 1, 116}, + []uint64{0, 75, 2, 24, 1, 32, 1, 12, 1, 10, 1, 60, 1, 28, 1, 41, 1, 14, 1, 1, 1, 87, 1, 29, 1, 54, 1, 13, 1, 37, 1, 18, 1, 8}) +} + +func TestBoolRLEIsSubsetOf(t *testing.T) { + f := func(a, b []uint64, want bool) { + t.Helper() + rleA := encodeTestRLE(a) + rleB := encodeTestRLE(b) + got := rleA.IsSubsetOf(rleB) + if got != want { + t.Fatalf("IsSubsetOf failed:\n a: %v\n b: %v\n got: %v\nwant: %v", a, b, got, want) + } + } + + // Both empty (length 0) + f([]uint64{}, []uint64{}, true) + + // Subset: all zeros + f([]uint64{6}, []uint64{6}, true) + + // Subset: a is all zeros, b is all ones + f([]uint64{6}, []uint64{0, 6}, true) + + // Subset: both all ones + f([]uint64{0, 6}, []uint64{0, 6}, true) + + // Subset: a is a proper subset of b + f([]uint64{2, 2, 2}, []uint64{2, 4}, true) + f([]uint64{0, 2, 2}, []uint64{0, 4}, true) + + // Not subset: a has ones where b has zeros + f([]uint64{0, 2, 2}, []uint64{2, 2}, false) + f([]uint64{0, 1, 1}, []uint64{2, 1}, false) + + // Both alternating, a subset of b + f([]uint64{1, 1, 1, 1}, []uint64{1, 1, 1, 1}, true) + f([]uint64{1, 1, 1, 1}, []uint64{1, 3}, true) + f([]uint64{1, 2, 1}, []uint64{1, 1, 1, 1}, false) + + // Edge: a is empty, b is not (always true) + f([]uint64{}, []uint64{5}, true) + + // Edge: b is empty, a is not (false unless a has no ones) + f([]uint64{0, 3}, []uint64{}, false) + f([]uint64{3}, []uint64{}, true) + + // a fully covers b in ones, not a subset + f([]uint64{0, 6}, []uint64{3, 3}, false) +} + +// CountOnes returns the total number of 1‑bits encoded in the RLE stream. +func (rle boolRLE) CountOnes() uint64 { + if len(rle) == 0 { + return 0 + } + + var ( + idx int // read offset in rle + ones bool // current run type; false = zeros, true = ones + total uint64 // accumulated 1‑bits + ) + + for idx < len(rle) { + run, n := encoding.UnmarshalVarUint64(rle[idx:]) + idx += n + if run == 0 { // explicit run‑type flip, no bits to count + ones = !ones + continue + } + if ones { + total += run + } + ones = !ones + } + + return total +} + +func TestBoolRLECountOnes(t *testing.T) { + f := func(runs []uint64, expected uint64) { + t.Helper() + rle := encodeTestRLE(runs) + got := rle.CountOnes() + if got != expected { + t.Fatalf("unexpected result for runs %v; got %d; want %d", runs, got, expected) + } + } + + // Empty RLE + f([]uint64{}, 0) + + // All zeros + f([]uint64{5}, 0) + f([]uint64{10}, 0) + f([]uint64{100}, 0) + + // All ones + f([]uint64{0, 5}, 5) + f([]uint64{0, 10}, 10) + f([]uint64{0, 100}, 100) + + // Single 1-bit + f([]uint64{0, 1}, 1) + f([]uint64{1, 1}, 1) + f([]uint64{5, 1}, 1) + f([]uint64{0, 1, 5}, 1) + f([]uint64{3, 1, 2}, 1) + + // Multiple single bits + f([]uint64{0, 1, 1, 1}, 2) + f([]uint64{1, 1, 1, 1}, 2) + f([]uint64{0, 1, 1, 1, 1, 1}, 3) + + // Basic patterns + f([]uint64{0, 2}, 2) + f([]uint64{2, 2}, 2) + f([]uint64{0, 2, 2}, 2) + f([]uint64{2, 2, 2}, 2) + + // Mixed patterns + f([]uint64{1, 3, 2, 1}, 4) + f([]uint64{0, 2, 3, 4}, 6) + f([]uint64{5, 1, 1, 2, 3}, 3) + + // Alternating patterns + f([]uint64{1, 1, 1, 1, 1, 1}, 3) + f([]uint64{0, 1, 1, 1, 1, 1, 1, 1}, 4) + + // Zero-length runs (type flips) + f([]uint64{0, 0, 5}, 0) + f([]uint64{0, 5, 0}, 5) + f([]uint64{0, 0, 0, 3}, 3) + f([]uint64{3, 0, 0, 2}, 2) + + // Large runs + f([]uint64{1000}, 0) + f([]uint64{0, 1000}, 1000) + f([]uint64{500, 300, 200}, 300) + f([]uint64{0, 100, 50, 200}, 300) + + // Edge cases with very large numbers + f([]uint64{0, 1000000}, 1000000) + f([]uint64{1000000, 500000, 250000}, 500000) + + // Complex real-world patterns + f([]uint64{1, 4, 1, 16, 1, 52, 2, 24, 1, 32}, 4+16+52+24+32) + f([]uint64{0, 1, 4, 1, 16, 1, 52, 2, 24}, 1+1+1+2) + + // Patterns with many small runs + f([]uint64{1, 1, 1, 1, 1, 1, 1, 1, 1, 1}, 5) + f([]uint64{0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1}, 5) + + // Patterns ending with ones + f([]uint64{2, 3}, 3) + f([]uint64{0, 2, 1, 3}, 5) + + // Patterns ending with zeros + f([]uint64{0, 3, 2}, 3) + f([]uint64{1, 2, 1, 3}, 5) +} + +func encodeTestRLE(runs []uint64) boolRLE { + var b boolRLE + for _, x := range runs { + b = encoding.MarshalVarUint64(b, x) + } + return b +} + +// decodeRLE decodes a boolRLE slice back to []uint64 for pretty-printing test errors +func decodeRLE(rle boolRLE) []uint64 { + var res []uint64 + idx := 0 + for idx < len(rle) { + n, l := encoding.UnmarshalVarUint64(rle[idx:]) + res = append(res, n) + idx += l + } + return res +} diff --git a/lib/logstorage/marker_test.go b/lib/logstorage/marker_test.go new file mode 100644 index 0000000000..18caeb036e --- /dev/null +++ b/lib/logstorage/marker_test.go @@ -0,0 +1,118 @@ +package logstorage + +import "testing" + +// TestDeleteMarkerMerge verifies correct behaviour of deleteMarker.Merge. +// All test inputs and expected outputs are hard-coded following the f-test style +// already used across marker_*_test.go files. +func TestDeleteMarkerMerge(t *testing.T) { + // build constructs a deleteMarker from a mapping blockID -> bitmap pattern string. + build := func(blockPatterns map[uint64]string) *deleteMarker { + dm := &deleteMarker{} + for id, pattern := range blockPatterns { + dm.AddBlock(id, createTestRLE(pattern)) + } + return dm + } + + // cmp compares deleteMarker contents against the expected mapping. + cmp := func(dm *deleteMarker, expected map[uint64]string) bool { + if len(dm.blockIDs) != len(expected) { + return false + } + for i, id := range dm.blockIDs { + pat, ok := expected[id] + if !ok { + return false + } + if !equalRLE(dm.rows[i], createTestRLE(pat)) { + return false + } + } + return true + } + + f := func(a, b, expected map[uint64]string) { + t.Helper() + + dmA := build(a) + dmB := build(b) + + dmA.merge(dmB) + + if !cmp(dmA, expected) { + t.Fatalf("unexpected merge result; got %+v; want %+v", dmA, expected) + } + } + + // 1) both markers empty + f(map[uint64]string{}, map[uint64]string{}, map[uint64]string{}) + + // 2) destination empty, source has data + f( + map[uint64]string{}, + map[uint64]string{3: "1"}, + map[uint64]string{3: "1"}, + ) + + // 3) destination has data, source empty + f( + map[uint64]string{3: "1"}, + map[uint64]string{}, + map[uint64]string{3: "1"}, + ) + + // 4) non-overlapping block sets + f( + map[uint64]string{1: "1"}, + map[uint64]string{2: "1"}, + map[uint64]string{1: "1", 2: "1"}, + ) + + // 5) overlapping block that requires RLE union + f( + map[uint64]string{5: "1010"}, + map[uint64]string{5: "0101"}, + map[uint64]string{5: "1111"}, + ) + + // 6) mix of overlapping and non-overlapping blocks + f( + map[uint64]string{1: "10", 3: "001"}, + map[uint64]string{1: "01", 2: "1"}, + map[uint64]string{1: "11", 2: "1", 3: "001"}, + ) + + // 7) complex overlapping patterns producing full-ones result + f( + map[uint64]string{10: "000111000111000111"}, + map[uint64]string{10: "111000111000111000"}, + map[uint64]string{10: "111111111111111111"}, + ) + + // 8) multiple blocks with various overlaps and run lengths + f( + map[uint64]string{ + 0: "00110011", + 2: "11110000", + 4: "00001111", + 6: "10101010", + }, + map[uint64]string{ + 0: "11001100", + 1: "01010101", + 2: "00001111", + 4: "11110000", + 6: "01010101", + 7: "11111111", + }, + map[uint64]string{ + 0: "11111111", + 1: "01010101", + 2: "11111111", + 4: "11111111", + 6: "11111111", + 7: "11111111", + }, + ) +} diff --git a/lib/logstorage/part.go b/lib/logstorage/part.go index f5d2395d36..e82fc9155f 100644 --- a/lib/logstorage/part.go +++ b/lib/logstorage/part.go @@ -6,6 +6,8 @@ import ( "github.com/cespare/xxhash/v2" + "sync/atomic" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" @@ -16,6 +18,9 @@ type part struct { // pt is the partition the part belongs to pt *partition + // deleteMarker holds delete marker data. May be nil if not present. + deleteMarker atomic.Pointer[deleteMarker] + // path is the path to the part on disk. // // If the part is in-memory then the path is empty. @@ -100,6 +105,8 @@ func mustOpenInmemoryPart(pt *partition, mp *inmemoryPart) *part { }, } + p.deleteMarker.Store(&mp.deleteMarker) + return &p } @@ -111,6 +118,7 @@ func mustOpenFilePart(pt *partition, path string) *part { columnNamesPath := filepath.Join(path, columnNamesFilename) columnIdxsPath := filepath.Join(path, columnIdxsFilename) + deleteMarkerPath := filepath.Join(path, deleteMarkerFilename) metaindexPath := filepath.Join(path, metaindexFilename) indexPath := filepath.Join(path, indexFilename) columnsHeaderIndexPath := filepath.Join(path, columnsHeaderIndexFilename) @@ -170,6 +178,15 @@ func mustOpenFilePart(pt *partition, path string) *part { } } + // Load marker data + p.deleteMarker.Store(nil) + if fs.IsPathExist(deleteMarkerPath) { + deleteMarkerReader := filestream.MustOpen(deleteMarkerPath, true) + deleteMarker := mustReadDeleteMarkerData(deleteMarkerReader) + p.deleteMarker.Store(&deleteMarker) + deleteMarkerReader.MustClose() + } + return &p } diff --git a/lib/logstorage/partition.go b/lib/logstorage/partition.go index 09cdd9d391..3d77dcad34 100644 --- a/lib/logstorage/partition.go +++ b/lib/logstorage/partition.go @@ -1,7 +1,9 @@ package logstorage import ( + "encoding/json" "fmt" + "os" "path/filepath" "sort" "sync" @@ -42,6 +44,9 @@ type partition struct { // which may be in the process of flushing to disk by concurrently running // snapshot process. snapshotLock sync.Mutex + + // deleteQueue tracks queued delete operations for the partition. + deleteQueue *deleteTaskQueue } // mustCreatePartition creates a partition at the given path. @@ -108,8 +113,10 @@ func mustOpenPartition(s *Storage, path string) *partition { mustCreateDatadb(datadbPath) } - pt.ddb = mustOpenDatadb(pt, datadbPath, s.flushInterval) + // delete tasks must be loaded before datadb + pt.mustLoadDeleteTasks() + pt.ddb = mustOpenDatadb(pt, datadbPath, s.flushInterval) return pt } @@ -273,3 +280,38 @@ func getPartitionNameFromDay(day int64) string { } const partitionNameFormat = "20060102" + +// mustSaveDeleteTasks persists the current delete tasks to disk. +func (pt *partition) mustSaveDeleteTasks() { + pt.deleteQueue.mu.Lock() + snapshot := append([]deleteTask(nil), pt.deleteQueue.ts...) + pt.deleteQueue.mu.Unlock() + + data, err := json.Marshal(snapshot) + if err != nil { + logger.Panicf("FATAL: cannot marshal delete tasks: %s", err) + } + + tasksPath := filepath.Join(pt.path, deleteTasksFilename) + fs.MustWriteAtomic(tasksPath, data, true) +} + +// mustLoadDeleteTasks loads delete tasks from disk during partition startup. +func (pt *partition) mustLoadDeleteTasks() { + tasksPath := filepath.Join(pt.path, deleteTasksFilename) + if !fs.IsPathExist(tasksPath) { + pt.deleteQueue = newDeleteTaskQueue(pt, nil) + return + } + + data, err := os.ReadFile(tasksPath) + if err != nil { + logger.Panicf("FATAL: cannot read delete tasks from %q: %s", tasksPath, err) + } + + tasks, err := unmarshalDeleteTasks(data) + if err != nil { + logger.Panicf("FATAL: cannot unmarshal delete tasks from %q: %s", tasksPath, err) + } + pt.deleteQueue = newDeleteTaskQueue(pt, tasks) +} diff --git a/lib/logstorage/storage.go b/lib/logstorage/storage.go index f11f44684c..34ea71c0b2 100644 --- a/lib/logstorage/storage.go +++ b/lib/logstorage/storage.go @@ -1,6 +1,7 @@ package logstorage import ( + "context" "fmt" "math" "os" @@ -13,6 +14,7 @@ import ( "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/snapshot/snapshotutil" @@ -180,6 +182,10 @@ type Storage struct { // // It reduces the load on persistent storage during querying by _stream:{...} filter. filterStreamCache *cache + + // deleteTaskState is used to stop the delete task worker. + deleteTaskState deleteTaskState + deleteTaskSeq atomic.Uint64 } // PartitionAttach attaches the partition with the given name to s. @@ -349,6 +355,62 @@ func (s *Storage) PartitionSnapshotList() []string { return snapshotPaths } +type deleteTaskState struct { + waiter atomic.Int32 + mu sync.Mutex + ch chan struct{} +} + +// init prepares the pause channel; must be called once at storage startup. +func (dts *deleteTaskState) init() { + dts.mu.Lock() + if dts.ch == nil { + dts.ch = make(chan struct{}) + } + dts.mu.Unlock() +} + +// addWaiter increments the waiter counter and returns the channel +// that will be closed when the delete-task worker acknowledges the pause. +func (dts *deleteTaskState) addWaiter() <-chan struct{} { + dts.mu.Lock() + ch := dts.ch + dts.waiter.Add(1) + dts.mu.Unlock() + return ch +} + +// doneWaiter decrements the waiter counter, signalling that the caller has +// finished the critical section. +func (dts *deleteTaskState) doneWaiter() { + if n := dts.waiter.Add(-1); n == 0 { + // All waiters are done – prepare a fresh channel for the next pause. + dts.mu.Lock() + if dts.ch == nil { + dts.ch = make(chan struct{}) + } + dts.mu.Unlock() + } +} + +// isPaused returns true if the delete-task worker may proceed with work. If +// there are active waiters, it closes the channel to acknowledge the pause and +// returns false. +func (dts *deleteTaskState) isPaused() bool { + if dts.waiter.Load() == 0 { + return false + } + + dts.mu.Lock() + if dts.ch != nil { + close(dts.ch) + dts.ch = nil + } + dts.mu.Unlock() + + return true +} + type partitionWrapper struct { // refCount is the number of active references to partition. // When it reaches zero, then the partition is closed. @@ -478,6 +540,9 @@ func MustOpenStorage(path string, cfg *StorageConfig) *Storage { filterStreamCache: filterStreamCache, } + // Initialize the delete-task pause mechanism. + s.deleteTaskState.init() + partitionsPath := filepath.Join(path, partitionsDirname) fs.MustMkdirIfNotExist(partitionsPath) fs.MustSyncPath(path) @@ -543,6 +608,10 @@ func MustOpenStorage(path string, cfg *StorageConfig) *Storage { s.partitions = ptws s.runRetentionWatcher() s.runMaxDiskSpaceUsageWatcher() + + // Start background delete-task reconciler. + s.startDeleteTaskWorker() + return s } @@ -768,6 +837,14 @@ func (s *Storage) MustForceMerge(partitionNamePrefix string) { } s.partitionsLock.Unlock() + // Pause the delete-task worker. + ch := s.deleteTaskState.addWaiter() + defer s.deleteTaskState.doneWaiter() + + // Wait until the worker acknowledges pause by closing the channel. + <-ch + + // shutdown must wait for force merge. s.wg.Add(1) defer s.wg.Done() @@ -831,6 +908,7 @@ func (s *Storage) MustAddRows(lr *LogRows) { s.rowsDroppedTooBigTimestamp.Add(1) continue } + lrPart := m[day] if lrPart == nil { lrPart = GetLogRows(nil, nil, nil, nil, "") @@ -985,3 +1063,189 @@ func (s *Storage) DebugFlush() { func durationToDays(d time.Duration) int64 { return int64(d / (time.Hour * 24)) } + +// ValidateDeleteQuery ensures the query has a filter (required for deletion) and only allows +// the `limit` pipe. It automatically adjusts the end time to the current time to prevent +// accidental deletion of future data. +func ValidateDeleteQuery(q *Query) error { + if len(q.pipes) > 0 { + // Only `limit` pipe is allowed for delete queries. + for _, p := range q.pipes { + if _, ok := p.(*pipeLimit); !ok { + return fmt.Errorf("delete supports only | limit N pipe") + } + } + } + + if q.f == nil { + return fmt.Errorf("query must contain a filter") + } + + minTS, maxTS := q.GetFilterTimeRange() + now := int64(fasttime.UnixTimestamp() * 1e9) + + // vlselect already adds a timestamp floor(now). + // vlstorage parses the time by adding +0.999, so we need to use now+1 to avoid + // duplicating the addition of _time filters. + if maxTS > now+int64(time.Second) { + q.AddTimeFilter(minTS, now) + } + + return nil +} + +// taskSeq provides unique, monotonically increasing sequence numbers for delete tasks. +var taskSeq = func() *atomic.Uint64 { + var x atomic.Uint64 + x.Store(uint64(time.Now().UnixNano())) + return &x +}() + +// DeleteRows schedules deletion of log rows matching the query filter for the specified tenants. +// The actual deletion is performed by background workers to avoid blocking the caller. +func (s *Storage) DeleteRows(ctx context.Context, tenantIDs []TenantID, q *Query) error { + minTS, maxTS := q.GetFilterTimeRange() + minDay := minTS / nsecsPerDay + maxDay := maxTS / nsecsPerDay + seq := taskSeq.Add(1) + + s.partitionsLock.Lock() + var ptws []*partitionWrapper + for _, ptw := range s.partitions { + if ptw.day < minDay || ptw.day > maxDay { + continue // outside time window + } + ptw.incRef() + ptws = append(ptws, ptw) + } + s.partitionsLock.Unlock() + + for _, ptw := range ptws { + ptw.pt.deleteQueue.add(tenantIDs, q, seq) + ptw.decRef() + } + + return nil +} + +// markDeleteRowsOnParts behaves like MarkRows but only processes data from the supplied parts. +// allowed map must contain *part keys that can be modified. +func (s *Storage) markDeleteRowsOnParts(ctx context.Context, tenantIDs []TenantID, qStr string, qTimestamp int64, seq uint64, allowed map[*partition][]*partWrapper) error { + var q *Query + var err error + if qTimestamp != 0 { + q, err = ParseQueryAtTimestamp(qStr, qTimestamp) + } else { + q, err = ParseQuery(qStr) + } + if err != nil { + return fmt.Errorf("parse query: %w", err) + } + if len(q.pipes) > 0 { + for _, p := range q.pipes { + if _, ok := p.(*pipeLimit); !ok { + return fmt.Errorf("delete supports only | limit N pipe") + } + } + } + + minTs, maxTs := q.GetFilterTimeRange() + for pt, pws := range allowed { + kept := pws[:0] // reuse underlying array + for _, pw := range pws { + if pw.p.ph.MinTimestamp > maxTs || pw.p.ph.MaxTimestamp < minTs { + continue // part is fully outside [minTs, maxTs] + } + kept = append(kept, pw) + } + if len(kept) == 0 { + delete(allowed, pt) // drop partition entirely if nothing left + } else { + allowed[pt] = kept + } + } + + // Build mapping of parts to wrappers and log allowed parts + pwMap := make(map[*part]*partWrapper) + for _, ptw := range allowed { + for _, pw := range ptw { + pwMap[pw.p] = pw + } + } + + type partMarkerData struct { + part *partWrapper + delMarker *deleteMarker + } + partMarkers := make(map[string]*partMarkerData) + + var partMarkersLock sync.Mutex + writeBlockResult := func(_ uint, br *blockResult) { + if br == nil || br.rowsLen == 0 { + return + } + bm := br.bm + if bm == nil || bm.isZero() { + return + } + bs := br.bs + if bs == nil { + return + } + p := bs.bsw.p + if p == nil { + return + } + + rowsCount := int(bs.bsw.bh.rowsCount) + ones := bm.onesCount() + + blockID := bs.bsw.bh.columnsHeaderOffset + var rle boolRLE + if ones == rowsCount { + rle = boolRLE(nil).SetAllOnes(rowsCount) + } else { + rle = boolRLE(bm.MarshalBoolRLE(nil)) + } + + if !rle.IsStateful() { + return // need at least 2 items in RLE bitmap + } + + if bs.bsw.dm != nil { + existedRLE, ok := bs.bsw.dm.GetMarkedRows(blockID) + if ok && rle.IsSubsetOf(existedRLE) { + return // already marked + } + } + + partPath := p.path + partMarkersLock.Lock() + m, ok := partMarkers[partPath] + if !ok { + m = &partMarkerData{ + part: pwMap[p], + delMarker: &deleteMarker{}, + } + partMarkers[partPath] = m + } + m.delMarker.AddBlock(blockID, rle) + partMarkersLock.Unlock() + } + + // Use specialized search that only processes allowed parts + if err := s.runQueryWithParts(ctx, tenantIDs, q, allowed, writeBlockResult); err != nil { + return fmt.Errorf("find rows: %w", err) + } + + for _, pm := range partMarkers { + flushDeleteMarker(pm.part, pm.delMarker, seq) + } + + // DEBUG: + partCount := 0 + for i := range allowed { + partCount += len(allowed[i]) + } + return nil +} diff --git a/lib/logstorage/storage_search.go b/lib/logstorage/storage_search.go index 4d6e38ed92..9631fba202 100644 --- a/lib/logstorage/storage_search.go +++ b/lib/logstorage/storage_search.go @@ -110,6 +110,10 @@ type genericSearchOptions struct { // timeOffset is the offset in nanoseconds, which must be subtracted from the selected the _time values before these values are passed to query pipes. timeOffset int64 + + // pws maps partition to the list of parts to search. + // If pws is empty, search all eligible parts on disk. + pws map[*partition][]*partWrapper } type searchOptions struct { @@ -132,6 +136,10 @@ type searchOptions struct { // fieldsFilter is the filter of fields to return in the result fieldsFilter *prefixfilter.Filter + + // pws is the list of parts to search. + // If pws is empty, search all eligible parts on disk. + pws []*partWrapper } // WriteDataBlockFunc must process the db. @@ -146,6 +154,7 @@ func (f WriteDataBlockFunc) newBlockResultWriter() writeBlockResultFunc { if br.rowsLen == 0 { return } + db := dbs.Get(workerID) db.initFromBlockResult(br) f(workerID, db) @@ -213,6 +222,47 @@ func (s *Storage) runQuery(qctx *QueryContext, writeBlock writeBlockResultFunc) return runPipes(qctx, q.pipes, search, writeBlock, concurrency) } +// runQueryWithParts executes q against the specified parts map. +// If pws is nil, searches all eligible parts (same as runQuery). +func (s *Storage) runQueryWithParts(ctx context.Context, tenantIDs []TenantID, q *Query, pws map[*partition][]*partWrapper, writeBlock writeBlockResultFunc) error { + qs := &QueryStats{} + qctx := NewQueryContext(ctx, qs, tenantIDs, q, false) + + qNew, err := initSubqueries(qctx, s.runQuery, true) + if err != nil { + return err + } + q = qNew + + streamIDs := q.getStreamIDs() + sort.Slice(streamIDs, func(i, j int) bool { + return streamIDs[i].less(&streamIDs[j]) + }) + + minTimestamp, maxTimestamp := q.GetFilterTimeRange() + fieldsFilter := getNeededColumns(q.pipes) + + so := &genericSearchOptions{ + tenantIDs: tenantIDs, + streamIDs: streamIDs, + minTimestamp: minTimestamp, + maxTimestamp: maxTimestamp, + filter: q.f, + fieldsFilter: fieldsFilter, + timeOffset: -q.opts.timeOffset, + pws: pws, + } + + search := func(stopCh <-chan struct{}, writeBlockToPipes writeBlockResultFunc) error { + workersCount := q.GetParallelReaders(s.defaultParallelReaders) + s.searchParallel(workersCount, so, qctx.QueryStats, stopCh, writeBlockToPipes) + return nil + } + + concurrency := q.GetConcurrency() + return runPipes(qctx, q.pipes, search, writeBlock, concurrency) +} + // searchFunc must perform search and pass its results to writeBlock. type searchFunc func(stopCh <-chan struct{}, writeBlock writeBlockResultFunc) error @@ -1224,6 +1274,11 @@ func (db *DataBlock) initFromBlockResult(br *blockResult) { // // It uses workersCount parallel workers for the search and calls writeBlock for each matching block. func (s *Storage) searchParallel(workersCount int, so *genericSearchOptions, qs *QueryStats, stopCh <-chan struct{}, writeBlock writeBlockResultFunc) { + if len(so.pws) > 0 { + s.searchOnPartitions(workersCount, so, qs, stopCh, writeBlock) + return + } + // spin up workers var wg sync.WaitGroup workCh := make(chan *blockSearchWorkBatch, workersCount) @@ -1327,6 +1382,73 @@ func (s *Storage) searchParallel(workersCount int, so *genericSearchOptions, qs } } +// searchOnPartitions is similar to storage.search, but only processes the specified allowed parts. +// Partitions must be locked (incRef) before calling this function. +func (s *Storage) searchOnPartitions(workersCount int, so *genericSearchOptions, qs *QueryStats, stopCh <-chan struct{}, writeBlock writeBlockResultFunc) { + // spin up workers (with QueryStats and timeOffset handling) + var wg sync.WaitGroup + workCh := make(chan *blockSearchWorkBatch, workersCount) + for workerID := 0; workerID < workersCount; workerID++ { + wg.Add(1) + go func(workerID uint) { + defer wg.Done() + qsLocal := &QueryStats{} + bs := getBlockSearch() + bm := getBitmap(0) + for bswb := range workCh { + bsws := bswb.bsws + for i := range bsws { + bsw := &bsws[i] + if needStop(stopCh) { + bsw.reset() + continue + } + bs.search(qsLocal, bsw, bm) + if bs.br.rowsLen > 0 { + if so.timeOffset != 0 { + bs.subTimeOffsetToTimestamps(so.timeOffset) + } + writeBlock(workerID, &bs.br) + } + bsw.reset() + } + bswb.bsws = bswb.bsws[:0] + putBlockSearchWorkBatch(bswb) + } + putBlockSearch(bs) + putBitmap(bm) + qs.UpdateAtomic(qsLocal) + }(uint(workerID)) + } + + var wgSearchers sync.WaitGroup + psfs := make([]partitionSearchFinalizer, len(so.pws)) + i := 0 + sf, f := getCommonStreamFilter(so.filter) + for pt := range so.pws { + partitionSearchConcurrencyLimitCh <- struct{}{} + wgSearchers.Add(1) + go func(idx int, partition *partition) { + qsLocal := &QueryStats{} + psfs[idx] = partition.search(sf, f, so, qsLocal, workCh, stopCh) + qs.UpdateAtomic(qsLocal) + wgSearchers.Done() + <-partitionSearchConcurrencyLimitCh + }(i, pt) + i++ + } + wgSearchers.Wait() + + // finish workers + close(workCh) + wg.Wait() + + // finalize partitions + for _, psf := range psfs { + psf() + } +} + // partitionSearchConcurrencyLimitCh limits the number of concurrent searches in partition. // // This is needed for limiting memory usage under high load. @@ -1355,6 +1477,7 @@ func (pt *partition) search(sf *StreamFilter, f filter, so *genericSearchOptions if hasStreamFilters(f) { f = initStreamFilters(so.tenantIDs, pt.idb, f) } + soInternal := &searchOptions{ tenantIDs: tenantIDs, streamIDs: streamIDs, @@ -1362,6 +1485,7 @@ func (pt *partition) search(sf *StreamFilter, f filter, so *genericSearchOptions maxTimestamp: so.maxTimestamp, filter: f, fieldsFilter: so.fieldsFilter, + pws: so.pws[pt], } return pt.ddb.search(soInternal, qs, workCh, stopCh) } @@ -1428,15 +1552,22 @@ func initStreamFilters(tenantIDs []TenantID, idb *indexdb, f filter) filter { func (ddb *datadb) search(so *searchOptions, qs *QueryStats, workCh chan<- *blockSearchWorkBatch, stopCh <-chan struct{}) partitionSearchFinalizer { // Select parts with data for the given time range ddb.partsLock.Lock() - pws := appendPartsInTimeRange(nil, ddb.bigParts, so.minTimestamp, so.maxTimestamp) - pws = appendPartsInTimeRange(pws, ddb.smallParts, so.minTimestamp, so.maxTimestamp) - pws = appendPartsInTimeRange(pws, ddb.inmemoryParts, so.minTimestamp, so.maxTimestamp) + var pws []*partWrapper + + if len(so.pws) == 0 { + pws = appendPartsInTimeRange(nil, ddb.bigParts, so.minTimestamp, so.maxTimestamp) + pws = appendPartsInTimeRange(pws, ddb.smallParts, so.minTimestamp, so.maxTimestamp) + pws = appendPartsInTimeRange(pws, ddb.inmemoryParts, so.minTimestamp, so.maxTimestamp) + } else { + pws = appendPartsInTimeRange(nil, so.pws, so.minTimestamp, so.maxTimestamp) + } // Increase references to the searched parts, so they aren't deleted during search. // References to the searched parts must be decremented by calling the returned partitionSearchFinalizer. for _, pw := range pws { pw.incRef() } + ddb.partsLock.Unlock() // Apply search to matching parts diff --git a/vendor/golang.org/x/sync/LICENSE b/vendor/golang.org/x/sync/LICENSE new file mode 100644 index 0000000000..2a7cf70da6 --- /dev/null +++ b/vendor/golang.org/x/sync/LICENSE @@ -0,0 +1,27 @@ +Copyright 2009 The Go Authors. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are +met: + + * Redistributions of source code must retain the above copyright +notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above +copyright notice, this list of conditions and the following disclaimer +in the documentation and/or other materials provided with the +distribution. + * Neither the name of Google LLC nor the names of its +contributors may be used to endorse or promote products derived from +this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/vendor/golang.org/x/sync/PATENTS b/vendor/golang.org/x/sync/PATENTS new file mode 100644 index 0000000000..733099041f --- /dev/null +++ b/vendor/golang.org/x/sync/PATENTS @@ -0,0 +1,22 @@ +Additional IP Rights Grant (Patents) + +"This implementation" means the copyrightable works distributed by +Google as part of the Go project. + +Google hereby grants to You a perpetual, worldwide, non-exclusive, +no-charge, royalty-free, irrevocable (except as stated in this section) +patent license to make, have made, use, offer to sell, sell, import, +transfer and otherwise run, modify and propagate the contents of this +implementation of Go, where such license applies only to those patent +claims, both currently owned or controlled by Google and acquired in +the future, licensable by Google that are necessarily infringed by this +implementation of Go. This grant does not include claims that would be +infringed only as a consequence of further modification of this +implementation. If you or your agent or exclusive licensee institute or +order or agree to the institution of patent litigation against any +entity (including a cross-claim or counterclaim in a lawsuit) alleging +that this implementation of Go or any code incorporated within this +implementation of Go constitutes direct or contributory patent +infringement, or inducement of patent infringement, then any patent +rights granted to you under this License for this implementation of Go +shall terminate as of the date such litigation is filed. diff --git a/vendor/golang.org/x/sync/errgroup/errgroup.go b/vendor/golang.org/x/sync/errgroup/errgroup.go new file mode 100644 index 0000000000..1d8cffae8c --- /dev/null +++ b/vendor/golang.org/x/sync/errgroup/errgroup.go @@ -0,0 +1,151 @@ +// Copyright 2016 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package errgroup provides synchronization, error propagation, and Context +// cancelation for groups of goroutines working on subtasks of a common task. +// +// [errgroup.Group] is related to [sync.WaitGroup] but adds handling of tasks +// returning errors. +package errgroup + +import ( + "context" + "fmt" + "sync" +) + +type token struct{} + +// A Group is a collection of goroutines working on subtasks that are part of +// the same overall task. A Group should not be reused for different tasks. +// +// A zero Group is valid, has no limit on the number of active goroutines, +// and does not cancel on error. +type Group struct { + cancel func(error) + + wg sync.WaitGroup + + sem chan token + + errOnce sync.Once + err error +} + +func (g *Group) done() { + if g.sem != nil { + <-g.sem + } + g.wg.Done() +} + +// WithContext returns a new Group and an associated Context derived from ctx. +// +// The derived Context is canceled the first time a function passed to Go +// returns a non-nil error or the first time Wait returns, whichever occurs +// first. +func WithContext(ctx context.Context) (*Group, context.Context) { + ctx, cancel := context.WithCancelCause(ctx) + return &Group{cancel: cancel}, ctx +} + +// Wait blocks until all function calls from the Go method have returned, then +// returns the first non-nil error (if any) from them. +func (g *Group) Wait() error { + g.wg.Wait() + if g.cancel != nil { + g.cancel(g.err) + } + return g.err +} + +// Go calls the given function in a new goroutine. +// +// The first call to Go must happen before a Wait. +// It blocks until the new goroutine can be added without the number of +// goroutines in the group exceeding the configured limit. +// +// The first goroutine in the group that returns a non-nil error will +// cancel the associated Context, if any. The error will be returned +// by Wait. +func (g *Group) Go(f func() error) { + if g.sem != nil { + g.sem <- token{} + } + + g.wg.Add(1) + go func() { + defer g.done() + + // It is tempting to propagate panics from f() + // up to the goroutine that calls Wait, but + // it creates more problems than it solves: + // - it delays panics arbitrarily, + // making bugs harder to detect; + // - it turns f's panic stack into a mere value, + // hiding it from crash-monitoring tools; + // - it risks deadlocks that hide the panic entirely, + // if f's panic leaves the program in a state + // that prevents the Wait call from being reached. + // See #53757, #74275, #74304, #74306. + + if err := f(); err != nil { + g.errOnce.Do(func() { + g.err = err + if g.cancel != nil { + g.cancel(g.err) + } + }) + } + }() +} + +// TryGo calls the given function in a new goroutine only if the number of +// active goroutines in the group is currently below the configured limit. +// +// The return value reports whether the goroutine was started. +func (g *Group) TryGo(f func() error) bool { + if g.sem != nil { + select { + case g.sem <- token{}: + // Note: this allows barging iff channels in general allow barging. + default: + return false + } + } + + g.wg.Add(1) + go func() { + defer g.done() + + if err := f(); err != nil { + g.errOnce.Do(func() { + g.err = err + if g.cancel != nil { + g.cancel(g.err) + } + }) + } + }() + return true +} + +// SetLimit limits the number of active goroutines in this group to at most n. +// A negative value indicates no limit. +// A limit of zero will prevent any new goroutines from being added. +// +// Any subsequent call to the Go method will block until it can add an active +// goroutine without exceeding the configured limit. +// +// The limit must not be modified while any goroutines in the group are active. +func (g *Group) SetLimit(n int) { + if n < 0 { + g.sem = nil + return + } + if len(g.sem) != 0 { + panic(fmt.Errorf("errgroup: modify limit while %v goroutines in the group are still active", len(g.sem))) + } + g.sem = make(chan token, n) +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 6a74ad6fe3..c6d082c810 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -121,6 +121,9 @@ github.com/valyala/quicktemplate golang.org/x/oauth2 golang.org/x/oauth2/clientcredentials golang.org/x/oauth2/internal +# golang.org/x/sync v0.16.0 +## explicit; go 1.23.0 +golang.org/x/sync/errgroup # golang.org/x/sys v0.35.0 ## explicit; go 1.23.0 golang.org/x/sys/cpu