Skip to content
Merged
37 changes: 37 additions & 0 deletions app/vlselect/internalselect/internalselect.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package internalselect

import (
"context"
"encoding/json"
"flag"
"fmt"
"net/http"
Expand Down Expand Up @@ -85,6 +86,7 @@ var requestHandlers = map[string]func(ctx context.Context, w http.ResponseWriter
"/internal/delete/run_task": processDeleteRunTask,
"/internal/delete/stop_task": processDeleteStopTask,
"/internal/delete/active_tasks": processDeleteActiveTasks,
"/internal/select/tenant_ids": processTenantIDsRequest,
}

func processQueryRequest(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
Expand Down Expand Up @@ -375,6 +377,41 @@ func processDeleteActiveTasks(ctx context.Context, w http.ResponseWriter, r *htt
return nil
}

func processTenantIDsRequest(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
start, err := getInt64FromRequest(r, "start")
if err != nil {
return fmt.Errorf("cannot parse start timestamp: %w", err)
}
end, err := getInt64FromRequest(r, "end")
if err != nil {
return fmt.Errorf("cannot parse end timestamp: %w", err)
}

disableCompression := false
if err := getBoolFromRequest(&disableCompression, r, "disable_compression"); err != nil {
return err
}

tenantIDs, err := vlstorage.GetTenantIDs(ctx, start, end)
if err != nil {
return fmt.Errorf("cannot obtain tenant IDs: %w", err)
}

// Marshal tenantIDs at first
b, err := json.Marshal(tenantIDs)
if err != nil {
return fmt.Errorf("cannot marshal tenantIDs: %w", err)
}
if !disableCompression {
b = zstd.CompressLevel(nil, b, 1)
}
w.Header().Set("Content-Type", "application/json")
if _, err := w.Write(b); err != nil {
return fmt.Errorf("cannot send response to the client: %w", err)
}
return nil
}

type commonParams struct {
TenantIDs []logstorage.TenantID
Query *logstorage.Query
Expand Down
48 changes: 48 additions & 0 deletions app/vlselect/logsql/logsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package logsql

import (
"context"
"encoding/json"
"flag"
"fmt"
"io"
Expand Down Expand Up @@ -1073,6 +1074,53 @@ func ProcessQueryRequest(ctx context.Context, w http.ResponseWriter, r *http.Req
writeResponseHeadersOnce()
}

// ProcessAdminTenantsRequest processes /select/tenant_ids request.
func ProcessAdminTenantsRequest(ctx context.Context, w http.ResponseWriter, r *http.Request) {
start, okStart, err := getTimeNsec(r, "start")
if err != nil {
httpserver.Errorf(w, r, "%s", err)
return
}
end, okEnd, err := getTimeNsec(r, "end")
if err != nil {
httpserver.Errorf(w, r, "%s", err)
return
}
if !okStart {
start = math.MinInt64
}
if !okEnd {
end = math.MaxInt64
}

if start > end {
httpserver.Errorf(w, r, "'start' time must be less than 'end' time")
return
}

tenants, err := vlstorage.GetTenantIDs(ctx, start, end)
if err != nil {
httpserver.Errorf(w, r, "cannot obtain tenantIDs: %s", err)
return
}

t, err := json.Marshal(tenants)
if err != nil {
httpserver.Errorf(w, r, "cannot marshal tenantIDs to JSON: %s", err)
return
}

// Write response header
h := w.Header()

h.Set("Content-Type", "application/json")

if _, err := w.Write(t); err != nil {
httpserver.Errorf(w, r, "cannot send response to the client: %s", err)
return
}
}

type syncWriter struct {
mu sync.Mutex
w io.Writer
Expand Down
6 changes: 6 additions & 0 deletions app/vlselect/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,10 @@ func processSelectRequest(ctx context.Context, w http.ResponseWriter, r *http.Re
logsql.ProcessStreamsRequest(ctx, w, r)
logsqlStreamsDuration.UpdateDuration(startTime)
return true
case "/select/tenant_ids":
logsqlAdminTenantsRequests.Inc()
logsql.ProcessAdminTenantsRequest(ctx, w, r)
return true
default:
return false
}
Expand Down Expand Up @@ -437,4 +441,6 @@ var (
deleteRunTaskRequests = metrics.NewCounter(`vl_http_requests_total{path="/delete/run_task"}`)
deleteStopTaskRequests = metrics.NewCounter(`vl_http_requests_total{path="/delete/stop_task"}`)
deleteActiveTasksRequests = metrics.NewCounter(`vl_http_requests_total{path="/delete/active_tasks"}`)

logsqlAdminTenantsRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/tenant_ids"}`)
)
8 changes: 8 additions & 0 deletions app/vlstorage/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,14 @@ func DeleteActiveTasks(ctx context.Context) ([]*logstorage.DeleteTask, error) {
return netstorageSelect.DeleteActiveTasks(ctx)
}

// GetTenantIDs returns tenantIDs from the storage by the given start and end.
func GetTenantIDs(ctx context.Context, start, end int64) ([]logstorage.TenantID, error) {
if localStorage != nil {
return localStorage.GetTenantIDs(ctx, start, end)
}
return netstorageSelect.GetTenantIDs(ctx, start, end)
}

func writeStorageMetrics(w io.Writer, strg *logstorage.Storage) {
var ss logstorage.StorageStats
strg.UpdateStats(&ss)
Expand Down
66 changes: 66 additions & 0 deletions app/vlstorage/netselect/netselect.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package netselect

import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
Expand Down Expand Up @@ -245,6 +246,22 @@ func (sn *storageNode) getStreamIDs(qctx *logstorage.QueryContext, limit uint64)
return sn.getValuesWithHits(qctx, "/internal/select/stream_ids", args)
}

func (sn *storageNode) getTenantIDs(ctx context.Context, start, end int64) ([]logstorage.TenantID, error) {
args := url.Values{}
args.Set("start", fmt.Sprintf("%d", start))
args.Set("end", fmt.Sprintf("%d", end))
args.Set("disable_compression", fmt.Sprintf("%v", sn.s.disableCompression))
b, err := sn.getResponseForPathAndArgs(ctx, "/internal/select/tenant_ids", args)
if err != nil {
return nil, err
}
var tIDs []logstorage.TenantID
if err := json.Unmarshal(b, &tIDs); err != nil {
return nil, fmt.Errorf("cannot unmarshal tenant IDs from %q: %w", b, err)
}
return tIDs, nil
}

func (sn *storageNode) getCommonArgs(version string, qctx *logstorage.QueryContext) url.Values {
// ATTENTION: the *ProtocolVersion consts must be incremented every time the set of common args changes or its format changes.

Expand Down Expand Up @@ -563,6 +580,55 @@ func (s *Storage) DeleteActiveTasks(ctx context.Context) ([]*logstorage.DeleteTa
return tasks, nil
}

// GetTenantIDs returns tenantIDs for the given start and end.
func (s *Storage) GetTenantIDs(ctx context.Context, start, end int64) ([]logstorage.TenantID, error) {
return s.getTenantIDs(ctx, start, end)
}

func (s *Storage) getTenantIDs(ctx context.Context, start, end int64) ([]logstorage.TenantID, error) {
ctxWithCancel, cancel := context.WithCancel(ctx)
defer cancel()

results := make([][]logstorage.TenantID, len(s.sns))
errs := make([]error, len(s.sns))

var wg sync.WaitGroup
for i := range s.sns {
wg.Add(1)
go func(nodeIdx int) {
defer wg.Done()

sn := s.sns[nodeIdx]
tenantIDs, err := sn.getTenantIDs(ctxWithCancel, start, end)
results[nodeIdx] = tenantIDs
errs[nodeIdx] = err
if err != nil {
// Cancel the remaining parallel requests
cancel()
}
}(i)
}
wg.Wait()
if err := getFirstError(errs, false); err != nil {
return nil, err
}

unique := make(map[logstorage.TenantID]struct{})
for _, tenants := range results {
// Deduplicate tenantIDs
for _, tenant := range tenants {
unique[tenant] = struct{}{}
}
}

tenantIDs := make([]logstorage.TenantID, 0, len(unique))
for key := range unique {
tenantIDs = append(tenantIDs, key)
}

return tenantIDs, nil
}

func (s *Storage) getValuesWithHits(qctx *logstorage.QueryContext, limit uint64, resetHitsOnLimitExceeded bool,
callback func(ctx context.Context, sn *storageNode) ([]logstorage.ValueWithHits, error)) ([]logstorage.ValueWithHits, error) {

Expand Down
2 changes: 1 addition & 1 deletion lib/logstorage/indexdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func TestStorageSearchStreamIDs(t *testing.T) {
// non-existing-tag-re
f(`{job="job-0",instance="instance-0",non_existing_tag=~"foo.+"}`, nil)

//non-existing-non-empty-tag-re
// non-existing-non-empty-tag-re
f(`{job="job-0",instance="instance-0",non_existing_tag!~""}`, nil)

// match-job-instance
Expand Down