Skip to content

Commit 4c7abb9

Browse files
committed
update
1 parent 3a72694 commit 4c7abb9

28 files changed

+2817
-18
lines changed

app/vlselect/main.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -325,6 +325,5 @@ var (
325325
logsqlStreamsRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/logsql/streams"}`)
326326
logsqlStreamsDuration = metrics.NewSummary(`vl_http_request_duration_seconds{path="/select/logsql/streams"}`)
327327

328-
// no need to track duration for tail requests, as they usually take long time
329328
logsqlTailRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/logsql/tail"}`)
330329
)

app/vlstorage/main.go

Lines changed: 125 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
package vlstorage
22

33
import (
4+
"context"
45
"encoding/json"
56
"flag"
67
"fmt"
78
"io"
89
"math"
910
"net/http"
11+
"strconv"
1012
"time"
1113

1214
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
@@ -57,6 +59,9 @@ var (
5759
partitionManageAuthKey = flagutil.NewPassword("partitionManageAuthKey", "authKey, which must be passed in query string to /internal/partition/* . It overrides -httpAuth.* . "+
5860
"See https://docs.victoriametrics.com/victorialogs/#partitions-lifecycle")
5961

62+
deleteAuthKey = flagutil.NewPassword("deleteAuthKey", "authKey, which must be passed in query string to /delete and /internal/delete . It overrides -httpAuth.* . "+
63+
"See https://docs.victoriametrics.com/victorialogs/#delete-log-rows")
64+
6065
storageNodeAddrs = flagutil.NewArrayString("storageNode", "Comma-separated list of TCP addresses for storage nodes to route the ingested logs to and to send select queries to. "+
6166
"If the list is empty, then the ingested logs are stored and queried locally from -storageDataPath")
6267
insertConcurrency = flag.Int("insert.concurrency", 2, "The average number of concurrent data ingestion requests, which can be sent to every -storageNode")
@@ -94,6 +99,11 @@ var netstorageInsert *netinsert.Storage
9499

95100
var netstorageSelect *netselect.Storage
96101

102+
// CheckDeleteAuth validates auth for delete endpoints.
103+
func CheckDeleteAuth(w http.ResponseWriter, r *http.Request) bool {
104+
return httpserver.CheckAuthFlag(w, r, deleteAuthKey)
105+
}
106+
97107
// Init initializes vlstorage.
98108
//
99109
// Stop must be called when vlstorage is no longer needed
@@ -166,7 +176,7 @@ func initNetworkStorage() {
166176
netstorageInsert = netinsert.NewStorage(*storageNodeAddrs, authCfgs, isTLSs, *insertConcurrency, *insertDisableCompression)
167177

168178
logger.Infof("initializing select service for nodes %s", *storageNodeAddrs)
169-
netstorageSelect = netselect.NewStorage(*storageNodeAddrs, authCfgs, isTLSs, *selectDisableCompression)
179+
netstorageSelect = netselect.NewStorage(*storageNodeAddrs, authCfgs, isTLSs, *selectDisableCompression, deleteAuthKey.Get())
170180

171181
logger.Infof("initialized all the network services")
172182
}
@@ -246,6 +256,8 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
246256
return processPartitionSnapshotCreate(w, r)
247257
case "/internal/partition/snapshot/list":
248258
return processPartitionSnapshotList(w, r)
259+
case "/delete":
260+
return processDelete(r.Context(), w, r)
249261
}
250262
return false
251263
}
@@ -397,6 +409,83 @@ func writeJSONResponse(w http.ResponseWriter, response any) {
397409
w.Write(responseBody)
398410
}
399411

