Skip to content

Commit 68b9433

Browse files
committed
lib/logstorage: treat row_any, row_min and row_max results as labels instead of metrics when executing /select/logsql/stats_query and /select/logsql/stats_query_range
row_any, row_min and row_max return string values, so they cannot be used as numeric metrics by clients requesting /select/logsql/stats_query and /select/logsql/stats_query_range (for example, vmalert and Grafana plugin for VictoriaLogs). It is better to treat them as labels. This allows obtaining a sample of some label (for example, log message) at vmalert, and use it in the created alert annotation / description, via the following query: error | stats count() errors, row_any(_msg) msg_sample | filter errors:>0 The row_*() funtions return JSON-encoded string. It can be unpacked into the needed fields via unpack_json pipe if needed, while the JSON-encoded string can be removed from the output if needed with the delete pipe: error | stats by (path) count() as errors, row_any(_msg) as msg_sample | filter errors:>0 | unpack_json from msg_sample fields (_msg) | delete msg_sample See #81
1 parent 33ebc89 commit 68b9433

File tree

4 files changed

+132
-126
lines changed

4 files changed

+132
-126
lines changed

app/vlselect/logsql/logsql.go

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -889,9 +889,7 @@ func ProcessStatsQueryRangeRequest(ctx context.Context, w http.ResponseWriter, r
889889
return
890890
}
891891

