Skip to content

Commit 2e2c02b

Browse files
committed
merge eviction hook into remove_sealed, static backend selection
1 parent 4521a37 commit 2e2c02b

1 file changed

Lines changed: 37 additions & 12 deletions

File tree

dial9-tokio-telemetry/design/in-memory-pipeline.md

Lines changed: 37 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ Let dial9 run with no filesystem dependency. Users with plenty of RAM, or runnin
88

99
The existing lifecycle is disk-mediated end to end. Flush thread writes encoded batches into `trace.N.bin.active`, writer renames to `trace.N.bin` on rotation, worker polls the directory every second, reads the file back into memory and runs the processor pipeline.
1010

11-
A single `Fs` trait covers the writer + worker boundary. `DiskFs` wraps `std::fs` for the rotating disk path. `MemFs` keeps active bytes in a per-path `Vec<u8>` map and routes sealed bytes through an internal channel to the worker. The trait covers the full segment lifecycle: write-side (`create`, `seal`, `remove_*`) and read-side (`take_files`, `wait_for_more`, `writer_done`, `mark_writer_done`, `record_writer_eviction`).
11+
A single `Fs` trait covers the writer + worker boundary. `DiskFs` wraps `std::fs` for the rotating disk path. `MemFs` keeps active bytes in a per-path `Vec<u8>` map and routes sealed bytes through an internal channel to the worker. The trait covers the full segment lifecycle: write-side (`create`, `seal`, `remove_*`) and read-side (`take_files`, `wait_for_more`, `writer_done`, `mark_writer_done`).
1212

1313
**Core principle:** the disk path stays the default and unchanged. Memory mode is one constructor swap on the existing writer builder.
1414

@@ -76,11 +76,16 @@ pub trait Fs: Send + Sync + 'static {
7676
/// stripping the `.active` suffix. Memory: drain the active buffer
7777
/// and push to the internal channel.
7878
fn seal(&self, active: &Path, index: u32) -> io::Result<SegmentRef>;
79-
/// Eviction-side removal. Disk: unlinks the sealed file plus any
80-
/// extension-renamed siblings (e.g. `.gz` from `WriteBackProcessor`)
81-
/// and drops the claim entry. Memory: no-op (sealed bytes already
82-
/// left `MemFs` via `seal`).
83-
fn remove_sealed(&self, seg: &SegmentRef) -> io::Result<()>;
79+
/// Remove a sealed segment. `reason` separates writer backpressure
80+
/// eviction (counts toward `dropped_segments`) from worker terminal
81+
/// cleanup after a non-retryable failure (does not, since that is a
82+
/// processing failure tracked via `SegmentProcessMetrics`). Disk:
83+
/// unlinks the sealed file plus extension-renamed siblings (e.g.
84+
/// `.gz` from `WriteBackProcessor`), drops the claim entry, and
85+
/// bumps `dropped_segments` iff `reason == Eviction`. Memory: no-op
86+
/// (bytes left `MemFs` via `seal`; memory eviction is counted in
87+
/// the channel regardless of `reason`).
88+
fn remove_sealed(&self, seg: &SegmentRef, reason: RemoveReason) -> io::Result<()>;
8489
fn remove_active(&self, path: &Path) -> io::Result<()>;
8590

8691
// --- Read side ---
@@ -96,9 +101,16 @@ pub trait Fs: Send + Sync + 'static {
96101
fn writer_done(&self) -> bool { false }
97102
/// Writer finalize hook. Memory: sets writer_done + notify.
98103
fn mark_writer_done(&self) {}
99-
/// `RotatingWriter::evict_oldest` hook. Disk: bumps the
100-
/// dropped-segments counter surfaced via `TakenFiles`.
101-
fn record_writer_eviction(&self) {}
104+
}
105+
106+
pub enum RemoveReason {
107+
/// Writer shed this segment for backpressure (`evict_oldest`).
108+
/// Counts toward `dropped_segments`.
109+
Eviction,
110+
/// Worker removed this after a terminal pipeline state
111+
/// (non-retryable failure, retry-budget exhaustion, panic).
112+
/// Not a backpressure drop.
113+
Terminal,
102114
}
103115
```
104116

@@ -141,15 +153,28 @@ Holds `Arc<dyn Fs>`. Disk constructors wire `DiskFs::from_base_path(&trace_path)
141153

142154
Eviction lives where it is natural:
143155

