Skip to content

Commit a18ab8e

Browse files
feat(output)!: unify compression behind a Codec abstraction (zstd/gzip/none)
The three sinks that compress (file, http, s3) each constructed their own zstd::Encoder, hardcoded `.zst`, and hardcoded `Content-Encoding: zstd`. That blocked gzip/plaintext output and meant a user changing the codec would also have to fix the file extension and the wire header by hand. This change pulls compression behind one `Codec` enum that owns format, level, file extension, and wire encoding name. The three per-sink `compression_level: Option<i32>` fields are replaced by one shared `compression: { format, level }` block; the s3 sink's `{prefix_hash}-{seq}` key template and the file sink's hardcoded `{prefix}.zst` are unified around a `path_template` / `key_template` that supports `{ext}` (codec-derived) plus the existing placeholders. Templates that don't carry `{prefix}` or `{prefix_hash}` (and `{seq}` when `batch_max_mb` is set on s3) are rejected at startup; a runtime guard catches residual collisions — fatal for the file sink, warn for s3. The s3 batching docs are rewritten to describe the actual lifecycle — per-prefix encoder, threshold-driven mid-run flush, end-of-run flush — and the buffering/concurrency caveats that make `batch_max_mb` behave unintuitively. Breaking: bare `compression_level` in YAML and `--s3-output-compression-level` on the CLI are removed; use the new `compression` block / `--compression-format` + `--compression-level` instead. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent c0906ca commit a18ab8e

13 files changed

Lines changed: 1632 additions & 212 deletions

File tree

CLAUDE.md

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# bucket-scrapper
22

3-
S3 bucket content searcher. Downloads compressed objects from S3, stream-decompresses, filters lines by regex, and routes matches to one of four pluggable output sinks: local zstd files, an HTTP API (NDJSON), an S3 bucket, or `/dev/null` (void).
3+
S3 bucket content searcher. Downloads compressed objects from S3, stream-decompresses, filters lines by regex, and routes matches to one of four pluggable output sinks: local files, an HTTP API (NDJSON), an S3 bucket, or `/dev/null` (void). The file/http/s3 sinks share a single `Codec` abstraction (zstd / gzip / none) so the on-disk extension, the wire `Content-Encoding`, and the encoder bytes stay in lockstep.
44

55
## Architecture
66