892-
// Obtain `by(...)` fields from the last `| stats` pipe in q.
893-
// Add `_time:step` to the `by(...)` list.
894-
byFields, err := ca.q.GetStatsByFieldsAddGroupingByTime(int64(step))
892+
labelFields, err := ca.q.GetStatsLabelsAddGroupingByTime(int64(step))
895893
if err != nil {
896894
httpserver.SendPrometheusError(w, r, err)
897895
return
@@ -913,7 +911,7 @@ func ProcessStatsQueryRangeRequest(ctx context.Context, w http.ResponseWriter, r
913911
// must be initialized to query timestamp for every processed log row.
914912
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/8312
915913
ts := ca.q.GetTimestamp()
916-
labels := make([]logstorage.Field, 0, len(byFields))
914+
labels := make([]logstorage.Field, 0, len(labelFields))
917915
for j, c := range columns {
918916
if c.Name == "_time" {
919917
nsec, ok := logstorage.TryParseTimestampRFC3339Nano(c.Values[i])
@@ -922,7 +920,7 @@ func ProcessStatsQueryRangeRequest(ctx context.Context, w http.ResponseWriter, r
922920
continue
923921
}
924922
}
925-
if slices.Contains(byFields, c.Name) {
923+
if slices.Contains(labelFields, c.Name) {
926924
labels = append(labels, logstorage.Field{
927925
Name: clonedColumnNames[j],
928926
Value: strings.Clone(c.Values[i]),
@@ -932,7 +930,7 @@ func ProcessStatsQueryRangeRequest(ctx context.Context, w http.ResponseWriter, r
932930

933931
var dst []byte
934932
for j, c := range columns {
935-
if !slices.Contains(byFields, c.Name) {
933+
if !slices.Contains(labelFields, c.Name) {
936934
name := clonedColumnNames[j]
937935
dst = dst[:0]
938936
dst = append(dst, name...)
@@ -1017,8 +1015,7 @@ func ProcessStatsQueryRequest(ctx context.Context, w http.ResponseWriter, r *htt
10171015
return
10181016
}
10191017

1020-
// Obtain `by(...)` fields from the last `| stats` pipe in q.
1021-
byFields, err := ca.q.GetStatsByFields()
1018+
labelFields, err := ca.q.GetStatsLabels()
10221019
if err != nil {
10231020
httpserver.SendPrometheusError(w, r, err)
10241021
return
@@ -1036,9 +1033,9 @@ func ProcessStatsQueryRequest(ctx context.Context, w http.ResponseWriter, r *htt
10361033
clonedColumnNames[i] = strings.Clone(c.Name)
10371034
}
10381035
for i := 0; i < rowsCount; i++ {
1039-
labels := make([]logstorage.Field, 0, len(byFields))
1036+
labels := make([]logstorage.Field, 0, len(labelFields))
10401037
for j, c := range columns {
1041-
if slices.Contains(byFields, c.Name) {
1038+
if slices.Contains(labelFields, c.Name) {
10421039
labels = append(labels, logstorage.Field{
10431040
Name: clonedColumnNames[j],
10441041
Value: strings.Clone(c.Values[i]),
@@ -1047,7 +1044,7 @@ func ProcessStatsQueryRequest(ctx context.Context, w http.ResponseWriter, r *htt
10471044
}
10481045

10491046
for j, c := range columns {
1050-
if !slices.Contains(byFields, c.Name) {
1047+
if !slices.Contains(labelFields, c.Name) {
10511048
r := statsRow{
10521049
Name: clonedColumnNames[j],
10531050
Labels: labels,

docs/victorialogs/CHANGELOG.md

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ according to the follosing docs:
2121

2222
## tip
2323

24+
* FEATURE: [HTTP querying API](https://docs.victoriametrics.com/victorialogs/querying/#http-api): automatically convert the results of [`row_any()`](https://docs.victoriametrics.com/victorialogs/logsql/#row_any-stats), [`row_min()`](https://docs.victoriametrics.com/victorialogs/logsql/#row_min-stats) and [`row_max()`](https://docs.victoriametrics.com/victorialogs/logsql/#row_max-stats) stats functions to labels in [`/select/logsql/stats_query`](https://docs.victoriametrics.com/victorialogs/querying/#querying-log-stats) and [`/select/logsql/stats_query_range`](https://docs.victoriametrics.com/victorialogs/querying/#querying-log-range-stats) functions. This allows obtaining a sample of log message in alerting rules with the following query: `... | stats count() as hits, row_any(_msg) as msg_sample`. See [#81](https://github.com/VictoriaMetrics/VictoriaLogs/issues/81).
2425
* FEATURE: [vmui](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#vmui): show VictoriaLogs version in the vmui's footer. See [#116](https://github.com/VictoriaMetrics/VictoriaLogs/issues/116).
2526

2627
## [v1.39.0](https://github.com/VictoriaMetrics/VictoriaLogs/releases/tag/v1.39.0)
@@ -346,10 +347,10 @@ Released at 2025-06-20
346347
* FEATURE: [`sum_len` stats function](https://docs.victoriametrics.com/victorialogs/logsql/#sum_len-stats): allow calculating the sum of byte lengths for all the fields with common prefix via `sum_len(prefix*)` syntax.
347348
* FEATURE: [`count` stats function](https://docs.victoriametrics.com/victorialogs/logsql/#count-stats): allow calculating the number of logs with at least a single non-empty field across fields with common prefix via `count(prefix*)` syntax.
348349
* FEATURE: [`count_empty` stats function](https://docs.victoriametrics.com/victorialogs/logsql/#count_empty-stats): allow calculating the number of logs with empty fields with common prefix via `count_empty(prefix*)` syntax.
349-
* FEATURE: [`rate_sum` stats function](https://docs.victoriametrics.com/victorialogs/logsql/#avg-stats): allow calculating the per-second rate over the sum of all the fields with common prefix via `rate_sum(prefix*)` syntax.
350-
* FEATURE: [`row_any` stats function](https://docs.victoriametrics.com/victorialogs/logsql/#avg-stats): allow returning all the fields with common prefix via `row_any(prefix*)` syntax.
351-
* FEATURE: [`row_max` stats function](https://docs.victoriametrics.com/victorialogs/logsql/#avg-stats): allow returning all the fields with common prefix via `row_max(max_field, prefix*)` syntax.
352-
* FEATURE: [`row_min` stats function](https://docs.victoriametrics.com/victorialogs/logsql/#avg-stats): allow returning all the fields with common prefix via `row_min(min_field, prefix*)` syntax.
350+
* FEATURE: [`rate_sum` stats function](https://docs.victoriametrics.com/victorialogs/logsql/#rate_sum-stats): allow calculating the per-second rate over the sum of all the fields with common prefix via `rate_sum(prefix*)` syntax.
351+
* FEATURE: [`row_any` stats function](https://docs.victoriametrics.com/victorialogs/logsql/#row_any-stats): allow returning all the fields with common prefix via `row_any(prefix*)` syntax.
352+
* FEATURE: [`row_max` stats function](https://docs.victoriametrics.com/victorialogs/logsql/#row_max-stats): allow returning all the fields with common prefix via `row_max(max_field, prefix*)` syntax.
353+
* FEATURE: [`row_min` stats function](https://docs.victoriametrics.com/victorialogs/logsql/#row_min-stats): allow returning all the fields with common prefix via `row_min(min_field, prefix*)` syntax.
353354
* FEATURE: [`uniq_values` stats function](https://docs.victoriametrics.com/victorialogs/logsql/#uniq_values-stats): allow fetching unique values for all the fields with common prefix via `uniq_values(prefix*)` syntax.
354355
* FEATURE: [`values` stats function](https://docs.victoriametrics.com/victorialogs/logsql/#values-stats): allow fetching values for all the fields with common prefix via `values(prefix*)` syntax.
355356
* FEATURE: [`json_values` stats function](https://docs.victoriametrics.com/victorialogs/logsql/#json_values-stats): allow fetching values for all the fields with common prefix via `json_values(prefix*)` syntax.

lib/logstorage/parser.go

Lines changed: 60 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -1033,15 +1033,17 @@ func mergeFiltersStreamInternal(fss []*filterStream) []*filterStream {
10331033
}
10341034
}
10351035

1036-
// GetStatsByFields returns `by (...)` fields from the last `stats` pipe at q.
1037-
func (q *Query) GetStatsByFields() ([]string, error) {
1038-
return q.GetStatsByFieldsAddGroupingByTime(0)
1036+
// GetStatsLabels returns stats labels from q for /select/logsql/stats_query endpoint
1037+
//
1038+
// The remaining fields are considered metrics.
1039+
func (q *Query) GetStatsLabels() ([]string, error) {
1040+
return q.GetStatsLabelsAddGroupingByTime(0)
10391041
}
10401042

1041-
// GetStatsByFieldsAddGroupingByTime returns `by (...)` fields from the last `stats` pipe at q.
1043+
// GetStatsLabelsAddGroupingByTime returns stats labels from q for /select/logsql/stats_query and /select/logsql/stats_query_range endpoints
10421044
//
10431045
// if step > 0, then _time:step is added to the last `stats by (...)` pipe at q.
1044-
func (q *Query) GetStatsByFieldsAddGroupingByTime(step int64) ([]string, error) {
1046+
func (q *Query) GetStatsLabelsAddGroupingByTime(step int64) ([]string, error) {
10451047
idx := getLastPipeStatsIdx(q.pipes)
10461048
if idx < 0 {
10471049
return nil, fmt.Errorf("missing `| stats ...` pipe in the query [%s]", q)
@@ -1073,20 +1075,37 @@ func (q *Query) GetStatsByFieldsAddGroupingByTime(step int64) ([]string, error)
10731075
// add 'partition by (_time)' to 'sort', 'first' and 'last' pipes.
10741076
q.addPartitionByTime(step)
10751077

1078+
labelFields := make([]string, 0, len(ps.byFields))
1079+
metricFields := make(map[string]struct{}, len(ps.funcs))
1080+
1081+
addToLabelFields := func(f string) {
1082+
if !slices.Contains(labelFields, f) {
1083+
labelFields = append(labelFields, f)
1084+
}
1085+
delete(metricFields, f)
1086+
}
1087+
1088+
addToMetricFields := func(f string) {
1089+
if idx := slices.Index(labelFields, f); idx >= 0 {
1090+
labelFields = append(labelFields[:idx], labelFields[idx+1:]...)
1091+
}
1092+
metricFields[f] = struct{}{}
1093+
}
1094+
10761095
// extract by(...) field names from ps
1077-
byFields := make([]string, len(ps.byFields))
1078-
for i, f := range ps.byFields {
1079-
byFields[i] = f.name
1096+
for _, f := range ps.byFields {
1097+
addToLabelFields(f.name)
10801098
}
10811099

10821100
// extract metric fields from stats pipe
1083-
metricFields := make(map[string]struct{}, len(ps.funcs))
10841101
for i := range ps.funcs {
10851102
f := &ps.funcs[i]
1086-
if slices.Contains(byFields, f.resultName) {
1087-
return nil, fmt.Errorf("the %q field cannot be overridden at %q in the query [%s]", f.resultName, ps, q)
1103+
switch f.f.(type) {
1104+
case *statsRowAny, *statsRowMin, *statsRowMax:
1105+
addToLabelFields(f.resultName)
1106+
default:
1107+
addToMetricFields(f.resultName)
10881108
}
1089-
metricFields[f.resultName] = struct{}{}
10901109
}
10911110

10921111
// verify that all the pipes after the idx do not add new fields
@@ -1098,31 +1117,24 @@ func (q *Query) GetStatsByFieldsAddGroupingByTime(step int64) ([]string, error)
10981117
case *pipeFirst, *pipeLast, *pipeSort:
10991118
// These pipes do not change the set of fields.
11001119
case *pipeRunningStats:
1101-
// `| running_stats ...` pipe must contain the same byFields as the preceding `stats` pipe.
1102-
if !hasNeededFieldsExceptTime(t.byFields, byFields) {
1120+
// `| running_stats ...` pipe must contain the same labelFields as the preceding `stats` pipe.
1121+
if !hasNeededFieldsExceptTime(t.byFields, labelFields) {
11031122
return nil, fmt.Errorf("the %q must contain the same list of fields as `stats` pipe in the query [%s]", t, q)
11041123
}
1105-
// `| running_stats ...` pipe cannot override byFields.
11061124
for _, f := range t.funcs {
1107-
if slices.Contains(byFields, f.resultName) {
1108-
return nil, fmt.Errorf("the %q field cannot be overridden at %q in the query [%s]", f.resultName, t, q)
1109-
}
1110-
metricFields[f.resultName] = struct{}{}
1125+
addToMetricFields(f.resultName)
11111126
}
11121127
case *pipeMath:
11131128
// Allow `| math ...` pipe, since it adds additional metrics to the given set of fields.
1114-
// Verify that the result fields at math pipe do not override byFields.
11151129
for _, me := range t.entries {
1116-
if slices.Contains(byFields, me.resultField) {
1117-
return nil, fmt.Errorf("the %q field cannot be overridden at %q in the query [%s]", me.resultField, t, q)
1118-
}
1119-
metricFields[me.resultField] = struct{}{}
1130+
addToMetricFields(me.resultField)
11201131
}
11211132
case *pipeFields:
1122-
// `| fields ...` pipe must contain all the by(...) fields, otherwise it breaks output.
1123-
for _, f := range byFields {
1124-
if !prefixfilter.MatchFilters(t.fieldFilters, f) {
1125-
return nil, fmt.Errorf("missing %q field at %q pipe in the query [%s]", f, p, q)
1133+
labelFieldsCopy := append([]string{}, labelFields...)
1134+
labelFields = make([]string, 0, len(labelFields))
1135+
for _, f := range labelFieldsCopy {
1136+
if prefixfilter.MatchFilters(t.fieldFilters, f) {
1137+
labelFields = append(labelFields, f)
11261138
}
11271139
}
11281140
for f := range maps.Clone(metricFields) {
@@ -1131,10 +1143,11 @@ func (q *Query) GetStatsByFieldsAddGroupingByTime(step int64) ([]string, error)
11311143
}
11321144
}
11331145
case *pipeDelete:
1134-
// Disallow deleting by(...) fields, since this breaks output.
1135-
for _, f := range byFields {
1136-
if prefixfilter.MatchFilters(t.fieldFilters, f) {
1137-
return nil, fmt.Errorf("the %q field cannot be deleted via %q in the query [%s]", f, p, q)
1146+
labelFieldsCopy := append([]string{}, labelFields...)
1147+
labelFields = make([]string, 0, len(labelFields))
1148+
for _, f := range labelFieldsCopy {
1149+
if !prefixfilter.MatchFilters(t.fieldFilters, f) {
1150+
labelFields = append(labelFields, f)
11381151
}
11391152
}
11401153
for f := range maps.Clone(metricFields) {
@@ -1148,25 +1161,20 @@ func (q *Query) GetStatsByFieldsAddGroupingByTime(step int64) ([]string, error)
11481161
fSrc := t.srcFieldFilters[i]
11491162
fDst := t.dstFieldFilters[i]
11501163

1151-
for _, f := range byFields {
1152-
if prefixfilter.MatchFilter(fDst, f) {
1153-
return nil, fmt.Errorf("the %q field cannot be overridden at %q in the query [%s]", f, t, q)
1154-
}
1164+
for _, f := range labelFields {
11551165
if prefixfilter.MatchFilter(fSrc, f) {
11561166
dstFieldName := string(prefixfilter.AppendReplace(nil, fSrc, fDst, f))
1157-
if slices.Contains(byFields, dstFieldName) {
1158-
return nil, fmt.Errorf("the %q field cannot be overridden at %q in the query [%s]", dstFieldName, t, q)
1159-
}
1160-
byFields = append(byFields, dstFieldName)
1167+
addToLabelFields(dstFieldName)
11611168
}
11621169
}
1170+
11631171
for f := range maps.Clone(metricFields) {
11641172
if prefixfilter.MatchFilter(fDst, f) {
11651173
delete(metricFields, f)
11661174
}
11671175
if prefixfilter.MatchFilter(fSrc, f) {
11681176
dstFieldName := string(prefixfilter.AppendReplace(nil, fSrc, fDst, f))
1169-
metricFields[dstFieldName] = struct{}{}
1177+
addToMetricFields(dstFieldName)
11701178
}
11711179
}
11721180
}
@@ -1176,16 +1184,14 @@ func (q *Query) GetStatsByFieldsAddGroupingByTime(step int64) ([]string, error)
11761184
fSrc := t.srcFieldFilters[i]
11771185
fDst := t.dstFieldFilters[i]
11781186

1179-
for j, f := range byFields {
1180-
if prefixfilter.MatchFilter(fDst, f) {
1181-
return nil, fmt.Errorf("the %q field cannot be overridden at %q in the query [%s]", f, t, q)
1182-
}
1187+
labelFieldsCopy := append([]string{}, labelFields...)
1188+
labelFields = make([]string, 0, len(labelFields))
1189+
for _, f := range labelFieldsCopy {
11831190
if prefixfilter.MatchFilter(fSrc, f) {
11841191
dstFieldName := string(prefixfilter.AppendReplace(nil, fSrc, fDst, f))
1185-
if slices.Contains(byFields, dstFieldName) {
1186-
return nil, fmt.Errorf("the %q field cannot be overridden at %q in the query [%s]", dstFieldName, t, q)
1187-
}
1188-
byFields[j] = dstFieldName
1192+
addToLabelFields(dstFieldName)
1193+
} else {
1194+
addToLabelFields(f)
11891195
}
11901196
}
11911197

@@ -1196,17 +1202,13 @@ func (q *Query) GetStatsByFieldsAddGroupingByTime(step int64) ([]string, error)
11961202
if prefixfilter.MatchFilter(fSrc, f) {
11971203
delete(metricFields, f)
11981204
dstFieldName := string(prefixfilter.AppendReplace(nil, fSrc, fDst, f))
1199-
metricFields[dstFieldName] = struct{}{}
1205+
addToMetricFields(dstFieldName)
12001206
}
12011207
}
12021208
}
12031209
case *pipeFormat:
12041210
// Assume that `| format ...` pipe generates an additional by(...) label
1205-
if slices.Contains(byFields, t.resultField) {
1206-
return nil, fmt.Errorf("the %q field cannot be overridden at %q in the query [%s]", t.resultField, t, q)
1207-
}
1208-
byFields = append(byFields, t.resultField)
1209-
delete(metricFields, t.resultField)
1211+
addToLabelFields(t.resultField)
12101212
case *pipeUnpackJSON:
12111213
// Assume that `| unpack_json ... fields (...)` pipe generates an additional by(...) labels from fields(...)
12121214
if len(t.fieldFilters) == 0 {
@@ -1216,11 +1218,7 @@ func (q *Query) GetStatsByFieldsAddGroupingByTime(step int64) ([]string, error)
12161218
if prefixfilter.IsWildcardFilter(f) {
12171219
return nil, fmt.Errorf("fields(...) at %q cannot contain wildcard filter; got %s; query [%s]", t, f, q)
12181220
}
1219-
if slices.Contains(byFields, f) {
1220-
return nil, fmt.Errorf("the %q field cannot be overridden at %q in the query [%s]", f, t, q)
1221-
}
1222-
byFields = append(byFields, f)
1223-
delete(metricFields, f)
1221+
addToLabelFields(f)
12241222
}
12251223
default:
12261224
return nil, fmt.Errorf("the %q pipe cannot be put after %q pipe in the query [%s]", p, ps, q)
@@ -1231,7 +1229,7 @@ func (q *Query) GetStatsByFieldsAddGroupingByTime(step int64) ([]string, error)
12311229
return nil, fmt.Errorf("missing metric fields in the results of query [%s]", q)
12321230
}
12331231

1234-
return byFields, nil
1232+
return labelFields, nil
12351233
}
12361234

12371235
func hasNeededFieldsExceptTime(fields, neededFields []string) bool {
@@ -1659,7 +1657,7 @@ func ParseStatsQuery(s string, timestamp int64) (*Query, error) {
16591657
if err != nil {
16601658
return nil, err
16611659
}
1662-
if _, err := q.GetStatsByFields(); err != nil {
1660+
if _, err := q.GetStatsLabels(); err != nil {
16631661
return nil, err
16641662
}
16651663
return q, nil

0 commit comments

Comments
 (0)