Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions app/vlselect/internalselect/internalselect.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package internalselect
import (
"context"
"fmt"
"math"
"net/http"
"strconv"
"sync"
Expand All @@ -14,6 +15,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding/zstd"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timeutil"
"github.com/VictoriaMetrics/metrics"

"github.com/VictoriaMetrics/VictoriaLogs/app/vlstorage"
Expand Down Expand Up @@ -49,6 +51,7 @@ var requestHandlers = map[string]func(ctx context.Context, w http.ResponseWriter
"/internal/select/stream_field_values": processStreamFieldValuesRequest,
"/internal/select/streams": processStreamsRequest,
"/internal/select/stream_ids": processStreamIDsRequest,
"/internal/select/tenant_ids": processTenantIDsRequest,
}

func processQueryRequest(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
Expand Down Expand Up @@ -242,6 +245,30 @@ func processStreamIDsRequest(ctx context.Context, w http.ResponseWriter, r *http
return writeValuesWithHits(w, streamIDs, cp.DisableCompression)
}

func processTenantIDsRequest(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
start, okStart, err := getTimeNsec(r, "start")
if err != nil {
return fmt.Errorf("cannot parse start timestamp: %w", err)
}
end, okEnd, err := getTimeNsec(r, "end")
if err != nil {
return fmt.Errorf("cannot parse end timestamp: %w", err)
}
if !okStart {
start = math.MinInt64
}
if !okEnd {
end = math.MaxInt64
}

tenantIDs, err := vlstorage.GetTenantIDs(ctx, start, end)
if err != nil {
return fmt.Errorf("cannot obtain tenant IDs: %w", err)
}

return writeTenantIDs(w, tenantIDs, false)
}

type commonParams struct {
TenantIDs []logstorage.TenantID
Query *logstorage.Query
Expand Down Expand Up @@ -306,6 +333,17 @@ func writeValuesWithHits(w http.ResponseWriter, vhs []logstorage.ValueWithHits,
return nil
}

func writeTenantIDs(w http.ResponseWriter, tenantIDs []byte, disableCompression bool) error {
if !disableCompression {
tenantIDs = zstd.CompressLevel(nil, tenantIDs, 1)
}
w.Header().Set("Content-Type", "application/json")
if _, err := w.Write(tenantIDs); err != nil {
return fmt.Errorf("cannot send response to the client: %w", err)
}
return nil
}

func getInt64FromRequest(r *http.Request, argName string) (int64, error) {
s := r.FormValue(argName)
n, err := strconv.ParseInt(s, 10, 64)
Expand All @@ -314,3 +352,16 @@ func getInt64FromRequest(r *http.Request, argName string) (int64, error) {
}
return n, nil
}

func getTimeNsec(r *http.Request, argName string) (int64, bool, error) {
s := r.FormValue(argName)
if s == "" {
return 0, false, nil
}
currentTimestamp := time.Now().UnixNano()
nsecs, err := timeutil.ParseTimeAt(s, currentTimestamp)
if err != nil {
return 0, false, fmt.Errorf("cannot parse %s=%s: %w", argName, s, err)
}
return nsecs, true, nil
}
62 changes: 62 additions & 0 deletions app/vlselect/logsql/logsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package logsql

import (
"context"
"encoding/json"
"fmt"
"io"
"math"
Expand Down Expand Up @@ -972,6 +973,67 @@ func ProcessQueryRequest(ctx context.Context, w http.ResponseWriter, r *http.Req
}
}

// ProcessAdminTenantsRequest processes /select/admin/tenants request.
func ProcessAdminTenantsRequest(ctx context.Context, w http.ResponseWriter, r *http.Request) {
start, okStart, err := getTimeNsec(r, "start")
if err != nil {
httpserver.Errorf(w, r, "%s", err)
return
}
end, okEnd, err := getTimeNsec(r, "end")
if err != nil {
httpserver.Errorf(w, r, "%s", err)
return
}
if !okStart {
start = math.MinInt64
}
if !okEnd {
end = math.MaxInt64
}

if start > end {
httpserver.Errorf(w, r, "'start' time must be less than 'end' time")
return
}

sw := &syncWriter{
w: w,
}

var bwShards atomicutil.Slice[bufferedWriter]
bwShards.Init = func(shard *bufferedWriter) {
shard.sw = sw
}
defer func() {
shards := bwShards.All()
for _, shard := range shards {
shard.FlushIgnoreErrors()
}
}()

w.Header().Set("Content-Type", "application/json")

tenants, err := vlstorage.GetTenantIDs(ctx, start, end)
if err != nil {
httpserver.Errorf(w, r, "cannot obtain tenantIDs: %s", err)
return
}

var t []logstorage.TenantID
if err := json.Unmarshal(tenants, &t); err != nil {
httpserver.Errorf(w, r, "cannot unmarshal tenantIDs: %s", err)
return
}

resp := make([]string, 0, len(t))
for _, tenantID := range t {
resp = append(resp, fmt.Sprintf("%d:%d", tenantID.AccountID, tenantID.ProjectID))
}

WriteTenantsResponse(w, resp)
}

type syncWriter struct {
mu sync.Mutex
w io.Writer
Expand Down
15 changes: 15 additions & 0 deletions app/vlselect/logsql/tenants_response.qtpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{% stripspace %}

TenantsResponse generates response for /admin/tenants .
{% func TenantsResponse(tenants []string) %}
{
"status":"success",
"data":[
{% for i, tenant := range tenants %}
{%q= tenant %}
{% if i+1 < len(tenants) %},{% endif %}
{% endfor %}
]
}
{% endfunc %}
{% endstripspace %}
67 changes: 67 additions & 0 deletions app/vlselect/logsql/tenants_response.qtpl.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions app/vlselect/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,10 @@ func processSelectRequest(ctx context.Context, w http.ResponseWriter, r *http.Re
logsql.ProcessStreamsRequest(ctx, w, r)
logsqlStreamsDuration.UpdateDuration(startTime)
return true
case "/select/admin/tenants":
logsqlAdminTenantsRequests.Inc()
logsql.ProcessAdminTenantsRequest(ctx, w, r)
return true
default:
return false
}
Expand Down Expand Up @@ -322,4 +326,6 @@ var (

// no need to track duration for tail requests, as they usually take long time
logsqlTailRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/logsql/tail"}`)

logsqlAdminTenantsRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/admin/tenants"}`)
)
8 changes: 8 additions & 0 deletions app/vlstorage/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,14 @@ func GetStreamIDs(ctx context.Context, tenantIDs []logstorage.TenantID, q *logst
return netstorageSelect.GetStreamIDs(ctx, tenantIDs, q, limit)
}

// GetTenantIDs returns tenantIDs from the storage by the given start and end.
func GetTenantIDs(ctx context.Context, start, end int64) ([]byte, error) {
if localStorage != nil {
return localStorage.GetTenantIDs(ctx, start, end)
}
return netstorageSelect.GetTenantIDs(ctx, start, end)
}

func writeStorageMetrics(w io.Writer, strg *logstorage.Storage) {
var ss logstorage.StorageStats
strg.UpdateStats(&ss)
Expand Down
61 changes: 61 additions & 0 deletions app/vlstorage/netselect/netselect.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package netselect

import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
Expand Down Expand Up @@ -225,6 +226,13 @@ func (sn *storageNode) getStreamIDs(ctx context.Context, tenantIDs []logstorage.
return sn.getValuesWithHits(ctx, "/internal/select/stream_ids", args)
}

func (sn *storageNode) getTenantIDs(ctx context.Context, start, end int64) ([]byte, error) {
args := url.Values{}
args.Set("start", fmt.Sprintf("%d", start))
args.Set("end", fmt.Sprintf("%d", end))
return sn.executeRequestAt(ctx, "/internal/select/tenant_ids", args)
}

func (sn *storageNode) getCommonArgs(version string, tenantIDs []logstorage.TenantID, q *logstorage.Query) url.Values {
args := url.Values{}
args.Set("version", version)
Expand Down Expand Up @@ -407,6 +415,59 @@ func (s *Storage) GetStreamIDs(ctx context.Context, tenantIDs []logstorage.Tenan
})
}

// GetTenantIDs returns tenantIDs for the given start and end.
func (s *Storage) GetTenantIDs(ctx context.Context, start, end int64) ([]byte, error) {
return s.getTenantIDs(ctx, start, end)
}

func (s *Storage) getTenantIDs(ctx context.Context, start, end int64) ([]byte, error) {
ctxWithCancel, cancel := context.WithCancel(ctx)
defer cancel()

results := make([][]byte, len(s.sns))
errs := make([]error, len(s.sns))

var wg sync.WaitGroup
for i := range s.sns {
wg.Add(1)
go func(nodeIdx int) {
defer wg.Done()

sn := s.sns[nodeIdx]
tenantIDs, err := sn.getTenantIDs(ctxWithCancel, start, end)
results[nodeIdx] = tenantIDs
errs[nodeIdx] = err
if err != nil {
// Cancel the remaining parallel requests
cancel()
}
}(i)
}
wg.Wait()
if err := getFirstNonCancelError(errs); err != nil {
return nil, err
}

unique := make(map[logstorage.TenantID]struct{}, len(results))
for i := range results {
var tenats []logstorage.TenantID
if err := json.Unmarshal(results[i], &tenats); err != nil {
return nil, fmt.Errorf("cannot unmarshal tenantIDs from storage node %d: %w", i, err)
}
for _, tenat := range tenats {
unique[tenat] = struct{}{}
}
}

// Deduplicate tenantIDs
tenantIDs := make([]logstorage.TenantID, 0, len(unique))
for key := range unique {
tenantIDs = append(tenantIDs, key)
}

return json.Marshal(tenantIDs)
}

func (s *Storage) getValuesWithHits(ctx context.Context, limit uint64, resetHitsOnLimitExceeded bool,
callback func(ctx context.Context, sn *storageNode) ([]logstorage.ValueWithHits, error)) ([]logstorage.ValueWithHits, error) {

Expand Down
Loading
Loading