Skip to content

Commit 025f7f3

Browse files
committed
chore: make concurrecy part of spec
Signed-off-by: Yashash Lokesh <yashashhl25@gmail.com>
1 parent b52cf42 commit 025f7f3

35 files changed

Lines changed: 1334 additions & 653 deletions

api/json-schema/schema.json

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21136,12 +21136,17 @@
2113621136
},
2113721137
"io.numaproj.numaflow.v1alpha1.MonoVertexLimits": {
2113821138
"properties": {
21139+
"concurrency": {
21140+
"description": "Concurrency defines the maximum number of messages that can be actively in-flight (read but not yet acknowledged) at any given time. With read-ahead enabled, the data plane keeps reading new batches from the source until the number of in-flight messages reaches `concurrency`; once that ceiling is hit, one more batch may be pre-fetched and held ready so that completed messages can be replaced immediately. Therefore the maximum in-flight count is at most `concurrency + readBatchSize`. With read-ahead disabled (the default for MonoVertex, since the MonoVertex always reads from a source), the data plane drains the current batch fully before the next read, so the upper bound becomes `min(concurrency, readBatchSize)`. `readBatchSize` controls only the size of an individual read; `concurrency` controls how many messages can be processed in parallel. To force strictly sequential processing, set `concurrency` to 1 (read-ahead is already off by default for MonoVertex).",
21141+
"format": "int64",
21142+
"type": "integer"
21143+
},
2113921144
"rateLimit": {
2114021145
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.RateLimit",
2114121146
"description": "RateLimit for MonoVertex defines how many messages can be read from Source. This is computed by number of `read` calls per second multiplied by the `readBatchSize`. This is how RateLimit is calculated for MonoVertex and for Source vertices."
2114221147
},
2114321148
"readBatchSize": {
21144-
"description": "Read batch size from the source.",
21149+
"description": "Read batch size from the source. ReadBatchSize controls only how many messages are fetched in a single read call from the source; it is not a cap on how many messages may be in-flight (use `concurrency` for that).",
2114521150
"format": "int64",
2114621151
"type": "integer"
2114721152
},
@@ -21530,12 +21535,17 @@
2153021535
"format": "int64",
2153121536
"type": "integer"
2153221537
},
21538+
"concurrency": {
21539+
"description": "Concurrency defines the maximum number of messages that can be actively in-flight (read but not yet acknowledged) at any given time across each vertex of the pipeline. With read-ahead enabled, the data plane keeps reading new batches from the source/buffer until the number of in-flight messages reaches `concurrency`; once that ceiling is hit, one more batch may be pre-fetched and held ready so that completed messages can be replaced immediately. Therefore the maximum in-flight count per vertex is at most `concurrency + readBatchSize`. `readBatchSize` controls only the size of an individual read; `concurrency` controls how many messages can be processed in parallel. By default, read-ahead is disabled on source vertices (so re-reads on failure stay cheap and source ordering is preserved) and enabled on Map/Sink/ Reduce vertices. To force strictly sequential processing, set `concurrency` to 1 and disable read-ahead via the `READ_AHEAD` environment variable on the vertex's container template. Can be overridden by the vertex's limit settings.",
21540+
"format": "int64",
21541+
"type": "integer"
21542+
},
2153321543
"rateLimit": {
2153421544
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.RateLimit",
2153521545
"description": "RateLimit is used to define the rate limit for all the vertices in the pipeline, it could be overridden by the vertex's limit settings. For source vertices, it will be set to rate divided by readBatchSize because for source vertices, the rate limit is defined by how many times the `Read` is called per second Reduce does not support RateLimit."
2153621546
},
2153721547
"readBatchSize": {
21538-
"description": "Read batch size for all the vertices in the pipeline, can be overridden by the vertex's limit settings.",
21548+
"description": "Read batch size for all the vertices in the pipeline, can be overridden by the vertex's limit settings. ReadBatchSize controls only how many messages are fetched in a single read call from the source/buffer; it is not a cap on how many messages may be in-flight (use `concurrency` for that).",
2153921549
"format": "int64",
2154021550
"type": "integer"
2154121551
},
@@ -22902,12 +22912,17 @@
2290222912
"format": "int64",
2290322913
"type": "integer"
2290422914
},
22915+
"concurrency": {
22916+
"description": "Concurrency defines the maximum number of messages that can be actively in-flight (read but not yet acknowledged) at any given time. With read-ahead enabled, the data plane keeps reading new batches from the source/buffer until the number of in-flight messages reaches `concurrency`; once that ceiling is hit, one more batch may be pre-fetched and held ready so that completed messages can be replaced immediately. Therefore the maximum in-flight count is at most `concurrency + readBatchSize`. `readBatchSize` controls only the size of an individual read; `concurrency` controls how many messages can be processed in parallel. It overrides the settings from pipeline limits. By default, read-ahead is disabled on source vertices and enabled on Map/Sink/Reduce vertices. To force strictly sequential processing, set `concurrency` to 1 and disable read-ahead via the `READ_AHEAD` environment variable on the vertex's container template.",
22917+
"format": "int64",
22918+
"type": "integer"
22919+
},
2290522920
"rateLimit": {
2290622921
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.RateLimit",
2290722922
"description": "RateLimit is used to define the rate limit for the vertex, it overrides the settings from pipeline limits. For Source vertices, the rate limit is defined by how many times the `Read` is called per second multiplied by the `readBatchSize`. Pipeline level rate limit is not applied to Source vertices."
2290822923
},
2290922924
"readBatchSize": {
22910-
"description": "Read batch size from the source or buffer. It overrides the settings from pipeline limits.",
22925+
"description": "Read batch size from the source or buffer. It overrides the settings from pipeline limits. ReadBatchSize controls only how many messages are fetched in a single read call from the source/buffer; it is not a cap on how many messages may be in-flight (use `concurrency` for that).",
2291122926
"format": "int64",
2291222927
"type": "integer"
2291322928
},

