Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion app/vlselect/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,5 @@ var (
logsqlStreamsRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/logsql/streams"}`)
logsqlStreamsDuration = metrics.NewSummary(`vl_http_request_duration_seconds{path="/select/logsql/streams"}`)

// no need to track duration for tail requests, as they usually take long time
logsqlTailRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/logsql/tail"}`)
)
126 changes: 126 additions & 0 deletions app/vlstorage/main.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package vlstorage

import (
"context"
"encoding/json"
"flag"
"fmt"
"io"
"math"
"net/http"
"strconv"
"time"

"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
Expand Down Expand Up @@ -57,6 +59,9 @@ var (
partitionManageAuthKey = flagutil.NewPassword("partitionManageAuthKey", "authKey, which must be passed in query string to /internal/partition/* . It overrides -httpAuth.* . "+
"See https://docs.victoriametrics.com/victorialogs/#partitions-lifecycle")

deleteAuthKey = flagutil.NewPassword("deleteAuthKey", "authKey, which must be passed in query string to /delete and /internal/delete . It overrides -httpAuth.* . "+
"See https://docs.victoriametrics.com/victorialogs/#delete-log-rows")

storageNodeAddrs = flagutil.NewArrayString("storageNode", "Comma-separated list of TCP addresses for storage nodes to route the ingested logs to and to send select queries to. "+
"If the list is empty, then the ingested logs are stored and queried locally from -storageDataPath")
insertConcurrency = flag.Int("insert.concurrency", 2, "The average number of concurrent data ingestion requests, which can be sent to every -storageNode")
Expand Down Expand Up @@ -94,6 +99,11 @@ var netstorageInsert *netinsert.Storage

var netstorageSelect *netselect.Storage

// CheckDeleteAuth validates auth for delete endpoints.
func CheckDeleteAuth(w http.ResponseWriter, r *http.Request) bool {
return httpserver.CheckAuthFlag(w, r, deleteAuthKey)
}

// Init initializes vlstorage.
//
// Stop must be called when vlstorage is no longer needed
Expand Down Expand Up @@ -246,6 +256,8 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
return processPartitionSnapshotCreate(w, r)
case "/internal/partition/snapshot/list":
return processPartitionSnapshotList(w, r)
case "/delete":
return processDelete(r.Context(), w, r)
}
return false
}
Expand Down Expand Up @@ -397,6 +409,85 @@ func writeJSONResponse(w http.ResponseWriter, response any) {
w.Write(responseBody)
}

// processDelete handles the /delete endpoint for both local and remote storage nodes.
// It supports POST for scheduling deletes and GET for retrieving task status.
func processDelete(ctx context.Context, w http.ResponseWriter, r *http.Request) bool {
if localStorage != nil {
if !CheckDeleteAuth(w, r) {
return true
}
}

authKey := r.FormValue("authKey")

switch r.Method {
case http.MethodPost:
if tenantIDsStr := r.FormValue("tenant_ids"); tenantIDsStr != "" {
tenantIDs, err := logstorage.UnmarshalTenantIDs([]byte(tenantIDsStr))
if err != nil {
httpserver.Errorf(w, r, "cannot unmarshal tenant_ids=%q: %s", tenantIDsStr, err)
return true
}

timestampStr := r.FormValue("timestamp")
timestamp, err := strconv.ParseInt(timestampStr, 10, 64)
if err != nil {
httpserver.Errorf(w, r, "cannot parse timestamp=%q: %s", timestampStr, err)
return true
}

qStr := r.FormValue("query")
q, err := logstorage.ParseQueryAtTimestamp(qStr, timestamp)
if err != nil {
httpserver.Errorf(w, r, "cannot parse query=%q: %s", qStr, err)
return true
}

if err := DeleteRows(ctx, tenantIDs, q, authKey); err != nil {
httpserver.Errorf(w, r, "cannot delete rows: %s", err)
return true
}
} else {
tenantID, err := logstorage.GetTenantIDFromRequest(r)
if err != nil {
httpserver.Errorf(w, r, "cannot obtain tenantID: %s", err)
return true
}

qStr := r.FormValue("query")
q, err := logstorage.ParseQuery(qStr)
if err != nil {
httpserver.Errorf(w, r, "cannot parse query [%s]: %s", qStr, err)
return true
}

if err := DeleteRows(ctx, []logstorage.TenantID{tenantID}, q, authKey); err != nil {
httpserver.Errorf(w, r, "%s", err)
return true
}
}

w.Header().Set("Content-Type", "text/plain")
w.WriteHeader(http.StatusOK)

case http.MethodGet:
tasks, err := ListDeleteTasks(ctx, authKey)
if err != nil {
httpserver.Errorf(w, r, "cannot list delete tasks: %s", err)
return true
}

w.Header().Set("Content-Type", "application/json")
if err := json.NewEncoder(w).Encode(tasks); err != nil {
httpserver.Errorf(w, r, "internal error: %s", err)
}

default:
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
}
return true
}

