Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 18 additions & 20 deletions rust/numaflow-core/src/sinker/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ where
}

// State to accumulate outcomes across retries
let mut messages_to_retry = messages.clone();
let mut messages_to_retry = messages;
let mut fallback_messages = Vec::new();
let mut serving_messages = Vec::new();
let mut dropped_messages = Vec::new();
Expand Down Expand Up @@ -93,35 +93,34 @@ where
.map(|resp| (resp.id, resp.status))
.collect::<HashMap<_, _>>();

// Classify messages based on responses
let mut failed_ids = Vec::new();

messages_to_retry.retain_mut(|msg| {
match result_map.remove(&msg.id.to_string()) {
// Classify messages based on responses, preserving original ordering.
// Take ownership of the current batch so we can iterate by move and push
// failed messages back into messages_to_retry in their original order.
for msg in std::mem::take(&mut messages_to_retry) {
let msg_id = msg.id.to_string();
match result_map.remove(&msg_id) {
Some(ResponseStatusFromSink::Success) => {
false // remove from retry list
// Message successfully sunk, nothing more to do
}
Some(ResponseStatusFromSink::Failed(err_msg)) => {
failed_ids.push(msg.id.to_string());
*error_map.entry(err_msg.clone()).or_insert(0) += 1;
true // keep for retry
*error_map.entry(err_msg).or_insert(0) += 1;
messages_to_retry.push(msg); // keep for retry
}
Some(ResponseStatusFromSink::Fallback) => {
fallback_messages.push(msg.clone());
false // remove from retry list
fallback_messages.push(msg);
}
Some(ResponseStatusFromSink::Serve(serve_response)) => {
let mut msg = msg;
if let Some(serve_response) = serve_response {
msg.value = serve_response.into();
}
serving_messages.push(msg.clone());
false // remove from retry list
serving_messages.push(msg);
}
Some(ResponseStatusFromSink::OnSuccess(on_success_msg)) => {
if let Some(on_success_msg) = on_success_msg {
let on_success_md: Option<Metadata> =
on_success_msg.metadata.map(|md| md.into());
let new_md = match &mut msg.metadata {
let new_md = match &msg.metadata {
// Following clones are required explicitly since Arc doesn't allow
// interior mutability, so we cannot move the required fields out of Arc
// without cloning, unless we can guarantee there is only a single reference to Arc
Expand All @@ -144,18 +143,17 @@ where
value: on_success_msg.value.into(),
keys: on_success_msg.keys.into(),
metadata: Some(Arc::new(new_md)),
..msg.clone()
..msg
};
on_success_messages.push(new_msg.clone());
} else {
// Send the original message if no payload was provided to the onSuccess sink
on_success_messages.push(msg.clone());
on_success_messages.push(msg);
}
false // remove from retry list
}
None => unreachable!("should have response for all messages"), // remove if no response
None => unreachable!("should have response for all messages"),
}
});
}

if messages_to_retry.is_empty() {
// success path, all messages processed
Expand Down
Loading