Skip to content

Commit 768960b

Browse files
committed
Add outbox usage guide
Cookbook-style README at lib/outbox/README.md covering connection providers (single + Caqti pool with the error-smuggling trick), publishing inside a UoW, the three consumer flavours (Iter / dispatch / run), a real HTTP webhook example via cohttp-eio, graceful shutdown, retry semantics, and multi-worker partitioning. Replaces the implicit assumption that readers will figure out a Caqti pool wrapper on their own; the test suite already needed one for concurrency > 1.
1 parent b084bed commit 768960b

2 files changed

Lines changed: 359 additions & 1 deletion

File tree

README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@ Domain-Driven Design in a functional style.
1414
- **Outbox** (`ascetic_ddd.outbox`): transactional outbox pattern for
1515
reliable message publishing — Postgres-backed, ordered via `xid8`,
1616
with consumer groups, URI-based partitioning and an effect-handler
17-
iterator.
17+
iterator. See [`lib/outbox/README.md`](./lib/outbox/README.md) for
18+
usage.
1819
- **Saga** (`ascetic_ddd.saga`): routing-slip saga pattern for
1920
long-running workflows with compensation.
2021
- **Specification** (`ascetic_ddd.spec`): specification-pattern DSL with

lib/outbox/README.md

Lines changed: 357 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,357 @@
1+
# Transactional Outbox
2+
3+
Reliable message publishing for systems that need to update a database
4+
**and** publish to external systems atomically.
5+
6+
The pattern: business state changes and outgoing messages are written
7+
to the same Postgres transaction. A separate dispatcher reads committed
8+
messages and forwards them to the broker / HTTP endpoint / whatever.
9+
Either both happen or neither does — no orphan messages, no orphan state.
10+
11+
For the schema and design rationale (xid8 ordering, visibility rules,
12+
consumer groups, URI-based partitioning) see [`init.sql`](./init.sql).
13+
14+
---
15+
16+
## Quick start
17+
18+
```ocaml
19+
open Ascetic_outbox
20+
21+
let () =
22+
Eio_main.run @@ fun env ->
23+
Eio.Switch.run @@ fun sw ->
24+
25+
(* 1. Open a connection (or a pool — see below). *)
26+
let stdenv = (env :> Caqti_eio.stdenv) in
27+
let uri = Uri.of_string "postgresql://user:pass@localhost/app" in
28+
let conn =
29+
match Caqti_eio_unix.connect ~sw ~stdenv uri with
30+
| Ok c -> c
31+
| Error e -> failwith (Format.asprintf "%a" Caqti_error.pp e)
32+
in
33+
34+
(* 2. Build the outbox. *)
35+
let provider = Connection_provider.of_connection conn in
36+
let outbox = Outbox.create ~provider () in
37+
38+
(* 3. Make sure the schema exists (idempotent). *)
39+
let uow = Ascetic_unit_of_work.Caqti_unit_of_work.of_connection conn in
40+
match Outbox.setup outbox uow with
41+
| Error e -> failwith e
42+
| Ok () -> ()
43+
```
44+
45+
`Outbox.publish` adds a message inside the caller's unit of work;
46+
`Outbox.dispatch` / `run` / `Iter` read committed messages back out.
47+
48+
---
49+
50+
## Connection providers
51+
52+
The dispatcher needs to acquire its own connections (separately from
53+
the publisher's UoW), so the outbox holds a `Connection_provider.t`
54+
rather than a single connection. Two common shapes:
55+
56+
### Single connection (development, tests)
57+
58+
```ocaml
59+
let provider = Connection_provider.of_connection conn
60+
```
61+
62+
Fine when there is exactly one fiber driving the dispatcher. **Will
63+
fail at runtime** with `Invalid concurrent usage of PostgreSQL
64+
connection detected` if you set `concurrency > 1` — Caqti rejects
65+
shared use of a single connection from multiple fibers.
66+
67+
### Caqti pool (production, `concurrency > 1`)
68+
69+
`Caqti_eio.Pool.use` constrains its callback's error type to
70+
`[> Caqti_error.t]`, but we want to surface our own `(_, string) result`.
71+
The cleanest way is to smuggle the error through an exception:
72+
73+
```ocaml
74+
let provider_of_pool pool : Connection_provider.t =
75+
(module struct
76+
exception Outbox_error of string
77+
78+
let with_connection f =
79+
try
80+
Caqti_eio.Pool.use
81+
(fun conn ->
82+
match f conn with
83+
| Ok v -> Ok v
84+
| Error msg -> raise (Outbox_error msg))
85+
pool
86+
|> Result.map_error (fun e ->
87+
Format.asprintf "%a" Caqti_error.pp e)
88+
with Outbox_error msg -> Error msg
89+
end)
90+
91+
(* Wire it: *)
92+
let pool =
93+
match Caqti_eio_unix.connect_pool ~sw ~stdenv uri with
94+
| Ok p -> p
95+
| Error e -> failwith (Format.asprintf "%a" Caqti_error.pp e)
96+
in
97+
let outbox = Outbox.create ~provider:(provider_of_pool pool) ()
98+
```
99+
100+
Pool size defaults to a small number; raise it via
101+
`Caqti_pool_config` when you need `concurrency > 1`. Plan for at least
102+
`concurrency + 1` connections — each dispatcher fiber holds one for
103+
its batch, and `ensure_consumer_group` briefly takes another.
104+
105+
---
106+
107+
## Publishing in a unit of work
108+
109+
The whole point of the pattern is atomicity with business state, so
110+
`publish` must run inside the same transaction as everything else.
111+
Sketch with a hand-rolled UoW:
112+
113+
```ocaml
114+
module Uow = Ascetic_unit_of_work.Caqti_unit_of_work
115+
116+
let create_order outbox conn order =
117+
let module C = (val conn : Caqti_eio.CONNECTION) in
118+
let uow = Uow.of_connection conn in
119+
match C.start () with
120+
| Error e -> Error (Format.asprintf "%a" Caqti_error.pp e)
121+
| Ok () ->
122+
let result =
123+
let open Result in
124+
let* () = save_order_row conn order in (* your repository *)
125+
Outbox.publish outbox uow
126+
(Outbox_message.make
127+
~uri:"webhook://order-created"
128+
~payload:(`Assoc [ ("order_id", `String order.id) ])
129+
~metadata:(`Assoc [ ("event_id", `String (Uuidm.to_string (Uuidm.v `V4))) ])
130+
())
131+
in
132+
match result with
133+
| Error e -> Uow.rollback uow; Error e
134+
| Ok () -> Uow.commit uow
135+
```
136+
137+
The message is invisible to dispatchers until commit — readers filter
138+
on `transaction_id < pg_snapshot_xmin(pg_current_snapshot())`.
139+
140+
---
141+
142+
## Consuming: three flavours
143+
144+
### `Iter.iter` (streaming, recommended)
145+
146+
Simplest for "process each message and move on":
147+
148+
```ocaml
149+
let subscriber (msg : Outbox_message.t) =
150+
Logs.info (fun m -> m "got %s" msg.uri);
151+
(* do work; raise / Error → message redelivered *)
152+
in
153+
Outbox.Iter.iter
154+
~clock:(Eio.Stdenv.mono_clock env)
155+
~consumer_group:"my-service"
156+
outbox
157+
subscriber
158+
```
159+
160+
Backed by OCaml 5 effect handlers; each batch is fetched in one
161+
transaction and acks happen per message. Keep the subscriber fast — the
162+
batch transaction stays open for the whole batch.
163+
164+
### `Outbox.dispatch` (manual single-batch)
165+
166+
Useful for cron-style dispatchers or test code:
167+
168+
```ocaml
169+
match Outbox.dispatch outbox subscriber with
170+
| Ok true -> (* processed at least one message *)
171+
| Ok false -> (* nothing pending *)
172+
| Error e -> Logs.err (fun m -> m "%s" e)
173+
```
174+
175+
### `Outbox.run` (callback loop with concurrency)
176+
177+
Long-running daemon with optional fan-out:
178+
179+
```ocaml
180+
let stop = ref false in
181+
Outbox.run
182+
~clock:(Eio.Stdenv.mono_clock env)
183+
~consumer_group:"my-service"
184+
~concurrency:3 (* requires a pool provider! *)
185+
~poll_interval:0.5
186+
~stop:(fun () -> !stop)
187+
outbox
188+
subscriber
189+
```
190+
191+
When `concurrency > 1`, work is partitioned by `hashtext(uri) %% N`,
192+
so messages for the same URI always land on the same worker.
193+
194+
---
195+
196+
## Real example: HTTP webhook publisher
197+
198+
This is the dispatcher process. It reads outbox messages and POSTs
199+
them to the URL embedded in the message URI.
200+
201+
```ocaml
202+
open Ascetic_outbox
203+
204+
let post_webhook ~sw ~client (msg : Outbox_message.t) =
205+
(* uri is e.g. "webhook://hooks.example.com/orders" *)
206+
let path = String.sub msg.uri 10 (String.length msg.uri - 10) in
207+
let target = Uri.of_string (Printf.sprintf "https://%s" path) in
208+
let body =
209+
Cohttp_eio.Body.of_string (Yojson.Safe.to_string msg.payload)
210+
in
211+
let headers =
212+
Cohttp.Header.of_list
213+
[
214+
("Content-Type", "application/json");
215+
( "X-Event-Id",
216+
match msg.metadata with
217+
| `Assoc fs -> (
218+
match List.assoc_opt "event_id" fs with
219+
| Some (`String s) -> s
220+
| _ -> "")
221+
| _ -> "" );
222+
]
223+
in
224+
let resp, body =
225+
Cohttp_eio.Client.post client ~sw ~headers ~body target
226+
in
227+
let _ = Eio.Buf_read.(of_flow ~max_size:1_000_000 body |> take_all) in
228+
let code = Cohttp.Code.code_of_status (Cohttp.Response.status resp) in
229+
if Cohttp.Code.is_success code then Ok ()
230+
else Error (Printf.sprintf "HTTP %d for %s" code (Uri.to_string target))
231+
232+
let () =
233+
Eio_main.run @@ fun env ->
234+
Eio.Switch.run @@ fun sw ->
235+
236+
let stdenv = (env :> Caqti_eio.stdenv) in
237+
let db_uri = Uri.of_string (Sys.getenv "DATABASE_URL") in
238+
let pool =
239+
Caqti_eio_unix.connect_pool ~sw ~stdenv db_uri
240+
|> Result.fold ~ok:Fun.id ~error:(fun e ->
241+
failwith (Format.asprintf "%a" Caqti_error.pp e))
242+
in
243+
let outbox =
244+
Outbox.create ~provider:(provider_of_pool pool) ()
245+
in
246+
let client = Cohttp_eio.Client.make ~https:None (Eio.Stdenv.net env) in
247+
248+
Outbox.Iter.iter
249+
~clock:(Eio.Stdenv.mono_clock env)
250+
~consumer_group:"webhook-publisher"
251+
~uri:"webhook://" (* only handle webhook:// URIs *)
252+
outbox
253+
(fun msg ->
254+
match post_webhook ~sw ~client msg with
255+
| Ok () -> ()
256+
| Error e ->
257+
Logs.warn (fun m -> m "delivery failed: %s" e);
258+
(* exception propagation here would close the iterator;
259+
swallowing means the message is acked despite failure.
260+
For at-least-once delivery, raise instead. *)
261+
())
262+
```
263+
264+
Same shape works for any broker:
265+
266+
- **Kafka**: replace the HTTP client with your Kafka client; route by
267+
URI prefix (`kafka://orders` → topic `orders`).
268+
- **AMQP**: parse `amqp://exchange/routing-key` from the URI.
269+
- **NATS**: subject = `String.sub msg.uri 7 ...` from `nats://subject`.
270+
271+
The dispatcher is broker-agnostic — only the subscriber callback knows
272+
which broker is involved.
273+
274+
---
275+
276+
## Graceful shutdown
277+
278+
`run` and `Iter.iter` accept a `?stop : unit -> bool` predicate. Wire
279+
it to a signal handler:
280+
281+
```ocaml
282+
let stop_flag = ref false in
283+
let handler _ = stop_flag := true in
284+
Sys.set_signal Sys.sigint (Sys.Signal_handle handler);
285+
Sys.set_signal Sys.sigterm (Sys.Signal_handle handler);
286+
287+
Outbox.run
288+
~clock:(Eio.Stdenv.mono_clock env)
289+
~stop:(fun () -> !stop_flag)
290+
outbox
291+
subscriber
292+
```
293+
294+
`stop` is checked between batches; the in-flight batch always finishes.
295+
For hard cancellation use `Iter.close` (rolls back the open
296+
transaction).
297+
298+
---
299+
300+
## Retry semantics
301+
302+
Returning `Error _` from a subscriber rolls back the dispatcher
303+
transaction — the messages of that batch stay in the outbox and will
304+
be redelivered on the next `dispatch`. There is no per-message dead
305+
letter queue: handle non-recoverable errors inside the subscriber
306+
(e.g. log + return `Ok` to ack and skip).
307+
308+
```ocaml
309+
let subscriber msg =
310+
match try_publish msg with
311+
| Ok () -> Ok ()
312+
| Error e when is_transient e ->
313+
(* roll back the batch — try again later *)
314+
Error (Printf.sprintf "transient: %s" e)
315+
| Error e ->
316+
(* poison message: log and ack so we don't loop forever *)
317+
Logs.err (fun m -> m "drop %s: %s" msg.uri e);
318+
Ok ()
319+
```
320+
321+
---
322+
323+
## Multi-worker partitioning
324+
325+
When `concurrency > 1`, the framework runs that many fibers, each
326+
processing a distinct partition:
327+
328+
| `process_id` | `concurrency` | `worker_id` (per fiber) |
329+
|:-:|:-:|:-:|
330+
| 0 | 3 | 0, 1, 2 |
331+
| 1 | 3 | 3, 4, 5 |
332+
333+
Messages are routed by `hashtext(uri) %% N = worker_id`, so all
334+
messages for a given URI land on the same worker — order is preserved
335+
within a URI even under fan-out.
336+
337+
For multi-process deployment (e.g. Kubernetes replicas), set
338+
`process_id` and `num_processes` to the replica index and replica
339+
count. Each replica then fans out to `concurrency` fibers internally.
340+
341+
**Always pair `concurrency > 1` with a pool-based provider** — Caqti
342+
fails loudly if multiple fibers share one connection.
343+
344+
---
345+
346+
## Operational notes
347+
348+
- **Backpressure**: `batch_size` (default 100) bounds how many messages
349+
a single dispatcher transaction holds. Larger batch = less SQL
350+
overhead, longer transaction window — see `init.sql` for the
351+
trade-offs around xmin horizon.
352+
- **Cleanup**: processed messages stay in the outbox table. Run a
353+
periodic job to delete rows below `MIN(last_processed_transaction_id)`
354+
across all consumer groups. SQL example in `init.sql`.
355+
- **Idempotency**: the `metadata->>'event_id'` UNIQUE INDEX prevents
356+
accidental duplicate publishes within the producer. Consumers must
357+
still be idempotent — at-least-once delivery is part of the contract.

0 commit comments

Comments
 (0)