Skip to content

Commit 9f19982

Browse files
feat(adapter): ✨ add pluggable Redis Streams event sink (#223)
## Summary Introduce a pluggable event sink architecture that allows operators to choose Redis Streams, Lode, or both as the destination for granular runtime events. Events flow through a `FanoutEventSink` that dispatches to configured sinks concurrently, with per-sink delivery guarantees (`mandatory` or `best_effort`). Artifact chunks always route to Lode regardless of event sink configuration. ## Highlights - **`EventSink` interface** — narrow event-only abstraction (`WriteEvents` + `Close`), separate from the full `policy.Sink` which also handles artifact chunks - **`FanoutEventSink`** — concurrent dispatch to N sinks with per-sink delivery mode; mandatory sink failures propagate, best-effort failures are logged - **`CompositeSink`** — routes events→FanoutEventSink, chunks→Lode, satisfying `policy.Sink` so all three policies (strict, buffered, streaming) work unchanged - **`redisstream` package** — Redis Streams implementation using pipelined `XADD` with `MAXLEN~` approximate trimming, exponential backoff retries, and configurable TTL - **`run_completed` unification** — when Redis sink is enabled, a terminal `run_completed` event is also published to the stream, making it a single event source for consumers - **Backward compatible** — no `events` config → Lode-only behavior, identical to today ## Test plan - [x] `policy/` tests: FanoutEventSink (single, multi, mandatory propagation, best-effort swallow, close, panic on empty) - [x] `policy/` tests: CompositeSink (event routing, chunk routing, error propagation, close, nil panics) - [x] `adapter/redisstream/` tests: WriteEvents with miniredis verification, empty batch, custom stream key, retry exhaustion, context cancellation, config validation, defaults, close - [x] CLI parity test passes with 9 new flag entries in `CLI_PARITY.json` - [x] Full `go test ./...` — all 17 packages pass - [ ] Manual: `quarry run --event-sink redis --event-sink-redis-url redis://localhost:6379` publishes events to stream - [ ] Manual: `quarry run --event-sink lode --event-sink redis ...` writes to both sinks concurrently - [ ] Manual: verify `run_completed` appears in Redis stream after run finishes Closes #222 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent c523f8c commit 9f19982

17 files changed

Lines changed: 2080 additions & 39 deletions

docs/ARCH_INDEX.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,7 @@ Go module root. Contains runtime, CLI, and core types.
175175

176176
- `adapter/` — event-bus adapter boundary for publishing run completion notifications
177177
- `adapter/redis/` — Redis pub/sub adapter with exponential backoff retry
178+
- `adapter/redisstream/` — Redis Streams event sink for real-time event publishing during runs
178179
- `adapter/webhook/` — HTTP POST adapter with retryable and non-retriable error handling
179180

180181
### quarry/iox/

docs/CLI_PARITY.json

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,69 @@
230230
"description": "Pub/sub channel name for Redis adapter (default: quarry:run_completed)",
231231
"dependsOn": ["adapter=redis"]
232232
},
233+
"event-sink": {
234+
"type": "string_slice",
235+
"required": false,
236+
"description": "Event sink type (repeatable: lode, redis). Default: lode only",
237+
"validation": "Each value must be one of: lode, redis"
238+
},
239+
"event-sink-redis-url": {
240+
"type": "string",
241+
"required": false,
242+
"description": "Redis URL for event sink (required when --event-sink includes redis)",
243+
"dependsOn": ["event-sink=redis"]
244+
},
245+
"event-sink-redis-stream-key": {
246+
"type": "string",
247+
"required": false,
248+
"default": "quarry:events",
249+
"description": "Redis Stream key for events",
250+
"dependsOn": ["event-sink=redis"]
251+
},
252+
"event-sink-redis-max-len": {
253+
"type": "int64",
254+
"required": false,
255+
"default": 100000,
256+
"description": "Approximate MAXLEN for Redis Stream trimming",
257+
"dependsOn": ["event-sink=redis"]
258+
},
259+
"event-sink-redis-ttl": {
260+
"type": "duration",
261+
"required": false,
262+
"default": "24h0m0s",
263+
"description": "Redis Stream key expiry",
264+
"dependsOn": ["event-sink=redis"]
265+
},
266+
"event-sink-redis-timeout": {
267+
"type": "duration",
268+
"required": false,
269+
"default": "2s",
270+
"description": "Per-write timeout for Redis event sink",
271+
"dependsOn": ["event-sink=redis"]
272+
},
273+
"event-sink-redis-retries": {
274+
"type": "int",
275+
"required": false,
276+
"default": 2,
277+
"description": "Retry attempts for Redis event sink",
278+
"dependsOn": ["event-sink=redis"]
279+
},
280+
"event-sink-redis-delivery": {
281+
"type": "string",
282+
"required": false,
283+
"default": "mandatory",
284+
"description": "Delivery mode for Redis event sink (mandatory, best_effort)",
285+
"validation": "Must be one of: mandatory, best_effort",
286+
"dependsOn": ["event-sink=redis"]
287+
},
288+
"event-sink-lode-delivery": {
289+
"type": "string",
290+
"required": false,
291+
"default": "mandatory",
292+
"description": "Delivery mode for Lode event sink (mandatory, best_effort)",
293+
"validation": "Must be one of: mandatory, best_effort",
294+
"dependsOn": ["event-sink=lode"]
295+
},
233296
"depth": {
234297
"type": "int",
235298
"required": false,

docs/contracts/CONTRACT_CLI.md

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,35 @@ See CONTRACT_INTEGRATION.md for semantics.
224224
Adapter invocation is best-effort. Failures are logged to stderr.
225225
The run exit code is determined by execution outcome, never by adapter status.
226226

227+
### Event Sink Flags (v0.13.0+)
228+
229+
`quarry run` supports optional event sink configuration for real-time
230+
event delivery during a run. See CONTRACT_INTEGRATION.md §Event Sink Model
231+
for semantics.
232+
233+
| Flag | Type | Default | Description |
234+
|------|------|---------|-------------|
235+
| `--event-sink` | string (repeatable) | | Event sink type (`lode`, `redis`) |
236+
| `--event-sink-lode-delivery` | string | `mandatory` | Delivery mode for Lode event sink |
237+
| `--event-sink-redis-url` | string | | Redis URL (required when `--event-sink redis`) |
238+
| `--event-sink-redis-stream-key` | string | `quarry:events` | Redis Streams key |
239+
| `--event-sink-redis-max-len` | int | `100000` | Max stream length (-1 disables) |
240+
| `--event-sink-redis-ttl` | duration | `24h` | Stream key TTL (-1 disables) |
241+
| `--event-sink-redis-timeout` | duration | `2s` | Per-operation timeout |
242+
| `--event-sink-redis-retries` | int | `2` | Retry attempts |
243+
| `--event-sink-redis-delivery` | string | `mandatory` | Delivery mode for Redis event sink |
244+
245+
Semantics:
246+
- `--event-sink` is repeatable. Each value adds a sink to the fan-out.
247+
- When no `--event-sink` flags are set, events go to Lode only (backward
248+
compatible).
249+
- Duplicate sink types are rejected.
250+
- `--event-sink-redis-*` flags require `--event-sink redis`.
251+
- Delivery modes: `mandatory` (failure may fail the run) or `best_effort`
252+
(failure logged, run continues).
253+
- Config file: `events.sinks[]` array in YAML.
254+
- Fan-out child runs inherit the parent's event sink configuration.
255+
227256
### Fan-Out Flags (v0.6.0+)
228257

229258
`quarry run` supports optional derived work execution via enqueue events.

docs/contracts/CONTRACT_INTEGRATION.md

Lines changed: 135 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,9 @@ This is a contract document. Implementations must conform.
99

1010
## Scope
1111

12-
- Defines the distinction between notification adapters and orchestration integrations.
12+
- Defines the distinction between notification adapters, event sinks, and orchestration integrations.
1313
- Defines the adapter boundary and ownership.
14+
- Defines the event sink boundary, delivery semantics, and failure behavior.
1415
- Defines CLI/config selection rules.
1516
- Defines delivery semantics and required observability.
1617

@@ -24,12 +25,19 @@ Non-goals:
2425

2526
## Integration Paradigms
2627

27-
Quarry supports two distinct integration paradigms:
28+
Quarry supports three distinct integration paradigms:
2829

29-
| Paradigm | Relationship to Runtime | Direction | Examples |
30-
|----------|------------------------|-----------|----------|
31-
| Notification Adapter | Invoked **by** runtime after a run | Runtime → External | Webhook, Redis, NATS, SNS |
32-
| Orchestration Integration | **Wraps** the runtime as embedded activity | External → Runtime | Temporal |
30+
| Paradigm | Relationship to Runtime | Timing | Direction | Examples |
31+
|----------|------------------------|--------|-----------|----------|
32+
| Event Sink | Receives events **during** a run | In-run | Runtime → External | Redis Streams, Lode |
33+
| Notification Adapter | Invoked **by** runtime after a run | Post-run | Runtime → External | Webhook, Redis Pub/Sub, NATS, SNS |
34+
| Orchestration Integration | **Wraps** the runtime as embedded activity | External | External → Runtime | Temporal |
35+
36+
**Event sinks** receive events in real time as the executor produces them.
37+
They implement the `EventSink` interface, are selected via `--event-sink`
38+
flags, and support per-sink delivery semantics (`mandatory` or
39+
`best_effort`). The runtime owns their lifecycle. Event sinks are NOT
40+
adapters and are NOT selected via `--adapter` flags.
3341

3442
**Notification adapters** are in-process modules that fire after a run
3543
completes. They implement the `Adapter` interface, are selected via
@@ -68,20 +76,101 @@ are separate binaries that depend on the runtime as a library.
6876
6977
---
7078

79+
## Event Sink Model (v0.13.0+)
80+
81+
Event sinks receive events **during** the run through the ingestion policy
82+
path. They are structurally distinct from notification adapters:
83+
84+
- Adapters fire once, after a run completes and data is persisted.
85+
- Event sinks receive every event as it is ingested, in real time.
86+
87+
Event sinks are in-repo modules that implement the `EventSink` interface
88+
(`WriteEvents`, `Close`). The runtime owns their lifecycle and selection.
89+
90+
### Scope Boundary
91+
92+
- Event sinks handle **events only**. Artifact chunks always go to Lode
93+
regardless of event sink configuration.
94+
- Event sinks are not adapters. They are not selected via `--adapter` flags.
95+
- Event sinks are selected via `--event-sink` and related flags.
96+
- When no event sinks are configured, events are written to Lode only
97+
(identical to pre-v0.13.0 behavior).
98+
99+
### Delivery Semantics
100+
101+
Each event sink declares a **delivery mode**:
102+
103+
| Mode | Behavior |
104+
|------|----------|
105+
| `mandatory` | Write failure propagates through the policy and may fail the run. |
106+
| `best_effort` | Write failure is logged to stderr and swallowed. The run continues. |
107+
108+
Default delivery mode is `mandatory`.
109+
110+
The runtime dispatches events to all configured sinks concurrently via a
111+
fan-out dispatcher. Mandatory failures from any sink are collected and
112+
returned as a combined error. Best-effort failures are observable but do
113+
not contribute to the error result.
114+
115+
### Failure Behavior
116+
117+
- A `mandatory` sink failure propagates through the ingestion policy. Under
118+
`StrictPolicy`, this fails the run with `policy_failure`.
119+
- A `best_effort` sink failure is logged to stderr with the sink label and
120+
error. The run continues unaffected.
121+
- Close errors from any sink are collected; close always attempts all sinks.
122+
123+
### Fan-Out Inheritance
124+
125+
Child runs (via `--depth > 0`) inherit the parent run's event sink
126+
configuration. All configured sinks are propagated to child runs.
127+
128+
### Available Event Sinks
129+
130+
| Sink | Package | Description |
131+
|------|---------|-------------|
132+
| Lode | `quarry/lode` | Default event persistence (always available) |
133+
| Redis Streams | `quarry/adapter/redisstream` | Real-time event streaming via XADD |
134+
135+
### Redis Streams Sink
136+
137+
The Redis Streams sink publishes events to a shared Redis Stream key using
138+
pipelined XADD commands.
139+
140+
Configuration defaults:
141+
- Stream key: `quarry:events`
142+
- Max length: `100000` (approximate trimming via `MAXLEN~`)
143+
- TTL: `24h` (applied once on first successful write via EXPIRE)
144+
- Timeout: `2s` per operation
145+
- Retries: `2` with exponential backoff (500ms × 2^attempt)
146+
147+
Sentinel values:
148+
- `max_len: -1` disables stream trimming.
149+
- `ttl: -1` disables key expiry.
150+
- `max_len: 0` and `ttl: 0` apply defaults.
151+
152+
Each stream entry contains flat fields: `run_id`, `event_type`, `seq`,
153+
`timestamp`, `source`, `category`, and `payload` (JSON-encoded).
154+
155+
---
156+
71157
## Selection and Configuration
72158

73159
### Selection
74-
- Adapter selection is runtime-owned and CLI/config-driven.
75-
- Per-run selection via `quarry run --adapter <type>` flags is the baseline.
160+
- Adapter and event sink selection is runtime-owned and CLI/config-driven.
161+
- Per-run selection via `quarry run --adapter <type>` or `--event-sink <type>`
162+
flags is the baseline.
76163
- Global defaults via config are optional and additive.
77-
- No silent fallback to a different adapter is permitted.
164+
- No silent fallback to a different adapter or event sink is permitted.
78165
- If `--adapter` is not set, no notification is sent.
166+
- If `--event-sink` is not set, events are written to Lode only.
79167

80168
### Configuration
81-
- Adapters must accept configuration only from CLI/config inputs.
169+
- Adapters and event sinks must accept configuration only from CLI/config inputs.
82170
- Sensitive fields must be redacted from logs and output.
171+
- Duplicate sink types are rejected (at most one sink of each type).
83172

84-
### CLI Flags
173+
### Adapter CLI Flags
85174

86175
| Flag | Description |
87176
|------|-------------|
@@ -92,11 +181,41 @@ are separate binaries that depend on the runtime as a library.
92181
| `--adapter-timeout <duration>` | Notification timeout (default `10s`) |
93182
| `--adapter-retries <n>` | Retry attempts (default `3`) |
94183

184+
### Event Sink CLI Flags (v0.13.0+)
185+
186+
| Flag | Description |
187+
|------|-------------|
188+
| `--event-sink <type>` | Event sink type (`lode`, `redis`); repeatable |
189+
| `--event-sink-lode-delivery <mode>` | Delivery mode for Lode sink (default `mandatory`) |
190+
| `--event-sink-redis-url <url>` | Redis URL (required when `--event-sink redis` is set) |
191+
| `--event-sink-redis-stream-key <key>` | Stream key (default `quarry:events`) |
192+
| `--event-sink-redis-max-len <n>` | Max stream length; -1 disables (default `100000`) |
193+
| `--event-sink-redis-ttl <duration>` | Key TTL; -1 disables (default `24h`) |
194+
| `--event-sink-redis-timeout <duration>` | Per-operation timeout (default `2s`) |
195+
| `--event-sink-redis-retries <n>` | Retry attempts (default `2`) |
196+
| `--event-sink-redis-delivery <mode>` | Delivery mode for Redis sink (default `mandatory`) |
197+
198+
Config file equivalent (`events.sinks[]` array in YAML):
199+
```yaml
200+
events:
201+
sinks:
202+
- type: lode
203+
delivery: mandatory
204+
- type: redis
205+
delivery: best_effort
206+
url: redis://localhost:6379
207+
stream_key: quarry:events
208+
max_len: 100000
209+
ttl: 24h
210+
timeout: 2s
211+
retries: 2
212+
```
213+
95214
---
96215
97-
## Delivery Semantics
216+
## Adapter Delivery Semantics
98217
99-
- Delivery is **best-effort with retries**. The adapter retries on
218+
- Adapter delivery is **best-effort with retries**. The adapter retries on
100219
transient failures (5xx, network errors) but may ultimately fail.
101220
A failed publish is logged to stderr; the run outcome is unaffected.
102221
- On success, delivery may be duplicated (retries after ambiguous
@@ -106,6 +225,9 @@ Adapters must not:
106225
- alter the event payload,
107226
- silently drop events without observable failure.
108227

228+
Event sink delivery semantics are defined per-sink; see
229+
[Event Sink Model](#event-sink-model-v0130).
230+
109231
---
110232

111233
## Strategy Surface (v0.3.0+)

0 commit comments

Comments
 (0)