Skip to content

Commit 7a080c4

Browse files
authored
Optimized filtering (#31)
* Add Aho Corasick index for pass/skip filters * Optimize performance * Update README, refresh default.pgo
1 parent 2aea9b5 commit 7a080c4

File tree

9 files changed

+220
-49
lines changed

9 files changed

+220
-49
lines changed

.golangci.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ linters-settings:
2222
linters:
2323
enable-all: true
2424
disable:
25+
- testpackage
2526
- gocognit
2627
- gocyclo
2728
- funlen

cmd/catp/README.md

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -44,18 +44,26 @@ catp [OPTIONS] PATH ...
4444
use 0 for multi-threaded zst decoder (slightly faster at cost of more CPU) (default 1)
4545
-pass value
4646
filter matching, may contain multiple AND patterns separated by ^,
47-
if filter matches, line is passed to the output (unless filtered out by -skip)
48-
each -pass value is added with OR logic,
49-
for example, you can use "-pass bar^baz -pass foo" to only keep lines that have (bar AND baz) OR foo
47+
if filter matches, line is passed to the output (may be filtered out by preceding -skip)
48+
other -pass values are evaluated if preceding pass/skip did not match,
49+
for example, you can use "-pass bar^baz -pass foo -skip fo" to only keep lines that have (bar AND baz) OR foo, but not fox
50+
-pass-any
51+
finishes matching and gets the value even if previous -pass did not match,
52+
if previous -skip matched, the line would be skipped any way.
53+
-pass-csv value
54+
filter matching, loads pass params from CSV file,
55+
each line is treated as -pass, each column value is AND condition.
5056
-progress-json string
5157
write current progress to a file
5258
-rate-limit float
5359
output rate limit lines per second
5460
-skip value
5561
filter matching, may contain multiple AND patterns separated by ^,
56-
if filter matches, line is removed from the output (even if it passed -pass)
57-
each -skip value is added with OR logic,
62+
if filter matches, line is removed from the output (may be kept if it passed preceding -pass)
5863
for example, you can use "-skip quux^baz -skip fooO" to skip lines that have (quux AND baz) OR fooO
64+
-skip-csv value
65+
filter matching, loads skip params from CSV file,
66+
each line is treated as -skip, each column value is AND condition.
5967
-version
6068
print version and exit
6169
```
@@ -77,10 +85,10 @@ get-key.log: 100.0% bytes read, 1000000 lines processed, 8065.7 l/s, 41.8 MB/s,
7785
```
7886

7987
Run log filtering (lines containing `foo bar` or `baz`) on multiple files in background (with `screen`) and output to a
80-
new file.
88+
new compressed file.
8189

8290
```
83-
screen -dmS foo12 ./catp -output ~/foo-2023-07-12.log -pass "foo bar" -pass "baz" /home/logs/server-2023-07-12*
91+
screen -dmS foo12 ./catp -output ~/foo-2023-07-12.log.zst -pass "foo bar" -pass "baz" /home/logs/server-2023-07-12*
8492
```
8593

8694
```
@@ -100,3 +108,14 @@ all: 32.3% bytes read, /home/logs/server-2023-07-12-09-00.log_6.zst: 5.1% bytes
100108
# detaching from screen with ctrl+a+d
101109
```
102110

111+
Filter based on large list of needles. Values from allow and block lists are loaded into high-performance
112+
[Aho Corasick](https://en.wikipedia.org/wiki/Aho%E2%80%93Corasick_algorithm) indexes.
113+
114+
```
115+
catp -pass-csv allowlist.csv -skip-csv blocklist.csv -pass-any -output filtered.log.zst source.log.zst
116+
```
117+
118+
Each source line would follow the filtering pipeline:
119+
* if `allowlist.csv` has at least one row, all cells of which are present in the source line, source line gets into output
120+
* if not, but if `blocklist.csv` has at least one row, all cells of which are present in the source line, source line is skipped
121+
* if not, source line gets into output because of `-pass-any`

cmd/catp/catp/app.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ func Main(options ...func(o *Options)) error { //nolint:funlen,cyclop,gocognit,g
2929
r := &runner{}
3030

3131
flag.Var(flagFunc(func(v string) error {
32-
r.filters = append(r.filters, filter{pass: true, and: bytes.Split([]byte(v), []byte("^"))})
32+
r.filters.addFilter(true, bytes.Split([]byte(v), []byte("^"))...)
3333

3434
return nil
3535
}), "pass", "filter matching, may contain multiple AND patterns separated by ^,\n"+
@@ -44,7 +44,7 @@ func Main(options ...func(o *Options)) error { //nolint:funlen,cyclop,gocognit,g
4444

4545
flag.BoolFunc("pass-any", "finishes matching and gets the value even if previous -pass did not match,\n"+
4646
"if previous -skip matched, the line would be skipped any way.", func(s string) error {
47-
r.filters = append(r.filters, filter{pass: true})
47+
r.filters.addPassAny()
4848

4949
return nil
5050
})
@@ -55,7 +55,7 @@ func Main(options ...func(o *Options)) error { //nolint:funlen,cyclop,gocognit,g
5555
"each line is treated as -skip, each column value is AND condition.")
5656

5757
flag.Var(flagFunc(func(v string) error {
58-
r.filters = append(r.filters, filter{pass: false, and: bytes.Split([]byte(v), []byte("^"))})
58+
r.filters.addFilter(false, bytes.Split([]byte(v), []byte("^"))...)
5959

6060
return nil
6161
}), "skip", "filter matching, may contain multiple AND patterns separated by ^,\n"+
@@ -94,6 +94,8 @@ func Main(options ...func(o *Options)) error { //nolint:funlen,cyclop,gocognit,g
9494
}
9595
flag.Parse()
9696

97+
r.filters.buildIndex()
98+
9799
if *ver {
98100
fmt.Println(version.Module("github.com/bool64/progress").Version)
99101

cmd/catp/catp/catp.go

Lines changed: 6 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package catp
22

33
import (
44
"bufio"
5-
"bytes"
65
"context"
76
"encoding/csv"
87
"encoding/json"
@@ -45,7 +44,7 @@ type runner struct {
4544
currentBytesUncompressed int64
4645
currentLines int64
4746

48-
filters []filter
47+
filters filters
4948

5049
currentFile *progress.CountingReader
5150
currentTotal int64
@@ -66,13 +65,7 @@ type runner struct {
6665
hasCompression bool
6766
}
6867

69-
type (
70-
filter struct {
71-
pass bool // Skip is false.
72-
and [][]byte
73-
}
74-
flagFunc func(v string) error
75-
)
68+
type flagFunc func(v string) error
7669

7770
func (f flagFunc) String() string { return "" }
7871
func (f flagFunc) Set(value string) error { return f(value) }
@@ -172,7 +165,7 @@ func (r *runner) st(s progress.Status) string {
172165
atomic.StoreInt64(&r.lastBytesUncompressed, currentBytesUncompressed)
173166
}
174167

175-
if len(r.filters) > 0 || r.options.PrepareLine != nil {
168+
if r.filters.isSet() || r.options.PrepareLine != nil {
176169
m := atomic.LoadInt64(&r.matches)
177170
pr.Matches = &m
178171
res += fmt.Sprintf(", matches %d", m)
@@ -251,7 +244,7 @@ func (r *runner) scanFile(filename string, rd io.Reader, out io.Writer) {
251244

252245
line := s.Bytes()
253246

254-
if !r.shouldWrite(line) {
247+
if !r.filters.shouldWrite(line) {
255248
continue
256249
}
257250

@@ -297,32 +290,6 @@ func (r *runner) scanFile(filename string, rd io.Reader, out io.Writer) {
297290
}
298291
}
299292

300-
func (r *runner) shouldWrite(line []byte) bool {
301-
shouldWrite := true
302-
303-
for _, f := range r.filters {
304-
if f.pass {
305-
shouldWrite = false
306-
}
307-
308-
andMatched := true
309-
310-
for _, andFilter := range f.and {
311-
if !bytes.Contains(line, andFilter) {
312-
andMatched = false
313-
314-
break
315-
}
316-
}
317-
318-
if andMatched {
319-
return f.pass
320-
}
321-
}
322-
323-
return shouldWrite
324-
}
325-
326293
func (r *runner) cat(filename string) (err error) { //nolint:gocyclo
327294
var rd io.Reader
328295

@@ -432,7 +399,7 @@ func (r *runner) cat(filename string) (err error) { //nolint:gocyclo
432399
r.limiter = rate.NewLimiter(rate.Limit(r.rateLimit), 100)
433400
}
434401

435-
if len(r.filters) > 0 || r.parallel > 1 || r.hasOptions || r.countLines || r.rateLimit > 0 {
402+
if r.filters.isSet() || r.parallel > 1 || r.hasOptions || r.countLines || r.rateLimit > 0 {
436403
r.scanFile(filename, rd, out)
437404
} else {
438405
r.readFile(rd, out)
@@ -524,7 +491,7 @@ func (r *runner) loadCSVFilter(fn string, pass bool) error {
524491
and = append(and, []byte(v))
525492
}
526493

527-
r.filters = append(r.filters, filter{pass: pass, and: and})
494+
r.filters.addFilter(pass, and...)
528495
}
529496

530497
return nil

cmd/catp/catp/filter.go

Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
package catp
2+
3+
import (
4+
"bytes"
5+
6+
"github.com/cloudflare/ahocorasick"
7+
)
8+
9+
type (
10+
filterAnd [][]byte
11+
filterGroup struct {
12+
pass bool
13+
ors []filterAnd
14+
15+
// Prefilter checks for match of the first element of any ors item.
16+
// This first element is removed from and.
17+
pre *ahocorasick.Matcher
18+
}
19+
filters struct {
20+
g []*filterGroup
21+
}
22+
)
23+
24+
func (f *filters) buildIndex() {
25+
for _, g := range f.g {
26+
g.buildIndex()
27+
}
28+
}
29+
30+
func (f *filters) isSet() bool {
31+
return len(f.g) > 0
32+
}
33+
34+
func (f *filters) addFilterString(pass bool, and ...string) {
35+
andb := make([][]byte, 0, len(and))
36+
37+
for _, item := range and {
38+
andb = append(andb, []byte(item))
39+
}
40+
41+
f.addFilter(pass, andb...)
42+
}
43+
44+
func (f *filters) addPassAny() {
45+
f.g = append(f.g, &filterGroup{pass: true})
46+
}
47+
48+
func (f *filters) addFilter(pass bool, and ...[]byte) {
49+
if len(and) == 0 {
50+
return
51+
}
52+
53+
var g *filterGroup
54+
55+
// Get current group if exists and has same pass, append new current group with new pass otherwise.
56+
if len(f.g) != 0 {
57+
g = f.g[len(f.g)-1]
58+
59+
if g.pass != pass {
60+
g = &filterGroup{pass: pass}
61+
f.g = append(f.g, g)
62+
}
63+
} else {
64+
// Create and append the very first group.
65+
g = &filterGroup{pass: pass}
66+
f.g = append(f.g, g)
67+
}
68+
69+
g.ors = append(g.ors, and)
70+
}
71+
72+
func (f *filters) shouldWrite(line []byte) bool {
73+
shouldWrite := true
74+
75+
for _, g := range f.g {
76+
if g.pass {
77+
shouldWrite = false
78+
}
79+
80+
matched := g.match(line)
81+
82+
if matched {
83+
return g.pass
84+
}
85+
}
86+
87+
return shouldWrite
88+
}
89+
90+
func (g *filterGroup) match(line []byte) bool {
91+
if g.pre != nil {
92+
ids := g.pre.Match(line)
93+
if len(ids) == 0 {
94+
return false
95+
}
96+
97+
for _, id := range ids {
98+
or := g.ors[id]
99+
100+
andMatched := true
101+
102+
for _, and := range or {
103+
if !bytes.Contains(line, and) {
104+
andMatched = false
105+
106+
break
107+
}
108+
}
109+
110+
if andMatched {
111+
return true
112+
}
113+
}
114+
115+
return false
116+
}
117+
118+
for _, or := range g.ors {
119+
andMatched := true
120+
121+
for _, and := range or {
122+
if !bytes.Contains(line, and) {
123+
andMatched = false
124+
125+
break
126+
}
127+
}
128+
129+
if andMatched {
130+
return true
131+
}
132+
}
133+
134+
return false
135+
}
136+
137+
func (g *filterGroup) buildIndex() {
138+
if g.pre != nil {
139+
return
140+
}
141+
142+
if len(g.ors) < 5 {
143+
return
144+
}
145+
146+
indexItems := make([][]byte, 0, len(g.ors))
147+
for i, or := range g.ors {
148+
indexItems = append(indexItems, or[0])
149+
g.ors[i] = or[1:]
150+
}
151+
152+
g.pre = ahocorasick.NewMatcher(indexItems)
153+
}

cmd/catp/catp/filter_test.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package catp
2+
3+
import (
4+
"bytes"
5+
"os"
6+
"testing"
7+
)
8+
9+
func TestFilter_Match(t *testing.T) {
10+
f := filters{}
11+
12+
f.addFilterString(false, "dbg")
13+
f.addFilterString(true, "linux", "64")
14+
f.addFilterString(true, "windows")
15+
16+
input, err := os.ReadFile("./testdata/release-assets.yml")
17+
if err != nil {
18+
t.Fatal(err)
19+
}
20+
21+
for _, line := range bytes.Split(input, []byte("\n")) {
22+
if f.shouldWrite(line) {
23+
println(string(line))
24+
}
25+
}
26+
}

cmd/catp/default.pgo

-3.83 KB
Binary file not shown.

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ go 1.23.0
55
require (
66
github.com/DataDog/zstd v1.5.7
77
github.com/bool64/dev v0.2.40
8+
github.com/cloudflare/ahocorasick v0.0.0-20240916140611-054963ec9396
89
github.com/klauspost/compress v1.18.0
910
github.com/klauspost/pgzip v1.2.6
1011
golang.org/x/time v0.12.0

0 commit comments

Comments
 (0)