api/openapi-spec/swagger.json

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21132,12 +21132,17 @@
2113221132
"io.numaproj.numaflow.v1alpha1.MonoVertexLimits": {
2113321133
"type": "object",
2113421134
"properties": {
21135+
"concurrency": {
21136+
"description": "Concurrency defines the maximum number of messages that can be actively in-flight (read but not yet acknowledged) at any given time. With read-ahead enabled, the data plane keeps reading new batches from the source until the number of in-flight messages reaches `concurrency`; once that ceiling is hit, one more batch may be pre-fetched and held ready so that completed messages can be replaced immediately. Therefore the maximum in-flight count is at most `concurrency + readBatchSize`. With read-ahead disabled (the default for MonoVertex, since the MonoVertex always reads from a source), the data plane drains the current batch fully before the next read, so the upper bound becomes `min(concurrency, readBatchSize)`. `readBatchSize` controls only the size of an individual read; `concurrency` controls how many messages can be processed in parallel. To force strictly sequential processing, set `concurrency` to 1 (read-ahead is already off by default for MonoVertex).",
21137+
"type": "integer",
21138+
"format": "int64"
21139+
},
2113521140
"rateLimit": {
2113621141
"description": "RateLimit for MonoVertex defines how many messages can be read from Source. This is computed by number of `read` calls per second multiplied by the `readBatchSize`. This is how RateLimit is calculated for MonoVertex and for Source vertices.",
2113721142
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.RateLimit"
2113821143
},
2113921144
"readBatchSize": {
21140-
"description": "Read batch size from the source.",
21145+
"description": "Read batch size from the source. ReadBatchSize controls only how many messages are fetched in a single read call from the source; it is not a cap on how many messages may be in-flight (use `concurrency` for that).",
2114121146
"type": "integer",
2114221147
"format": "int64"
2114321148
},
@@ -21517,12 +21522,17 @@
2151721522
"type": "integer",
2151821523
"format": "int64"
2151921524
},
21525+
"concurrency": {
21526+
"description": "Concurrency defines the maximum number of messages that can be actively in-flight (read but not yet acknowledged) at any given time across each vertex of the pipeline. With read-ahead enabled, the data plane keeps reading new batches from the source/buffer until the number of in-flight messages reaches `concurrency`; once that ceiling is hit, one more batch may be pre-fetched and held ready so that completed messages can be replaced immediately. Therefore the maximum in-flight count per vertex is at most `concurrency + readBatchSize`. `readBatchSize` controls only the size of an individual read; `concurrency` controls how many messages can be processed in parallel. By default, read-ahead is disabled on source vertices (so re-reads on failure stay cheap and source ordering is preserved) and enabled on Map/Sink/ Reduce vertices. To force strictly sequential processing, set `concurrency` to 1 and disable read-ahead via the `READ_AHEAD` environment variable on the vertex's container template. Can be overridden by the vertex's limit settings.",
21527+
"type": "integer",
21528+
"format": "int64"
21529+
},
2152021530
"rateLimit": {
2152121531
"description": "RateLimit is used to define the rate limit for all the vertices in the pipeline, it could be overridden by the vertex's limit settings. For source vertices, it will be set to rate divided by readBatchSize because for source vertices, the rate limit is defined by how many times the `Read` is called per second Reduce does not support RateLimit.",
2152221532
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.RateLimit"
2152321533
},
2152421534
"readBatchSize": {
21525-
"description": "Read batch size for all the vertices in the pipeline, can be overridden by the vertex's limit settings.",
21535+
"description": "Read batch size for all the vertices in the pipeline, can be overridden by the vertex's limit settings. ReadBatchSize controls only how many messages are fetched in a single read call from the source/buffer; it is not a cap on how many messages may be in-flight (use `concurrency` for that).",
2152621536
"type": "integer",
2152721537
"format": "int64"
2152821538
},
@@ -22880,12 +22890,17 @@
2288022890
"type": "integer",
2288122891
"format": "int64"
2288222892
},
22893+
"concurrency": {
22894+
"description": "Concurrency defines the maximum number of messages that can be actively in-flight (read but not yet acknowledged) at any given time. With read-ahead enabled, the data plane keeps reading new batches from the source/buffer until the number of in-flight messages reaches `concurrency`; once that ceiling is hit, one more batch may be pre-fetched and held ready so that completed messages can be replaced immediately. Therefore the maximum in-flight count is at most `concurrency + readBatchSize`. `readBatchSize` controls only the size of an individual read; `concurrency` controls how many messages can be processed in parallel. It overrides the settings from pipeline limits. By default, read-ahead is disabled on source vertices and enabled on Map/Sink/Reduce vertices. To force strictly sequential processing, set `concurrency` to 1 and disable read-ahead via the `READ_AHEAD` environment variable on the vertex's container template.",
22895+
"type": "integer",
22896+
"format": "int64"
22897+
},
2288322898
"rateLimit": {
2288422899
"description": "RateLimit is used to define the rate limit for the vertex, it overrides the settings from pipeline limits. For Source vertices, the rate limit is defined by how many times the `Read` is called per second multiplied by the `readBatchSize`. Pipeline level rate limit is not applied to Source vertices.",
2288522900
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.RateLimit"
2288622901
},
2288722902
"readBatchSize": {
22888-
"description": "Read batch size from the source or buffer. It overrides the settings from pipeline limits.",
22903+
"description": "Read batch size from the source or buffer. It overrides the settings from pipeline limits. ReadBatchSize controls only how many messages are fetched in a single read call from the source/buffer; it is not a cap on how many messages may be in-flight (use `concurrency` for that).",
2288922904
"type": "integer",
2289022905
"format": "int64"
2289122906
},

