Skip to content

Commit 7513a6b

Browse files
fix(s3-output)!: stream uploads via TM PartStream, bound peak RAM
The S3 sink used to accumulate every per-prefix batch in a single `CodecEncoder<Vec<u8>>` for the lifetime of the run (default mode) or until `batch_max_mb` crossed. On long runs this OOM'd: with N prefixes and M MB matched per prefix, peak RAM = N × M before the first byte was uploaded. Replace the buffer with a streaming pipeline: filter worker → CodecEncoder<ChannelWriter> → mpsc(Bytes, cap=2) → EncoderPartStream → tm::io::InputStream::from_part_stream → TM multipart upload `ChannelWriter` is a sync `io::Write` that ships each codec block as a `Bytes` chunk through a tokio mpsc. `EncoderPartStream` impls TM's `PartStream`, aggregating up to `multipart_part_mb` before yielding one `PartData` per poll; the trailing partial is marked `is_last(true)` on channel EOF. Peak resident per active prefix drops to ~`channel_cap × block_size + part_size` (sub-10 MB at default settings) regardless of total batch size. PrefixBatch now holds `upload: Option<ActiveUpload>` — opened lazily on the first matched line (no empty MPUs), closed on `bytes_sent >= batch_max_mb` or end-of-run. Each open spawns a driver task that joins the TM handle and folds per-batch counts into global counters (or `lines_dropped` + `fatal` on failure). `finish()` closes any trailing uploads on a blocking thread (codec frame trailer writes hit `ChannelWriter::write` → `blocking_send`, which panics from a runtime worker) and awaits all driver handles. Trade-offs documented in the module header: - Always-MPU. `from_part_stream` is MPU-only; sub-multipart_threshold batches now use 3 S3 API calls instead of 1. Functional only. - No full-batch retry. Once compressed bytes leave the encoder they're gone — TM's per-part retries are the only retry. The sink already treated failed uploads as dropped batches, so this is a non-regression. Removed dead pool plumbing as part of the rewrite: - S3OutputConfig.upload_tasks (and the YAML knob) - OutputCli.s3_upload_tasks - --s3-output-upload-tasks CLI flag - UploadJob struct, upload_tx flume queue, uploader_task HTTP sink's `upload_tasks` is unaffected — that path still uses a worker pool. Instrumentation: `Inner.peak_inflight_bytes` (sink-global high-water of bytes resident in mpsc + reader pending across all active uploads) is now surfaced in `OutputStats.extras` so the e2e suite can assert it. `report_completion` emits `extras` as a JSON string instead of Debug repr so consumers (and tests) can parse it. Test coverage (test-first): - s3_output_does_not_buffer_whole_run_in_memory — bulky fixture (200K matches × 2 prefixes ≈ 56 MB plaintext), default mode, asserts peak_inflight_bytes ≤ 12 MB. Pre-fix: 61 MB resident → fails. Post-fix: passes well under cap. - Existing s3_output_end_to_end_{no_batching, batched, gzip, plaintext, multipart, json_array} all still pass against the streaming path. - New unit tests on ChannelWriter (inflight/peak/bytes_sent accounting, BrokenPipe on reader drop). BREAKING: --s3-output-upload-tasks CLI flag and S3OutputConfig.upload_tasks removed (the per-prefix streaming model has no worker pool to size). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 7b3f9cf commit 7513a6b

7 files changed

Lines changed: 840 additions & 289 deletions

File tree

src/config/output.rs

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -165,8 +165,6 @@ pub struct S3OutputConfig {
165165
/// integer for an explicit cap.
166166
#[serde(default)]
167167
pub multipart_concurrency: Option<usize>,
168-
#[serde(default)]
169-
pub upload_tasks: Option<usize>,
170168
/// Object body framing. Default `json_lines` writes NDJSON. Set
171169
/// `kind: json_array` to upload a single JSON array per batch. The
172170
/// object extension is still driven by the codec — override
@@ -482,7 +480,6 @@ mystery: 42
482480
multipart_threshold_mb: 5,
483481
multipart_part_mb: 5,
484482
multipart_concurrency: None,
485-
upload_tasks: None,
486483
format: OutputFormat::default(),
487484
});
488485
validate_output(&cfg).unwrap();
@@ -526,7 +523,6 @@ mystery: 42
526523
multipart_threshold_mb: 5,
527524
multipart_part_mb: 5,
528525
multipart_concurrency: None,
529-
upload_tasks: None,
530526
format: OutputFormat::default(),
531527
});
532528
let err = validate_output(&cfg).unwrap_err();
@@ -545,7 +541,6 @@ mystery: 42
545541
multipart_threshold_mb: 5,
546542
multipart_part_mb: 5,
547543
multipart_concurrency: None,
548-
upload_tasks: None,
549544
format: OutputFormat::default(),
550545
});
551546
let err = validate_output(&cfg).unwrap_err();
@@ -632,7 +627,6 @@ dir: /tmp/out
632627
multipart_threshold_mb: threshold_mb,
633628
multipart_part_mb: part_mb,
634629
multipart_concurrency: concurrency,
635-
upload_tasks: None,
636630
format: OutputFormat::default(),
637631
})
638632
}

