diff --git a/app/vlselect/internalselect/internalselect.go b/app/vlselect/internalselect/internalselect.go index 5c9076b263..6240292b29 100644 --- a/app/vlselect/internalselect/internalselect.go +++ b/app/vlselect/internalselect/internalselect.go @@ -3,6 +3,7 @@ package internalselect import ( "context" "fmt" + "math" "net/http" "strconv" "sync" @@ -14,6 +15,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding/zstd" "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" "github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/timeutil" "github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/VictoriaLogs/app/vlstorage" @@ -49,6 +51,7 @@ var requestHandlers = map[string]func(ctx context.Context, w http.ResponseWriter "/internal/select/stream_field_values": processStreamFieldValuesRequest, "/internal/select/streams": processStreamsRequest, "/internal/select/stream_ids": processStreamIDsRequest, + "/internal/select/tenant_ids": processTenantIDsRequest, } func processQueryRequest(ctx context.Context, w http.ResponseWriter, r *http.Request) error { @@ -242,6 +245,30 @@ func processStreamIDsRequest(ctx context.Context, w http.ResponseWriter, r *http return writeValuesWithHits(w, streamIDs, cp.DisableCompression) } +func processTenantIDsRequest(ctx context.Context, w http.ResponseWriter, r *http.Request) error { + start, okStart, err := getTimeNsec(r, "start") + if err != nil { + return fmt.Errorf("cannot parse start timestamp: %w", err) + } + end, okEnd, err := getTimeNsec(r, "end") + if err != nil { + return fmt.Errorf("cannot parse end timestamp: %w", err) + } + if !okStart { + start = math.MinInt64 + } + if !okEnd { + end = math.MaxInt64 + } + + tenantIDs, err := vlstorage.GetTenantIDs(ctx, start, end) + if err != nil { + return fmt.Errorf("cannot obtain tenant IDs: %w", err) + } + + return writeTenantIDs(w, tenantIDs, false) +} + type commonParams struct { TenantIDs []logstorage.TenantID Query *logstorage.Query @@ -306,6 +333,17 @@ func writeValuesWithHits(w http.ResponseWriter, vhs []logstorage.ValueWithHits, return nil } +func writeTenantIDs(w http.ResponseWriter, tenantIDs []byte, disableCompression bool) error { + if !disableCompression { + tenantIDs = zstd.CompressLevel(nil, tenantIDs, 1) + } + w.Header().Set("Content-Type", "application/json") + if _, err := w.Write(tenantIDs); err != nil { + return fmt.Errorf("cannot send response to the client: %w", err) + } + return nil +} + func getInt64FromRequest(r *http.Request, argName string) (int64, error) { s := r.FormValue(argName) n, err := strconv.ParseInt(s, 10, 64) @@ -314,3 +352,16 @@ func getInt64FromRequest(r *http.Request, argName string) (int64, error) { } return n, nil } + +func getTimeNsec(r *http.Request, argName string) (int64, bool, error) { + s := r.FormValue(argName) + if s == "" { + return 0, false, nil + } + currentTimestamp := time.Now().UnixNano() + nsecs, err := timeutil.ParseTimeAt(s, currentTimestamp) + if err != nil { + return 0, false, fmt.Errorf("cannot parse %s=%s: %w", argName, s, err) + } + return nsecs, true, nil +} diff --git a/app/vlselect/logsql/logsql.go b/app/vlselect/logsql/logsql.go index 687fc3072f..69fc1cd6be 100644 --- a/app/vlselect/logsql/logsql.go +++ b/app/vlselect/logsql/logsql.go @@ -2,6 +2,7 @@ package logsql import ( "context" + "encoding/json" "fmt" "io" "math" @@ -972,6 +973,67 @@ func ProcessQueryRequest(ctx context.Context, w http.ResponseWriter, r *http.Req } } +// ProcessAdminTenantsRequest processes /select/admin/tenants request. +func ProcessAdminTenantsRequest(ctx context.Context, w http.ResponseWriter, r *http.Request) { + start, okStart, err := getTimeNsec(r, "start") + if err != nil { + httpserver.Errorf(w, r, "%s", err) + return + } + end, okEnd, err := getTimeNsec(r, "end") + if err != nil { + httpserver.Errorf(w, r, "%s", err) + return + } + if !okStart { + start = math.MinInt64 + } + if !okEnd { + end = math.MaxInt64 + } + + if start > end { + httpserver.Errorf(w, r, "'start' time must be less than 'end' time") + return + } + + sw := &syncWriter{ + w: w, + } + + var bwShards atomicutil.Slice[bufferedWriter] + bwShards.Init = func(shard *bufferedWriter) { + shard.sw = sw + } + defer func() { + shards := bwShards.All() + for _, shard := range shards { + shard.FlushIgnoreErrors() + } + }() + + w.Header().Set("Content-Type", "application/json") + + tenants, err := vlstorage.GetTenantIDs(ctx, start, end) + if err != nil { + httpserver.Errorf(w, r, "cannot obtain tenantIDs: %s", err) + return + } + + var t []logstorage.TenantID + if err := json.Unmarshal(tenants, &t); err != nil { + httpserver.Errorf(w, r, "cannot unmarshal tenantIDs: %s", err) + return + } + + resp := make([]string, 0, len(t)) + for _, tenantID := range t { + resp = append(resp, fmt.Sprintf("%d:%d", tenantID.AccountID, tenantID.ProjectID)) + } + + WriteTenantsResponse(w, resp) +} + type syncWriter struct { mu sync.Mutex w io.Writer diff --git a/app/vlselect/logsql/tenants_response.qtpl b/app/vlselect/logsql/tenants_response.qtpl new file mode 100644 index 0000000000..7e82e0ba78 --- /dev/null +++ b/app/vlselect/logsql/tenants_response.qtpl @@ -0,0 +1,15 @@ +{% stripspace %} + +TenantsResponse generates response for /admin/tenants . +{% func TenantsResponse(tenants []string) %} +{ + "status":"success", + "data":[ + {% for i, tenant := range tenants %} + {%q= tenant %} + {% if i+1 < len(tenants) %},{% endif %} + {% endfor %} + ] +} +{% endfunc %} +{% endstripspace %} diff --git a/app/vlselect/logsql/tenants_response.qtpl.go b/app/vlselect/logsql/tenants_response.qtpl.go new file mode 100644 index 0000000000..12222f821e --- /dev/null +++ b/app/vlselect/logsql/tenants_response.qtpl.go @@ -0,0 +1,67 @@ +// Code generated by qtc from "tenants_response.qtpl". DO NOT EDIT. +// See https://github.com/valyala/quicktemplate for details. + +// TenantsResponse generates response for /admin/tenants . + +//line app/vlselect/logsql/tenants_response.qtpl:4 +package logsql + +//line app/vlselect/logsql/tenants_response.qtpl:4 +import ( + qtio422016 "io" + + qt422016 "github.com/valyala/quicktemplate" +) + +//line app/vlselect/logsql/tenants_response.qtpl:4 +var ( + _ = qtio422016.Copy + _ = qt422016.AcquireByteBuffer +) + +//line app/vlselect/logsql/tenants_response.qtpl:4 +func StreamTenantsResponse(qw422016 *qt422016.Writer, tenants []string) { +//line app/vlselect/logsql/tenants_response.qtpl:4 + qw422016.N().S(`{"status":"success","data":[`) +//line app/vlselect/logsql/tenants_response.qtpl:8 + for i, tenant := range tenants { +//line app/vlselect/logsql/tenants_response.qtpl:9 + qw422016.N().Q(tenant) +//line app/vlselect/logsql/tenants_response.qtpl:10 + if i+1 < len(tenants) { +//line app/vlselect/logsql/tenants_response.qtpl:10 + qw422016.N().S(`,`) +//line app/vlselect/logsql/tenants_response.qtpl:10 + } +//line app/vlselect/logsql/tenants_response.qtpl:11 + } +//line app/vlselect/logsql/tenants_response.qtpl:11 + qw422016.N().S(`]}`) +//line app/vlselect/logsql/tenants_response.qtpl:14 +} + +//line app/vlselect/logsql/tenants_response.qtpl:14 +func WriteTenantsResponse(qq422016 qtio422016.Writer, tenants []string) { +//line app/vlselect/logsql/tenants_response.qtpl:14 + qw422016 := qt422016.AcquireWriter(qq422016) +//line app/vlselect/logsql/tenants_response.qtpl:14 + StreamTenantsResponse(qw422016, tenants) +//line app/vlselect/logsql/tenants_response.qtpl:14 + qt422016.ReleaseWriter(qw422016) +//line app/vlselect/logsql/tenants_response.qtpl:14 +} + +//line app/vlselect/logsql/tenants_response.qtpl:14 +func TenantsResponse(tenants []string) string { +//line app/vlselect/logsql/tenants_response.qtpl:14 + qb422016 := qt422016.AcquireByteBuffer() +//line app/vlselect/logsql/tenants_response.qtpl:14 + WriteTenantsResponse(qb422016, tenants) +//line app/vlselect/logsql/tenants_response.qtpl:14 + qs422016 := string(qb422016.B) +//line app/vlselect/logsql/tenants_response.qtpl:14 + qt422016.ReleaseByteBuffer(qb422016) +//line app/vlselect/logsql/tenants_response.qtpl:14 + return qs422016 +//line app/vlselect/logsql/tenants_response.qtpl:14 +} diff --git a/app/vlselect/main.go b/app/vlselect/main.go index 4670d8bf3e..92601c02aa 100644 --- a/app/vlselect/main.go +++ b/app/vlselect/main.go @@ -268,6 +268,10 @@ func processSelectRequest(ctx context.Context, w http.ResponseWriter, r *http.Re logsql.ProcessStreamsRequest(ctx, w, r) logsqlStreamsDuration.UpdateDuration(startTime) return true + case "/select/admin/tenants": + logsqlAdminTenantsRequests.Inc() + logsql.ProcessAdminTenantsRequest(ctx, w, r) + return true default: return false } @@ -322,4 +326,6 @@ var ( // no need to track duration for tail requests, as they usually take long time logsqlTailRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/logsql/tail"}`) + + logsqlAdminTenantsRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/admin/tenants"}`) ) diff --git a/app/vlstorage/main.go b/app/vlstorage/main.go index 3caf6bb42e..a9033a4de5 100644 --- a/app/vlstorage/main.go +++ b/app/vlstorage/main.go @@ -350,6 +350,14 @@ func GetStreamIDs(ctx context.Context, tenantIDs []logstorage.TenantID, q *logst return netstorageSelect.GetStreamIDs(ctx, tenantIDs, q, limit) } +// GetTenantIDs returns tenantIDs from the storage by the given start and end. +func GetTenantIDs(ctx context.Context, start, end int64) ([]byte, error) { + if localStorage != nil { + return localStorage.GetTenantIDs(ctx, start, end) + } + return netstorageSelect.GetTenantIDs(ctx, start, end) +} + func writeStorageMetrics(w io.Writer, strg *logstorage.Storage) { var ss logstorage.StorageStats strg.UpdateStats(&ss) diff --git a/app/vlstorage/netselect/netselect.go b/app/vlstorage/netselect/netselect.go index 13157eb7d0..986d9649ab 100644 --- a/app/vlstorage/netselect/netselect.go +++ b/app/vlstorage/netselect/netselect.go @@ -2,6 +2,7 @@ package netselect import ( "context" + "encoding/json" "errors" "fmt" "io" @@ -225,6 +226,13 @@ func (sn *storageNode) getStreamIDs(ctx context.Context, tenantIDs []logstorage. return sn.getValuesWithHits(ctx, "/internal/select/stream_ids", args) } +func (sn *storageNode) getTenantIDs(ctx context.Context, start, end int64) ([]byte, error) { + args := url.Values{} + args.Set("start", fmt.Sprintf("%d", start)) + args.Set("end", fmt.Sprintf("%d", end)) + return sn.executeRequestAt(ctx, "/internal/select/tenant_ids", args) +} + func (sn *storageNode) getCommonArgs(version string, tenantIDs []logstorage.TenantID, q *logstorage.Query) url.Values { args := url.Values{} args.Set("version", version) @@ -407,6 +415,59 @@ func (s *Storage) GetStreamIDs(ctx context.Context, tenantIDs []logstorage.Tenan }) } +// GetTenantIDs returns tenantIDs for the given start and end. +func (s *Storage) GetTenantIDs(ctx context.Context, start, end int64) ([]byte, error) { + return s.getTenantIDs(ctx, start, end) +} + +func (s *Storage) getTenantIDs(ctx context.Context, start, end int64) ([]byte, error) { + ctxWithCancel, cancel := context.WithCancel(ctx) + defer cancel() + + results := make([][]byte, len(s.sns)) + errs := make([]error, len(s.sns)) + + var wg sync.WaitGroup + for i := range s.sns { + wg.Add(1) + go func(nodeIdx int) { + defer wg.Done() + + sn := s.sns[nodeIdx] + tenantIDs, err := sn.getTenantIDs(ctxWithCancel, start, end) + results[nodeIdx] = tenantIDs + errs[nodeIdx] = err + if err != nil { + // Cancel the remaining parallel requests + cancel() + } + }(i) + } + wg.Wait() + if err := getFirstNonCancelError(errs); err != nil { + return nil, err + } + + unique := make(map[logstorage.TenantID]struct{}, len(results)) + for i := range results { + var tenats []logstorage.TenantID + if err := json.Unmarshal(results[i], &tenats); err != nil { + return nil, fmt.Errorf("cannot unmarshal tenantIDs from storage node %d: %w", i, err) + } + for _, tenat := range tenats { + unique[tenat] = struct{}{} + } + } + + // Deduplicate tenantIDs + tenantIDs := make([]logstorage.TenantID, 0, len(unique)) + for key := range unique { + tenantIDs = append(tenantIDs, key) + } + + return json.Marshal(tenantIDs) +} + func (s *Storage) getValuesWithHits(ctx context.Context, limit uint64, resetHitsOnLimitExceeded bool, callback func(ctx context.Context, sn *storageNode) ([]logstorage.ValueWithHits, error)) ([]logstorage.ValueWithHits, error) { diff --git a/lib/logstorage/indexdb.go b/lib/logstorage/indexdb.go index dca2984a6a..67f229095c 100644 --- a/lib/logstorage/indexdb.go +++ b/lib/logstorage/indexdb.go @@ -437,6 +437,57 @@ func (is *indexSearch) getStreamIDsForTagRegexp(tenantID TenantID, tagName strin return ids } +func (is *indexSearch) getTenantIDs() []TenantID { + tenants := make(map[string]struct{}) + ts := &is.ts + kb := &is.kb + + tID := TenantID{0, 0} + + kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixStreamID, tID) + ts.Seek(kb.B) + + for ts.NextItem() { + _, prefix, err := unmarshalCommonPrefix(&tID, ts.Item) + if err != nil { + logger.Panicf("FATAL: cannot unmarshal tenantID: %s", err) + } + if prefix != nsPrefixStreamID { + // Reached the end of enteris with the needed prefix. + break + } + tenant := fmt.Sprintf("%d:%d", tID.AccountID, tID.ProjectID) + tenants[tenant] = struct{}{} + // Seek for the next (accountID, projectID) + tID.ProjectID++ + if tID.ProjectID == 0 { + tID.AccountID++ + if tID.AccountID == 0 { + // Reached the end (accountID, projectID) space + break + } + } + + kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixStreamID, tID) + ts.Seek(kb.B) + } + + if err := ts.Error(); err != nil { + logger.Panicf("FATAL: error when performing search: %s", err) + } + + tenantIDs := make([]TenantID, 0) + for tenantID := range tenants { + tid, err := ParseTenantID(tenantID) + if err != nil { + logger.Panicf("FATAL: cannot parse tenantID %q: %s", tenantID, err) + } + tenantIDs = append(tenantIDs, tid) + } + + return tenantIDs +} + func (idb *indexdb) mustRegisterStream(streamID *streamID, streamTagsCanonical string) { st := GetStreamTags() mustUnmarshalStreamTags(st, streamTagsCanonical) @@ -542,6 +593,13 @@ func (idb *indexdb) storeStreamIDsToCache(tenantIDs []TenantID, sf *StreamFilter bbPool.Put(bb) } +func (idb *indexdb) searchTenants() []TenantID { + is := idb.getIndexSearch() + defer idb.putIndexSearch(is) + + return is.getTenantIDs() +} + type batchItems struct { buf []byte diff --git a/lib/logstorage/indexdb_test.go b/lib/logstorage/indexdb_test.go index 10e622c58a..eb67d7abdc 100644 --- a/lib/logstorage/indexdb_test.go +++ b/lib/logstorage/indexdb_test.go @@ -3,6 +3,7 @@ package logstorage import ( "fmt" "reflect" + "sort" "testing" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" @@ -112,7 +113,7 @@ func TestStorageSearchStreamIDs(t *testing.T) { // non-existing-tag-re f(`{job="job-0",instance="instance-0",non_existing_tag=~"foo.+"}`, nil) - //non-existing-non-empty-tag-re + // non-existing-non-empty-tag-re f(`{job="job-0",instance="instance-0",non_existing_tag!~""}`, nil) // match-job-instance @@ -252,3 +253,82 @@ func TestStorageSearchStreamIDs(t *testing.T) { closeTestStorage(s) } + +func TestGetTenantsIds(t *testing.T) { + t.Parallel() + + path := t.Name() + const partitionName = "foobar" + s := newTestStorage() + mustCreateIndexdb(path) + idb := mustOpenIndexdb(path, partitionName, s) + + tenantIDs := []TenantID{ + {AccountID: 0, ProjectID: 0}, + {AccountID: 0, ProjectID: 1}, + {AccountID: 1, ProjectID: 0}, + {AccountID: 1, ProjectID: 1}, + {AccountID: 123, ProjectID: 567}, + } + getStreamIDForTags := func(tags map[string]string) ([]streamID, string) { + st := GetStreamTags() + for k, v := range tags { + st.Add(k, v) + } + streamTagsCanonical := st.MarshalCanonical(nil) + PutStreamTags(st) + id := hash128(streamTagsCanonical) + sids := make([]streamID, 0, len(tenantIDs)) + for _, tenantID := range tenantIDs { + sid := streamID{ + tenantID: tenantID, + id: id, + } + + sids = append(sids, sid) + } + + return sids, string(streamTagsCanonical) + } + + // Create indexdb entries + const jobsCount = 7 + const instancesCount = 5 + for i := 0; i < jobsCount; i++ { + for j := 0; j < instancesCount; j++ { + sids, streamTagsCanonical := getStreamIDForTags(map[string]string{ + "job": fmt.Sprintf("job-%d", i), + "instance": fmt.Sprintf("instance-%d", j), + }) + for _, sid := range sids { + idb.mustRegisterStream(&sid, streamTagsCanonical) + } + + } + } + idb.debugFlush() + + f := func(expectedTenantIDs []TenantID) { + t.Helper() + tenantIDs := idb.searchTenants() + sort.Slice(tenantIDs, func(i, j int) bool { + return tenantIDs[i].less(&tenantIDs[j]) + }) + sort.Slice(expectedTenantIDs, func(i, j int) bool { + return expectedTenantIDs[i].less(&expectedTenantIDs[j]) + }) + if !reflect.DeepEqual(tenantIDs, expectedTenantIDs) { + fs.MustRemoveAll(path) + t.Fatalf("unexpected tensntIds; got %v; want %v", tenantIDs, expectedTenantIDs) + } + } + + expectedTenantIDs := tenantIDs + + f(expectedTenantIDs) + + mustCloseIndexdb(idb) + fs.MustRemoveAll(path) + + closeTestStorage(s) +} diff --git a/lib/logstorage/storage_search.go b/lib/logstorage/storage_search.go index d72ee835e7..0c024f0af7 100644 --- a/lib/logstorage/storage_search.go +++ b/lib/logstorage/storage_search.go @@ -2,6 +2,7 @@ package logstorage import ( "context" + "encoding/json" "fmt" "math" "slices" @@ -489,6 +490,87 @@ func (s *Storage) GetStreamIDs(ctx context.Context, tenantIDs []TenantID, q *Que return s.GetFieldValues(ctx, tenantIDs, q, "_stream_id", limit) } +// GetTenantIDs returns tenantIDs for the given start and end. +func (s *Storage) GetTenantIDs(ctx context.Context, start, end int64) ([]byte, error) { + return s.getTenantIDs(ctx, start, end) +} + +func (s *Storage) getTenantIDs(ctx context.Context, start, end int64) ([]byte, error) { + workersCount := cgroup.AvailableCPUs() + stopCh := ctx.Done() + + tenantIDs := make([]TenantID, workersCount) + processPartitions := func(pt *partition, workerID uint) { + tenants := pt.idb.searchTenants() + tenantIDs = append(tenantIDs, tenants...) + } + + // Spin up workers + var wgWorkers sync.WaitGroup + workCh := make(chan *partition, workersCount) + wgWorkers.Add(workersCount) + for i := 0; i < workersCount; i++ { + go func(workerID uint) { + for pt := range workCh { + if needStop(stopCh) { + // The search has been canceled. Just skip all the scheduled work in order to save CPU time. + continue + } + processPartitions(pt, workerID) + } + wgWorkers.Done() + }(uint(i)) + } + + // Select partitions according to the selected time range + s.partitionsLock.Lock() + ptws := s.partitions + minDay := start / nsecsPerDay + n := sort.Search(len(ptws), func(i int) bool { + return ptws[i].day >= minDay + }) + ptws = ptws[n:] + maxDay := end / nsecsPerDay + n = sort.Search(len(ptws), func(i int) bool { + return ptws[i].day > maxDay + }) + ptws = ptws[:n] + + // Copy the selected partitions, so they don't interfere with s.partitions. + ptws = append([]*partitionWrapper{}, ptws...) + + for _, ptw := range ptws { + ptw.incRef() + } + s.partitionsLock.Unlock() + + // Schedule concurrent search across matching partitions. + for _, ptw := range ptws { + workCh <- ptw.pt + } + + // Wait until workers finish their work + close(workCh) + wgWorkers.Wait() + + // Decrement references to partitions + for _, ptw := range ptws { + ptw.decRef() + } + + unique := make(map[TenantID]struct{}, len(tenantIDs)) + for _, tid := range tenantIDs { + unique[tid] = struct{}{} + } + + tenantIDs = make([]TenantID, 0, len(unique)) + for tid := range unique { + tenantIDs = append(tenantIDs, tid) + } + + return json.Marshal(tenantIDs) +} + func (s *Storage) runValuesWithHitsQuery(ctx context.Context, tenantIDs []TenantID, q *Query) ([]ValueWithHits, error) { var results []ValueWithHits var resultsLock sync.Mutex