Skip to content

refactor: introducing tasks for map operations#3167

Merged
yhl25 merged 28 commits into
mainfrom
readmessage
Feb 5, 2026
Merged

refactor: introducing tasks for map operations#3167
yhl25 merged 28 commits into
mainfrom
readmessage

Conversation

@yhl25
Copy link
Copy Markdown
Contributor

@yhl25 yhl25 commented Jan 29, 2026

Refactors map operation handling by introducing a task-based architecture:

  • Task structs (MapUnaryTask, MapStreamTask, MapBatchTask): Encapsulate all context needed to execute map operations, making the code more modular and testable.
  • ConcurrentMapper enum: Groups Unary and Stream mappers together since they share the same concurrent processing pattern (one task per message).
  • SharedMapTaskContext: Holds common resources (output/error channels, tracker, bypass router, cancellation token) shared across concurrent tasks.
  • Simplified UDF clients: Now return raw protobuf results instead of converted Message objects; conversion happens at the task level.
  • Cleaner main loop: Extracts process_concurrent_messages() and process_batch_messages() methods, reducing duplication between unary and stream handling.

yhl25 added 17 commits January 21, 2026 15:00
Signed-off-by: Yashash <yashashhl25@gmail.com>
Signed-off-by: Yashash <yashashhl25@gmail.com>
Signed-off-by: Yashash <yashashhl25@gmail.com>
Signed-off-by: Yashash <yashashhl25@gmail.com>
Signed-off-by: Yashash <yashashhl25@gmail.com>
Signed-off-by: Yashash <yashashhl25@gmail.com>
Signed-off-by: Yashash <yashashhl25@gmail.com>
Signed-off-by: Yashash <yashashhl25@gmail.com>
Signed-off-by: Yashash <yashashhl25@gmail.com>
Signed-off-by: Yashash <yashashhl25@gmail.com>
Signed-off-by: Yashash <yashashhl25@gmail.com>
@whynowy
Copy link
Copy Markdown
Member

whynowy commented Feb 3, 2026

Motivation or any doc to understand this change?

yhl25 and others added 7 commits February 3, 2026 14:42
Signed-off-by: Yashash <yashashhl25@gmail.com>
Signed-off-by: Yashash <yashashhl25@gmail.com>
Signed-off-by: Yashash <yashashhl25@gmail.com>
Signed-off-by: Vigith Maurice <vigith@gmail.com>
Signed-off-by: Yashash <yashashhl25@gmail.com>
@codecov
Copy link
Copy Markdown

codecov Bot commented Feb 4, 2026

Codecov Report

❌ Patch coverage is 94.17910% with 39 lines in your changes missing coverage. Please review.
✅ Project coverage is 80.27%. Comparing base (4e8e72b) to head (142bc67).
⚠️ Report is 1 commits behind head on main.

Files with missing lines Patch % Lines
rust/numaflow-core/src/mapper/map/stream.rs 91.07% 15 Missing ⚠️
rust/numaflow-core/src/mapper/map/unary.rs 91.20% 11 Missing ⚠️
rust/numaflow-core/src/mapper/map.rs 96.71% 7 Missing ⚠️
rust/numaflow-core/src/mapper/map/batch.rs 96.20% 6 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #3167      +/-   ##
==========================================
+ Coverage   80.08%   80.27%   +0.18%     
==========================================
  Files         297      297              
  Lines       67717    67602     -115     
==========================================
+ Hits        54232    54265      +33     
+ Misses      12935    12789     -146     
+ Partials      550      548       -2     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@yhl25
Copy link
Copy Markdown
Contributor Author

yhl25 commented Feb 4, 2026

Motivation or any doc to understand this change?

Updated the description.

…tch and stream implementations to prevent insertion after error broadcasting

