Skip to content

Commit 8b522c6

Browse files
committed
fix
1 parent 7c0626e commit 8b522c6

22 files changed

+2593
-55
lines changed

app/vlselect/internalselect/internalselect.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ var requestHandlers = map[string]func(ctx context.Context, w http.ResponseWriter
4949
"/internal/select/stream_field_values": processStreamFieldValuesRequest,
5050
"/internal/select/streams": processStreamsRequest,
5151
"/internal/select/stream_ids": processStreamIDsRequest,
52+
"/internal/select/delete": processDeleteRequest,
5253
}
5354

5455
func processQueryRequest(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
@@ -242,6 +243,20 @@ func processStreamIDsRequest(ctx context.Context, w http.ResponseWriter, r *http
242243
return writeValuesWithHits(w, streamIDs, cp.DisableCompression)
243244
}
244245

246+
func processDeleteRequest(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
247+
cp, err := getCommonParams(r, netselect.DeleteProtocolVersion)
248+
if err != nil {
249+
return err
250+
}
251+
252+
if err := vlstorage.DeleteRows(ctx, cp.TenantIDs, cp.Query); err != nil {
253+
return err
254+
}
255+
256+
_, _ = w.Write([]byte("ok"))
257+
return nil
258+
}
259+
245260
type commonParams struct {
246261
TenantIDs []logstorage.TenantID
247262
Query *logstorage.Query

app/vlselect/logsql/logsql.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1333,3 +1333,22 @@ func parseExtraFiltersJSON(s string) ([]extraFilter, error) {
13331333
}
13341334
return filters, nil
13351335
}
1336+
1337+
// ProcessDeleteRequest handles /select/logsql/delete request.
1338+
//
1339+
// It executes DeleteRows and responds with plain "ok" when finished.
1340+
func ProcessDeleteRequest(ctx context.Context, w http.ResponseWriter, r *http.Request) {
1341+
q, tenantIDs, err := parseCommonArgs(r)
1342+
if err != nil {
1343+
httpserver.Errorf(w, r, "%s", err)
1344+
return
1345+
}
1346+
1347+
if err := vlstorage.DeleteRows(ctx, tenantIDs, q); err != nil {
1348+
httpserver.Errorf(w, r, "%s", err)
1349+
return
1350+
}
1351+
1352+
w.Header().Set("Content-Type", "text/plain")
1353+
_, _ = w.Write([]byte("ok"))
1354+
}

app/vlselect/main.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -268,6 +268,11 @@ func processSelectRequest(ctx context.Context, w http.ResponseWriter, r *http.Re
268268
logsql.ProcessStreamsRequest(ctx, w, r)
269269
logsqlStreamsDuration.UpdateDuration(startTime)
270270
return true
271+
case "/select/logsql/delete":
272+
logsqlDeleteRequests.Inc()
273+
logsql.ProcessDeleteRequest(ctx, w, r)
274+
logsqlDeleteDuration.UpdateDuration(startTime)
275+
return true
271276
default:
272277
return false
273278
}
@@ -322,4 +327,7 @@ var (
322327

323328
// no need to track duration for tail requests, as they usually take long time
324329
logsqlTailRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/logsql/tail"}`)
330+
331+
logsqlDeleteRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/logsql/delete"}`)
332+
logsqlDeleteDuration = metrics.NewSummary(`vl_http_request_duration_seconds{path="/select/logsql/delete"}`)
325333
)

app/vlstorage/main.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -351,6 +351,19 @@ func GetStreamIDs(ctx context.Context, tenantIDs []logstorage.TenantID, q *logst
351351
return netstorageSelect.GetStreamIDs(ctx, tenantIDs, q, limit)
352352
}
353353

354+
// DeleteRows marks rows matching q with the Deleted marker (full or partial) and flushes markers to disk immediately.
355+
func DeleteRows(ctx context.Context, tenantIDs []logstorage.TenantID, q *logstorage.Query) error {
356+
err := logstorage.ValidateDeleteQuery(q)
357+
if err != nil {
358+
return fmt.Errorf("validate query: %w", err)
359+
}
360+
361+
if localStorage != nil {
362+
return localStorage.DeleteRows(ctx, tenantIDs, q)
363+
}
364+
return netstorageSelect.DeleteRows(ctx, tenantIDs, q)
365+
}
366+
354367
func writeStorageMetrics(w io.Writer, strg *logstorage.Storage) {
355368
var ss logstorage.StorageStats
356369
strg.UpdateStats(&ss)

app/vlstorage/netselect/netselect.go

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,10 @@ const (
6060
//
6161
// It must be updated every time the protocol changes.
6262
QueryProtocolVersion = "v1"
63+
64+
// DeleteProtocolVersion is the version of the protocol used for /internal/select/delete HTTP endpoint.
65+
// It must be updated every time the protocol changes.
66+
DeleteProtocolVersion = "v1"
6367
)
6468

6569
// Storage is a network storage for querying remote storage nodes in the cluster.
@@ -486,3 +490,50 @@ func unmarshalValuesWithHits(src []byte) ([]logstorage.ValueWithHits, error) {
486490

487491
return vhs, nil
488492
}
493+
494+
func (sn *storageNode) deleteRows(ctx context.Context, tenantIDs []logstorage.TenantID, q *logstorage.Query) error {
495+
args := sn.getCommonArgs(DeleteProtocolVersion, tenantIDs, q)
496+
497+
reqURL := sn.getRequestURL("/internal/select/delete", args)
498+
req, err := http.NewRequestWithContext(ctx, "POST", reqURL, nil)
499+
if err != nil {
500+
return err
501+
}
502+
if err := sn.ac.SetHeaders(req, true); err != nil {
503+
return fmt.Errorf("cannot set auth headers for %q: %w", reqURL, err)
504+
}
505+
resp, err := sn.c.Do(req)
506+
if err != nil {
507+
return err
508+
}
509+
defer resp.Body.Close()
510+
if resp.StatusCode != http.StatusOK {
511+
body, _ := io.ReadAll(resp.Body)
512+
return fmt.Errorf("unexpected status code for request to %q: %d; response: %q", reqURL, resp.StatusCode, body)
513+
}
514+
return nil
515+
}
516+
517+
// DeleteRows propagates delete markers to all storage nodes.
518+
func (s *Storage) DeleteRows(ctx context.Context, tenantIDs []logstorage.TenantID, q *logstorage.Query) error {
519+
var wg sync.WaitGroup
520+
errCh := make(chan error, len(s.sns))
521+
for _, sn := range s.sns {
522+
sn := sn
523+
wg.Add(1)
524+
go func() {
525+
defer wg.Done()
526+
if err := sn.deleteRows(ctx, tenantIDs, q); err != nil {
527+
errCh <- err
528+
}
529+
}()
530+
}
531+
wg.Wait()
532+
close(errCh)
533+
for err := range errCh {
534+
if err != nil {
535+
return err
536+
}
537+
}
538+
return nil
539+
}

lib/logstorage/async_task.go

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
package logstorage
2+
3+
import (
4+
"encoding/json"
5+
"fmt"
6+
"sync"
7+
"sync/atomic"
8+
"time"
9+
)
10+
11+
// asyncTaskType identifies the type of background (asynchronous) task attached to a partition.
12+
// More types can be added in the future (e.g. compaction, ttl, schema-changes).
13+
type asyncTaskType int
14+
15+
const (
16+
asyncTaskNone asyncTaskType = iota // no task
17+
asyncTaskDelete // delete rows matching a query
18+
)
19+
20+
// Status field tracks the outcome of the task.
21+
type asyncTaskStatus int
22+
23+
const (
24+
taskPending asyncTaskStatus = iota
25+
taskSuccess
26+
taskError
27+
)
28+
29+
type asyncTask struct {
30+
Type asyncTaskType `json:"type"`
31+
TenantIDs []TenantID `json:"tenantIDs,omitempty"` // affected tenants (empty slice = all)
32+
Query string `json:"query,omitempty"` // serialized LogSQL query
33+
Seq uint64 `json:"seq,omitempty"` // monotonically increasing *global* sequence
34+
35+
// Status tracks the last execution state; omitted from JSON when zero (pending) to
36+
// preserve compatibility with tasks created before this field existed.
37+
Status asyncTaskStatus `json:"status,omitempty"`
38+
}
39+
40+
type asyncTasks struct {
41+
pt *partition
42+
43+
mu sync.Mutex
44+
ts []asyncTask
45+
currentSeq atomic.Uint64
46+
}
47+
48+
func newAsyncTasks(pt *partition, tasks []asyncTask) *asyncTasks {
49+
ast := &asyncTasks{
50+
pt: pt,
51+
ts: tasks,
52+
}
53+
return ast
54+
}
55+
56+
func (at *asyncTasks) updatePending() asyncTask {
57+
var result asyncTask
58+
59+
at.mu.Lock()
60+
for i := range at.ts {
61+
task := at.ts[i]
62+
if task.Status == taskPending {
63+
result = task
64+
break
65+
}
66+
}
67+
at.mu.Unlock()
68+
69+
at.currentSeq.Store(result.Seq)
70+
return result
71+
}
72+
73+
// unmarshalAsyncTasks converts JSON data back to async tasks
74+
func unmarshalAsyncTasks(data []byte) ([]asyncTask, error) {
75+
if len(data) == 0 {
76+
return nil, nil
77+
}
78+
79+
var tasks []asyncTask
80+
if err := json.Unmarshal(data, &tasks); err != nil {
81+
return nil, fmt.Errorf("unmarshal async tasks: %w", err)
82+
}
83+
return tasks, nil
84+
}
85+
86+
// markResolvedSync updates task status and persists the change to disk.
87+
// It holds the internal mutex only for in‐memory modification; the slow fs write
88+
// is executed after the lock is released to avoid blocking other readers.
89+
func (at *asyncTasks) markResolvedSync(seq uint64, status asyncTaskStatus) {
90+
var updated bool
91+
92+
at.mu.Lock()
93+
for i := len(at.ts) - 1; i >= 0; i-- {
94+
if at.ts[i].Seq == seq && at.ts[i].Status == taskPending {
95+
at.ts[i].Status = status
96+
updated = true
97+
break
98+
}
99+
}
100+
at.mu.Unlock()
101+
102+
if updated {
103+
at.pt.mustSaveAsyncTasks()
104+
}
105+
}
106+
107+
// addDeleteTask appends a delete task to the partition's task list
108+
func (at *asyncTasks) addDeleteTaskSync(tenantIDs []TenantID, q *Query, seq uint64) uint64 {
109+
task := asyncTask{
110+
Seq: seq,
111+
Type: asyncTaskDelete,
112+
TenantIDs: append([]TenantID(nil), tenantIDs...),
113+
Query: q.String(),
114+
Status: taskPending,
115+
}
116+
117+
at.mu.Lock()
118+
at.ts = append(at.ts, task)
119+
at.mu.Unlock()
120+
121+
at.pt.mustSaveAsyncTasks()
122+
return seq
123+
}
124+
125+
// taskSeq provides unique, monotonically increasing sequence numbers for async tasks.
126+
var taskSeq atomic.Uint64
127+
128+
func init() {
129+
// Initialise with current unix-nano in order to minimise collision with seqs that may be present on disk.
130+
taskSeq.Store(uint64(time.Now().UnixNano()))
131+
}

0 commit comments

Comments
 (0)