144-
- **Disk:** writer holds `closed_files: VecDeque<(SegmentRef, u64)>`. After every rotation, `evict_oldest` pops the front and calls `Fs::remove_sealed` until the byte budget is satisfied. Identical to today.
156+
- **Disk:** writer holds `closed_files: VecDeque<(SegmentRef, u64)>`. After every rotation, `evict_oldest` pops the front and calls `Fs::remove_sealed(seg, RemoveReason::Eviction)` until the byte budget is satisfied. The `Eviction` reason bumps `dropped_segments`. Worker terminal cleanup calls `remove_sealed(seg, RemoveReason::Terminal)`, which unlinks without counting (a processing failure, not backpressure).
145157
- **Memory:** the channel enforces the byte budget. Push adds the segment, then drops oldest while `queued_bytes` is over `max_total_size`. The ring also has a slot cap (about `max_total_size / 4 KB` plus a bit of headroom) as a safety net for unusually small segments. `closed_files` stays empty.
146158

159+
### Backend selection
160+
161+
The writer dispatches to its backend statically through the Mode typestate from section 4. `WriterMode` gains an associated backend type:
162+
163+
```rust
164+
pub trait WriterMode { type Fs: Fs; }
165+
impl WriterMode for Disk { type Fs = DiskFs; }
166+
impl WriterMode for Memory { type Fs = MemFs; }
167+
```
168+
169+
`RotatingWriter<Mode>` holds `Arc<Mode::Fs>`. The worker loop is a generic function confined to the worker module, monomorphized once per backend and never named in a public signature. The recorder builder knows the backend from the typestate and spawns the matching worker directly.
170+
147171
### Rejected alternatives
148172

149173
- **`MemoryWriter` as a sibling `TraceWriter` impl.** Duplicates encoder/rotation/metadata/drain-timer logic. Tests still need `TempDir`. Single `RotatingWriter` over `Arc<dyn Fs>` is simpler.
150174
- **Split write-side and read-side traits (`Fs` + `SegmentSource`).** Disk read-side is stateful (claim-set dedup) so the state would have to be shared across both traits anyway. Single trait keeps the lifecycle in one place.
151175
- **Eager payload load in `take_files`.** Reads every unclaimed file into RAM on each scan. First drain after boot or recovery scales with backlog size. Lazy `TakenSegment::load` bounds peak in-flight memory to one segment.
152-
- **`Fs::Writer` as an associated type.** `dyn Fs` not object-safe with associated types and `Arc<dyn Fs>` is needed at the recorder/worker boundary. Cost of `Box<dyn Write + Send>`: one `Box::new(File)` per rotation, one vtable call per ~8 KB BufWriter drain. Dominated by the syscall.
176+
- **`Fs::Writer` as an associated type.** `dyn Fs` not object-safe with associated types.
177+
- **`Arc<dyn Fs>` dynamic dispatch everywhere.** A vtable hop per call. Free on `DiskFs` where the syscall dominates, but pure overhead on the `MemFs` hot path. Static dispatch through the Mode typestate (see Backend selection) avoids it without leaking a type parameter.
153178
- **Sync `mpsc<()>(1)` for wakeup.** Already shuttle-shimmed, but blocking `recv()` would stall the current-thread worker, `spawn_blocking` per wait churns the thread pool.
154179
- **`Mutex<VecDeque<MemSealedSegment>>` for queue.** Adds a sync mutex on a path other crate queues keep lock-free.
155180
- **`crossbeam_queue::SegQueue`.** Unbounded, no eviction primitive.
@@ -364,7 +389,7 @@ Memory mode keeps bytes in process heap that disk kept on disk. Regressions are
364389

365390
- `QueuedCount` / `QueuedBytes`: segments visible to the backend but not returned this cycle. Reserved for bounded-take semantics, 0 in steady state today.
366391
- `InFlightCount` / `InFlightBytes`: segments claimed but not yet released by last-stage cleanup or `remove_sealed`. Rising values mean the pipeline is not shedding work fast enough.
367-
- `DroppedSegments`: backend-side evictions. Disk: `RotatingWriter::evict_oldest` (via `Fs::record_writer_eviction`). Memory: channel byte-budget plus slot-cap.
392+
- `DroppedSegments`: backend-side evictions. Disk: `remove_sealed(_, Eviction)` from `evict_oldest`. Memory: channel byte-budget plus slot-cap.
368393
- `SegmentsDispatched`: segments handed into the pipeline this cycle.
369394

370395
Fires every cycle, drained-empty included, so a stuck pipeline shows climbing `InFlightBytes` with `SegmentsDispatched == 0`. `ChannelReceiver` keeps direct accessors for at-cadence sampling.

0 commit comments

Comments
 (0)