@@ -13,21 +13,23 @@ S3 GetObject stream (semaphore-bounded, range-based resume on retries)
1313
→ line_ch (flume bounded)
1414
→ filter_worker pool (spawn_blocking, regex via grep-matcher)
1515
→ Arc<dyn OutputSink>::ingest(prefix, line)
16-
├─ FileOutputSink → SharedFileWriter (per-prefix zstd files)
16+
├─ FileOutputSink → SharedFileWriter (per-prefix Codec-encoded files)
1717
├─ HttpOutputSink → compressor pool → uploader pool → HTTP POST (AIMD)
18-
├─ S3OutputSink → per-prefix zstd batches → uploader pool → PutObject
18+
├─ S3OutputSink → per-prefix Codec-encoded batches → uploader pool → PutObject
1919
└─ VoidOutputSink → atomic counters only (benchmarking)
2020
```
2121

2222
Key modules:
2323
- `src/pipeline/orchestrator.rs` — pipeline orchestrator: download → decompress → filter → sink
2424
- `src/pipeline/output.rs``OutputSink` trait + `OutputStats`
25+
- `src/pipeline/codec.rs` — output codec (zstd / gzip / none) + `CompressionConfig` + `CodecEncoder<W>`
26+
- `src/pipeline/path_template.rs``{prefix}` / `{prefix_hash}` / `{seq}` / `{run_id}` / `{ext}` template renderer + `CollisionTracker`, shared by file and s3 sinks
2527
- `src/pipeline/http_writer.rs` / `http_sink.rs` — HTTP output internals + sink adapter
2628
- `src/pipeline/streaming_writer.rs` / `file_sink.rs` — file output internals + sink adapter
27-
- `src/pipeline/s3_writer.rs` — S3 output sink with per-prefix zstd batching
29+
- `src/pipeline/s3_writer.rs` — S3 output sink with per-prefix batching
2830
- `src/pipeline/void_writer.rs` — no-op sink with atomic counters
2931
- `src/pipeline/observer.rs` — observer primitives: `PipelineObserver`, `ChannelObserver`, `DownloadObserver`
30-
- `src/config/output.rs``OutputConfig` tagged enum + `${ENV}` interpolation
32+
- `src/config/output.rs``OutputConfig` tagged enum + `${ENV}` interpolation + template/codec validation
3133
- `src/config/resolve.rs` — selects config-driven vs CLI-driven mode (mixing is a hard error)
3234
- `src/matcher.rs``LineMatcher`: stateless regex wrapper around `grep-matcher`
3335
- `src/progress.rs` — periodic structured-log progress reports with bottleneck detection

README.md

Lines changed: 80 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -104,13 +104,14 @@ Pick exactly one sink, either via the config `outputs:` block or via CLI flags.
104104

105105
### File sink
106106

107-
Per-prefix zstd-compressed files under `dir`:
107+
Per-prefix files under `dir`. Default codec is zstd:3, default filename is `{prefix}.{ext}`:
108108

109109
```yaml
110110
outputs:
111111
- type: file
112112
dir: ./scrapper-output
113-
# compression_level: 3
113+
# path_template: "{prefix}.{ext}"
114+
# compression: { format: zstd, level: 3 }
114115
```
115116

116117
Or via CLI:
@@ -121,7 +122,7 @@ bucket-scrapper -s 2024-01-15T10:00:00Z --output file --output-dir ./scrapper-ou
121122

122123
### HTTP sink
123124

124-
NDJSON-zstd POSTs to an HTTP endpoint, with adaptive (AIMD) throttling and 429 back-off:
125+
NDJSON POSTs to an HTTP endpoint with adaptive (AIMD) throttling and 429 back-off. `Content-Encoding` is set automatically from the codec (`zstd` / `gzip`) or omitted entirely when `compression.format = none`:
125126

126127
```yaml
127128
outputs:
@@ -130,6 +131,7 @@ outputs:
130131
bearer_auth: ${HTTP_BEARER_AUTH} # ${ENV} interpolation supported
131132
timeout_secs: 30
132133
batch_max_mb: 2
134+
# compression: { format: zstd, level: 3 }
133135
```
134136

135137
Or via CLI (URL and token can also come from `HTTP_URL` / `HTTP_BEARER_AUTH`):
@@ -143,22 +145,51 @@ bucket-scrapper -s 2024-01-15T10:00:00Z \
143145

144146
### S3 sink
145147

146-
Per-prefix zstd objects written to a destination S3 bucket (works with non-AWS backends Garage, MinIOvia `endpoint_url`). By default each source prefix collapses to exactly one output object (N:1, same shape as the file sink). Set `batch_max_mb` to opt into size-based rollover within a prefix:
148+
Per-prefix objects written to a destination S3 bucket. Works with non-AWS backends (Garage, MinIO) via `endpoint_url`.
147149

148150
```yaml
149151
outputs:
150152
- type: s3
151153
bucket: my-results-bucket
152-
key_template: "results/{prefix}/{run_id}-{seq}.ndjson.zst"
153-
# batch_max_mb: 16 # optional; omit for one object per source prefix
154+
key_template: "results/{prefix}/{run_id}-{seq}.ndjson.{ext}"
155+
# batch_max_mb: 16 # opt into batched uploads — see below
156+
# compression: { format: zstd, level: 3 }
154157
```
155158

156-
Output mapping summary (file and S3 sinks):
159+
#### Batching model
160+
161+
The s3 sink uploads **batches**. One batch is one `PutObject` call. Every batch carries lines from a single source prefix; there is no cross-prefix mixing or consolidation. The configurable axis is *how many batches per source prefix*.
162+
163+
**Per-prefix encoder lifecycle.** When the first matched line for a source prefix arrives, the sink lazily creates an in-memory encoder for that prefix (zstd / gzip / identity, per `compression.format`). Subsequent matched lines for the same prefix are written into that encoder. A prefix that produces no matches has no encoder and emits no object. Different prefixes have independent encoders and never block each other.
164+
165+
**Default mode (`batch_max_mb` unset).** Each prefix's encoder is finalized exactly once, at end-of-run. The finalized buffer is enqueued for upload. Result: one output object per source prefix, with `{seq}` always rendering to `00000`. This is the same N:1 shape as the file sink.
166+
167+
**Batched mode (`batch_max_mb` set).** After every line ingested for a prefix, the sink reads the **compressed bytes already emitted into that prefix's encoder output buffer** and compares it to `batch_max_mb`. When the buffer crosses the threshold:
168+
169+
1. The encoder is finalized in place, producing a finished compressed frame.
170+
2. That frame is rendered into a destination key (`{seq}` substituted, then `{seq}` is incremented for the next batch in this prefix) and pushed onto the bounded upload queue.
171+
3. A fresh encoder is constructed for the same prefix; subsequent lines start filling it.
172+
173+
End-of-run runs an unconditional flush over every prefix that still has a non-empty encoder, producing a final batch (the trailing partial). Each prefix's `{seq}` therefore yields a contiguous `00000`, `00001`, … sequence whose count depends only on how many threshold crossings happened plus one closing flush.
174+
175+
**Why upload size can exceed `batch_max_mb`.** The threshold check sees only the compressed bytes the encoder has already flushed to its output buffer — it doesn't include lines still buffered inside the codec, nor does it preemptively split a line that pushes the buffer over the line. In practice each batch lands a little above the threshold rather than at it.
176+
177+
**Why a small `batch_max_mb` may not produce extra batches.** zstd and gzip both buffer internally and only emit compressed bytes when they have enough data to encode efficiently. A trickle of small, highly-compressible lines can keep the encoder's *output* buffer at zero for a long time even though many lines have been ingested — so no threshold crossing fires, and end-of-run produces a single batch. To exercise batched mode you need either enough input volume per prefix to force several internal block flushes, or `compression.format: none` (where the output buffer grows monotonically with input size).
178+
179+
**Concurrency and ordering.** Batches go onto a bounded queue drained by N uploader tasks (`upload_tasks`, defaults to ~`cpu/4`). Uploaders run concurrently, so batches within a prefix can land out of order on S3. `{seq}` reflects the order in which the sink *finalized* the batch, not the order S3 finished receiving it. End-of-run waits for the queue to drain before reporting completion.
180+
181+
**Failure model.** Recoverable upload errors (transient 5xx, throttling) surface as `error!` logs but don't stop the pipeline; the run continues with the next batch. Non-recoverable errors (malformed request, auth failure) flip the sink's fatal flag, the download coordinator stops feeding new work, and the run aborts. Lost batches are counted into `OutputStats.lines_dropped`. Multipart uploads are not yet implemented — every batch is a single `PutObject`, so a batch must fit AWS's 5 GB single-PUT cap (the `multipart_threshold_mb` / `multipart_part_mb` config fields are accepted but currently informational, with a startup warning if you set them away from defaults).
182+
183+
**`{seq}` is required when `batch_max_mb` is set.** Without it, every batch within a prefix would render to the same key and silently overwrite the previous one. Startup rejects such configs.
184+
185+
**`{seq}` and reruns.** `{seq}` is an in-memory counter — it resets to 0 on every process start. Two runs that hit the same prefix and template produce overlapping `{seq}` values; rely on `{run_id}` (unique per process invocation) to disambiguate them in the destination key.
186+
187+
#### Output mapping summary
157188

158189
| Sink | Default | With `batch_max_mb` set |
159190
|------|---------|-------------------------|
160-
| `file` | one `.zst` file per source prefix (always) | n/a — file sink has no rollover |
161-
| `s3` | one object per source prefix | one or more objects per prefix, rolling over at the threshold |
191+
| `file` | one file per source prefix (always) | n/a — file sink has no batched uploads |
192+
| `s3` | one object per source prefix, `{seq}=00000` | one or more objects per prefix; new batch finalized whenever the encoder's compressed output buffer crosses `batch_max_mb`, plus one trailing batch at end-of-run |
162193

163194
Cross-prefix consolidation (e.g. one daily file across all hours) is not currently supported.
164195

@@ -171,6 +202,42 @@ outputs:
171202
- type: void
172203
```
173204

