Skip to content
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions rust/numaflow-core/src/mapper/map/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,6 @@ use std::collections::HashMap;
use std::sync::Arc;
use std::sync::Mutex;

use super::{
ParentMessageInfo, UserDefinedMessage, create_response_stream, update_udf_error_metric,
update_udf_process_time_metric, update_udf_read_metric, update_udf_write_metric,
};
use crate::config::is_mono_vertex;
use crate::config::pipeline::VERTEX_TYPE_MAP_UDF;
use crate::error::{Error, Result};
Expand All @@ -17,6 +13,11 @@ use tonic::Streaming;
use tonic::transport::Channel;
use tracing::error;

use super::{
ParentMessageInfo, UserDefinedMessage, create_response_stream, update_udf_error_metric,
update_udf_process_time_metric, update_udf_read_metric, update_udf_write_metric,
};

/// Type aliases
type ResponseSenderMap =
Arc<Mutex<HashMap<String, (ParentMessageInfo, oneshot::Sender<Result<Vec<Message>>>)>>>;
Expand Down
7 changes: 7 additions & 0 deletions rust/numaflow-core/src/mapper/map/unary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
use crate::message::Message;
use numaflow_pb::clients::map::{MapRequest, MapResponse, map_client::MapClient};
use tokio::sync::{mpsc, oneshot};
use tokio_stream::StreamExt;
use tokio_util::task::AbortOnDropHandle;
use tonic::Streaming;
use tonic::transport::Channel;
Expand Down Expand Up @@ -81,12 +82,18 @@
Err(e) => {
error!(?e, "Error reading message from unary map gRPC stream");
Self::broadcast_error(&sender_map, e);
while let Some(_) = resp_stream.next().await {
// consume the rest of the stream
}
break;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

replace the original loop with the while let some(_) ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That will work but we do lose the original gRPC error that was received from the stream in that case.

Copy link
Copy Markdown
Contributor Author

@vaibhavtiwari33 vaibhavtiwari33 Feb 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One reason why we shouldn't do this:

       while let Some(resp) = resp_stream.next().await {
            match resp {
                Ok(message) => Self::process_unary_response(&sender_map, message).await,
                Err(e) => {
                    Self::broadcast_error(&sender_map, e);
                }
            };
        }

would be to avoid processing a message that has already been removed and for which error has been broadcasted if we've encountered an error before.
We'll be spending time to acquire lock to the senders map to remove an entry that might not be there.

Copy link
Copy Markdown
Contributor Author

@vaibhavtiwari33 vaibhavtiwari33 Feb 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add a boolean state check behind the response sender map mutex so that we don't have to worry about this and can use your semantic
The abovementioned scenario would still play out with the mentioned semantics

}
};

Self::process_unary_response(&sender_map, resp).await
}

Check warning on line 93 in rust/numaflow-core/src/mapper/map/unary.rs

View workflow job for this annotation

GitHub Actions / Unit Tests

Diff in /home/runner/work/numaflow/numaflow/rust/numaflow-core/src/mapper/map/unary.rs

// consume the rest of the stream
Self::broadcast_error(&sender_map, tonic::Status::aborted("receiver stream dropped"));
}

/// Handles the incoming message and sends it to the server for mapping.
Expand Down
Loading