Signed-off-by: Vaibhav Tiwari <vaibhav.tiwari33@gmail.com>
Signed-off-by: Yashash <yashashhl25@gmail.com>
@yhl25 yhl25 marked this pull request as ready for review February 4, 2026 21:39
@yhl25 yhl25 requested review from vigith and whynowy as code owners February 4, 2026 21:39
Signed-off-by: Vigith Maurice <vigith@gmail.com>
@vigith vigith changed the title chore: introducing tasks for map operations refactor: introducing tasks for map operations Feb 5, 2026
@yhl25 yhl25 enabled auto-merge (squash) February 5, 2026 00:34
@yhl25 yhl25 merged commit 72ca344 into main Feb 5, 2026
44 of 47 checks passed
@yhl25 yhl25 deleted the readmessage branch February 5, 2026 00:36
Comment on lines +182 to +185
while let Some(resp) = resp_stream.next().await {
match resp {
Ok(resp) => {
if let Some(map::TransmissionStatus { eot: true }) = resp.status {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: After we encounter an error, the remaining Ok responses are processed here. Then in process_response we try to acquire lock to the map only to find out that the map is empty. Should we drain the stream in Err(e) => ... branch?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think there will be a case of Ok response followed by an Err response.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is possible in the current implementation on the server side (Rust SDK at least). The flow responsible for shutting down the server on error is independent of the flow processing incoming requests.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

server logs showing continued sends on stream after panic happens:

�[2m2026-02-04T02:43:17.658992Z�[0m �[31mERROR�[0m �[2mnumaflow::map�[0m�[2m:�[0m Failed to run map function: JoinError::Panic(Id(102), "BypassCat panicked!", ...)
�[2m2026-02-04T02:43:17.658966Z�[0m �[32m INFO�[0m �[2mnumaflow::map�[0m�[2m:�[0m debug -- sent processed response for msg id: 1770172997641487083-4
�[2m2026-02-04T02:43:17.658998Z�[0m �[32m INFO�[0m �[2mnumaflow::map�[0m�[2m:�[0m debug -- Received gRPC request: Ok(..)
�[2m2026-02-04T02:43:17.659008Z�[0m �[32m INFO�[0m �[2mnumaflow::map�[0m�[2m:�[0m debug -- sending panic information to error channel
�[2m2026-02-04T02:43:17.659068Z�[0m �[32m INFO�[0m �[2mnumaflow::map�[0m�[2m:�[0m debug -- Received gRPC request: Ok(..)
�[2m2026-02-04T02:43:17.659075Z�[0m �[32m INFO�[0m �[2mnumaflow::map�[0m�[2m:�[0m debug -- sent processed response for msg id: 1770172997641502453-4
�[2m2026-02-04T02:43:17.659063Z�[0m �[31mERROR�[0m �[2mnumaflow::map�[0m�[2m:�[0m Shutting down gRPC channel: GrpcStatus(Status { code: Internal, ...})
�[2m2026-02-04T02:43:17.659098Z�[0m �[32m INFO�[0m �[2mnumaflow::map�[0m�[2m:�[0m debug -- sent processed response for msg id: 1770172997641513273-4
�[2m2026-02-04T02:43:17.659108Z�[0m �[32m INFO�[0m �[2mnumaflow::map�[0m�[2m:�[0m debug -- stream response tx closed
�[2m2026-02-04T02:43:17.659063Z�[0m �[32m INFO�[0m �[2mnumaflow::map�[0m�[2m:�[0m debug -- sent processed response for msg id: 1770172997641501343-4
�[2m2026-02-04T02:43:17.659085Z�[0m �[32m INFO�[0m �[2mnumaflow::map�[0m�[2m:�[0m debug -- Received gRPC request: Ok(..)
�[2m2026-02-04T02:43:17.659076Z�[0m �[32m INFO�[0m �[2mnumaflow::map�[0m�[2m:�[0m debug -- sent processed response for msg id: 1770172997641503573-4
�[2m2026-02-04T02:43:17.659195Z�[0m �[32m INFO�[0m �[2mnumaflow::map�[0m�[2m:�[0m Cancellation token is cancelled, shutting down
�[2m2026-02-04T02:43:17.659197Z�[0m �[32m INFO�[0m �[2mnumaflow::map�[0m�[2m:�[0m debug -- sent processed response for msg id: 1770172997641515613-4
�[2m2026-02-04T02:43:17.659201Z�[0m �[32m INFO�[0m �[2mnumaflow::map�[0m�[2m:�[0m debug -- stream response tx closed
�[2m2026-02-04T02:43:17.659201Z�[0m �[32m INFO�[0m �[2mnumaflow::map�[0m�[2m:�[0m debug -- sent processed response for msg id: 1770172997641516783-4
�[2m2026-02-04T02:43:17.659216Z�[0m �[32m INFO�[0m �[2mnumaflow::map�[0m�[2m:�[0m debug -- sent processed response for msg id: 1770172997641517913-4

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you see that on the client side where we get an Ok response followed by an Err response?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can add another log for checking this out.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants