diff --git a/app/vlselect/internalselect/internalselect.go b/app/vlselect/internalselect/internalselect.go index c26a3f2c94..b636d57de1 100644 --- a/app/vlselect/internalselect/internalselect.go +++ b/app/vlselect/internalselect/internalselect.go @@ -83,9 +83,11 @@ var requestHandlers = map[string]func(ctx context.Context, w http.ResponseWriter "/internal/select/stream_field_values": processStreamFieldValuesRequest, "/internal/select/streams": processStreamsRequest, "/internal/select/stream_ids": processStreamIDsRequest, - "/internal/delete/run_task": processDeleteRunTask, - "/internal/delete/stop_task": processDeleteStopTask, - "/internal/delete/active_tasks": processDeleteActiveTasks, + "/internal/select/tenant_ids": processTenantIDsRequest, + + "/internal/delete/run_task": processDeleteRunTask, + "/internal/delete/stop_task": processDeleteStopTask, + "/internal/delete/active_tasks": processDeleteActiveTasks, } func processQueryRequest(ctx context.Context, w http.ResponseWriter, r *http.Request) error { @@ -376,6 +378,35 @@ func processDeleteActiveTasks(ctx context.Context, w http.ResponseWriter, r *htt return nil } +func processTenantIDsRequest(ctx context.Context, w http.ResponseWriter, r *http.Request) error { + start, err := getInt64FromRequest(r, "start") + if err != nil { + return err + } + end, err := getInt64FromRequest(r, "end") + if err != nil { + return err + } + + tenantIDs, err := vlstorage.GetTenantIDs(ctx, start, end) + if err != nil { + return fmt.Errorf("cannot obtain tenant IDs: %w", err) + } + + // Marshal tenantIDs at first + data, err := json.Marshal(tenantIDs) + if err != nil { + return fmt.Errorf("cannot marshal tenantIDs: %w", err) + } + + // Send the marshaled tenantIDs to the client + w.Header().Set("Content-Type", "application/json") + if _, err := w.Write(data); err != nil { + return fmt.Errorf("cannot send response to the client: %w", err) + } + return nil +} + type commonParams struct { TenantIDs []logstorage.TenantID Query *logstorage.Query diff --git a/app/vlselect/logsql/logsql.go b/app/vlselect/logsql/logsql.go index 92fe00bdd5..c065ece954 100644 --- a/app/vlselect/logsql/logsql.go +++ b/app/vlselect/logsql/logsql.go @@ -1128,6 +1128,65 @@ func ProcessQueryRequest(ctx context.Context, w http.ResponseWriter, r *http.Req writeResponseHeadersOnce() } +// ProcessTenantIDsRequest processes /select/tenant_ids request. +func ProcessTenantIDsRequest(ctx context.Context, w http.ResponseWriter, r *http.Request) { + accountID := r.Header.Get("AccountID") + if accountID != "" { + // Security measure - prevent from requesting tenant_ids for requests with the already specified tenant. + // This allows enforcing the needed tenants at vmauth side, so they won't have access to /select/tenant_ids endpoint. + // See https://docs.victoriametrics.com/victoriametrics/vmauth/#modifying-http-headers + httpserver.Errorf(w, r, "The /select/tenant_ids endpoint cannot be requested with non-empty AccountID=%q header", accountID) + return + } + + start, okStart, err := getTimeNsec(r, "start") + if err != nil { + httpserver.Errorf(w, r, "%s", err) + return + } + end, okEnd, err := getTimeNsec(r, "end") + if err != nil { + httpserver.Errorf(w, r, "%s", err) + return + } + if !okStart { + start = math.MinInt64 + } + if !okEnd { + end = math.MaxInt64 + } else { + // Treat HTTP 'end' query arg as exclusive: [start, end) + // Convert to inclusive bound for internal filter by subtracting 1ns. + if end != math.MinInt64 { + end-- + } + } + + if start > end { + httpserver.Errorf(w, r, "'start=%d' must be smaller than 'end=%d'", start, end) + return + } + + tenants, err := vlstorage.GetTenantIDs(ctx, start, end) + if err != nil { + httpserver.Errorf(w, r, "cannot obtain tenantIDs: %s", err) + return + } + + data, err := json.Marshal(tenants) + if err != nil { + httpserver.Errorf(w, r, "cannot marshal tenantIDs to JSON: %s", err) + return + } + + w.Header().Set("Content-Type", "application/json") + + if _, err := w.Write(data); err != nil { + httpserver.Errorf(w, r, "cannot send response to the client: %s", err) + return + } +} + type syncWriter struct { mu sync.Mutex w io.Writer diff --git a/app/vlselect/main.go b/app/vlselect/main.go index 17a7862593..63680bc50e 100644 --- a/app/vlselect/main.go +++ b/app/vlselect/main.go @@ -321,6 +321,11 @@ func processSelectRequest(ctx context.Context, w http.ResponseWriter, r *http.Re logsql.ProcessStreamsRequest(ctx, w, r) logsqlStreamsDuration.UpdateDuration(startTime) return true + case "/select/tenant_ids": + tenantIDsRequests.Inc() + logsql.ProcessTenantIDsRequest(ctx, w, r) + tenantIDsDuration.UpdateDuration(startTime) + return true default: return false } @@ -448,6 +453,9 @@ var ( logsqlStreamsRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/logsql/streams"}`) logsqlStreamsDuration = metrics.NewSummary(`vl_http_request_duration_seconds{path="/select/logsql/streams"}`) + tenantIDsRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/tenant_ids"}`) + tenantIDsDuration = metrics.NewSummary(`vl_http_request_duration_seconds{path="/select/tenant_ids"}`) + // no need to track duration for tail requests, as they usually take long time logsqlTailRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/logsql/tail"}`) diff --git a/app/vlstorage/main.go b/app/vlstorage/main.go index 54ce9f98bc..ee254ef6c9 100644 --- a/app/vlstorage/main.go +++ b/app/vlstorage/main.go @@ -568,6 +568,14 @@ func DeleteActiveTasks(ctx context.Context) ([]*logstorage.DeleteTask, error) { return netstorageSelect.DeleteActiveTasks(ctx) } +// GetTenantIDs returns tenantIDs from the storage by the given start and end. +func GetTenantIDs(ctx context.Context, start, end int64) ([]logstorage.TenantID, error) { + if localStorage != nil { + return localStorage.GetTenantIDs(ctx, start, end) + } + return netstorageSelect.GetTenantIDs(ctx, start, end) +} + func writeStorageMetrics(w io.Writer, strg *logstorage.Storage) { var ss logstorage.StorageStats strg.UpdateStats(&ss) diff --git a/app/vlstorage/netselect/netselect.go b/app/vlstorage/netselect/netselect.go index 8568f74d44..78cd590c0e 100644 --- a/app/vlstorage/netselect/netselect.go +++ b/app/vlstorage/netselect/netselect.go @@ -246,6 +246,23 @@ func (sn *storageNode) getStreamIDs(qctx *logstorage.QueryContext, limit uint64) return sn.getValuesWithHits(qctx, "/internal/select/stream_ids", args) } +func (sn *storageNode) getTenantIDs(ctx context.Context, start, end int64) ([]logstorage.TenantID, error) { + args := url.Values{} + args.Set("start", fmt.Sprintf("%d", start)) + args.Set("end", fmt.Sprintf("%d", end)) + + path := "/internal/select/tenant_ids" + data, reqURL, err := sn.getPlainResponseBodyForPathAndArgs(ctx, path, args) + if err != nil { + return nil, err + } + var tenantIDs []logstorage.TenantID + if err := json.Unmarshal(data, &tenantIDs); err != nil { + return nil, fmt.Errorf("cannot unmarshal tenantIDs received from %q; data=%q: %w", reqURL, data, err) + } + return tenantIDs, nil +} + func (sn *storageNode) getCommonArgs(version string, qctx *logstorage.QueryContext) url.Values { // ATTENTION: the *ProtocolVersion consts must be incremented every time the set of common args changes or its format changes. @@ -571,6 +588,61 @@ func (s *Storage) DeleteActiveTasks(ctx context.Context) ([]*logstorage.DeleteTa return tasks, nil } +// GetTenantIDs returns tenantIDs for the given start and end. +func (s *Storage) GetTenantIDs(ctx context.Context, start, end int64) ([]logstorage.TenantID, error) { + return s.getTenantIDs(ctx, start, end) +} + +func (s *Storage) getTenantIDs(ctx context.Context, start, end int64) ([]logstorage.TenantID, error) { + ctxWithCancel, cancel := context.WithCancel(ctx) + defer cancel() + + results := make([][]logstorage.TenantID, len(s.sns)) + errs := make([]error, len(s.sns)) + + // Return an error to the caller when at least a single storage node is unavailable, + // since this may result in incomplete list of the returned tenantIDs, which may mislead the caller. + allowPartialResponse := false + + var wg sync.WaitGroup + for i := range s.sns { + wg.Add(1) + go func(nodeIdx int) { + defer wg.Done() + + sn := s.sns[nodeIdx] + tenantIDs, err := sn.getTenantIDs(ctxWithCancel, start, end) + results[nodeIdx] = tenantIDs + errs[nodeIdx] = sn.handleError(ctxWithCancel, cancel, err, allowPartialResponse) + + if err != nil { + // Cancel the remaining parallel requests + cancel() + } + }(i) + } + wg.Wait() + + if err := getFirstError(errs, allowPartialResponse); err != nil { + return nil, err + } + + // Deduplicate tenantIDs + m := make(map[logstorage.TenantID]struct{}) + for _, tenantIDs := range results { + for _, tenantID := range tenantIDs { + m[tenantID] = struct{}{} + } + } + + tenantIDs := make([]logstorage.TenantID, 0, len(m)) + for tenantID := range m { + tenantIDs = append(tenantIDs, tenantID) + } + + return tenantIDs, nil +} + func (s *Storage) getValuesWithHits(qctx *logstorage.QueryContext, limit uint64, resetHitsOnLimitExceeded bool, callback func(ctx context.Context, sn *storageNode) ([]logstorage.ValueWithHits, error)) ([]logstorage.ValueWithHits, error) { diff --git a/docs/victorialogs/CHANGELOG.md b/docs/victorialogs/CHANGELOG.md index 03792f6822..c4fe6c938d 100644 --- a/docs/victorialogs/CHANGELOG.md +++ b/docs/victorialogs/CHANGELOG.md @@ -23,6 +23,7 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta * FEATURE: add an ability to delete stored logs. See [these docs](https://docs.victoriametrics.com/victorialogs/#how-to-delete-logs) and [#43](https://github.com/VictoriaMetrics/VictoriaLogs/issues/43). Thanks to @func25 for the initial idea and implementation at [#4](https://github.com/VictoriaMetrics/VictoriaLogs/pull/4). * FEATURE: add an ability to hide the given [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) on a per-query basis. This may be useful for restricting access to log fields with sensitive information, by specifying the list of the fields to hide via `hidden_fields_filters` query arg. See [these docs](https://docs.victoriametrics.com/victorialogs/querying/#hidden-fields) for details. See [#668](https://github.com/VictoriaMetrics/VictoriaLogs/issues/668). * FEATURE: [querying](https://docs.victoriametrics.com/victorialogs/querying/): add slow query logging via `-search.logSlowQueryDuration`; increments `vl_slow_queries_total`. See [#750](https://github.com/VictoriaMetrics/VictoriaLogs/issues/750). +* FEATURE: [querying API](https://docs.victoriametrics.com/victorialogs/querying/#http-api): add support for `/select/tenant_ids` endpoint, which returns [tenant IDs](https://docs.victoriametrics.com/victorialogs/#multitenancy) for the existing logs on the given `[start .. end)` time range. See [these docs](https://docs.victoriametrics.com/victorialogs/querying/#querying-tenants) and [#158 at victorialogs-datasource](https://github.com/VictoriaMetrics/victorialogs-datasource/issues/158). * FEATURE: [web UI](https://docs.victoriametrics.com/victorialogs/querying/#web-ui): simplify "Download logs" dialog - show only file name field and description with time range and tenant. See [#789](https://github.com/VictoriaMetrics/VictoriaLogs/issues/789). * FEATURE: [web UI](https://docs.victoriametrics.com/victorialogs/querying/#web-ui): update text for skipping warning modal and increase large load limit to `1000`. See [#808](https://github.com/VictoriaMetrics/VictoriaLogs/issues/808). diff --git a/docs/victorialogs/querying/README.md b/docs/victorialogs/querying/README.md index 53a9bd88b2..2ad8672467 100644 --- a/docs/victorialogs/querying/README.md +++ b/docs/victorialogs/querying/README.md @@ -31,6 +31,7 @@ VictoriaLogs provides the following HTTP endpoints: - [`/select/logsql/stream_field_values`](https://docs.victoriametrics.com/victorialogs/querying/#querying-stream-field-values) for querying [log stream](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) field values. - [`/select/logsql/field_names`](https://docs.victoriametrics.com/victorialogs/querying/#querying-field-names) for querying [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) names. - [`/select/logsql/field_values`](https://docs.victoriametrics.com/victorialogs/querying/#querying-field-values) for querying [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) values. +- [`/select/tenant_ids`](https://docs.victoriametrics.com/victorialogs/querying/#querying-tenants) for querying [tenants](https://docs.victoriametrics.com/victorialogs/#multitenancy) across the stored data. See also: @@ -844,6 +845,26 @@ See also: - [Querying streams](https://docs.victoriametrics.com/victorialogs/querying/#querying-streams) - [HTTP API](https://docs.victoriametrics.com/victorialogs/querying/#http-api) +### Querying tenants + +VictoriaLogs provides `/select/tenant_ids?start=&end=` endpoint, which returns [tenant ids](https://docs.victoriametrics.com/victorialogs/#multitenancy) +for the ingested logs on the given `[start ... end)` time range. + +This endpoint must be called with empty `AccountID` request header for security reasons - this prevents from unauthorized calls for this endpoint +from clients who have access to the specified `AccountID` tenant. This can be enforced at `vmauth` side +according to [these docs](https://docs.victoriametrics.com/victoriametrics/vmauth/#modifying-http-headers). + +Example response returned by the `/select/tenant_ids` endpoint: + +```sh +[ + { + "account_id": 0, + "project_id": 0 + } +] +``` + ### Querying field names VictoriaLogs provides `/select/logsql/field_names?query=&start=&end=` HTTP endpoint, which returns field names