// Storage implements insertutil.LogRowsStorage interface
type Storage struct{}

Expand Down Expand Up @@ -500,6 +591,41 @@ func GetStreamIDs(qctx *logstorage.QueryContext, limit uint64) ([]logstorage.Val
return netstorageSelect.GetStreamIDs(qctx, limit)
}

// DeleteRows marks rows matching q with the Deleted marker (full or partial) and flushes markers to disk immediately.
func DeleteRows(ctx context.Context, tenantIDs []logstorage.TenantID, q *logstorage.Query, authKey string) error {
if err := logstorage.ValidateDeleteQuery(q); err != nil {
return fmt.Errorf("validate query: %w", err)
}

if localStorage != nil {
return localStorage.DeleteRows(ctx, tenantIDs, q)
}

return netstorageSelect.DeleteRows(ctx, tenantIDs, q, authKey)
}

// IsLocalStorage confirms whether the running instance is a storage node.
func IsLocalStorage() bool {
return localStorage != nil
}

// ListDeleteTasks collects delete task information either from the local storage or from all configured storage nodes.
// It returns a slice with the tasks and an extra Storage field indicating the source node address (or "local" for the embedded storage).
func ListDeleteTasks(ctx context.Context, authKey string) ([]logstorage.DeleteTaskInfoWithSource, error) {
if localStorage != nil {
tasks := localStorage.ListDeleteTasks()
out := make([]logstorage.DeleteTaskInfoWithSource, len(tasks))
for i, t := range tasks {
out[i] = logstorage.DeleteTaskInfoWithSource{
DeleteTaskInfo: t,
}
}
return out, nil
}

return netstorageSelect.ListDeleteTasks(ctx, authKey)
}

func writeStorageMetrics(w io.Writer, strg *logstorage.Storage) {
var ss logstorage.StorageStats
strg.UpdateStats(&ss)
Expand Down
92 changes: 92 additions & 0 deletions app/vlstorage/netselect/delete_tasks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package netselect

import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"

"github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage"
"golang.org/x/sync/errgroup"
)

// ListDeleteTasks gathers all delete tasks from every storage node and returns them along with the originating storage address.
func (s *Storage) ListDeleteTasks(ctx context.Context, authKey string) ([]logstorage.DeleteTaskInfoWithSource, error) {
if len(s.sns) == 0 {
return nil, nil
}

g, ctx := errgroup.WithContext(ctx)

// race-free slices
results := make([][]logstorage.DeleteTaskInfoWithSource, len(s.sns))
for i, sn := range s.sns {
i, sn := i, sn
g.Go(func() error {
tasks, err := sn.getDeleteTasks(ctx, authKey)
if err != nil {
return err
}
results[i] = tasks
return nil
})
}

if err := g.Wait(); err != nil {
return nil, err
}

var all []logstorage.DeleteTaskInfoWithSource
for _, ts := range results {
all = append(all, ts...)
}

return all, nil
}

func (sn *storageNode) getDeleteTasks(ctx context.Context, authKey string) ([]logstorage.DeleteTaskInfoWithSource, error) {
args := url.Values{}
args.Set("version", DeleteProtocolVersion)
if authKey != "" {
args.Set("authKey", authKey)
}

reqURL := sn.getRequestURLWithArgs("/delete", args)
req, err := http.NewRequestWithContext(ctx, "GET", reqURL, nil)
if err != nil {
return nil, err
}
if err := sn.ac.SetHeaders(req, true); err != nil {
return nil, fmt.Errorf("cannot set auth headers for %q: %w", reqURL, err)
}

resp, err := sn.c.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()

body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("cannot read response body from %q: %w", reqURL, err)
}

if resp.StatusCode != 200 {
return nil, fmt.Errorf("unexpected status code for %q: %d; response: %q", reqURL, resp.StatusCode, body)
}