205+
### Codecs
206+
207+
The `file`, `http`, and `s3` sinks share a single compression block:
208+
209+
```yaml
210+
compression:
211+
format: zstd | gzip | none # default: zstd
212+
level: <int> # codec-specific; omit for the codec default
213+
```
214+
215+
| Format | Levels | Default | File extension | Wire `Content-Encoding` |
216+
|--------|---------|---------|----------------|-------------------------|
217+
| `zstd` | 1–22 | 3 | `.zst` | `zstd` |
218+
| `gzip` | 0–9 | 6 | `.gz` | `gzip` |
219+
| `none` | n/a | n/a | (none) | (header omitted) |
220+
221+
Setting `level` when `format: none` is rejected at startup. So is an out-of-range level for the chosen format.
222+
223+
### Path templates and collisions
224+
225+
Both `file` (`path_template`) and `s3` (`key_template`) render the per-prefix output destination from a string with these placeholders:
226+
227+
| Placeholder | Meaning |
228+
|------------------|----------------------------------------------------------------------------------|
229+
| `{prefix}` | Source S3 prefix verbatim (e.g. `logs/dt=20240315/hour=09`) |
230+
| `{prefix_hash}` | 8-char hex hash of the prefix — useful when the raw prefix has awkward characters |
231+
| `{run_id}` | 8-char hex unique to this process invocation |
232+
| `{seq}` | Per-prefix zero-padded sequence number — **s3 only**, incremented on each batch finalize when `batch_max_mb` is set |
233+
| `{ext}` | Codec extension (`zst`, `gz`, or empty); the leading `.` is dropped when empty |
234+
235+
The template **must contain `{prefix}` or `{prefix_hash}`** — without one, every source prefix renders to the same destination. For the file sink this is fatal (two encoders writing to one file would corrupt the output) and is rejected at startup. For the s3 sink it's also rejected at startup, with a runtime warn-and-overwrite as defence in depth against `{prefix_hash}` collisions.
236+
237+
When `batch_max_mb` is set on the s3 sink, the template must additionally contain `{seq}`; otherwise every batch within a prefix would render to the same key and overwrite the previous — also a startup error.
238+
239+
Anti-pattern (rejected): `path_template: "results.{ext}"` — same path for every prefix.
240+
174241
## AWS Authentication
175242