412+
// processDelete handles the /delete endpoint for both local and remote storage nodes.
413+
// It supports POST for scheduling deletes and GET for retrieving task status.
414+
func processDelete(ctx context.Context, w http.ResponseWriter, r *http.Request) bool {
415+
if localStorage != nil {
416+
if !CheckDeleteAuth(w, r) {
417+
return true
418+
}
419+
}
420+
421+
switch r.Method {
422+
case http.MethodPost:
423+
if tenantIDsStr := r.FormValue("tenant_ids"); tenantIDsStr != "" {
424+
tenantIDs, err := logstorage.UnmarshalTenantIDs([]byte(tenantIDsStr))
425+
if err != nil {
426+
httpserver.Errorf(w, r, "cannot unmarshal tenant_ids=%q: %s", tenantIDsStr, err)
427+
return true
428+
}
429+
430+
timestampStr := r.FormValue("timestamp")
431+
timestamp, err := strconv.ParseInt(timestampStr, 10, 64)
432+
if err != nil {
433+
httpserver.Errorf(w, r, "cannot parse timestamp=%q: %s", timestampStr, err)
434+
return true
435+
}
436+
437+
qStr := r.FormValue("query")
438+
q, err := logstorage.ParseQueryAtTimestamp(qStr, timestamp)
439+
if err != nil {
440+
httpserver.Errorf(w, r, "cannot parse query=%q: %s", qStr, err)
441+
return true
442+
}
443+
444+
if err := DeleteRows(ctx, tenantIDs, q); err != nil {
445+
httpserver.Errorf(w, r, "cannot delete rows: %s", err)
446+
return true
447+
}
448+
} else {
449+
tenantID, err := logstorage.GetTenantIDFromRequest(r)
450+
if err != nil {
451+
httpserver.Errorf(w, r, "cannot obtain tenantID: %s", err)
452+
return true
453+
}
454+
455+
qStr := r.FormValue("query")
456+
q, err := logstorage.ParseQuery(qStr)
457+
if err != nil {
458+
httpserver.Errorf(w, r, "cannot parse query [%s]: %s", qStr, err)
459+
return true
460+
}
461+
462+
if err := DeleteRows(ctx, []logstorage.TenantID{tenantID}, q); err != nil {
463+
httpserver.Errorf(w, r, "%s", err)
464+
return true
465+
}
466+
}
467+
468+
w.Header().Set("Content-Type", "text/plain")
469+
w.WriteHeader(http.StatusOK)
470+
471+
case http.MethodGet:
472+
tasks, err := ListDeleteTasks(ctx)
473+
if err != nil {
474+
httpserver.Errorf(w, r, "cannot list delete tasks: %s", err)
475+
return true
476+
}
477+
478+
w.Header().Set("Content-Type", "application/json")
479+
if err := json.NewEncoder(w).Encode(tasks); err != nil {
480+
httpserver.Errorf(w, r, "internal error: %s", err)
481+
}
482+
483+
default:
484+
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
485+
}
486+
return true
487+
}
488+
400489
// Storage implements insertutil.LogRowsStorage interface
401490
type Storage struct{}
402491

@@ -500,6 +589,41 @@ func GetStreamIDs(qctx *logstorage.QueryContext, limit uint64) ([]logstorage.Val
500589
return netstorageSelect.GetStreamIDs(qctx, limit)
501590
}
502591

