Skip to content

Commit 93a9357

Browse files
committed
update
1 parent 3a72694 commit 93a9357

28 files changed

+2815
-16
lines changed

app/vlselect/main.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -325,6 +325,5 @@ var (
325325
logsqlStreamsRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/logsql/streams"}`)
326326
logsqlStreamsDuration = metrics.NewSummary(`vl_http_request_duration_seconds{path="/select/logsql/streams"}`)
327327

328-
// no need to track duration for tail requests, as they usually take long time
329328
logsqlTailRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/logsql/tail"}`)
330329
)

app/vlstorage/main.go

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
package vlstorage
22

33
import (
4+
"context"
45
"encoding/json"
56
"flag"
67
"fmt"
78
"io"
89
"math"
910
"net/http"
11+
"strconv"
1012
"time"
1113

1214
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
@@ -57,6 +59,9 @@ var (
5759
partitionManageAuthKey = flagutil.NewPassword("partitionManageAuthKey", "authKey, which must be passed in query string to /internal/partition/* . It overrides -httpAuth.* . "+
5860
"See https://docs.victoriametrics.com/victorialogs/#partitions-lifecycle")
5961

62+
deleteAuthKey = flagutil.NewPassword("deleteAuthKey", "authKey, which must be passed in query string to /delete and /internal/delete . It overrides -httpAuth.* . "+
63+
"See https://docs.victoriametrics.com/victorialogs/#delete-log-rows")
64+
6065
storageNodeAddrs = flagutil.NewArrayString("storageNode", "Comma-separated list of TCP addresses for storage nodes to route the ingested logs to and to send select queries to. "+
6166
"If the list is empty, then the ingested logs are stored and queried locally from -storageDataPath")
6267
insertConcurrency = flag.Int("insert.concurrency", 2, "The average number of concurrent data ingestion requests, which can be sent to every -storageNode")
@@ -94,6 +99,11 @@ var netstorageInsert *netinsert.Storage
9499

95100
var netstorageSelect *netselect.Storage
96101

102+
// CheckDeleteAuth validates auth for delete endpoints.
103+
func CheckDeleteAuth(w http.ResponseWriter, r *http.Request) bool {
104+
return httpserver.CheckAuthFlag(w, r, deleteAuthKey)
105+
}
106+
97107
// Init initializes vlstorage.
98108
//
99109
// Stop must be called when vlstorage is no longer needed
@@ -246,6 +256,8 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
246256
return processPartitionSnapshotCreate(w, r)
247257
case "/internal/partition/snapshot/list":
248258
return processPartitionSnapshotList(w, r)
259+
case "/delete":
260+
return processDelete(r.Context(), w, r)
249261
}
250262
return false
251263
}
@@ -397,6 +409,85 @@ func writeJSONResponse(w http.ResponseWriter, response any) {
397409
w.Write(responseBody)
398410
}
399411

