Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions rust/numaflow-core/src/mapper/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ use batch::UserDefinedBatchMap;
use stream::UserDefinedStreamMap;
use unary::UserDefinedUnaryMap;

const DROP: &str = "U+005C__DROP__";

/// ParentMessageInfo is used to store the information of the parent message. This is propagated to
/// all the downstream messages.
pub(crate) struct ParentMessageInfo {
Expand Down
41 changes: 33 additions & 8 deletions rust/numaflow-core/src/mapper/map/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::sync::Arc;
use std::sync::Mutex;

use super::{
ParentMessageInfo, UserDefinedMessage, create_response_stream, update_udf_error_metric,
DROP, ParentMessageInfo, UserDefinedMessage, create_response_stream, update_udf_error_metric,
update_udf_process_time_metric, update_udf_read_metric, update_udf_write_metric,
};
use crate::config::is_mono_vertex;
Expand Down Expand Up @@ -59,12 +59,25 @@ impl UserDefinedBatchMap {

/// Broadcasts a batch map gRPC error to all pending senders and records error metrics.
fn broadcast_error(sender_map: &ResponseSenderMap, error: tonic::Status) {
let senders =
std::mem::take(&mut *sender_map.lock().expect("failed to acquire poisoned lock"));
let (_sender, _) = oneshot::channel();
let mut drop_map =
HashMap::from([(DROP.to_string(), ((&Message::default()).into(), _sender))]);

// swap the sender map with the drop map
// Any new messages that need to be added to the [ResponseSenderMap] will be skipped when
// the DROP key is found.
// This is to prevent any new messages from being added to the
// ResponseSenderMap after the error has been broadcasted.
std::mem::swap(
&mut *sender_map.lock().expect("failed to acquire poisoned lock"),
&mut drop_map,
);

for (_, (_, sender)) in senders {
// live messages are now in the drop_map
// send error to all the senders
for (_, (_, sender)) in drop_map {
let _ = sender.send(Err(Error::Grpc(Box::new(error.clone()))));
update_udf_error_metric(is_mono_vertex())
update_udf_error_metric(is_mono_vertex());
}
}

Expand Down Expand Up @@ -147,10 +160,22 @@ impl UserDefinedBatchMap {
return;
}

self.senders
let mut senders_guard = self
.senders
.lock()
.expect("failed to acquire poisoned lock")
.insert(key.clone(), (msg_info, respond_to));
.expect("failed to acquire poisoned lock");

// if the DROP key is found, it means that an error has been broadcasted by
// receive_batch_responses, and we should not add any new messages to the map
if senders_guard.contains_key(DROP) {
// FIXME: Use better aborted tonic status
let _ = respond_to.send(Err(Error::Grpc(Box::new(tonic::Status::aborted(
"DROPPED",
)))));
return;
}

senders_guard.insert(key.clone(), (msg_info, respond_to));
}

// send eot request
Expand Down
43 changes: 34 additions & 9 deletions rust/numaflow-core/src/mapper/map/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@ use crate::config::is_mono_vertex;
use crate::error::{Error, Result};
use crate::message::Message;
use numaflow_pb::clients::map::{self, MapRequest, MapResponse, map_client::MapClient};
use tokio::sync::mpsc;
use tokio::sync::{mpsc, oneshot};
use tokio_util::task::AbortOnDropHandle;
use tonic::Streaming;
use tonic::transport::Channel;
use tracing::error;

use super::{
ParentMessageInfo, UserDefinedMessage, create_response_stream, update_udf_error_metric,
DROP, ParentMessageInfo, UserDefinedMessage, create_response_stream, update_udf_error_metric,
update_udf_process_time_metric, update_udf_read_metric, update_udf_write_only_metric,
};

Expand Down Expand Up @@ -57,11 +57,24 @@ impl UserDefinedStreamMap {

/// Broadcasts a gRPC error to all pending senders and records error metrics.
async fn broadcast_error(sender_map: &StreamResponseSenderMap, error: tonic::Status) {
let senders =
std::mem::take(&mut *sender_map.lock().expect("failed to acquire poisoned lock"));
let (_sender, _) = mpsc::channel(1);
let mut drop_map: HashMap<String, (ParentMessageInfo, mpsc::Sender<Result<Message>>)> =
HashMap::from([(DROP.to_string(), ((&Message::default()).into(), _sender))]);

// swap the sender map with the drop map
// Any new messages that need to be added to the [ResponseSenderMap] will be skipped when
// the DROP key is found.
// This is to prevent any new messages from being added to the
// ResponseSenderMap after the error has been broadcasted.
std::mem::swap(
&mut *sender_map.lock().expect("failed to acquire poisoned lock"),
&mut drop_map,
);

for (_, (_, sender)) in senders {
let _ = sender.send(Err(Error::Grpc(Box::new(error.clone())))).await;
// live messages are now in the drop_map
// send error to all the senders
for (_, (_, sender)) in drop_map {
let _ = sender.send(Err(Error::Grpc(Box::new(error.clone()))));
update_udf_error_metric(is_mono_vertex());
}
}
Expand Down Expand Up @@ -129,10 +142,22 @@ impl UserDefinedStreamMap {
return;
}

self.senders
let mut senders_guard = self
.senders
.lock()
.expect("failed to acquire poisoned lock")
.insert(key.clone(), (msg_info, respond_to));
.expect("failed to acquire poisoned lock");

// if the DROP key is found, it means that an error has been broadcasted by
// receive_stream_responses, and we should not add any new messages to the map
if senders_guard.contains_key(DROP) {
// FIXME: Use better aborted tonic status
let _ = respond_to.send(Err(Error::Grpc(Box::new(tonic::Status::aborted(
"DROPPED",
)))));
return;
}

senders_guard.insert(key.clone(), (msg_info, respond_to));
}

/// Processes stream responses and sends them to the appropriate mpsc sender
Expand Down
48 changes: 36 additions & 12 deletions rust/numaflow-core/src/mapper/map/unary.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,19 @@
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::Mutex;

use crate::config::is_mono_vertex;
use crate::error::{Error, Result};
use crate::message::Message;
use numaflow_pb::clients::map::{MapRequest, MapResponse, map_client::MapClient};
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::Mutex;
use tokio::sync::{mpsc, oneshot};
use tokio_util::task::AbortOnDropHandle;
use tonic::Streaming;
use tonic::transport::Channel;
use tracing::error;

use super::{
ParentMessageInfo, create_response_stream, update_udf_error_metric, update_udf_read_metric,
update_udf_write_metric,
DROP, ParentMessageInfo, create_response_stream, update_udf_error_metric,
update_udf_read_metric, update_udf_write_metric,
};

type ResponseSenderMap =
Expand Down Expand Up @@ -59,10 +58,23 @@ impl UserDefinedUnaryMap {

/// Broadcasts a unary gRPC error to all pending senders and records error metrics.
fn broadcast_error(sender_map: &ResponseSenderMap, error: tonic::Status) {
let senders =
std::mem::take(&mut *sender_map.lock().expect("failed to acquire poisoned lock"));
let (_sender, _) = oneshot::channel();
let mut drop_map =
HashMap::from([(DROP.to_string(), ((&Message::default()).into(), _sender))]);

// swap the sender map with the drop map
// Any new messages that need to be added to the [ResponseSenderMap] will be skipped when
// the DROP key is found.
// This is to prevent any new messages from being added to the
// ResponseSenderMap after the error has been broadcasted.
std::mem::swap(
&mut *sender_map.lock().expect("failed to acquire poisoned lock"),
&mut drop_map,
);

for (_, (_, sender)) in senders {
// live messages are now in the drop_map
// send error to all the senders
for (_, (_, sender)) in drop_map {
let _ = sender.send(Err(Error::Grpc(Box::new(error.clone()))));
update_udf_error_metric(is_mono_vertex());
}
Expand Down Expand Up @@ -110,10 +122,22 @@ impl UserDefinedUnaryMap {
}

// insert the sender into the map
self.senders
let mut senders_guard = self
.senders
.lock()
.expect("failed to acquire poisoned lock")
.insert(key.clone(), (msg_info, respond_to));
.expect("failed to acquire poisoned lock");

// if the DROP key is found, it means that an error has been broadcasted by
// receive_unary_responses, and we should not add any new messages to the map
if senders_guard.contains_key(DROP) {
// FIXME: Use better aborted tonic status
let _ = respond_to.send(Err(Error::Grpc(Box::new(tonic::Status::aborted(
"DROPPED",
)))));
return;
}

senders_guard.insert(key.clone(), (msg_info, respond_to));
}

/// Processes the response from the server and sends it to the appropriate oneshot sender
Expand Down
Loading