Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 10 additions & 6 deletions docs/src/transforms.md
Original file line number Diff line number Diff line change
Expand Up @@ -211,17 +211,21 @@ This transform should be used with the `CassandraSinkSingle` transform. It will

### Coalesce

This transform holds onto messages until some requirement is met and then sends them batched together.
Validation will fail if none of the `flush_when_` fields are provided, as this would otherwise result in a Coalesce transform that never flushes.
This transform holds onto messages until a flush condition is met, then sends them batched together down the chain.

You must set **at least one** of the flush fields below. You may set both.

* **`flush_when_buffered_message_count`** — flush when the buffer holds at least this many messages.
* **`flush_when_millis_since_last_flush`** — when **> 0**, Coalesce may flush a non-empty buffer once at least this many milliseconds have passed since the interval was last restarted. The interval restarts only when Coalesce flushes messages to the next transform (including count-based and shutdown flushes) or when it runs with an **empty** buffer and no shutdown flush. **Appending incoming messages does not restart the interval.** Optional if `flush_when_buffered_message_count` is set; omit for count-only flushing.

**Count-only** (no millis, or millis omitted): messages that never reach the count stay buffered until the connection is torn down (chain flush on close). Clients may see no response for those requests until then. Add `flush_when_millis_since_last_flush` if you need a time bound on partial batches.

```yaml
- Coalesce:
# When this field is provided a flush will occur when the specified number of messages are currently held in the buffer.
# Flush when the buffer holds at least this many messages (optional if millis is set).
flush_when_buffered_message_count: 2000

# When this field is provided a flush will occur when the following occurs in sequence:
# 1. the specified number of milliseconds have passed since the last flush ocurred
# 2. a new message is received
# Max time partial batches may wait (ms); optional if count is set. Must be > 0 when set.
flush_when_millis_since_last_flush: 10000
```

Expand Down
33 changes: 30 additions & 3 deletions shotover/src/config/topology.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,14 +284,14 @@ foo source:
}

#[tokio::test]
async fn test_validate_coalesce() {
async fn test_validate_coalesce_neither_flush_field() {
let expected = r#"Topology errors
foo source:
foo chain:
Coalesce:
Need to provide at least one of these fields:
Provide at least one of:
* flush_when_buffered_message_count
* flush_when_millis_since_last_flush
* flush_when_millis_since_last_flush (must be greater than 0)

But none of them were provided.
Check https://shotover.io/docs/latest/transforms.html#coalesce for more information.
Expand All @@ -314,6 +314,33 @@ foo source:
assert_eq!(error, expected);
}

#[tokio::test]
async fn test_validate_coalesce_millis_zero() {
let expected = r#"Topology errors
foo source:
foo chain:
Coalesce:
flush_when_millis_since_last_flush must be greater than 0 when set.
Check https://shotover.io/docs/latest/transforms.html#coalesce for more information.
"#;

let error = run_test_topology_valkey(vec![
Box::new(CoalesceConfig {
name: "coalesce".to_string(),
flush_when_buffered_message_count: Some(100),
flush_when_millis_since_last_flush: Some(0),
}),
Box::new(NullSinkConfig {
name: "sink".to_string(),
}),
])
.await
.unwrap_err()
.to_string();

assert_eq!(error, expected);
}

#[tokio::test]
async fn test_validate_chain_terminating_in_middle() {
let expected = r#"Topology errors
Expand Down
Loading
Loading