176243
Standard AWS SDK credential chain: environment variables, `~/.aws/credentials`, IAM role, or `aws sso login`. Custom CA bundles via `AWS_CA_BUNDLE`.
@@ -213,7 +280,10 @@ Usage: bucket-scrapper [OPTIONS] --start <START>
213280
| `--http-batch-max-mb` | 2 | Max batch size (MB) |
214281
| `--http-timeout` | 30 | Request timeout (seconds) |
215282
| `--s3-output-bucket` | | Destination bucket (s3 output) |
216-
| `--s3-output-key-template` | | Key template; supports `{prefix}`, `{prefix_hash}`, `{seq}`, `{run_id}` |
283+
| `--s3-output-key-template` | | Key template; supports `{prefix}`, `{prefix_hash}`, `{seq}`, `{run_id}`, `{ext}` |
284+
| `--output-path-template` | `{prefix}.{ext}` | File-sink path template; same placeholders minus `{seq}` |
285+
| `--compression-format` | `zstd` | `zstd`, `gzip`, or `none` |
286+
| `--compression-level` | codec default | zstd 1–22 / gzip 0–9; unset for `none` |
217287
218288
### AIMD throttle
219289
@@ -234,7 +304,6 @@ Usage: bucket-scrapper [OPTIONS] --start <START>
234304
| `--max-retries` | 10 | Download retry attempts |
235305
| `--retry-delay` | 2 | Initial retry delay (seconds) |
236306
| `--progress-interval` | 3 | Progress report interval (seconds) |
237-
| `--compression-level` | 3 | Zstd level (1-22) |
238307
| `--memory-limit-gb` | 0 | Memory limit via setrlimit (0 = none) |
239308
| `--client-max-age` | 60 | S3 client max age (minutes) |
240309
| `--http-line-channel-size` | 1000 | Line channel before compressors |

sample-config.yaml

Lines changed: 70 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -72,12 +72,29 @@ region: eu-west-3
7272
# secrets stay out of the YAML.
7373

7474
outputs:
75-
# ── File output: per-prefix zstd files under `dir` ───────────────────────
75+
# ── File output: per-prefix files under `dir` ────────────────────────────
76+
#
77+
# `compression`: { format: zstd|gzip|none, level: <int> }. Omit the block
78+
# to default to zstd:3. `level` ranges: zstd 1–22, gzip 0–9. Must be
79+
# unset when format=none.
80+
#
81+
# `path_template` is the per-prefix filename (joined with `dir`).
82+
# Placeholders: `{prefix}`, `{prefix_hash}`, `{run_id}`, `{ext}` (codec
83+
# extension — `zst`/`gz`/empty). Default `{prefix}.{ext}` matches the
84+
# historic layout. The template MUST contain `{prefix}` or
85+
# `{prefix_hash}`; otherwise distinct source prefixes collide and the
86+
# second one fails with a fatal error (two encoders cannot share a file).
7687
- type: file
7788
dir: ./scrapper-output
78-
# compression_level: 3 # optional, 1–22
89+
# path_template: "{prefix}.{ext}"
90+
# compression:
91+
# format: zstd
92+
# level: 3
7993