412+
// processDelete handles the /delete endpoint for both local and remote storage nodes.
413+
// It supports POST for scheduling deletes and GET for retrieving task status.
414+
func processDelete(ctx context.Context, w http.ResponseWriter, r *http.Request) bool {
415+
if localStorage != nil {
416+
if !CheckDeleteAuth(w, r) {
417+
return true
418+
}
419+
}
420+
421+
authKey := r.FormValue("authKey")
422+
423+
switch r.Method {
424+
case http.MethodPost:
425+
if tenantIDsStr := r.FormValue("tenant_ids"); tenantIDsStr != "" {
426+
tenantIDs, err := logstorage.UnmarshalTenantIDs([]byte(tenantIDsStr))
427+
if err != nil {
428+
httpserver.Errorf(w, r, "cannot unmarshal tenant_ids=%q: %s", tenantIDsStr, err)
429+
return true
430+
}
431+
432+
timestampStr := r.FormValue("timestamp")
433+
timestamp, err := strconv.ParseInt(timestampStr, 10, 64)
434+
if err != nil {
435+
httpserver.Errorf(w, r, "cannot parse timestamp=%q: %s", timestampStr, err)
436+
return true
437+
}
438+
439+
qStr := r.FormValue("query")
440+
q, err := logstorage.ParseQueryAtTimestamp(qStr, timestamp)
441+
if err != nil {
442+
httpserver.Errorf(w, r, "cannot parse query=%q: %s", qStr, err)
443+
return true
444+
}
445+
446+
if err := DeleteRows(ctx, tenantIDs, q, authKey); err != nil {
447+
httpserver.Errorf(w, r, "cannot delete rows: %s", err)
448+
return true
449+
}
450+
} else {
451+
tenantID, err := logstorage.GetTenantIDFromRequest(r)
452+
if err != nil {
453+
httpserver.Errorf(w, r, "cannot obtain tenantID: %s", err)
454+
return true
455+
}
456+
457+
qStr := r.FormValue("query")
458+
q, err := logstorage.ParseQuery(qStr)
459+
if err != nil {
460+
httpserver.Errorf(w, r, "cannot parse query [%s]: %s", qStr, err)
461+
return true
462+
}
463+
464+
if err := DeleteRows(ctx, []logstorage.TenantID{tenantID}, q, authKey); err != nil {
465+
httpserver.Errorf(w, r, "%s", err)
466+
return true
467+
}
468+
}
469+
470+
w.Header().Set("Content-Type", "text/plain")
471+
w.WriteHeader(http.StatusOK)
472+
473+
case http.MethodGet:
474+
tasks, err := ListDeleteTasks(ctx, authKey)
475+
if err != nil {
476+
httpserver.Errorf(w, r, "cannot list delete tasks: %s", err)
477+
return true
478+
}
479+
480+
w.Header().Set("Content-Type", "application/json")
481+
if err := json.NewEncoder(w).Encode(tasks); err != nil {
482+
httpserver.Errorf(w, r, "internal error: %s", err)
483+
}
484+
485+
default:
486+
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
487+
}
488+
return true
489+
}
490+
400491
// Storage implements insertutil.LogRowsStorage interface
401492
type Storage struct{}
402493

@@ -500,6 +591,41 @@ func GetStreamIDs(qctx *logstorage.QueryContext, limit uint64) ([]logstorage.Val
500591
return netstorageSelect.GetStreamIDs(qctx, limit)
501592
}
502593