var tasks []logstorage.DeleteTaskInfoWithSource
if err := json.Unmarshal(body, &tasks); err != nil {
return nil, fmt.Errorf("cannot decode delete tasks response from %q: %w; response body: %q", reqURL, err, body)
}

// Attach origin address.
for i := range tasks {
if tasks[i].Storage == "" {
tasks[i].Storage = sn.addr
}
}
return tasks, nil
}
64 changes: 64 additions & 0 deletions app/vlstorage/netselect/netselect.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ const (
//
// It must be updated every time the protocol changes.
QueryProtocolVersion = "v2"

// DeleteProtocolVersion is the version of the protocol used for /internal/delete HTTP endpoint.
// It must be updated every time the protocol changes.
DeleteProtocolVersion = "v1"
)

// Storage is a network storage for querying remote storage nodes in the cluster.
Expand Down Expand Up @@ -578,3 +582,63 @@ func unmarshalQueryStats(qs *logstorage.QueryStats, src []byte) ([]byte, error)
}
return tail, nil
}

// DeleteRows propagates delete markers to all storage nodes.
func (s *Storage) DeleteRows(ctx context.Context, tenantIDs []logstorage.TenantID, q *logstorage.Query, authKey string) error {
var wg sync.WaitGroup
errs := make([]error, len(s.sns))
for i, sn := range s.sns {
i, sn := i, sn
wg.Add(1)
go func() {
defer wg.Done()
if err := sn.deleteRows(ctx, tenantIDs, q, authKey); err != nil {
errs[i] = fmt.Errorf("storage node %s: %w", sn.addr, err)
}
}()
}
wg.Wait()

return errors.Join(errs...)
}

func (sn *storageNode) deleteRows(ctx context.Context, tenantIDs []logstorage.TenantID, q *logstorage.Query, authKey string) error {
args := sn.getDeleteArgs(DeleteProtocolVersion, tenantIDs, q, authKey)

reqURL := sn.getRequestURLWithArgs("/delete", args)
req, err := http.NewRequestWithContext(ctx, "POST", reqURL, nil)
if err != nil {
return err
}
if err := sn.ac.SetHeaders(req, true); err != nil {
return fmt.Errorf("cannot set auth headers for %q: %w", reqURL, err)
}
resp, err := sn.c.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
return fmt.Errorf("unexpected status code for request to %q: %d; response: %q", reqURL, resp.StatusCode, body)
}
return nil
}

// getDeleteArgs builds url.Values for delete requests
func (sn *storageNode) getDeleteArgs(version string, tenantIDs []logstorage.TenantID, q *logstorage.Query, authKey string) url.Values {
args := url.Values{}
args.Set("version", version)
args.Set("tenant_ids", string(logstorage.MarshalTenantIDs(nil, tenantIDs)))
args.Set("query", q.String())
args.Set("timestamp", fmt.Sprintf("%d", q.GetTimestamp()))
if authKey != "" {
args.Set("authKey", authKey)
}
return args
}

// getRequestURLWithArgs builds request URL with encoded query args
func (sn *storageNode) getRequestURLWithArgs(path string, args url.Values) string {
return fmt.Sprintf("%s://%s%s?%s", sn.scheme, sn.addr, path, args.Encode())
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ require (
github.com/valyala/fastjson v1.6.4
github.com/valyala/fastrand v1.1.0
github.com/valyala/quicktemplate v1.8.0
golang.org/x/sync v0.16.0
)

require (
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ github.com/valyala/quicktemplate v1.8.0 h1:zU0tjbIqTRgKQzFY1L42zq0qR3eh4WoQQdIdq
github.com/valyala/quicktemplate v1.8.0/go.mod h1:qIqW8/igXt8fdrUln5kOSb+KWMaJ4Y8QUsfd1k6L2jM=
golang.org/x/oauth2 v0.30.0 h1:dnDm7JmhM45NNpd8FDDeLhK6FwqbOf4MLCM9zb1BOHI=
golang.org/x/oauth2 v0.30.0/go.mod h1:B++QgG3ZKulg6sRPGD/mqlHQs5rB3Ml9erfeDY7xKlU=
golang.org/x/sync v0.16.0 h1:ycBJEhp9p4vXvUZNszeOq0kGTPghopOL8q0fq3vstxw=
golang.org/x/sync v0.16.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.35.0 h1:vz1N37gP5bs89s7He8XuIYXpyY0+QlsKmzipCbUtyxI=
golang.org/x/sys v0.35.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
Expand Down
Loading