Skip to content

Commit 670b2ac

Browse files
authored
Add save matches to separate files, start/end line numbers (#33)
* Add start/end line * Add save matches to separate files
1 parent c4fb54c commit 670b2ac

File tree

8 files changed

+205
-112
lines changed

8 files changed

+205
-112
lines changed

.golangci.yml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,10 @@ run:
55
linters:
66
default: all
77
disable:
8+
- funlen
9+
- gocyclo
10+
- gocognit
11+
- err113
812
- embeddedstructfieldcheck
913
- testpackage
1014
- noinlineerr

cmd/catp/README.md

Lines changed: 33 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,9 @@ catp [OPTIONS] PATH ...
2929
write first 10 seconds of CPU profile to file
3030
-dbg-mem-prof string
3131
write heap profile to file after 10 seconds
32+
-end-line int
33+
stop printing lines at this line (exclusive),
34+
default is 0 (no limit), each input file is counted separately
3235
-l count lines
3336
-no-progress
3437
disable progress printing
@@ -57,20 +60,25 @@ catp [OPTIONS] PATH ...
5760
write current progress to a file
5861
-rate-limit float
5962
output rate limit lines per second
63+
-save-matches value
64+
save matches of previous filter group to file
6065
-skip value
6166
filter matching, may contain multiple AND patterns separated by ^,
6267
if filter matches, line is removed from the output (may be kept if it passed preceding -pass)
6368
for example, you can use "-skip quux^baz -skip fooO" to skip lines that have (quux AND baz) OR fooO
6469
-skip-csv value
6570
filter matching, loads skip params from CSV file,
6671
each line is treated as -skip, each column value is AND condition.
72+
-start-line int
73+
start printing lines from this line (inclusive),
74+
default is 0 (first line), each input file is counted separately
6775
-version
6876
print version and exit
6977
```
7078

7179
## Examples
7280

73-
Feed a file into `jq` field extractor with progress printing.
81+
### Feed a file into `jq` field extractor with progress printing
7482

7583
```
7684
catp get-key.log | jq .context.callback.Data.Nonce > get-key.jq
@@ -84,11 +92,13 @@ get-key.log: 96.8% bytes read, 967819 lines processed, 8064.9 l/s, 41.8 MB/s, el
8492
get-key.log: 100.0% bytes read, 1000000 lines processed, 8065.7 l/s, 41.8 MB/s, elapsed 2m3.98s, remaining 0s
8593
```
8694

95+
### Parallel scan of multiple files
96+
8797
Run log filtering (lines containing `foo bar` or `baz`) on multiple files in background (with `screen`) and output to a
8898
new compressed file.
8999

90100
```
91-
screen -dmS foo12 ./catp -output ~/foo-2023-07-12.log.zst -pass "foo bar" -pass "baz" /home/logs/server-2023-07-12*
101+
screen -dmS foo12 ./catp -parallel 20 -output ~/foo-2023-07-12.log.zst -pass "foo bar" -pass "baz" /home/logs/server-2023-07-12*
92102
```
93103

94104
```
@@ -108,14 +118,31 @@ all: 32.3% bytes read, /home/logs/server-2023-07-12-09-00.log_6.zst: 5.1% bytes
108118
# detaching from screen with ctrl+a+d
109119
```
110120

111-
Filter based on large list of needles. Values from allow and block lists are loaded into high-performance
112-
[Aho Corasick](https://en.wikipedia.org/wiki/Aho%E2%80%93Corasick_algorithm) indexes.
121+
### Filter based on large list of needles
122+
123+
Values from allow and block lists are loaded into high-performance
124+
[Aho Corasick](https://en.wikipedia.org/wiki/Aho%E2%80%93Corasick_algorithm) indexes.
113125

114126
```
115127
catp -pass-csv allowlist.csv -skip-csv blocklist.csv -pass-any -output filtered.log.zst source.log.zst
116128
```
117129

118130
Each source line would follow the filtering pipeline:
119-
* if `allowlist.csv` has at least one row, all cells of which are present in the source line, source line gets into output
120-
* if not, but if `blocklist.csv` has at least one row, all cells of which are present in the source line, source line is skipped
131+
132+
* if `allowlist.csv` has at least one row, all cells of which are present in the source line, source line gets into
133+
output
134+
* if not, but if `blocklist.csv` has at least one row, all cells of which are present in the source line, source line is
135+
skipped
121136
* if not, source line gets into output because of `-pass-any`
137+
138+
### Split matches into separate files
139+
140+
```
141+
catp -pass foo -save-matches foo.log.zst -pass bar^baz -save-matches 2.gz -pass qux -pass quux -output other.log input.log
142+
```
143+
144+
Pipeline:
145+
* each line from `input.log` is being read
146+
* lines that contain `foo` are stored to `foo.log.zst`
147+
* lines that contain `bar` and `baz` (but not `foo` that was already matched) are stored to `2.gz`
148+
* lines that contain `qux` or `quux` are stored to `other.log`

cmd/catp/catp/app.go

Lines changed: 44 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import (
77
"errors"
88
"flag"
99
"fmt"
10-
"io"
1110
"log"
1211
"os"
1312
"os/signal"
@@ -21,13 +20,21 @@ import (
2120

2221
"github.com/bool64/dev/version"
2322
"github.com/bool64/progress"
24-
gzip "github.com/klauspost/pgzip"
2523
)
2624

2725
// Main is the entry point for catp CLI tool.
2826
func Main(options ...func(o *Options)) error { //nolint:funlen,cyclop,gocognit,gocyclo,maintidx
2927
r := &runner{}
3028

29+
var closers []func() error
30+
defer func() {
31+
for _, closer := range closers {
32+
if err := closer(); err != nil {
33+
log.Printf("failed to close: %s\n", err.Error())
34+
}
35+
}
36+
}()
37+
3138
flag.Var(flagFunc(func(v string) error {
3239
r.filters.addFilter(true, bytes.Split([]byte(v), []byte("^"))...)
3340

@@ -62,6 +69,17 @@ func Main(options ...func(o *Options)) error { //nolint:funlen,cyclop,gocognit,g
6269
"if filter matches, line is removed from the output (may be kept if it passed preceding -pass)\n"+
6370
"for example, you can use \"-skip quux^baz -skip fooO\" to skip lines that have (quux AND baz) OR fooO")
6471

72+
flag.Var(flagFunc(func(v string) error {
73+
w, closer, err := makeWriter(v)
74+
if err != nil {
75+
return err
76+
}
77+
78+
closers = append(closers, closer)
79+
80+
return r.filters.saveTo(w)
81+
}), "save-matches", "save matches of previous filter group to file")
82+
6583
flag.IntVar(&r.parallel, "parallel", 1, "number of parallel readers if multiple files are provided\n"+
6684
"lines from different files will go to output simultaneously (out of order of files, but in order of lines in each file)\n"+
6785
"use 0 for multi-threaded zst decoder (slightly faster at cost of more CPU)")
@@ -79,8 +97,13 @@ func Main(options ...func(o *Options)) error { //nolint:funlen,cyclop,gocognit,g
7997
"files will be written to out dir with original base names\n"+
8098
"disables output flag")
8199

100+
flag.IntVar(&r.startLine, "start-line", 0, "start printing lines from this line (inclusive),\n"+
101+
"default is 0 (first line), each input file is counted separately")
102+
flag.IntVar(&r.endLine, "end-line", 0, "stop printing lines at this line (exclusive),\n"+
103+
"default is 0 (no limit), each input file is counted separately")
104+
82105
flag.Usage = func() {
83-
fmt.Println("catp", version.Module("github.com/bool64/progress").Version+",",
106+
fmt.Println("catp", version.Module("github.com/bool64/progress").Version+r.options.VersionLabel+",",
84107
version.Info().GoVersion, strings.Join(versionExtra, " "))
85108
fmt.Println()
86109
fmt.Println("catp prints contents of files to STDOUT or dir/file output, \n" +
@@ -94,20 +117,6 @@ func Main(options ...func(o *Options)) error { //nolint:funlen,cyclop,gocognit,g
94117
}
95118
flag.Parse()
96119

97-
r.filters.buildIndex()
98-
99-
if *ver {
100-
fmt.Println(version.Module("github.com/bool64/progress").Version)
101-
102-
return nil
103-
}
104-
105-
if flag.NArg() == 0 {
106-
flag.Usage()
107-
108-
return nil
109-
}
110-
111120
if *cpuProfile != "" {
112121
startProfiling(*cpuProfile, *memProfile)
113122

@@ -122,6 +131,20 @@ func Main(options ...func(o *Options)) error { //nolint:funlen,cyclop,gocognit,g
122131
}
123132
}
124133

134+
r.filters.buildIndex()
135+
136+
if *ver {
137+
fmt.Println(version.Module("github.com/bool64/progress").Version + r.options.VersionLabel)
138+
139+
return nil
140+
}
141+
142+
if flag.NArg() == 0 {
143+
flag.Usage()
144+
145+
return nil
146+
}
147+
125148
var files []string
126149

127150
args := flag.Args()
@@ -158,61 +181,21 @@ func Main(options ...func(o *Options)) error { //nolint:funlen,cyclop,gocognit,g
158181
sort.Strings(files)
159182

160183
if *output != "" && r.outDir == "" {
161-
fn := *output
162-
163-
out, err := os.Create(fn) //nolint:gosec
184+
w, closer, err := makeWriter(*output)
164185
if err != nil {
165-
return fmt.Errorf("failed to create output file %s: %w", fn, err)
186+
return err
166187
}
167188

168-
r.output = out
169-
compCloser := io.Closer(io.NopCloser(nil))
170-
171-
switch {
172-
case strings.HasSuffix(fn, ".gz"):
173-
gw := gzip.NewWriter(r.output)
174-
compCloser = gw
175-
176-
r.output = gw
177-
case strings.HasSuffix(fn, ".zst"):
178-
zw, err := zstdWriter(r.output)
179-
if err != nil {
180-
return fmt.Errorf("zstd new writer: %w", err)
181-
}
182-
183-
compCloser = zw
184-
185-
r.output = zw
186-
}
187-
188-
w := bufio.NewWriterSize(r.output, 64*1024)
189189
r.output = w
190190

191-
defer func() {
192-
if err := w.Flush(); err != nil {
193-
log.Fatalf("failed to flush STDOUT buffer: %s", err)
194-
}
195-
196-
if err := compCloser.Close(); err != nil {
197-
log.Fatalf("failed to close compressor: %s", err)
198-
}
199-
200-
if err := out.Close(); err != nil {
201-
log.Fatalf("failed to close output file %s: %s", *output, err)
202-
}
203-
}()
191+
closers = append(closers, closer)
204192
} else {
205193
if isStdin {
206194
r.output = os.Stdout
207195
} else {
208196
w := bufio.NewWriterSize(os.Stdout, 64*1024)
209197
r.output = w
210-
211-
defer func() {
212-
if err := w.Flush(); err != nil {
213-
log.Fatalf("failed to flush STDOUT buffer: %s", err)
214-
}
215-
}()
198+
closers = append(closers, w.Flush)
216199
}
217200
}
218201

0 commit comments

Comments
 (0)