594+
// DeleteRows marks rows matching q with the Deleted marker (full or partial) and flushes markers to disk immediately.
595+
func DeleteRows(ctx context.Context, tenantIDs []logstorage.TenantID, q *logstorage.Query, authKey string) error {
596+
if err := logstorage.ValidateDeleteQuery(q); err != nil {
597+
return fmt.Errorf("validate query: %w", err)
598+
}
599+
600+
if localStorage != nil {
601+
return localStorage.DeleteRows(ctx, tenantIDs, q)
602+
}
603+
604+
return netstorageSelect.DeleteRows(ctx, tenantIDs, q, authKey)
605+
}
606+
607+
// IsLocalStorage confirms whether the running instance is a storage node.
608+
func IsLocalStorage() bool {
609+
return localStorage != nil
610+
}
611+
612+
// ListDeleteTasks collects delete task information either from the local storage or from all configured storage nodes.
613+
// It returns a slice with the tasks and an extra Storage field indicating the source node address (or "local" for the embedded storage).
614+
func ListDeleteTasks(ctx context.Context, authKey string) ([]logstorage.DeleteTaskInfoWithSource, error) {
615+
if localStorage != nil {
616+
tasks := localStorage.ListDeleteTasks()
617+
out := make([]logstorage.DeleteTaskInfoWithSource, len(tasks))
618+
for i, t := range tasks {
619+
out[i] = logstorage.DeleteTaskInfoWithSource{
620+
DeleteTaskInfo: t,
621+
}
622+
}
623+
return out, nil
624+
}
625+
626+
return netstorageSelect.ListDeleteTasks(ctx, authKey)
627+
}
628+
503629
func writeStorageMetrics(w io.Writer, strg *logstorage.Storage) {
504630
var ss logstorage.StorageStats
505631
strg.UpdateStats(&ss)
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
package netselect
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"fmt"
7+
"io"
8+
"net/http"
9+
"net/url"
10+
11+
"github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage"
12+
"golang.org/x/sync/errgroup"
13+
)
14+
15+
// ListDeleteTasks gathers all delete tasks from every storage node and returns them along with the originating storage address.
16+
func (s *Storage) ListDeleteTasks(ctx context.Context, authKey string) ([]logstorage.DeleteTaskInfoWithSource, error) {
17+
if len(s.sns) == 0 {
18+
return nil, nil
19+
}
20+
21+
g, ctx := errgroup.WithContext(ctx)
22+
23+
// race-free slices
24+
results := make([][]logstorage.DeleteTaskInfoWithSource, len(s.sns))
25+
for i, sn := range s.sns {
26+
i, sn := i, sn
27+
g.Go(func() error {
28+
tasks, err := sn.getDeleteTasks(ctx, authKey)
29+
if err != nil {
30+
return err
31+
}
32+
results[i] = tasks
33+
return nil
34+
})
35+
}
36+
37+
if err := g.Wait(); err != nil {
38+
return nil, err
39+
}
40+
41+
var all []logstorage.DeleteTaskInfoWithSource
42+
for _, ts := range results {
43+
all = append(all, ts...)
44+
}
45+
46+
return all, nil
47+
}
48+
49+
func (sn *storageNode) getDeleteTasks(ctx context.Context, authKey string) ([]logstorage.DeleteTaskInfoWithSource, error) {
50+
args := url.Values{}
51+
args.Set("version", DeleteProtocolVersion)
52+
if authKey != "" {
53+
args.Set("authKey", authKey)
54+
}
55+
56+
reqURL := sn.getRequestURLWithArgs("/delete", args)
57+
req, err := http.NewRequestWithContext(ctx, "GET", reqURL, nil)
58+
if err != nil {
59+
return nil, err
60+
}
61+
if err := sn.ac.SetHeaders(req, true); err != nil {
62+
return nil, fmt.Errorf("cannot set auth headers for %q: %w", reqURL, err)
63+
}
64+
65+
resp, err := sn.c.Do(req)
66+
if err != nil {
67+
return nil, err
68+
}
69+
defer resp.Body.Close()
70+
71+
body, err := io.ReadAll(resp.Body)
72+
if err != nil {
73+
return nil, fmt.Errorf("cannot read response body from %q: %w", reqURL, err)
74+
}
75+
76+
if resp.StatusCode != 200 {
77+
return nil, fmt.Errorf("unexpected status code for %q: %d; response: %q", reqURL, resp.StatusCode, body)
78+
}
79+
80+
var tasks []logstorage.DeleteTaskInfoWithSource
81+
if err := json.Unmarshal(body, &tasks); err != nil {
82+
return nil, fmt.Errorf("cannot decode delete tasks response from %q: %w; response body: %q", reqURL, err, body)
83+
}
84+
85+
// Attach origin address.
86+
for i := range tasks {
87+
if tasks[i].Storage == "" {
88+
tasks[i].Storage = sn.addr
89+
}
90+
}
91+
return tasks, nil
92+
}

app/vlstorage/netselect/netselect.go

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,10 @@ const (
6161
//
6262
// It must be updated every time the protocol changes.
6363
QueryProtocolVersion = "v2"
64+
65+
// DeleteProtocolVersion is the version of the protocol used for /internal/delete HTTP endpoint.
66+
// It must be updated every time the protocol changes.
67+
DeleteProtocolVersion = "v1"
6468
)
6569

