Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 34 additions & 3 deletions app/vlselect/internalselect/internalselect.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,11 @@ var requestHandlers = map[string]func(ctx context.Context, w http.ResponseWriter
"/internal/select/stream_field_values": processStreamFieldValuesRequest,
"/internal/select/streams": processStreamsRequest,
"/internal/select/stream_ids": processStreamIDsRequest,
"/internal/delete/run_task": processDeleteRunTask,
"/internal/delete/stop_task": processDeleteStopTask,
"/internal/delete/active_tasks": processDeleteActiveTasks,
"/internal/select/tenant_ids": processTenantIDsRequest,

"/internal/delete/run_task": processDeleteRunTask,
"/internal/delete/stop_task": processDeleteStopTask,
"/internal/delete/active_tasks": processDeleteActiveTasks,
}

func processQueryRequest(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
Expand Down Expand Up @@ -376,6 +378,35 @@ func processDeleteActiveTasks(ctx context.Context, w http.ResponseWriter, r *htt
return nil
}

func processTenantIDsRequest(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
start, err := getInt64FromRequest(r, "start")
if err != nil {
return err
}
end, err := getInt64FromRequest(r, "end")
if err != nil {
return err
}

tenantIDs, err := vlstorage.GetTenantIDs(ctx, start, end)
if err != nil {
return fmt.Errorf("cannot obtain tenant IDs: %w", err)
}

// Marshal tenantIDs at first
data, err := json.Marshal(tenantIDs)
if err != nil {
return fmt.Errorf("cannot marshal tenantIDs: %w", err)
}

// Send the marshaled tenantIDs to the client
w.Header().Set("Content-Type", "application/json")
if _, err := w.Write(data); err != nil {
return fmt.Errorf("cannot send response to the client: %w", err)
}
return nil
}

type commonParams struct {
TenantIDs []logstorage.TenantID
Query *logstorage.Query
Expand Down
59 changes: 59 additions & 0 deletions app/vlselect/logsql/logsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -1128,6 +1128,65 @@ func ProcessQueryRequest(ctx context.Context, w http.ResponseWriter, r *http.Req
writeResponseHeadersOnce()
}

// ProcessTenantIDsRequest processes /select/tenant_ids request.
func ProcessTenantIDsRequest(ctx context.Context, w http.ResponseWriter, r *http.Request) {
accountID := r.Header.Get("AccountID")
if accountID != "" {
// Security measure - prevent from requesting tenant_ids for requests with the already specified tenant.
// This allows enforcing the needed tenants at vmauth side, so they won't have access to /select/tenant_ids endpoint.
// See https://docs.victoriametrics.com/victoriametrics/vmauth/#modifying-http-headers
httpserver.Errorf(w, r, "The /select/tenant_ids endpoint cannot be requested with non-empty AccountID=%q header", accountID)
return
}

start, okStart, err := getTimeNsec(r, "start")
if err != nil {
httpserver.Errorf(w, r, "%s", err)
return
}
end, okEnd, err := getTimeNsec(r, "end")
if err != nil {
httpserver.Errorf(w, r, "%s", err)
return
}
if !okStart {
start = math.MinInt64
}
if !okEnd {
end = math.MaxInt64
} else {
// Treat HTTP 'end' query arg as exclusive: [start, end)
// Convert to inclusive bound for internal filter by subtracting 1ns.
if end != math.MinInt64 {
end--
}
}

if start > end {
httpserver.Errorf(w, r, "'start=%d' must be smaller than 'end=%d'", start, end)
return
}

tenants, err := vlstorage.GetTenantIDs(ctx, start, end)
if err != nil {
httpserver.Errorf(w, r, "cannot obtain tenantIDs: %s", err)
return
}

data, err := json.Marshal(tenants)
if err != nil {
httpserver.Errorf(w, r, "cannot marshal tenantIDs to JSON: %s", err)
return
}

w.Header().Set("Content-Type", "application/json")

if _, err := w.Write(data); err != nil {
httpserver.Errorf(w, r, "cannot send response to the client: %s", err)
return
}
}

type syncWriter struct {
mu sync.Mutex
w io.Writer
Expand Down
8 changes: 8 additions & 0 deletions app/vlselect/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,11 @@ func processSelectRequest(ctx context.Context, w http.ResponseWriter, r *http.Re
logsql.ProcessStreamsRequest(ctx, w, r)
logsqlStreamsDuration.UpdateDuration(startTime)
return true
case "/select/tenant_ids":
tenantIDsRequests.Inc()
logsql.ProcessTenantIDsRequest(ctx, w, r)
tenantIDsDuration.UpdateDuration(startTime)
return true
default:
return false
}
Expand Down Expand Up @@ -448,6 +453,9 @@ var (
logsqlStreamsRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/logsql/streams"}`)
logsqlStreamsDuration = metrics.NewSummary(`vl_http_request_duration_seconds{path="/select/logsql/streams"}`)

tenantIDsRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/tenant_ids"}`)
tenantIDsDuration = metrics.NewSummary(`vl_http_request_duration_seconds{path="/select/tenant_ids"}`)

// no need to track duration for tail requests, as they usually take long time
logsqlTailRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/logsql/tail"}`)

Expand Down
8 changes: 8 additions & 0 deletions app/vlstorage/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,14 @@ func DeleteActiveTasks(ctx context.Context) ([]*logstorage.DeleteTask, error) {
return netstorageSelect.DeleteActiveTasks(ctx)
}

// GetTenantIDs returns tenantIDs from the storage by the given start and end.
func GetTenantIDs(ctx context.Context, start, end int64) ([]logstorage.TenantID, error) {
if localStorage != nil {
return localStorage.GetTenantIDs(ctx, start, end)
}
return netstorageSelect.GetTenantIDs(ctx, start, end)
}

func writeStorageMetrics(w io.Writer, strg *logstorage.Storage) {
var ss logstorage.StorageStats
strg.UpdateStats(&ss)
Expand Down
72 changes: 72 additions & 0 deletions app/vlstorage/netselect/netselect.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,23 @@ func (sn *storageNode) getStreamIDs(qctx *logstorage.QueryContext, limit uint64)
return sn.getValuesWithHits(qctx, "/internal/select/stream_ids", args)
}

func (sn *storageNode) getTenantIDs(ctx context.Context, start, end int64) ([]logstorage.TenantID, error) {
args := url.Values{}
args.Set("start", fmt.Sprintf("%d", start))
args.Set("end", fmt.Sprintf("%d", end))

path := "/internal/select/tenant_ids"
data, reqURL, err := sn.getPlainResponseBodyForPathAndArgs(ctx, path, args)
if err != nil {
return nil, err
}
var tenantIDs []logstorage.TenantID
if err := json.Unmarshal(data, &tenantIDs); err != nil {
return nil, fmt.Errorf("cannot unmarshal tenantIDs received from %q; data=%q: %w", reqURL, data, err)
}
return tenantIDs, nil
}

func (sn *storageNode) getCommonArgs(version string, qctx *logstorage.QueryContext) url.Values {
// ATTENTION: the *ProtocolVersion consts must be incremented every time the set of common args changes or its format changes.

Expand Down Expand Up @@ -571,6 +588,61 @@ func (s *Storage) DeleteActiveTasks(ctx context.Context) ([]*logstorage.DeleteTa
return tasks, nil
}

// GetTenantIDs returns tenantIDs for the given start and end.
func (s *Storage) GetTenantIDs(ctx context.Context, start, end int64) ([]logstorage.TenantID, error) {
return s.getTenantIDs(ctx, start, end)
}

func (s *Storage) getTenantIDs(ctx context.Context, start, end int64) ([]logstorage.TenantID, error) {
ctxWithCancel, cancel := context.WithCancel(ctx)
defer cancel()

results := make([][]logstorage.TenantID, len(s.sns))
errs := make([]error, len(s.sns))

// Return an error to the caller when at least a single storage node is unavailable,
// since this may result in incomplete list of the returned tenantIDs, which may mislead the caller.
allowPartialResponse := false

var wg sync.WaitGroup
for i := range s.sns {
wg.Add(1)
go func(nodeIdx int) {
defer wg.Done()

sn := s.sns[nodeIdx]
tenantIDs, err := sn.getTenantIDs(ctxWithCancel, start, end)
results[nodeIdx] = tenantIDs
errs[nodeIdx] = sn.handleError(ctxWithCancel, cancel, err, allowPartialResponse)

if err != nil {
// Cancel the remaining parallel requests
cancel()
}
}(i)
}
wg.Wait()

if err := getFirstError(errs, allowPartialResponse); err != nil {
return nil, err
}

// Deduplicate tenantIDs
m := make(map[logstorage.TenantID]struct{})
for _, tenantIDs := range results {
for _, tenantID := range tenantIDs {
m[tenantID] = struct{}{}
}
}

tenantIDs := make([]logstorage.TenantID, 0, len(m))
for tenantID := range m {
tenantIDs = append(tenantIDs, tenantID)
}

return tenantIDs, nil
}

func (s *Storage) getValuesWithHits(qctx *logstorage.QueryContext, limit uint64, resetHitsOnLimitExceeded bool,
callback func(ctx context.Context, sn *storageNode) ([]logstorage.ValueWithHits, error)) ([]logstorage.ValueWithHits, error) {

Expand Down
1 change: 1 addition & 0 deletions docs/victorialogs/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta
* FEATURE: add an ability to delete stored logs. See [these docs](https://docs.victoriametrics.com/victorialogs/#how-to-delete-logs) and [#43](https://github.com/VictoriaMetrics/VictoriaLogs/issues/43). Thanks to @func25 for the initial idea and implementation at [#4](https://github.com/VictoriaMetrics/VictoriaLogs/pull/4).
* FEATURE: add an ability to hide the given [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) on a per-query basis. This may be useful for restricting access to log fields with sensitive information, by specifying the list of the fields to hide via `hidden_fields_filters` query arg. See [these docs](https://docs.victoriametrics.com/victorialogs/querying/#hidden-fields) for details. See [#668](https://github.com/VictoriaMetrics/VictoriaLogs/issues/668).
* FEATURE: [querying](https://docs.victoriametrics.com/victorialogs/querying/): add slow query logging via `-search.logSlowQueryDuration`; increments `vl_slow_queries_total`. See [#750](https://github.com/VictoriaMetrics/VictoriaLogs/issues/750).
* FEATURE: [querying API](https://docs.victoriametrics.com/victorialogs/querying/#http-api): add support for `/select/tenant_ids` endpoint, which returns [tenant IDs](https://docs.victoriametrics.com/victorialogs/#multitenancy) for the existing logs on the given `[start .. end)` time range. See [these docs](https://docs.victoriametrics.com/victorialogs/querying/#querying-tenants) and [#158 at victorialogs-datasource](https://github.com/VictoriaMetrics/victorialogs-datasource/issues/158).
* FEATURE: [web UI](https://docs.victoriametrics.com/victorialogs/querying/#web-ui): simplify "Download logs" dialog - show only file name field and description with time range and tenant. See [#789](https://github.com/VictoriaMetrics/VictoriaLogs/issues/789).
* FEATURE: [web UI](https://docs.victoriametrics.com/victorialogs/querying/#web-ui): update text for skipping warning modal and increase large load limit to `1000`. See [#808](https://github.com/VictoriaMetrics/VictoriaLogs/issues/808).

Expand Down
21 changes: 21 additions & 0 deletions docs/victorialogs/querying/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ VictoriaLogs provides the following HTTP endpoints:
- [`/select/logsql/stream_field_values`](https://docs.victoriametrics.com/victorialogs/querying/#querying-stream-field-values) for querying [log stream](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) field values.
- [`/select/logsql/field_names`](https://docs.victoriametrics.com/victorialogs/querying/#querying-field-names) for querying [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) names.
- [`/select/logsql/field_values`](https://docs.victoriametrics.com/victorialogs/querying/#querying-field-values) for querying [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) values.
- [`/select/tenant_ids`](https://docs.victoriametrics.com/victorialogs/querying/#querying-tenants) for querying [tenants](https://docs.victoriametrics.com/victorialogs/#multitenancy) across the stored data.

See also:

Expand Down Expand Up @@ -844,6 +845,26 @@ See also:
- [Querying streams](https://docs.victoriametrics.com/victorialogs/querying/#querying-streams)
- [HTTP API](https://docs.victoriametrics.com/victorialogs/querying/#http-api)

### Querying tenants

VictoriaLogs provides `/select/tenant_ids?start=<start>&end=<end>` endpoint, which returns [tenant ids](https://docs.victoriametrics.com/victorialogs/#multitenancy)
for the ingested logs on the given `[start ... end)` time range.

This endpoint must be called with empty `AccountID` request header for security reasons - this prevents from unauthorized calls for this endpoint
from clients who have access to the specified `AccountID` tenant. This can be enforced at `vmauth` side
according to [these docs](https://docs.victoriametrics.com/victoriametrics/vmauth/#modifying-http-headers).

Example response returned by the `/select/tenant_ids` endpoint:

```sh
[
{
"account_id": 0,
"project_id": 0
}
]
```

### Querying field names

VictoriaLogs provides `/select/logsql/field_names?query=<query>&start=<start>&end=<end>` HTTP endpoint, which returns field names
Expand Down
Loading