Skip to content

Commit 2b3b6d4

Browse files
Merge pull request #416 from VictoriaMetrics/get-vlogs-tenants
implement tenants endpoint for victorialogs
2 parents 8b2cdbd + a85c43b commit 2b3b6d4

File tree

10 files changed

+491
-1
lines changed

10 files changed

+491
-1
lines changed

app/vlselect/internalselect/internalselect.go

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package internalselect
33
import (
44
"context"
55
"fmt"
6+
"math"
67
"net/http"
78
"strconv"
89
"sync"
@@ -14,6 +15,7 @@ import (
1415
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding/zstd"
1516
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
1617
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
18+
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timeutil"
1719
"github.com/VictoriaMetrics/metrics"
1820

1921
"github.com/VictoriaMetrics/VictoriaLogs/app/vlstorage"
@@ -49,6 +51,7 @@ var requestHandlers = map[string]func(ctx context.Context, w http.ResponseWriter
4951
"/internal/select/stream_field_values": processStreamFieldValuesRequest,
5052
"/internal/select/streams": processStreamsRequest,
5153
"/internal/select/stream_ids": processStreamIDsRequest,
54+
"/internal/select/tenant_ids": processTenantIDsRequest,
5255
}
5356

5457
func processQueryRequest(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
@@ -242,6 +245,30 @@ func processStreamIDsRequest(ctx context.Context, w http.ResponseWriter, r *http
242245
return writeValuesWithHits(w, streamIDs, cp.DisableCompression)
243246
}
244247

248+
func processTenantIDsRequest(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
249+
start, okStart, err := getTimeNsec(r, "start")
250+
if err != nil {
251+
return fmt.Errorf("cannot parse start timestamp: %w", err)
252+
}
253+
end, okEnd, err := getTimeNsec(r, "end")
254+
if err != nil {
255+
return fmt.Errorf("cannot parse end timestamp: %w", err)
256+
}
257+
if !okStart {
258+
start = math.MinInt64
259+
}
260+
if !okEnd {
261+
end = math.MaxInt64
262+
}
263+
264+
tenantIDs, err := vlstorage.GetTenantIDs(ctx, start, end)
265+
if err != nil {
266+
return fmt.Errorf("cannot obtain tenant IDs: %w", err)
267+
}
268+
269+
return writeTenantIDs(w, tenantIDs, false)
270+
}
271+
245272
type commonParams struct {
246273
TenantIDs []logstorage.TenantID
247274
Query *logstorage.Query
@@ -306,6 +333,17 @@ func writeValuesWithHits(w http.ResponseWriter, vhs []logstorage.ValueWithHits,
306333
return nil
307334
}
308335

336+
func writeTenantIDs(w http.ResponseWriter, tenantIDs []byte, disableCompression bool) error {
337+
if !disableCompression {
338+
tenantIDs = zstd.CompressLevel(nil, tenantIDs, 1)
339+
}
340+
w.Header().Set("Content-Type", "application/json")
341+
if _, err := w.Write(tenantIDs); err != nil {
342+
return fmt.Errorf("cannot send response to the client: %w", err)
343+
}
344+
return nil
345+
}
346+
309347
func getInt64FromRequest(r *http.Request, argName string) (int64, error) {
310348
s := r.FormValue(argName)
311349
n, err := strconv.ParseInt(s, 10, 64)
@@ -314,3 +352,16 @@ func getInt64FromRequest(r *http.Request, argName string) (int64, error) {
314352
}
315353
return n, nil
316354
}
355+
356+
func getTimeNsec(r *http.Request, argName string) (int64, bool, error) {
357+
s := r.FormValue(argName)
358+
if s == "" {
359+
return 0, false, nil
360+
}
361+
currentTimestamp := time.Now().UnixNano()
362+
nsecs, err := timeutil.ParseTimeAt(s, currentTimestamp)
363+
if err != nil {
364+
return 0, false, fmt.Errorf("cannot parse %s=%s: %w", argName, s, err)
365+
}
366+
return nsecs, true, nil
367+
}

app/vlselect/logsql/logsql.go

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package logsql
22

33
import (
44
"context"
5+
"encoding/json"
56
"fmt"
67
"io"
78
"math"
@@ -972,6 +973,67 @@ func ProcessQueryRequest(ctx context.Context, w http.ResponseWriter, r *http.Req
972973
}
973974
}
974975

976+
// ProcessAdminTenantsRequest processes /select/admin/tenants request.
977+
func ProcessAdminTenantsRequest(ctx context.Context, w http.ResponseWriter, r *http.Request) {
978+
start, okStart, err := getTimeNsec(r, "start")
979+
if err != nil {
980+
httpserver.Errorf(w, r, "%s", err)
981+
return
982+
}
983+
end, okEnd, err := getTimeNsec(r, "end")
984+
if err != nil {
985+
httpserver.Errorf(w, r, "%s", err)
986+
return
987+
}
988+
if !okStart {
989+
start = math.MinInt64
990+
}
991+
if !okEnd {
992+
end = math.MaxInt64
993+
}
994+
995+
if start > end {
996+
httpserver.Errorf(w, r, "'start' time must be less than 'end' time")
997+
return
998+
}
999+
1000+
sw := &syncWriter{
1001+
w: w,
1002+
}
1003+
1004+
var bwShards atomicutil.Slice[bufferedWriter]
1005+
bwShards.Init = func(shard *bufferedWriter) {
1006+
shard.sw = sw
1007+
}
1008+
defer func() {
1009+
shards := bwShards.All()
1010+
for _, shard := range shards {
1011+
shard.FlushIgnoreErrors()
1012+
}
1013+
}()
1014+
1015+
w.Header().Set("Content-Type", "application/json")
1016+
1017+
tenants, err := vlstorage.GetTenantIDs(ctx, start, end)
1018+
if err != nil {
1019+
httpserver.Errorf(w, r, "cannot obtain tenantIDs: %s", err)
1020+
return
1021+
}
1022+
1023+
var t []logstorage.TenantID
1024+
if err := json.Unmarshal(tenants, &t); err != nil {
1025+
httpserver.Errorf(w, r, "cannot unmarshal tenantIDs: %s", err)
1026+
return
1027+
}
1028+
1029+
resp := make([]string, 0, len(t))
1030+
for _, tenantID := range t {
1031+
resp = append(resp, fmt.Sprintf("%d:%d", tenantID.AccountID, tenantID.ProjectID))
1032+
}
1033+
1034+
WriteTenantsResponse(w, resp)
1035+
}
1036+
9751037
type syncWriter struct {
9761038
mu sync.Mutex
9771039
w io.Writer
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
{% stripspace %}
2+
3+
TenantsResponse generates response for /admin/tenants .
4+
{% func TenantsResponse(tenants []string) %}
5+
{
6+
"status":"success",
7+
"data":[
8+
{% for i, tenant := range tenants %}
9+
{%q= tenant %}
10+
{% if i+1 < len(tenants) %},{% endif %}
11+
{% endfor %}
12+
]
13+
}
14+
{% endfunc %}
15+
{% endstripspace %}

app/vlselect/logsql/tenants_response.qtpl.go

Lines changed: 67 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

app/vlselect/main.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -268,6 +268,10 @@ func processSelectRequest(ctx context.Context, w http.ResponseWriter, r *http.Re
268268
logsql.ProcessStreamsRequest(ctx, w, r)
269269
logsqlStreamsDuration.UpdateDuration(startTime)
270270
return true
271+
case "/select/admin/tenants":
272+
logsqlAdminTenantsRequests.Inc()
273+
logsql.ProcessAdminTenantsRequest(ctx, w, r)
274+
return true
271275
default:
272276
return false
273277
}
@@ -322,4 +326,6 @@ var (
322326

323327
// no need to track duration for tail requests, as they usually take long time
324328
logsqlTailRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/logsql/tail"}`)
329+
330+
logsqlAdminTenantsRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/admin/tenants"}`)
325331
)

app/vlstorage/main.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -350,6 +350,14 @@ func GetStreamIDs(ctx context.Context, tenantIDs []logstorage.TenantID, q *logst
350350
return netstorageSelect.GetStreamIDs(ctx, tenantIDs, q, limit)
351351
}
352352

353+
// GetTenantIDs returns tenantIDs from the storage by the given start and end.
354+
func GetTenantIDs(ctx context.Context, start, end int64) ([]byte, error) {
355+
if localStorage != nil {
356+
return localStorage.GetTenantIDs(ctx, start, end)
357+
}
358+
return netstorageSelect.GetTenantIDs(ctx, start, end)
359+
}
360+
353361
func writeStorageMetrics(w io.Writer, strg *logstorage.Storage) {
354362
var ss logstorage.StorageStats
355363
strg.UpdateStats(&ss)

app/vlstorage/netselect/netselect.go

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package netselect
22

33
import (
44
"context"
5+
"encoding/json"
56
"errors"
67
"fmt"
78
"io"
@@ -225,6 +226,13 @@ func (sn *storageNode) getStreamIDs(ctx context.Context, tenantIDs []logstorage.
225226
return sn.getValuesWithHits(ctx, "/internal/select/stream_ids", args)
226227
}
227228

229+
func (sn *storageNode) getTenantIDs(ctx context.Context, start, end int64) ([]byte, error) {
230+
args := url.Values{}
231+
args.Set("start", fmt.Sprintf("%d", start))
232+
args.Set("end", fmt.Sprintf("%d", end))
233+
return sn.executeRequestAt(ctx, "/internal/select/tenant_ids", args)
234+
}
235+
228236
func (sn *storageNode) getCommonArgs(version string, tenantIDs []logstorage.TenantID, q *logstorage.Query) url.Values {
229237
args := url.Values{}
230238
args.Set("version", version)
@@ -407,6 +415,59 @@ func (s *Storage) GetStreamIDs(ctx context.Context, tenantIDs []logstorage.Tenan
407415
})
408416
}
409417

418+
// GetTenantIDs returns tenantIDs for the given start and end.
419+
func (s *Storage) GetTenantIDs(ctx context.Context, start, end int64) ([]byte, error) {
420+
return s.getTenantIDs(ctx, start, end)
421+
}
422+
423+
func (s *Storage) getTenantIDs(ctx context.Context, start, end int64) ([]byte, error) {
424+
ctxWithCancel, cancel := context.WithCancel(ctx)
425+
defer cancel()
426+
427+
results := make([][]byte, len(s.sns))
428+
errs := make([]error, len(s.sns))
429+
430+
var wg sync.WaitGroup
431+
for i := range s.sns {
432+
wg.Add(1)
433+
go func(nodeIdx int) {
434+
defer wg.Done()
435+
436+
sn := s.sns[nodeIdx]
437+
tenantIDs, err := sn.getTenantIDs(ctxWithCancel, start, end)
438+
results[nodeIdx] = tenantIDs
439+
errs[nodeIdx] = err
440+
if err != nil {
441+
// Cancel the remaining parallel requests
442+
cancel()
443+
}
444+
}(i)
445+
}
446+
wg.Wait()
447+
if err := getFirstNonCancelError(errs); err != nil {
448+
return nil, err
449+
}
450+
451+
unique := make(map[logstorage.TenantID]struct{}, len(results))
452+
for i := range results {
453+
var tenats []logstorage.TenantID
454+
if err := json.Unmarshal(results[i], &tenats); err != nil {
455+
return nil, fmt.Errorf("cannot unmarshal tenantIDs from storage node %d: %w", i, err)
456+
}
457+
for _, tenat := range tenats {
458+
unique[tenat] = struct{}{}
459+
}
460+
}
461+
462+
// Deduplicate tenantIDs
463+
tenantIDs := make([]logstorage.TenantID, 0, len(unique))
464+
for key := range unique {
465+
tenantIDs = append(tenantIDs, key)
466+
}
467+
468+
return json.Marshal(tenantIDs)
469+
}
470+
410471
func (s *Storage) getValuesWithHits(ctx context.Context, limit uint64, resetHitsOnLimitExceeded bool,
411472
callback func(ctx context.Context, sn *storageNode) ([]logstorage.ValueWithHits, error)) ([]logstorage.ValueWithHits, error) {
412473

0 commit comments

Comments
 (0)