Skip to content

Commit 0b099f8

Browse files
committed
update
1 parent 3ac9ad9 commit 0b099f8

23 files changed

+2631
-60
lines changed

app/vlselect/internalselect/internalselect.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ var requestHandlers = map[string]func(ctx context.Context, w http.ResponseWriter
4949
"/internal/select/stream_field_values": processStreamFieldValuesRequest,
5050
"/internal/select/streams": processStreamsRequest,
5151
"/internal/select/stream_ids": processStreamIDsRequest,
52+
"/internal/select/delete": processDeleteRequest,
5253
}
5354

5455
func processQueryRequest(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
@@ -242,6 +243,20 @@ func processStreamIDsRequest(ctx context.Context, w http.ResponseWriter, r *http
242243
return writeValuesWithHits(w, streamIDs, cp.DisableCompression)
243244
}
244245

246+
func processDeleteRequest(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
247+
cp, err := getCommonParams(r, netselect.DeleteProtocolVersion)
248+
if err != nil {
249+
return err
250+
}
251+
252+
if err := vlstorage.DeleteRows(ctx, cp.TenantIDs, cp.Query); err != nil {
253+
return err
254+
}
255+
256+
_, _ = w.Write([]byte("ok"))
257+
return nil
258+
}
259+
245260
type commonParams struct {
246261
TenantIDs []logstorage.TenantID
247262
Query *logstorage.Query

app/vlselect/main.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -268,6 +268,7 @@ func processSelectRequest(ctx context.Context, w http.ResponseWriter, r *http.Re
268268
logsql.ProcessStreamsRequest(ctx, w, r)
269269
logsqlStreamsDuration.UpdateDuration(startTime)
270270
return true
271+
271272
default:
272273
return false
273274
}

app/vlstorage/main.go

Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package vlstorage
22

33
import (
44
"context"
5+
"encoding/json"
56
"flag"
67
"fmt"
78
"io"
@@ -15,6 +16,8 @@ import (
1516
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
1617
"github.com/VictoriaMetrics/metrics"
1718

19+
"html/template"
20+
1821
"github.com/VictoriaMetrics/VictoriaLogs/app/vlstorage/netinsert"
1922
"github.com/VictoriaMetrics/VictoriaLogs/app/vlstorage/netselect"
2023
"github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage"
@@ -212,6 +215,10 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
212215
return processForceMerge(w, r)
213216
case "/internal/force_flush":
214217
return processForceFlush(w, r)
218+
case "/internal/async_tasks":
219+
return processAsyncTasks(w, r)
220+
case "/internal/delete":
221+
return processDeleteRequest(w, r)
215222
}
216223
return false
217224
}
@@ -254,6 +261,139 @@ func processForceFlush(w http.ResponseWriter, r *http.Request) bool {
254261
return true
255262
}
256263

264+
func processDeleteRequest(w http.ResponseWriter, r *http.Request) bool {
265+
ctx := r.Context()
266+
267+
// Extract tenantID
268+
tenantID, err := logstorage.GetTenantIDFromRequest(r)
269+
if err != nil {
270+
httpserver.Errorf(w, r, "cannot obtain tenanID: %s", err)
271+
return true
272+
}
273+
tenantIDs := []logstorage.TenantID{tenantID}
274+
275+
// Parse query
276+
qStr := r.FormValue("query")
277+
q, err := logstorage.ParseQueryAtTimestamp(qStr, time.Now().UnixNano()-1)
278+
if err != nil {
279+
httpserver.Errorf(w, r, "cannot parse query [%s]: %s", qStr, err)
280+
return true
281+
}
282+
283+
if err := DeleteRows(ctx, tenantIDs, q); err != nil {
284+
httpserver.Errorf(w, r, "%s", err)
285+
return true
286+
}
287+
288+
w.Header().Set("Content-Type", "text/plain")
289+
_, _ = w.Write([]byte("ok"))
290+
return true
291+
}
292+
293+
var asyncTasksTmpl = template.Must(template.New("asyncTasks").Parse(`
294+
<!DOCTYPE html>
295+
<html lang="en">
296+
<head>
297+
<meta http-equiv="refresh" content="5">
298+
<meta charset="utf-8">
299+
<title>VictoriaLogs — Async tasks</title>
300+
<style>
301+
:root {
302+
--bg: #f9f9f9;
303+
--border: #d0d0d0;
304+
--header-bg: #fafafa;
305+
--row-alt-bg: #ffffff;
306+
--row-hover: #eef2ff;
307+
--font: system-ui,-apple-system,"Segoe UI",Roboto,"Helvetica Neue",sans-serif;
308+
--font-size: 14px;
309+
}
310+
html,body { margin: 0; padding: 0; font-family: var(--font); background: var(--bg); font-size: var(--font-size); }
311+
main { padding: 16px; }
312+
h2 { margin: 0 0 12px; font-weight: 600; }
313+
table { width: 100%; border-collapse: collapse; background: white; box-shadow: 0 1px 3px rgba(0,0,0,0.05); }
314+
th,td { padding: 6px 10px; border: 1px solid var(--border); text-align: left; vertical-align: top; }
315+
thead th { background: var(--header-bg); position: sticky; top: 0; z-index: 1; }
316+
tbody tr:nth-child(odd) { background: var(--row-alt-bg); }
317+
tbody tr:hover { background: var(--row-hover); }
318+
.status { font-weight: 600; padding: 2px 6px; border-radius: 4px; display:inline-block; text-transform: capitalize; }
319+
.status.pending { background:#fff4cc; color:#856404; }
320+
.status.success { background:#d3f9d8; color:#14532d; }
321+
.status.error { background:#f8d7da; color:#842029; }
322+
pre { margin: 0; white-space: pre-wrap; word-break: break-word; }
323+
</style>
324+
</head>
325+
<body>
326+
<main>
327+
<h2>Async tasks</h2>
328+
329+
{{- if .Tasks }}
330+
<table>
331+
<thead>
332+
<tr>
333+
<th style="width:70px;">Seq</th>
334+
<th style="width:140px;">Type</th>
335+
<th style="width:120px;">Status</th>
336+
<th style="width:160px;">Tenant</th>
337+
<th>Payload</th>
338+
</tr>
339+
</thead>
340+
<tbody>
341+
{{- range .Tasks }}
342+
<tr>
343+
<td>{{ .Seq }}</td>
344+
<td>{{ .Type }}</td>
345+
<td><span class="status {{ .Status }}">{{ .Status }}</span></td>
346+
<td>{{ html .Tenant }}</td>
347+
<td><pre>{{ html .PayloadJSON }}</pre></td>
348+
</tr>
349+
{{- end }}
350+
</tbody>
351+
</table>
352+
{{- else }}
353+
<p>No async tasks found.</p>
354+
{{- end }}
355+
</main>
356+
</body>
357+
</html>
358+
`))
359+
360+
func processAsyncTasks(w http.ResponseWriter, _ *http.Request) bool {
361+
if localStorage == nil {
362+
return false // only in local mode
363+
}
364+
tasks := localStorage.ListAsyncTasks()
365+
366+
// build lightweight view-model so template logic stays simple
367+
type row struct {
368+
Seq uint64
369+
Type string
370+
Status string
371+
Tenant string
372+
PayloadJSON string
373+
}
374+
vm := struct {
375+
Tasks []row
376+
}{}
377+
378+
for _, t := range tasks {
379+
payloadJSON, _ := json.Marshal(t.Payload)
380+
vm.Tasks = append(vm.Tasks, row{
381+
Seq: t.Seq,
382+
Type: t.Type,
383+
Status: t.Status,
384+
Tenant: t.Tenant,
385+
PayloadJSON: string(payloadJSON),
386+
})
387+
}
388+
389+
w.Header().Set("Content-Type", "text/html; charset=utf-8")
390+
if err := asyncTasksTmpl.Execute(w, vm); err != nil {
391+
logger.Errorf("render async tasks: %s", err)
392+
http.Error(w, "internal error", http.StatusInternalServerError)
393+
}
394+
return true
395+
}
396+
257397
// Storage implements insertutil.LogRowsStorage interface
258398
type Storage struct{}
259399

@@ -351,6 +491,19 @@ func GetStreamIDs(ctx context.Context, tenantIDs []logstorage.TenantID, q *logst
351491
return netstorageSelect.GetStreamIDs(ctx, tenantIDs, q, limit)
352492
}
353493

494+
// DeleteRows marks rows matching q with the Deleted marker (full or partial) and flushes markers to disk immediately.
495+
func DeleteRows(ctx context.Context, tenantIDs []logstorage.TenantID, q *logstorage.Query) error {
496+
err := logstorage.ValidateDeleteQuery(q)
497+
if err != nil {
498+
return fmt.Errorf("validate query: %w", err)
499+
}
500+
501+
if localStorage != nil {
502+
return localStorage.DeleteRows(ctx, tenantIDs, q)
503+
}
504+
return netstorageSelect.DeleteRows(ctx, tenantIDs, q)
505+
}
506+
354507
func writeStorageMetrics(w io.Writer, strg *logstorage.Storage) {
355508
var ss logstorage.StorageStats
356509
strg.UpdateStats(&ss)

app/vlstorage/netselect/netselect.go

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,10 @@ const (
6060
//
6161
// It must be updated every time the protocol changes.
6262
QueryProtocolVersion = "v1"
63+
64+
// DeleteProtocolVersion is the version of the protocol used for /internal/select/delete HTTP endpoint.
65+
// It must be updated every time the protocol changes.
66+
DeleteProtocolVersion = "v1"
6367
)
6468

6569
// Storage is a network storage for querying remote storage nodes in the cluster.
@@ -486,3 +490,50 @@ func unmarshalValuesWithHits(src []byte) ([]logstorage.ValueWithHits, error) {
486490

487491
return vhs, nil
488492
}
493+
494+
func (sn *storageNode) deleteRows(ctx context.Context, tenantIDs []logstorage.TenantID, q *logstorage.Query) error {
495+
args := sn.getCommonArgs(DeleteProtocolVersion, tenantIDs, q)
496+
497+
reqURL := sn.getRequestURL("/internal/select/delete", args)
498+
req, err := http.NewRequestWithContext(ctx, "POST", reqURL, nil)
499+
if err != nil {
500+
return err
501+
}
502+
if err := sn.ac.SetHeaders(req, true); err != nil {
503+
return fmt.Errorf("cannot set auth headers for %q: %w", reqURL, err)
504+
}
505+
resp, err := sn.c.Do(req)
506+
if err != nil {
507+
return err
508+
}
509+
defer resp.Body.Close()
510+
if resp.StatusCode != http.StatusOK {
511+
body, _ := io.ReadAll(resp.Body)
512+
return fmt.Errorf("unexpected status code for request to %q: %d; response: %q", reqURL, resp.StatusCode, body)
513+
}
514+
return nil
515+
}
516+
517+
// DeleteRows propagates delete markers to all storage nodes.
518+
func (s *Storage) DeleteRows(ctx context.Context, tenantIDs []logstorage.TenantID, q *logstorage.Query) error {
519+
var wg sync.WaitGroup
520+
errCh := make(chan error, len(s.sns))
521+
for _, sn := range s.sns {
522+
sn := sn
523+
wg.Add(1)
524+
go func() {
525+
defer wg.Done()
526+
if err := sn.deleteRows(ctx, tenantIDs, q); err != nil {
527+
errCh <- err
528+
}
529+
}()
530+
}
531+
wg.Wait()
532+
close(errCh)
533+
for err := range errCh {
534+
if err != nil {
535+
return err
536+
}
537+
}
538+
return nil
539+
}

0 commit comments

Comments
 (0)