Skip to content

Commit b62c360

Browse files
committed
fix test and ordered processing doc
Signed-off-by: Yashash Lokesh <yashashhl25@gmail.com>
1 parent 025f7f3 commit b62c360

3 files changed

Lines changed: 21 additions & 9 deletions

File tree

docs/user-guide/reference/ordered-processing.md

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,6 @@ kind: Pipeline
5858
metadata:
5959
name: my-pipeline
6060
spec:
61-
limits:
62-
readBatchSize: 1 # recommended for strict ordering
6361
ordered:
6462
enabled: true # enable order-preserving processing pipeline-wide
6563
vertices:
@@ -118,8 +116,9 @@ case.
118116
- **Source vertices** always preserve input order regardless of the `ordered` setting.
119117
- **Key-based routing**: ordering is guaranteed per key. Messages with different keys may still be interleaved across
120118
partitions. Ensure your UDF or SDK sets meaningful message keys to leverage per-key ordering.
121-
- **`readBatchSize: 1`** is required for strict ordering. With a larger batch size, multiple messages may
122-
be in-flight simultaneously within a single pod.
119+
- **In-flight cap is forced to 1**: when ordered processing is enabled on a Map or Sink vertex, the controller
120+
automatically sets `limits.concurrency = 1` so each pod processes one message at a time. You don't need to set
121+
`concurrency` or `readBatchSize` yourself — any explicit `concurrency` you set is overridden for ordered vertices.
123122
- **Throughput trade-off**: ordered processing limits parallelism within a partition. Consider the number of partitions
124123
carefully to balance ordering guarantees with throughput requirements.
125124
- **Join vertices (multiple input edges)**: When a vertex receives messages from multiple upstream vertices
@@ -169,8 +168,6 @@ kind: Pipeline
169168
metadata:
170169
name: ordered-pipeline
171170
spec:
172-
limits:
173-
readBatchSize: 1
174171
ordered:
175172
enabled: true
176173
vertices:
@@ -202,8 +199,8 @@ spec:
202199
In the example above:
203200

204201
- `ordered.enabled: true` enables order-preserving processing pipeline-wide.
205-
- `limits.readBatchSize: 1` is required so that each pod processes one message at a time, which is essential for
206-
strict in-order guarantees.
202+
- For each ordered Map/Sink vertex the controller automatically forces `limits.concurrency = 1` so each pod processes
203+
one message at a time, which is essential for strict in-order guarantees.
207204
- The `cat` (Map) and `out` (Sink) vertices each have `partitions: 3`, so they will run with exactly 3 replicas.
208205
- Source vertices (`in-1`, `in-2`) always preserve input order and require no extra configuration.
209206
- Because `in-1` and `in-2` both feed into `cat` (a join), messages from the two sources are interleaved at `cat`.

pkg/reconciler/pipeline/controller.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -659,6 +659,18 @@ func buildVertices(pl *dfv1.Pipeline) map[string]dfv1.Vertex {
659659
// Resolve and set the effective ordered config on the vertex
660660
// This merges pipeline-level and vertex-level ordered config so the vertex is self-contained
661661
resolveOrderedConfig(pl, vCopy)
662+
// When ordered processing is enabled on a Map/Sink vertex, force the in-flight cap to 1 so
663+
// each partition processes one message at a time. Source and Reduce vertices preserve order
664+
// by other means (single-stream-per-replica routing for sources, partition-by-key for
665+
// reduce), so we leave their concurrency alone. We override even if the user set
666+
// concurrency explicitly: ordered processing without concurrency=1 would silently violate
667+
// the FIFO contract.
668+
if vCopy.IsOrdered() && (vCopy.IsMapUDF() || vCopy.IsASink()) {
669+
if vCopy.Limits == nil {
670+
vCopy.Limits = &dfv1.VertexLimits{}
671+
}
672+
vCopy.Limits.Concurrency = ptr.To[uint64](1)
673+
}
662674
replicas := int32(1)
663675
// If the desired phase is paused, or we are in the middle of pausing we should not start any vertex replicas
664676
if isLifecycleChange(pl) {

rust/numaflow-core/src/source.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1067,9 +1067,12 @@ mod tests {
10671067
.unwrap();
10681068

10691069
let tracker = Tracker::new(None, CancellationToken::new());
1070+
// Concurrency is large enough to comfortably hold 50 unacked messages plus read-ahead
1071+
// batches; matches the previous behavior that depended on the now-removed 10000-message
1072+
// MAX_ACK_PENDING constant.
10701073
let source: Source<crate::typ::WithoutRateLimiter> = Source::new(
10711074
5,
1072-
5,
1075+
10000,
10731076
SourceType::UserDefinedSource(Box::new(src_read), Box::new(src_ack), lag_reader),
10741077
tracker.clone(),
10751078
true,

0 commit comments

Comments
 (0)