config/base/crds/full/numaflow.numaproj.io_monovertices.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2609,6 +2609,10 @@ spec:
26092609
type: object
26102610
limits:
26112611
properties:
2612+
concurrency:
2613+
default: 500
2614+
format: int64
2615+
type: integer
26122616
rateLimit:
26132617
properties:
26142618
max:

config/base/crds/full/numaflow.numaproj.io_pipelines.yaml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,10 @@ spec:
152152
default: 80
153153
format: int32
154154
type: integer
155+
concurrency:
156+
default: 500
157+
format: int64
158+
type: integer
155159
rateLimit:
156160
properties:
157161
max:
@@ -7239,6 +7243,9 @@ spec:
72397243
bufferUsageLimit:
72407244
format: int32
72417245
type: integer
7246+
concurrency:
7247+
format: int64
7248+
type: integer
72427249
rateLimit:
72437250
properties:
72447251
max:

config/base/crds/full/numaflow.numaproj.io_servingpipelines.yaml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,10 @@ spec:
131131
default: 80
132132
format: int32
133133
type: integer
134+
concurrency:
135+
default: 500
136+
format: int64
137+
type: integer
134138
rateLimit:
135139
properties:
136140
max:
@@ -7218,6 +7222,9 @@ spec:
72187222
bufferUsageLimit:
72197223
format: int32
72207224
type: integer
7225+
concurrency:
7226+
format: int64
7227+
type: integer
72217228
rateLimit:
72227229
properties:
72237230
max:

