Skip to content

Commit a845dc3

Browse files
Merge branch 'master' into vmalert-proxy
2 parents 9fd897a + dbba346 commit a845dc3

File tree

5 files changed

+68
-9
lines changed

5 files changed

+68
-9
lines changed

docs/victorialogs/CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,10 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta
2121
* FEATURE: proxy VMAlert requests at `/select/vmalert` path, when `-vmalert.proxyURL` flag is set. See [#8272](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/8272).
2222
* FEATURE: add `-search.disableTenantResponseHeaders` flag to disable showing tenant information on VMUI for case, when AccountID and ProjectID headers are set by some proxy.
2323

24+
* BUGFIX: [`/select/logsql/query` endpoint](https://docs.victoriametrics.com/victorialogs/querying/#querying-logs): properly optimize the query execution when the `limit` query arg is set, while the `offset` query arg isn't set. It wasn't addressed properly in [v1.33.0](#v1330). See [#620](https://github.com/VictoriaMetrics/VictoriaLogs/issues/620).
25+
* BUGFIX: [`/select/logsql/hits` endpoint](https://docs.victoriametrics.com/victorialogs/querying/#querying-hits-stats): take into account results from the [`union` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#union-pipe) while calculating log hits. See [#641](https://github.com/VictoriaMetrics/VictoriaLogs/issues/641).
26+
* BUGFIX: [`/select/logsql/hits` endpoint](https://docs.victoriametrics.com/victorialogs/querying/#querying-hits-stats): take into account results from the [`join` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#join-pipe) while calculating log hits.
27+
2428
## [v1.33.0](https://github.com/VictoriaMetrics/VictoriaLogs/releases/tag/v1.33.0)
2529

2630
Released at 2025-09-10

lib/logstorage/net_query_runner_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,4 +84,9 @@ func TestSplitQueryToRemoteAndLocal(t *testing.T) {
8484
f(`foo | unpack_syslog`, `foo | unpack_syslog`, ``)
8585
f(`foo | unpack_words`, `foo | unpack_words`, ``)
8686
f(`foo | unroll by (x)`, `foo | unroll by (x)`, ``)
87+
88+
// Special cases with 'offset 0'
89+
// See https://github.com/VictoriaMetrics/VictoriaLogs/issues/620#issuecomment-3276624504
90+
f(`foo | offset 0`, `foo`, ``)
91+
f(`foo | offset 0 | limit 10`, `foo | limit 10`, `limit 10`)
8792
}

lib/logstorage/parser.go

Lines changed: 41 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -511,7 +511,7 @@ func (q *Query) AddFacetsPipe(limit, maxValuesPerField, maxValueLen int, keepCon
511511
// AddCountByTimePipe adds '| stats by (_time:step offset off, field1, ..., fieldN) count() hits' to the end of q.
512512
func (q *Query) AddCountByTimePipe(step, off int64, fields []string) {
513513
// Drop pipes from q, which modify or delete _time field, since they make impossible to calculate stats grouped by _time.
514-
q.dropTimeModificationPipes()
514+
q.dropPipesUnsafeForHits()
515515

516516
{
517517
// add 'stats by (_time:step offset off, fields) count() hits'
@@ -541,10 +541,11 @@ func (q *Query) AddCountByTimePipe(step, off int64, fields []string) {
541541
}
542542
}
543543

544-
// dropTimeModificationPipes drops pipes from q, which modify
545-
func (q *Query) dropTimeModificationPipes() {
544+
// dropPipesUnsafeForHits drops trailing pipes from q, which are unsafe
545+
// for calculating hits grouped by _time.
546+
func (q *Query) dropPipesUnsafeForHits() {
546547
for i, p := range q.pipes {
547-
if !p.canReturnLastNResults() {
548+
if !isPipeSafeForHits(p) {
548549
// Drop the rest of the pipes, including the current pipe,
549550
// since it modified or deletes the _time field.
550551
q.pipes = q.pipes[:i]
@@ -553,6 +554,25 @@ func (q *Query) dropTimeModificationPipes() {
553554
}
554555
}
555556

557+
func isPipeSafeForHits(p pipe) bool {
558+
if p.canReturnLastNResults() {
559+
return true
560+
}
561+
562+
switch t := p.(type) {
563+
case *pipeUnion:
564+
// Allow union pipes, but drop pipes unsafe for hits inside them.
565+
// See https://github.com/VictoriaMetrics/VictoriaLogs/issues/641
566+
t.q.dropPipesUnsafeForHits()
567+
return true
568+
case *pipeJoin:
569+
// Allow join pipes, since they do not drop _time field.
570+
return true
571+
default:
572+
return false
573+
}
574+
}
575+
556576
// Clone returns a copy of q at the given timestamp.
557577
func (q *Query) Clone(timestamp int64) *Query {
558578
qStr := q.String()
@@ -790,7 +810,7 @@ func (q *Query) AddPipeOffsetLimit(offset, limit uint64) {
790810
q.mustAppendPipe(limitStr)
791811

792812
// optimize the query, so the `offset` and `limit` pipes could be joined with the preceding `sort` pipe.
793-
q.pipes = optimizeSortOffsetLimitPipes(q.pipes)
813+
q.pipes = optimizeOffsetLimitPipes(q.pipes)
794814
}
795815

796816
func (q *Query) mustAppendPipe(s string) {
@@ -807,7 +827,7 @@ func (q *Query) optimize() {
807827
}
808828

809829
func (q *Query) optimizeNoSubqueries() {
810-
q.pipes = optimizeSortOffsetLimitPipes(q.pipes)
830+
q.pipes = optimizeOffsetLimitPipes(q.pipes)
811831
q.pipes = optimizeUniqLimitPipes(q.pipes)
812832
q.pipes = optimizeFilterPipes(q.pipes)
813833

@@ -1323,15 +1343,28 @@ func removeStarFilters(f filter) filter {
13231343
return f
13241344
}
13251345

1326-
func optimizeSortOffsetLimitPipes(pipes []pipe) []pipe {
1346+
func optimizeOffsetLimitPipes(pipes []pipe) []pipe {
13271347
for {
13281348
pipesLen := len(pipes)
13291349
pipes = optimizeSortOffsetPipes(pipes)
13301350
pipes = optimizeSortLimitPipes(pipes)
13311351
if len(pipes) == pipesLen {
1332-
return pipes
1352+
break
13331353
}
13341354
}
1355+
1356+
// Remove `offset 0` pipes.
1357+
i := 0
1358+
for i < len(pipes) {
1359+
po, ok := pipes[i].(*pipeOffset)
1360+
if !ok || po.offset != 0 {
1361+
i++
1362+
continue
1363+
}
1364+
pipes = append(pipes[:i], pipes[i+1:]...)
1365+
}
1366+
1367+
return pipes
13351368
}
13361369

13371370
func optimizeSortOffsetPipes(pipes []pipe) []pipe {

lib/logstorage/parser_test.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -355,7 +355,7 @@ func TestQuery_AddTimeFilter_StepPropagation(t *testing.T) {
355355
})
356356
}
357357

358-
func TestParseQuery_OptimizeSortOffsetLimitPipes(t *testing.T) {
358+
func TestParseQuery_OptimizeOffsetLimitPipes(t *testing.T) {
359359
f := func(s, resultExpected string) {
360360
t.Helper()
361361

@@ -385,6 +385,11 @@ func TestParseQuery_OptimizeSortOffsetLimitPipes(t *testing.T) {
385385
f(`* | sort by (x) | limit 30 | limit 20 | offset 4`, `* | sort by (x) offset 4 limit 16`)
386386
f(`* | sort by (x) | limit 30 | limit 20 | offset 4 | offset 5`, `* | sort by (x) offset 9 limit 11`)
387387
f(`* | sort by (x) | limit 30 | limit 20 | offset 4 | offset 5 | fields x`, `* | sort by (x) offset 9 limit 11 | fields x`)
388+
389+
// Verify the case without 'sort' pipe and with 'offset 0' pipes.
390+
// See https://github.com/VictoriaMetrics/VictoriaLogs/issues/620#issuecomment-3276624504
391+
f(`* | offset 0`, `*`)
392+
f(`* | offset 0 | limit 10`, `* | limit 10`)
388393
}
389394

390395
func TestParseQuery_OptimizeStarFilters(t *testing.T) {
@@ -3932,6 +3937,14 @@ func TestQuery_AddCountByTimePipe(t *testing.T) {
39323937
// pipes, which do not change _time field
39333938
f("* | extract 'abc<de>fg' | filter de:='qwer'", nsecsPerMinute, 0, nil, `* | extract "abc<de>fg" | filter de:=qwer | stats by (_time:1m) count(*) as hits | sort by (_time)`)
39343939

3940+
// union pipe is allowed. See https://github.com/VictoriaMetrics/VictoriaLogs/issues/641
3941+
f(`foo | union (bar)`, nsecsPerMinute, 0, nil, `foo | union (bar) | stats by (_time:1m) count(*) as hits | sort by (_time)`)
3942+
f(`foo | union (bar) | stats count()`, nsecsPerMinute, 0, nil, `foo | union (bar) | stats by (_time:1m) count(*) as hits | sort by (_time)`)
3943+
f(`foo | union (bar | stats count())`, nsecsPerMinute, 0, nil, `foo | union (bar) | stats by (_time:1m) count(*) as hits | sort by (_time)`)
3944+
3945+
// join pipe is allowed
3946+
f(`foo | join by (x) (y)`, nsecsPerMinute, 0, nil, `foo | join by (x) (y) | stats by (_time:1m) count(*) as hits | sort by (_time)`)
3947+
39353948
// pipes, which change _time field
39363949
f("* | extract 'abc<de>fg' | filter de:='qwer' | stats count()", nsecsPerMinute, 0, nil, `* | extract "abc<de>fg" | filter de:=qwer | stats by (_time:1m) count(*) as hits | sort by (_time)`)
39373950
f("* | extract 'abc<de>fg' | sort by (x)", nsecsPerMinute, 0, nil, `* | extract "abc<de>fg" | stats by (_time:1m) count(*) as hits | sort by (_time)`)

lib/logstorage/pipe_offset.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,10 @@ func (po *pipeOffset) String() string {
1919
}
2020

2121
func (po *pipeOffset) splitToRemoteAndLocal(_ int64) (pipe, []pipe) {
22+
if po.offset == 0 {
23+
// Special case - `offset 0` is safe to push to the remote side.
24+
return po, nil
25+
}
2226
return nil, []pipe{po}
2327
}
2428

0 commit comments

Comments
 (0)