Skip to content

Commit f5762a4

Browse files
Claudelukekim
authored andcommitted
docs: Document MongoDB Change Stream config and resumability
Expand the MongoDB connector docs to cover the Change Stream feature shipped in spiceai/spiceai#10813: - Document new dataset params: change_stream_batch_max_size, change_stream_batch_max_duration, change_stream_max_await_time, change_stream_batch_size, mongodb_resume_token_invalid_behavior. - Document prerequisites: primary_key=_id and on_conflict={_id: upsert} are required for delete/update routing. - Document resume token behavior (spice_sys_mongodb sidecar, at-least-once semantics, file vs in-memory restart behavior). - Document event mapping (insert, update, replace, delete, drop/rename/ invalidate truncate path). - Fix the example to use correct mongodb_* param names instead of the invalid `host:` / `db:` short forms.
1 parent b5b8788 commit f5762a4

1 file changed

Lines changed: 62 additions & 9 deletions

File tree

website/docs/components/data-connectors/mongodb.md

Lines changed: 62 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,11 @@ The MongoDB data connector can be configured by providing the following `params`
9393
| `mongodb_num_docs_to_infer_schema` | Optional. Number of documents to use to infer the schema. Defaults to 400. |
9494
| `mongodb_pool_min` | The minimum number of connections to keep open in the pool, lazily created when requested. Default: `1` |
9595
| `mongodb_pool_max` | The maximum number of connections to allow in the pool. Default: `5` |
96+
| `mongodb_resume_token_invalid_behavior` | Optional. Used with `refresh_mode: changes`. Behavior when a persisted Change Stream resume token is rejected by the server (e.g. it is past the oplog window). `error` (default) surfaces a clear error; `rebootstrap` drops the persisted token and re-snapshots the collection. See [Using MongoDB Change Streams](#using-mongodb-change-streams). |
97+
| `change_stream_batch_max_size` | Optional. Used with `refresh_mode: changes`. Maximum number of Change Stream events grouped into one CDC batch before applying it. Default: `1000`. |
98+
| `change_stream_batch_max_duration` | Optional. Used with `refresh_mode: changes`. Maximum time to wait for a Change Stream batch to fill before applying it. Accepts [fundu](https://docs.rs/fundu) duration strings. Default: `1s`. |
99+
| `change_stream_max_await_time` | Optional. Used with `refresh_mode: changes`. Maximum time MongoDB waits for new Change Stream events before returning an empty server batch. Accepts [fundu](https://docs.rs/fundu) duration strings. Default: `1s`. |
100+
| `change_stream_batch_size` | Optional. Used with `refresh_mode: changes`. Number of Change Stream events MongoDB should request from the server per batch. Default: `1000`. |
96101

97102
## Types
98103

@@ -267,26 +272,74 @@ datasets:
267272

268273
### Using MongoDB Change Streams
269274

270-
Spice supports real-time Change Data Capture (CDC) from MongoDB using native [MongoDB Change Streams](https://www.mongodb.com/docs/manual/changeStreams/). This enables streaming inserts, updates, and deletes from your MongoDB collections directly into Spice accelerators.
275+
Spice supports real-time Change Data Capture (CDC) from MongoDB using native [MongoDB Change Streams](https://www.mongodb.com/docs/manual/changeStreams/). This streams inserts, updates, replacements, deletes, and collection-level invalidation events from MongoDB collections directly into Spice accelerators.
271276

272-
To enable real-time CDC, set `refresh_mode: changes` in the dataset's configuration:
277+
#### How it works
278+
279+
On startup, Spice opens a Change Stream on the source collection (`fullDocument=updateLookup`), emits a CDC `TRUNCATE`, applies a full snapshot of the collection as upsert rows, signals readiness, then processes Change Stream events in batches. Opening the Change Stream before the snapshot prevents gaps between the snapshot and the live stream.
280+
281+
File-accelerated datasets persist resume tokens and resume from the last committed token on restart. In-memory accelerators re-bootstrap from a fresh snapshot.
282+
283+
#### Prerequisites
284+
285+
- MongoDB 4.0+ with Change Streams enabled. MongoDB requires a replica set or sharded cluster for Change Streams.
286+
- The MongoDB user must have `changeStream` privileges.
287+
- The accelerator must support upsert behavior. Use `duckdb`, `sqlite`, `postgres`, `turso`, or `cayenne`.
288+
- `acceleration.primary_key: _id` is required. Delete events only include the document key, so Spice needs `_id` to route deletes.
289+
- `acceleration.on_conflict` must specify `upsert` on `_id` so update and replace events overwrite existing rows.
290+
291+
#### Minimal configuration
273292

274293
```yaml
275294
datasets:
276-
- from: mongodb:my_collection
277-
name: my_collection
295+
- from: mongodb:users
296+
name: users
278297
params:
279-
host: my-cluster.mongodb.net
280-
db: mydb
298+
mongodb_host: localhost
299+
mongodb_port: '27017'
300+
mongodb_db: my_database
301+
mongodb_user: my_user
302+
mongodb_pass: ${secrets:mongodb_pass}
281303
acceleration:
282304
enabled: true
283305
engine: duckdb
284306
refresh_mode: changes
307+
primary_key: _id
308+
on_conflict:
309+
_id: upsert
285310
```
286311

287-
#### Notes
288-
- Requires MongoDB 4.0+ and a replica set or sharded cluster.
289-
- Ensure your MongoDB user has `changeStream` privileges.
312+
#### Change Stream parameters
313+
314+
These optional runtime parameters live under dataset `params:`. The first four are not prefixed with `mongodb_`.
315+
316+
| Parameter Name | Default | Description |
317+
| ----------------------------------------- | ------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
318+
| `change_stream_batch_max_size` | `1000` | Maximum number of Change Stream events to group into one CDC batch before applying it. Must be greater than 0. |
319+
| `change_stream_batch_max_duration` | `1s` | Maximum time to wait for a Change Stream batch to fill before applying it. Accepts [fundu](https://docs.rs/fundu) duration strings; must be greater than 0. |
320+
| `change_stream_max_await_time` | `1s` | Maximum time MongoDB waits for new Change Stream events before returning an empty server batch. Accepts [fundu](https://docs.rs/fundu) duration strings; must be greater than 0. |
321+
| `change_stream_batch_size` | `1000` | Number of Change Stream events MongoDB should request from the server per batch. Must fit in a `u32` and be greater than 0. |
322+
| `mongodb_resume_token_invalid_behavior` | `error` | Behavior when a persisted Change Stream resume token cannot be honored by the server (e.g. the token is past the oplog retention window). `error` surfaces a clear error so the operator can decide; `rebootstrap` drops the persisted token and re-snapshots the collection. |
323+
324+
The existing `mongodb_unnest_depth` parameter also applies to Change Stream documents, so nested BSON is flattened the same way as normal MongoDB reads.
325+
326+
#### Event mapping
327+
328+
- `insert`: create/upsert, using `fullDocument`.
329+
- `update`: update/upsert, using `fullDocument` from `fullDocument=updateLookup`.
330+
- `replace`: update/upsert, using `fullDocument`.
331+
- `delete`: delete, using `documentKey`; non-key columns are `null`.
332+
- `drop`, `rename`, `dropDatabase`, `invalidate`: truncate, because collection continuity is no longer guaranteed.
333+
334+
If MongoDB does not include `fullDocument` for an update or replace event, Spice fails the stream with a clear error instead of applying a partial row.
335+
336+
#### Resumability across restarts
337+
338+
For file-accelerated datasets (acceleration `mode: file` / `file_create` / `file_update`, or `engine: postgres`), Spice persists the most recent Change Stream resume token in a sidecar table named `spice_sys_mongodb`, stored alongside the accelerator data. The token is committed only after the downstream accelerator write succeeds (at-least-once semantics).
339+
340+
On restart with a persisted token, Spice resumes the Change Stream from that token and skips the collection snapshot. If MongoDB rejects the token (typical codes `ChangeStreamHistoryLost` 286 or `ChangeStreamFatalError` 280, e.g. when the oplog window has rolled past the token's position), the behavior is governed by `mongodb_resume_token_invalid_behavior` above. Re-snapshotting a large collection is opt-in by default.
341+
342+
Datasets that are not file-accelerated (in-memory Arrow, etc.) do not get a sidecar row; restarts re-bootstrap from a fresh snapshot.
290343

291344
---
292345

0 commit comments

Comments
 (0)