config/base/crds/full/numaflow.numaproj.io_vertices.yaml

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -786,6 +786,9 @@ spec:
786786
bufferUsageLimit:
787787
format: int32
788788
type: integer
789+
concurrency:
790+
format: int64
791+
type: integer
789792
rateLimit:
790793
properties:
791794
max:
@@ -1040,6 +1043,9 @@ spec:
10401043
bufferUsageLimit:
10411044
format: int32
10421045
type: integer
1046+
concurrency:
1047+
format: int64
1048+
type: integer
10431049
rateLimit:
10441050
properties:
10451051
max:
@@ -2250,6 +2256,9 @@ spec:
22502256
bufferUsageLimit:
22512257
format: int32
22522258
type: integer
2259+
concurrency:
2260+
format: int64
2261+
type: integer
22532262
rateLimit:
22542263
properties:
22552264
max:
@@ -6907,6 +6916,9 @@ spec:
69076916
bufferUsageLimit:
69086917
format: int32
69096918
type: integer
6919+
concurrency:
6920+
format: int64
6921+
type: integer
69106922
rateLimit:
69116923
properties:
69126924
max:
@@ -7161,6 +7173,9 @@ spec:
71617173
bufferUsageLimit:
71627174
format: int32
71637175
type: integer
7176+
concurrency:
7177+
format: int64
7178+
type: integer
71647179
rateLimit:
71657180
properties:
71667181
max:

config/install.yaml

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4135,6 +4135,10 @@ spec:
41354135
type: object
41364136
limits:
41374137
properties:
4138+
concurrency:
4139+
default: 500
4140+
format: int64
4141+
type: integer
41384142
rateLimit:
41394143
properties:
41404144
max:
@@ -9932,6 +9936,10 @@ spec:
99329936
default: 80
99339937
format: int32
99349938
type: integer
9939+
concurrency:
9940+
default: 500
9941+
format: int64
9942+
type: integer
99359943
rateLimit:
99369944
properties:
99379945
max:
@@ -17019,6 +17027,9 @@ spec:
1701917027
bufferUsageLimit:
1702017028
format: int32
1702117029
type: integer
17030+
concurrency:
17031+
format: int64
17032+
type: integer
1702217033
rateLimit:
1702317034
properties:
1702417035
max:
@@ -23058,6 +23069,10 @@ spec:
2305823069
default: 80
2305923070
format: int32
2306023071
type: integer
23072+
concurrency:
23073+
default: 500
23074+
format: int64
23075+
type: integer
2306123076
rateLimit:
2306223077
properties:
2306323078
max:
@@ -30145,6 +30160,9 @@ spec:
3014530160
bufferUsageLimit:
3014630161
format: int32
3014730162
type: integer
30163+
concurrency:
30164+
format: int64
30165+
type: integer
3014830166
rateLimit:
3014930167
properties:
3015030168
max:
@@ -38000,6 +38018,9 @@ spec:
3800038018
bufferUsageLimit:
3800138019
format: int32
3800238020
type: integer
38021+
concurrency:
38022+
format: int64
38023+
type: integer
3800338024
rateLimit:
3800438025
properties:
3800538026
max:
@@ -38254,6 +38275,9 @@ spec:
3825438275
bufferUsageLimit:
3825538276
format: int32
3825638277
type: integer
38278+
concurrency:
38279+
format: int64
38280+
type: integer
3825738281
rateLimit:
3825838282
properties:
3825938283
max:
@@ -39464,6 +39488,9 @@ spec:
3946439488
bufferUsageLimit:
3946539489
format: int32
3946639490
type: integer
39491+
concurrency:
39492+
format: int64
39493+
type: integer
3946739494
rateLimit:
3946839495
properties:
3946939496
max:
@@ -44121,6 +44148,9 @@ spec:
4412144148
bufferUsageLimit:
4412244149
format: int32
4412344150
type: integer
44151+
concurrency:
44152+
format: int64
44153+
type: integer
4412444154
rateLimit:
4412544155
properties:
4412644156
max:
@@ -44375,6 +44405,9 @@ spec:
4437544405
bufferUsageLimit:
4437644406
format: int32
4437744407
type: integer
44408+
concurrency:
44409+
format: int64
44410+
type: integer
4437844411
rateLimit:
4437944412
properties:
4438044413
max:

0 commit comments

Comments
 (0)