592+
// DeleteRows marks rows matching q with the Deleted marker (full or partial) and flushes markers to disk immediately.
593+
func DeleteRows(ctx context.Context, tenantIDs []logstorage.TenantID, q *logstorage.Query) error {
594+
if err := logstorage.ValidateDeleteQuery(q); err != nil {
595+
return fmt.Errorf("validate query: %w", err)
596+
}
597+
598+
if localStorage != nil {
599+
return localStorage.DeleteRows(ctx, tenantIDs, q)
600+
}
601+
602+
return netstorageSelect.DeleteRows(ctx, tenantIDs, q)
603+
}
604+
605+
// IsLocalStorage confirms whether the running instance is a storage node.
606+
func IsLocalStorage() bool {
607+
return localStorage != nil
608+
}
609+
610+
// ListDeleteTasks collects delete task information either from the local storage or from all configured storage nodes.
611+
// It returns a slice with the tasks and an extra Storage field indicating the source node address (or "local" for the embedded storage).
612+
func ListDeleteTasks(ctx context.Context) ([]logstorage.DeleteTaskInfoWithSource, error) {
613+
if localStorage != nil {
614+
tasks := localStorage.ListDeleteTasks()
615+
out := make([]logstorage.DeleteTaskInfoWithSource, len(tasks))
616+
for i, t := range tasks {
617+
out[i] = logstorage.DeleteTaskInfoWithSource{
618+
DeleteTaskInfo: t,
619+
}
620+
}
621+
return out, nil
622+
}
623+
624+
return netstorageSelect.ListDeleteTasks(ctx)
625+
}
626+
503627
func writeStorageMetrics(w io.Writer, strg *logstorage.Storage) {
504628
var ss logstorage.StorageStats
505629
strg.UpdateStats(&ss)
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
package netselect
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"fmt"
7+
"io"
8+
"net/http"
9+
"net/url"
10+
11+
"github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage"
12+
"golang.org/x/sync/errgroup"
13+
)
14+
15+
// ListDeleteTasks gathers all delete tasks from every storage node and returns them along with the originating storage address.
16+
func (s *Storage) ListDeleteTasks(ctx context.Context) ([]logstorage.DeleteTaskInfoWithSource, error) {
17+
if len(s.sns) == 0 {
18+
return nil, nil
19+
}
20+
21+
g, ctx := errgroup.WithContext(ctx)
22+
23+
// race-free slices
24+
results := make([][]logstorage.DeleteTaskInfoWithSource, len(s.sns))
25+
for i, sn := range s.sns {
26+
i, sn := i, sn
27+
g.Go(func() error {
28+
tasks, err := sn.getDeleteTasks(ctx)
29+
if err != nil {
30+
return err
31+
}
32+
results[i] = tasks
33+
return nil
34+
})
35+
}
36+
37+
if err := g.Wait(); err != nil {
38+
return nil, err
39+
}
40+
41+
var all []logstorage.DeleteTaskInfoWithSource
42+
for _, ts := range results {
43+
all = append(all, ts...)
44+
}
45+
46+
return all, nil
47+
}
48+
49+
func (sn *storageNode) getDeleteTasks(ctx context.Context) ([]logstorage.DeleteTaskInfoWithSource, error) {
50+
args := url.Values{}
51+
args.Set("version", DeleteProtocolVersion)
52+
if key := sn.s.deleteAuthKey; key != "" {
53+
args.Set("authKey", key)
54+
}
55+
56+
reqURL := sn.getRequestURLWithArgs("/delete", args)
57+
req, err := http.NewRequestWithContext(ctx, "GET", reqURL, nil)
58+
if err != nil {
59+
return nil, err
60+
}
61+
if err := sn.ac.SetHeaders(req, true); err != nil {
62+
return nil, fmt.Errorf("cannot set auth headers for %q: %w", reqURL, err)
63+
}
64+
65+
resp, err := sn.c.Do(req)
66+
if err != nil {
67+
return nil, err
68+
}
69+
defer resp.Body.Close()
70+
71+
body, err := io.ReadAll(resp.Body)
72+
if err != nil {
73+
return nil, fmt.Errorf("cannot read response body from %q: %w", reqURL, err)
74+
}
75+
76+
if resp.StatusCode != 200 {
77+
return nil, fmt.Errorf("unexpected status code for %q: %d; response: %q", reqURL, resp.StatusCode, body)
78+
}
79+
80+
var tasks []logstorage.DeleteTaskInfoWithSource
81+
if err := json.Unmarshal(body, &tasks); err != nil {
82+
return nil, fmt.Errorf("cannot decode delete tasks response from %q: %w; response body: %q", reqURL, err, body)
83+
}
84+
85+
// Attach origin address.
86+
for i := range tasks {
87+
if tasks[i].Storage == "" {
88+
tasks[i].Storage = sn.addr
89+
}
90+
}
91+
return tasks, nil
92+
}

app/vlstorage/netselect/netselect.go

Lines changed: 67 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,13 +61,18 @@ const (
6161
//
6262
// It must be updated every time the protocol changes.
6363
QueryProtocolVersion = "v2"
64+
65+
// DeleteProtocolVersion is the version of the protocol used for /internal/delete HTTP endpoint.
66+
// It must be updated every time the protocol changes.
67+
DeleteProtocolVersion = "v1"
6468
)
6569

6670
// Storage is a network storage for querying remote storage nodes in the cluster.
6771
type Storage struct {
6872
sns []*storageNode
6973

7074
disableCompression bool
75+
deleteAuthKey string
7176
}
7277

