diff --git a/lib/logstorage/filter_array_contains.go b/lib/logstorage/filter_array_contains.go new file mode 100644 index 0000000000..00c1d0ab93 --- /dev/null +++ b/lib/logstorage/filter_array_contains.go @@ -0,0 +1,174 @@ +package logstorage + +import ( + "fmt" + "strings" + + "github.com/VictoriaMetrics/VictoriaLogs/lib/prefixfilter" + "github.com/valyala/fastjson" +) + +// filterArrayContains matches if the JSON array in the given field contains the given value. +// +// Example LogsQL: `tags:array_contains("prod")` +type filterArrayContains struct { + fieldName string + value string +} + +func (fa *filterArrayContains) String() string { + return fmt.Sprintf("%sarray_contains(%s)", quoteFieldNameIfNeeded(fa.fieldName), quoteTokenIfNeeded(fa.value)) +} + +func (fa *filterArrayContains) updateNeededFields(pf *prefixfilter.Filter) { + pf.AddAllowFilter(fa.fieldName) +} + +func (fa *filterArrayContains) matchRow(fields []Field) bool { + v := getFieldValueByName(fields, fa.fieldName) + return matchArrayContains(v, fa.value) +} + +func (fa *filterArrayContains) applyToBlockResult(br *blockResult, bm *bitmap) { + c := br.getColumnByName(fa.fieldName) + if c.isConst { + v := c.valuesEncoded[0] + if !matchArrayContains(v, fa.value) { + bm.resetBits() + } + return + } + if c.isTime { + bm.resetBits() + return + } + + switch c.valueType { + case valueTypeString: + values := c.getValues(br) + bm.forEachSetBit(func(idx int) bool { + v := values[idx] + return matchArrayContains(v, fa.value) + }) + case valueTypeDict: + bb := bbPool.Get() + for _, v := range c.dictValues { + c := byte(0) + if matchArrayContains(v, fa.value) { + c = 1 + } + bb.B = append(bb.B, c) + } + valuesEncoded := c.getValuesEncoded(br) + bm.forEachSetBit(func(idx int) bool { + n := valuesEncoded[idx][0] + return bb.B[n] == 1 + }) + bbPool.Put(bb) + default: + bm.resetBits() + } +} + +func (fa *filterArrayContains) applyToBlockSearch(bs *blockSearch, bm *bitmap) { + fieldName := fa.fieldName + value := fa.value + + v := bs.getConstColumnValue(fieldName) + if v != "" { + if !matchArrayContains(v, value) { + bm.resetBits() + } + return + } + + // Verify whether filter matches other columns + ch := bs.getColumnHeader(fieldName) + if ch == nil { + // Fast path - there are no matching columns. + bm.resetBits() + return + } + + switch ch.valueType { + case valueTypeString: + matchStringByArrayContains(bs, ch, bm, value) + case valueTypeDict: + matchValuesDictByArrayContains(bs, ch, bm, value) + default: + bm.resetBits() + } +} + +func matchValuesDictByArrayContains(bs *blockSearch, ch *columnHeader, bm *bitmap, value string) { + bb := bbPool.Get() + for _, v := range ch.valuesDict.values { + c := byte(0) + if matchArrayContains(v, value) { + c = 1 + } + bb.B = append(bb.B, c) + } + matchEncodedValuesDict(bs, ch, bm, bb.B) + bbPool.Put(bb) +} + +func matchStringByArrayContains(bs *blockSearch, ch *columnHeader, bm *bitmap, value string) { + visitValues(bs, ch, bm, func(v string) bool { + return matchArrayContains(v, value) + }) +} + +func matchArrayContains(s, value string) bool { + if s == "" { + return false + } + // Fast check: if the value is not present as a substring, it definitely won't be in the array. + if !strings.Contains(s, value) { + return false + } + + // Fast check 2: must start with [ + if s[0] != '[' { + return false + } + + // Use shared fastjson.ParserPool in order to avoid per-call parser allocations. + p := jspp.Get() + defer jspp.Put(p) + v, err := p.Parse(s) + if err != nil { + return false + } + + // Check if it is an array + a, err := v.Array() + if err != nil { + return false + } + + for _, elem := range a { + // We only support checking against string representation of values in the array. + var sElem string + switch elem.Type() { + case fastjson.TypeString: + sElem = string(elem.GetStringBytes()) + case fastjson.TypeNumber: + sElem = elem.String() + case fastjson.TypeTrue: + sElem = "true" + case fastjson.TypeFalse: + sElem = "false" + case fastjson.TypeNull: + sElem = "null" + default: + continue + } + + if sElem == value { + return true + } + } + + return false +} diff --git a/lib/logstorage/filter_array_contains_test.go b/lib/logstorage/filter_array_contains_test.go new file mode 100644 index 0000000000..468d8025bd --- /dev/null +++ b/lib/logstorage/filter_array_contains_test.go @@ -0,0 +1,162 @@ +package logstorage + +import ( + "testing" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" +) + +func TestMatchArrayContains(t *testing.T) { + t.Parallel() + + f := func(s, value string, resultExpected bool) { + t.Helper() + result := matchArrayContains(s, value) + if result != resultExpected { + t.Fatalf("unexpected result for s=%q, value=%q; got %v; want %v", s, value, result, resultExpected) + } + } + + // Not an array + f("", "foo", false) + f("foo", "foo", false) + f("{}", "foo", false) + + // Array doesn't contain value + f("[]", "foo", false) + f(`["bar"]`, "foo", false) + f(`["bar","baz"]`, "foo", false) + f(`[1,2]`, "3", false) + + // Array contains value + f(`["foo"]`, "foo", true) + f(`["bar","foo"]`, "foo", true) + f(`["foo","bar"]`, "foo", true) + f(`["a","foo","b"]`, "foo", true) + + // Mixed types + f(`[123]`, "123", true) + f(`[true]`, "true", true) + f(`["123"]`, "123", true) + + // Tricky cases + f(`["foo bar"]`, "foo", false) // partial match + f(`["foobar"]`, "foo", false) // partial match + f(`["foo"]`, "fo", false) // partial match + + // Nested structures (ignored by current implementation) + f(`[{"a":"b"}]`, `{"a":"b"}`, false) // nested object ignored + f(`[["a"]]`, `["a"]`, false) // nested array ignored + f(`[["a"], "b"]`, "b", true) // mixed with simple value +} + +func TestFilterArrayContains(t *testing.T) { + t.Parallel() + + t.Run("const-column", func(t *testing.T) { + columns := []column{ + { + name: "foo", + values: []string{ + `["a","b"]`, + `["a","b"]`, + `["a","b"]`, + }, + }, + } + + // match + fa := &filterArrayContains{ + fieldName: "foo", + value: "a", + } + testFilterMatchForColumns(t, columns, fa, "foo", []int{0, 1, 2}) + + fa = &filterArrayContains{ + fieldName: "foo", + value: "b", + } + testFilterMatchForColumns(t, columns, fa, "foo", []int{0, 1, 2}) + + // mismatch + fa = &filterArrayContains{ + fieldName: "foo", + value: "c", + } + testFilterMatchForColumns(t, columns, fa, "foo", nil) + + fa = &filterArrayContains{ + fieldName: "non-existing-column", + value: "a", + } + testFilterMatchForColumns(t, columns, fa, "foo", nil) + }) + + t.Run("dict", func(t *testing.T) { + columns := []column{ + { + name: "foo", + values: []string{ + "", + `["a"]`, + `["b"]`, + `["a","b"]`, + `"a"`, // not an array + `[1,2]`, + }, + }, + } + + // match + fa := &filterArrayContains{ + fieldName: "foo", + value: "a", + } + testFilterMatchForColumns(t, columns, fa, "foo", []int{1, 3}) + + fa = &filterArrayContains{ + fieldName: "foo", + value: "b", + } + testFilterMatchForColumns(t, columns, fa, "foo", []int{2, 3}) + + // mismatch + fa = &filterArrayContains{ + fieldName: "foo", + value: "c", + } + testFilterMatchForColumns(t, columns, fa, "foo", nil) + }) + + t.Run("strings", func(t *testing.T) { + columns := []column{ + { + name: "foo", + values: []string{ + `["apple", "banana"]`, + `["orange"]`, + `not array`, + `["apple"]`, + `[]`, + }, + }, + } + + // match + fa := &filterArrayContains{ + fieldName: "foo", + value: "apple", + } + testFilterMatchForColumns(t, columns, fa, "foo", []int{0, 3}) + + // mismatch + fa = &filterArrayContains{ + fieldName: "foo", + value: "pear", + } + testFilterMatchForColumns(t, columns, fa, "foo", nil) + }) + + // Remove the remaining data files for the test + fs.MustRemoveDir(t.Name()) +} diff --git a/lib/logstorage/parser.go b/lib/logstorage/parser.go index 902df5f14b..8633154ed7 100644 --- a/lib/logstorage/parser.go +++ b/lib/logstorage/parser.go @@ -2007,6 +2007,8 @@ func parseFilterGeneric(lex *lexer, fieldName string) (filter, error) { return parseFilterContainsAll(lex, fieldName) case lex.isKeyword("contains_any"): return parseFilterContainsAny(lex, fieldName) + case lex.isKeyword("array_contains"): + return parseFilterArrayContains(lex, fieldName) case lex.isKeyword("contains_common_case"): return parseFilterContainsCommonCase(lex, fieldName) case lex.isKeyword("eq_field"): @@ -2311,6 +2313,16 @@ func parseFilterContainsAny(lex *lexer, fieldName string) (filter, error) { return parseInValues(lex, fieldName, fi, &fi.values) } +func parseFilterArrayContains(lex *lexer, fieldName string) (filter, error) { + return parseFuncArg(lex, fieldName, func(arg string) (filter, error) { + fa := &filterArrayContains{ + fieldName: getCanonicalColumnName(fieldName), + value: arg, + } + return fa, nil + }) +} + func parseFilterIn(lex *lexer, fieldName string) (filter, error) { fi := &filterIn{ fieldName: getCanonicalColumnName(fieldName), @@ -3803,6 +3815,7 @@ var reservedKeywords = func() map[string]struct{} { // functions "contains_all", "contains_any", + "array_contains", "contains_common_case", "eq_field", "equals_common_case", diff --git a/lib/logstorage/parser_test.go b/lib/logstorage/parser_test.go index 291a1cba6a..ca83ea905a 100644 --- a/lib/logstorage/parser_test.go +++ b/lib/logstorage/parser_test.go @@ -923,6 +923,30 @@ func TestParseFilterContainsAny(t *testing.T) { f(`a:contains_any(* | fields bar)`, `a`, nil) } +func TestParseFilterArrayContains(t *testing.T) { + f := func(s, fieldNameExpected, valueExpected string) { + t.Helper() + q, err := ParseQuery(s) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + fa, ok := q.f.(*filterArrayContains) + if !ok { + t.Fatalf("unexpected filter type; got %T; want *filterArrayContains; filter: %s", q.f, q.f) + } + if fa.fieldName != fieldNameExpected { + t.Fatalf("unexpected fieldName; got %q; want %q", fa.fieldName, fieldNameExpected) + } + if fa.value != valueExpected { + t.Fatalf("unexpected value; got %q; want %q", fa.value, valueExpected) + } + } + + f(`array_contains(foo)`, `_msg`, "foo") + f(`tags:array_contains("prod")`, `tags`, "prod") + f(`array_contains("foo bar,baz")`, `_msg`, "foo bar,baz") +} + func TestParseFilterIPv4Range(t *testing.T) { f := func(s, fieldNameExpected string, minValueExpected, maxValueExpected uint32) { t.Helper() @@ -1612,6 +1636,11 @@ func TestParseQuery_Success(t *testing.T) { f(`contains_all(bar:contains_all(1,2,3) | uniq (x)) | stats count() rows`, `contains_all(bar:contains_all(1,2,3) | uniq by (x)) | stats count(*) as rows`) f(`contains_all((1) | fields z) | stats count() rows`, `contains_all(1 | fields z) | stats count(*) as rows`) + // array_contains filter + f(`array_contains(foo)`, `array_contains(foo)`) + f(`array_contains('foo bar,baz')`, `array_contains("foo bar,baz")`) + f(`tags:array_contains(foo-bar/baz)`, `tags:array_contains("foo-bar/baz")`) + // ipv4_range filter f(`ipv4_range(1.2.3.4, "5.6.7.8")`, `ipv4_range(1.2.3.4, 5.6.7.8)`) f(`foo:ipv4_range(1.2.3.4, "5.6.7.8" , )`, `foo:ipv4_range(1.2.3.4, 5.6.7.8)`)