src/config/resolve.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,6 @@ pub struct OutputCli {
7878
pub s3_multipart_threshold_mb: Option<u64>,
7979
pub s3_multipart_part_mb: Option<u64>,
8080
pub s3_multipart_concurrency: Option<usize>,
81-
pub s3_upload_tasks: Option<usize>,
8281

8382
// shared
8483
pub compression_format: Option<CodecFormat>,
@@ -150,7 +149,6 @@ impl OutputCli {
150149
s3_multipart_concurrency,
151150
"--s3-output-multipart-concurrency"
152151
);
153-
check!(s3_upload_tasks, "--s3-output-upload-tasks");
154152
check!(compression_format, "--compression-format");
155153
check!(compression_level, "--compression-level");
156154
check!(file_path_template, "--output-path-template");
@@ -271,7 +269,6 @@ fn build_from_cli(cli: &OutputCli) -> Result<OutputConfig> {
271269
multipart_threshold_mb: cli.s3_multipart_threshold_mb.unwrap_or(5),
272270
multipart_part_mb: cli.s3_multipart_part_mb.unwrap_or(5),
273271
multipart_concurrency: cli.s3_multipart_concurrency,
274-
upload_tasks: cli.s3_upload_tasks,
275272
format: cli_format(cli),
276273
})
277274
}
@@ -337,7 +334,6 @@ fn reject_inactive_flags(kind: OutputKind, cli: &OutputCli) -> Result<()> {
337334
s3_multipart_concurrency,
338335
"--s3-output-multipart-concurrency"
339336
);
340-
reject_unless!(is_s3, s3_upload_tasks, "--s3-output-upload-tasks");
341337
reject_unless!(is_file, file_path_template, "--output-path-template");
342338

343339
if !bad.is_empty() {

src/main.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -264,10 +264,6 @@ struct Cli {
264264
/// pass a positive integer for an explicit cap.
265265
#[arg(long)]
266266
s3_output_multipart_concurrency: Option<usize>,
267-
268-
/// Number of concurrent uploader tasks for the s3 output.
269-
#[arg(long)]
270-
s3_output_upload_tasks: Option<usize>,
271267
}
272268

273269
/// CLI mirror of [`CodecFormat`] — kept separate so clap's `ValueEnum`
@@ -321,7 +317,6 @@ impl Cli {
321317
s3_multipart_threshold_mb: self.s3_output_multipart_threshold_mb,
322318
s3_multipart_part_mb: self.s3_output_multipart_part_mb,
323319
s3_multipart_concurrency: self.s3_output_multipart_concurrency,
324-
s3_upload_tasks: self.s3_output_upload_tasks,
325320
compression_format: self.compression_format.map(Into::into),
326321
compression_level: self.compression_level,
327322
file_path_template: self.output_path_template.clone(),
@@ -814,7 +809,11 @@ fn report_completion(
814809
compression_ratio = compression_ratio,
815810
plaintext_mbps = if elapsed_s > 0.0 { plaintext_mb / elapsed_s } else { 0.0 },
816811
pattern = cli.line_pattern_regex.as_deref().unwrap_or("(all lines)"),
817-
extras = ?stats.extras,
812+
// Render `extras` as a JSON object string so downstream log
813+
// consumers (and the e2e tests) can parse it back; the previous
814+
// `?` Debug format produced an ad-hoc Rust-style repr that
815+
// wasn't machine-readable.
816+
extras = %serde_json::to_string(&stats.extras).unwrap_or_else(|_| "{}".to_string()),
818817
"Search completed"
819818
);
820819
}

src/pipeline/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ pub mod observer;
77
pub mod orchestrator;
88
pub mod output;
99
pub mod path_template;
10+
pub mod s3_streaming;
1011
pub mod s3_writer;
1112
pub mod streaming_writer;
1213
pub mod void_writer;

0 commit comments

Comments
 (0)