7378
type storageNode struct {
@@ -321,9 +326,10 @@ func (sn *storageNode) getRequestURL(path string) string {
321326
// If disableCompression is set, then uncompressed responses are received from storage nodes.
322327
//
323328
// Call MustStop on the returned storage when it is no longer needed.
324-
func NewStorage(addrs []string, authCfgs []*promauth.Config, isTLSs []bool, disableCompression bool) *Storage {
329+
func NewStorage(addrs []string, authCfgs []*promauth.Config, isTLSs []bool, disableCompression bool, deleteAuthKey string) *Storage {
325330
s := &Storage{
326331
disableCompression: disableCompression,
332+
deleteAuthKey: deleteAuthKey,
327333
}
328334

329335
sns := make([]*storageNode, len(addrs))
@@ -578,3 +584,63 @@ func unmarshalQueryStats(qs *logstorage.QueryStats, src []byte) ([]byte, error)
578584
}
579585
return tail, nil
580586
}
587+
588+
// DeleteRows propagates delete markers to all storage nodes.
589+
func (s *Storage) DeleteRows(ctx context.Context, tenantIDs []logstorage.TenantID, q *logstorage.Query) error {
590+
var wg sync.WaitGroup
591+
errs := make([]error, len(s.sns))
592+
for i, sn := range s.sns {
593+
i, sn := i, sn
594+
wg.Add(1)
595+
go func() {
596+
defer wg.Done()
597+
if err := sn.deleteRows(ctx, tenantIDs, q); err != nil {
598+
errs[i] = fmt.Errorf("storage node %s: %w", sn.addr, err)
599+
}
600+
}()
601+
}
602+
wg.Wait()
603+
604+
return errors.Join(errs...)
605+
}
606+
607+
func (sn *storageNode) deleteRows(ctx context.Context, tenantIDs []logstorage.TenantID, q *logstorage.Query) error {
608+
args := sn.getDeleteArgs(DeleteProtocolVersion, tenantIDs, q)
609+
610+
reqURL := sn.getRequestURLWithArgs("/delete", args)
611+
req, err := http.NewRequestWithContext(ctx, "POST", reqURL, nil)
612+
if err != nil {
613+
return err
614+
}
615+
if err := sn.ac.SetHeaders(req, true); err != nil {
616+
return fmt.Errorf("cannot set auth headers for %q: %w", reqURL, err)
617+
}
618+
resp, err := sn.c.Do(req)
619+
if err != nil {
620+
return err
621+
}
622+
defer resp.Body.Close()
623+
if resp.StatusCode != http.StatusOK {
624+
body, _ := io.ReadAll(resp.Body)
625+
return fmt.Errorf("unexpected status code for request to %q: %d; response: %q", reqURL, resp.StatusCode, body)
626+
}
627+
return nil
628+
}
629+
630+
// getDeleteArgs builds url.Values for delete requests
631+
func (sn *storageNode) getDeleteArgs(version string, tenantIDs []logstorage.TenantID, q *logstorage.Query) url.Values {
632+
args := url.Values{}
633+
args.Set("version", version)
634+
args.Set("tenant_ids", string(logstorage.MarshalTenantIDs(nil, tenantIDs)))
635+
args.Set("query", q.String())
636+
args.Set("timestamp", fmt.Sprintf("%d", q.GetTimestamp()))
637+
if key := sn.s.deleteAuthKey; key != "" {
638+
args.Set("authKey", key)
639+
}
640+
return args
641+
}
642+
643+
// getRequestURLWithArgs builds request URL with encoded query args
644+
func (sn *storageNode) getRequestURLWithArgs(path string, args url.Values) string {
645+
return fmt.Sprintf("%s://%s%s?%s", sn.scheme, sn.addr, path, args.Encode())
646+
}

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ require (
1515
github.com/valyala/fastjson v1.6.4
1616
github.com/valyala/fastrand v1.1.0
1717
github.com/valyala/quicktemplate v1.8.0
18+
golang.org/x/sync v0.16.0
1819
)
1920

2021
require (

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ github.com/valyala/quicktemplate v1.8.0 h1:zU0tjbIqTRgKQzFY1L42zq0qR3eh4WoQQdIdq
3636
github.com/valyala/quicktemplate v1.8.0/go.mod h1:qIqW8/igXt8fdrUln5kOSb+KWMaJ4Y8QUsfd1k6L2jM=
3737
golang.org/x/oauth2 v0.30.0 h1:dnDm7JmhM45NNpd8FDDeLhK6FwqbOf4MLCM9zb1BOHI=
3838
golang.org/x/oauth2 v0.30.0/go.mod h1:B++QgG3ZKulg6sRPGD/mqlHQs5rB3Ml9erfeDY7xKlU=
39+
golang.org/x/sync v0.16.0 h1:ycBJEhp9p4vXvUZNszeOq0kGTPghopOL8q0fq3vstxw=
40+
golang.org/x/sync v0.16.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
3941
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
4042
golang.org/x/sys v0.35.0 h1:vz1N37gP5bs89s7He8XuIYXpyY0+QlsKmzipCbUtyxI=
4143
golang.org/x/sys v0.35.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=

0 commit comments

Comments
 (0)