Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 10 additions & 6 deletions docs/src/transforms.md
Original file line number Diff line number Diff line change
Expand Up @@ -211,17 +211,21 @@ This transform should be used with the `CassandraSinkSingle` transform. It will

### Coalesce

This transform holds onto messages until some requirement is met and then sends them batched together.
Validation will fail if none of the `flush_when_` fields are provided, as this would otherwise result in a Coalesce transform that never flushes.
This transform holds onto messages until a flush condition is met, then sends them batched together down the chain.

You must set **at least one** of the flush fields below. You may set both.

* **`flush_when_buffered_message_count`** — flush when the buffer holds at least this many messages.
* **`flush_when_millis_since_last_flush`** — a per-connection task sleeps for this many milliseconds, then wakes the chain. That wake is a time-based flush when the buffer is non-empty. **Every flush** (time-based, count-based, or shutdown) restarts the sleep, so the next wake is one full interval after the last flush—no separate wall-clock field is used. Must be **> 0**. If you only use count-based flush, **no timer runs**.

**Count-only** (no millis, or millis omitted): messages that never reach the count stay buffered until the connection is torn down (chain flush on close). Clients may see no response for those requests until then. Add `flush_when_millis_since_last_flush` if you need a time bound on partial batches.

```yaml
- Coalesce:
# When this field is provided a flush will occur when the specified number of messages are currently held in the buffer.
# Flush when the buffer holds at least this many messages (optional if millis is set).
flush_when_buffered_message_count: 2000

# When this field is provided a flush will occur when the following occurs in sequence:
# 1. the specified number of milliseconds have passed since the last flush ocurred
# 2. a new message is received
# Milliseconds since last flush; optional if count is set. Must be > 0 to enable the timer.
flush_when_millis_since_last_flush: 10000
```

Expand Down
6 changes: 3 additions & 3 deletions shotover/src/config/topology.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,14 +284,14 @@ foo source:
}

#[tokio::test]
async fn test_validate_coalesce() {
async fn test_validate_coalesce_neither_flush_field() {
let expected = r#"Topology errors
foo source:
foo chain:
Coalesce:
Need to provide at least one of these fields:
Provide at least one of:
* flush_when_buffered_message_count
* flush_when_millis_since_last_flush
* flush_when_millis_since_last_flush (must be greater than 0)

But none of them were provided.
Check https://shotover.io/docs/latest/transforms.html#coalesce for more information.
Expand Down
Loading
Loading