80-
# ── HTTP output: NDJSON-zstd POSTs with AIMD throttle ────────────────────
94+
# ── HTTP output: NDJSON POSTs with AIMD throttle ─────────────────────────
95+
#
96+
# `Content-Encoding` follows `compression.format` automatically (zstd /
97+
# gzip), or is omitted entirely when format=none.
8198
# - type: http
8299
# url: https://logs.example.com/api/v1/logs
83100
# bearer_auth: ${HTTP_BEARER_AUTH}
@@ -87,25 +104,68 @@ outputs:
87104
# upload_tasks: null # null = 4 × compressor_tasks
88105
# upload_channel_size: 4
89106
# line_channel_size: 1000
90-
# compression_level: 3
107+
# compression:
108+
# format: zstd
109+
# level: 3
91110
# max_retries: 3
92111
# max_upload_rate_mbps: 0 # 0 = unlimited
93112
# aimd:
94113
# decrease_factor: 0.15
95114
# increase_mbps: 1.0
96115
# max_submission_time_s: 4.0 # 0 = AIMD disabled
97116

98-
# ── S3 output: per-prefix zstd objects ───────────────────────────────────
99-
# By default each source prefix collapses to exactly one output object
100-
# (N:1, same shape as the file sink). Set `batch_max_mb` to opt into
101-
# size-based rollover within a prefix.
117+
# ── S3 output: per-prefix objects ────────────────────────────────────────
118+
#
119+
# Batching model. The s3 sink uploads "batches"; one batch is one
120+
# PutObject call. Every batch carries lines from a single source
121+
# prefix — no cross-prefix mixing. The configurable axis is how many
122+
# batches per source prefix.
123+
#
124+
# Default mode (no `batch_max_mb`). One encoder per source prefix,
125+
# finalized once at end-of-run → exactly one PutObject per prefix.
126+
# `{seq}` is always 00000.
127+
#
128+
# Batched mode (`batch_max_mb` set). After every ingested line the
129+
# sink reads the per-prefix encoder's compressed output buffer; when
130+
# it crosses the threshold, that encoder is finalized, the resulting
131+
# frame is rendered into a key (with `{seq}` substituted) and pushed
132+
# onto the upload queue, and a fresh encoder is started for the same
133+
# prefix with `{seq}` += 1. End-of-run runs one final flush per
134+
# prefix to capture the trailing partial. So a prefix that crosses
135+
# the threshold N times produces N+1 batches (00000..N).
136+
#
137+
# Caveats:
138+
# - The threshold is checked against compressed bytes, not plaintext,
139+
# so actual upload size sits a little above `batch_max_mb`.
140+
# - zstd/gzip buffer internally and only flush blocks periodically.
141+
# A small `batch_max_mb` paired with highly-compressible input may
142+
# never trigger a mid-run flush — every line gets buffered inside
143+
# the codec and end-of-run produces a single batch. Use
144+
# `compression.format: none` if you want size-driven batching with
145+
# predictable thresholds.
146+
# - `multipart_threshold_mb` / `multipart_part_mb` are accepted but
147+
# not yet implemented; every batch is a single PutObject (5 GB cap).
148+
#
149+
# When `batch_max_mb` is set, `key_template` MUST contain `{seq}`,
150+
# otherwise every batch within a prefix would render to the same key
151+
# and silently overwrite the previous one (rejected at startup).
152+
#
153+
# `key_template` placeholders: `{prefix}`, `{prefix_hash}`, `{seq}`,
154+
# `{run_id}`, `{ext}` (codec extension). MUST contain `{prefix}` or
155+
# `{prefix_hash}` — otherwise distinct source prefixes write to the same
156+
# destination key. Static rejection at startup; a runtime warn fires
157+
# as defence in depth (e.g. if two prefixes happen to share a
158+
# `{prefix_hash}`). `{run_id}` is unique per process invocation; use
159+
# it to disambiguate reruns, since `{seq}` resets to 0 each time.
102160
# - type: s3
103161
# bucket: my-results-bucket
104162
# region: eu-west-3 # optional
105163
# endpoint_url: null # optional
106-
# key_template: "results/{prefix}/{run_id}-{seq}.ndjson.zst"
164+
# key_template: "results/{prefix}/{run_id}-{seq}.ndjson.{ext}"
107165
# # batch_max_mb: 16 # optional; omit for one object per prefix
108-
# compression_level: 3
166+
# compression:
167+
# format: zstd
168+
# level: 3
109169
# multipart_threshold_mb: 64 # informational; multipart not yet implemented
110170
# multipart_part_mb: 16
111171
# upload_tasks: null # null = auto

0 commit comments

Comments
 (0)