6670
// Storage is a network storage for querying remote storage nodes in the cluster.
@@ -578,3 +582,63 @@ func unmarshalQueryStats(qs *logstorage.QueryStats, src []byte) ([]byte, error)
578582
}
579583
return tail, nil
580584
}
585+
586+
// DeleteRows propagates delete markers to all storage nodes.
587+
func (s *Storage) DeleteRows(ctx context.Context, tenantIDs []logstorage.TenantID, q *logstorage.Query, authKey string) error {
588+
var wg sync.WaitGroup
589+
errs := make([]error, len(s.sns))
590+
for i, sn := range s.sns {
591+
i, sn := i, sn
592+
wg.Add(1)
593+
go func() {
594+
defer wg.Done()
595+
if err := sn.deleteRows(ctx, tenantIDs, q, authKey); err != nil {
596+
errs[i] = fmt.Errorf("storage node %s: %w", sn.addr, err)
597+
}
598+
}()
599+
}
600+
wg.Wait()
601+
602+
return errors.Join(errs...)
603+
}
604+
605+
func (sn *storageNode) deleteRows(ctx context.Context, tenantIDs []logstorage.TenantID, q *logstorage.Query, authKey string) error {
606+
args := sn.getDeleteArgs(DeleteProtocolVersion, tenantIDs, q, authKey)
607+
608+
reqURL := sn.getRequestURLWithArgs("/delete", args)
609+
req, err := http.NewRequestWithContext(ctx, "POST", reqURL, nil)
610+
if err != nil {
611+
return err
612+
}
613+
if err := sn.ac.SetHeaders(req, true); err != nil {
614+
return fmt.Errorf("cannot set auth headers for %q: %w", reqURL, err)
615+
}
616+
resp, err := sn.c.Do(req)
617+
if err != nil {
618+
return err
619+
}
620+
defer resp.Body.Close()
621+
if resp.StatusCode != http.StatusOK {
622+
body, _ := io.ReadAll(resp.Body)
623+
return fmt.Errorf("unexpected status code for request to %q: %d; response: %q", reqURL, resp.StatusCode, body)
624+
}
625+
return nil
626+
}
627+
628+
// getDeleteArgs builds url.Values for delete requests
629+
func (sn *storageNode) getDeleteArgs(version string, tenantIDs []logstorage.TenantID, q *logstorage.Query, authKey string) url.Values {
630+
args := url.Values{}
631+
args.Set("version", version)
632+
args.Set("tenant_ids", string(logstorage.MarshalTenantIDs(nil, tenantIDs)))
633+
args.Set("query", q.String())
634+
args.Set("timestamp", fmt.Sprintf("%d", q.GetTimestamp()))
635+
if authKey != "" {
636+
args.Set("authKey", authKey)
637+
}
638+
return args
639+
}
640+
641+
// getRequestURLWithArgs builds request URL with encoded query args
642+
func (sn *storageNode) getRequestURLWithArgs(path string, args url.Values) string {
643+
return fmt.Sprintf("%s://%s%s?%s", sn.scheme, sn.addr, path, args.Encode())
644+
}

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ require (
1515
github.com/valyala/fastjson v1.6.4
1616
github.com/valyala/fastrand v1.1.0
1717
github.com/valyala/quicktemplate v1.8.0
18+
golang.org/x/sync v0.16.0
1819
)
1920

2021
require (

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ github.com/valyala/quicktemplate v1.8.0 h1:zU0tjbIqTRgKQzFY1L42zq0qR3eh4WoQQdIdq
3636
github.com/valyala/quicktemplate v1.8.0/go.mod h1:qIqW8/igXt8fdrUln5kOSb+KWMaJ4Y8QUsfd1k6L2jM=
3737
golang.org/x/oauth2 v0.30.0 h1:dnDm7JmhM45NNpd8FDDeLhK6FwqbOf4MLCM9zb1BOHI=
3838
golang.org/x/oauth2 v0.30.0/go.mod h1:B++QgG3ZKulg6sRPGD/mqlHQs5rB3Ml9erfeDY7xKlU=
39+
golang.org/x/sync v0.16.0 h1:ycBJEhp9p4vXvUZNszeOq0kGTPghopOL8q0fq3vstxw=
40+
golang.org/x/sync v0.16.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
3941
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
4042
golang.org/x/sys v0.35.0 h1:vz1N37gP5bs89s7He8XuIYXpyY0+QlsKmzipCbUtyxI=
4143
golang.org/x/sys v0.35.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=

0 commit comments

Comments
 (0)