Skip to content

fix: numa container hangs occassionally when map panics#3179

Closed
vaibhavtiwari33 wants to merge 18 commits into
mainfrom
map-panic-fix
Closed

fix: numa container hangs occassionally when map panics#3179
vaibhavtiwari33 wants to merge 18 commits into
mainfrom
map-panic-fix

Conversation

@vaibhavtiwari33
Copy link
Copy Markdown
Contributor

@vaibhavtiwari33 vaibhavtiwari33 commented Feb 2, 2026

What this PR does / why we need it

During testing of bypass feature for monovertex using long running tests, it was noticed that when running the test with a panicking mapper, sometimes the numa container would get stuck waiting for all the messages to get nacked.

The bug wasn't related to bypassing as the bug was reproducible without the feature enabled.

Hypothesis

Messages are getting added to the ResponseSenderMap after the receiver tokio task has exited.

Confirmation

Add a log when the senders map has been emptied and errors are broadcasted:

    /// Broadcasts a unary gRPC error to all pending senders and records error metrics.
    fn broadcast_error(sender_map: &ResponseSenderMap, error: tonic::Status) {
        let mut drop_map: HashMap<String, (ParentMessageInfo, oneshot::Sender<Result<Vec<Message>>>)> = HashMap::new();
        let message = Message::default();
        let (sender, receiver) = oneshot::channel();
        drop_map.insert(DROP.to_string(), ((&message).into(), sender));

        std::mem::swap(&mut *sender_map.lock().expect("failed to acquire poisoned lock"), &mut drop_map);

        for (_, (_, sender)) in drop_map {
            let _ = sender.send(Err(Error::Grpc(Box::new(error.clone()))));
            update_udf_error_metric(is_mono_vertex());
        }

        info!("Broadcasting error to all pending senders");   // <-------- Added log
    }

Add a log when messages are inserted into the senders map:

        // insert the sender into the map
       let mut guard = self.senders
            .lock()
            .expect("failed to acquire poisoned lock");
        if guard.is_empty() {
            info!("Inserting into empty sender map")    // <---- Added log 
            let _ = respond_to.send(Err(Error::Mapper("Map received a message after error".to_string())));
            return;
        }
        guard.insert(key.clone(), (msg_info, respond_to));

Following logs were observed in all the stuck numa container:

---->{"timestamp":"2026-02-02T00:52:06.609268Z","level":"INFO","message":"Broadcasting error to all pending senders","target":"numaflow_core::mapper::map::unary"}
     {"timestamp":"2026-02-02T00:52:06.609338Z","level":"INFO","message":"writing map error to error channel","target":"numaflow_core::mapper::map"}
     {"timestamp":"2026-02-02T00:52:06.610564Z","level":"INFO","message":"debug -- sent map error to error channel","target":"numaflow_core::mapper::map"}
     {"timestamp":"2026-02-02T00:52:06.610582Z","level":"INFO","message":"debug -- sending nak to source","target":"numaflow_core::message"}
---->{"timestamp":"2026-02-02T00:52:06.611297Z","level":"INFO","message":"Inserting into empty sender map","target":"numaflow_core::mapper::map::unary"}

Thus, messages were being inserted into the senders map AFTER the receiver has stopped monitoring the senders map.
This caused this message to stick around since no one responded to its oneshot receiver with a response and the source keeps waiting for ack/nack of the message, since the message never got dropped.

Fix

  • Add a drop key into the senders map after broadcasting error
  • When inserting into the senders map, if the drop key exists, skip adding the message and send an error to the message receiver.

Wait for the receiver to close out before finishing the task.

Testing

How was this tested (unit/integration/manual)? Include commands, links, or screenshots if applicable.

Currently this was tested for unary use case by running multiple replicas of a panicking udf. Although, just tested for about an hour 12 hours for now, I'm yet to encounter a stuck numa container. While in the previous case I'd get at least 1 such scenario within 20-30mins

monovertex spec:

apiVersion: numaflow.numaproj.io/v1alpha1
kind: MonoVertex
metadata:
  name: map-only-panicking-mono-vertex
  namespace: dev-devx-osamonnumaflow-usw2-qal
spec:
  replicas: 10
  scale:
    min: 10
  sink:
    blackhole: {}
  source:
    generator:
      duration: 1s
      keyCount: 5
      rpu: 500
      value: 5
  udf:
    container:
      image: docker.intuit.com/personal/vtiwari5/panicking-map-bypass-cat:stable
      imagePullPolicy: IfNotPresent

The pods are restarting consistently:

vtiwari5@macos-H29191QYT0 numaflow % kgp
NAME                                                       READY   STATUS      RESTARTS          AGE
isbsvc-default-js-0                                        3/3     Running     0                 3d5h
isbsvc-default-js-1                                        3/3     Running     0                 4h39m
isbsvc-default-js-2                                        3/3     Running     0                 3d13h
map-only-panicking-mono-vertex-mv-0-f0qdn                  2/3     Running     50 (16s ago)      155m
map-only-panicking-mono-vertex-mv-1-fctpu                  3/3     Running     96 (6m16s ago)    4h38m
map-only-panicking-mono-vertex-mv-2-jadoh                  1/3     Completed   286               12h
map-only-panicking-mono-vertex-mv-3-ocwvb                  1/3     Completed   243               12h
map-only-panicking-mono-vertex-mv-4-le8kb                  3/3     Running     258 (8m32s ago)   12h
map-only-panicking-mono-vertex-mv-5-njvab                  1/3     Completed   270               12h
map-only-panicking-mono-vertex-mv-6-z2fit                  1/3     Completed   268               12h
map-only-panicking-mono-vertex-mv-7-7ibgp                  3/3     Running     266 (3m54s ago)   12h
map-only-panicking-mono-vertex-mv-8-zcuil                  3/3     Running     290 (3m31s ago)   12h
map-only-panicking-mono-vertex-mv-9-qeshx                  2/3     Running     244 (45s ago)     11h
map-only-panicking-mono-vertex-mv-daemon-979b845d9-z6wmt   1/1     Running     0                 155m
numaflow-controller-768859bc4-xq25k                        1/1     Running     0                 153m

Special notes for reviewers

Anything notable for review (risk, rollout, follow-ups).

Signed-off-by: Vaibhav Tiwari <vaibhav.tiwari33@gmail.com>
Signed-off-by: Vaibhav Tiwari <vaibhav.tiwari33@gmail.com>
Signed-off-by: Vaibhav Tiwari <vaibhav.tiwari33@gmail.com>
@vaibhavtiwari33 vaibhavtiwari33 marked this pull request as ready for review February 2, 2026 03:27
@vaibhavtiwari33 vaibhavtiwari33 self-assigned this Feb 2, 2026
@vaibhavtiwari33 vaibhavtiwari33 added the bug Something isn't working label Feb 2, 2026
@vaibhavtiwari33 vaibhavtiwari33 changed the title fix: numa container occassionally stuck when map panics fix: numa container occassionally stuck when map panicked Feb 2, 2026
@vaibhavtiwari33 vaibhavtiwari33 changed the title fix: numa container occassionally stuck when map panicked fix: numa container hangs occassionally when map panics Feb 2, 2026
Copy link
Copy Markdown
Member

@vigith vigith left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this will not work, let's fix the root problem which is we reaking out of the rx while rx is still active.

Signed-off-by: Vaibhav Tiwari <vaibhav.tiwari33@gmail.com>
Signed-off-by: Vaibhav Tiwari <vaibhav.tiwari33@gmail.com>
Comment on lines 85 to 88
while let Some(_) = resp_stream.next().await {
// consume the rest of the stream
}
break;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

replace the original loop with the while let some(_) ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That will work but we do lose the original gRPC error that was received from the stream in that case.

Copy link
Copy Markdown
Contributor Author

@vaibhavtiwari33 vaibhavtiwari33 Feb 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One reason why we shouldn't do this:

       while let Some(resp) = resp_stream.next().await {
            match resp {
                Ok(message) => Self::process_unary_response(&sender_map, message).await,
                Err(e) => {
                    Self::broadcast_error(&sender_map, e);
                }
            };
        }

would be to avoid processing a message that has already been removed and for which error has been broadcasted if we've encountered an error before.
We'll be spending time to acquire lock to the senders map to remove an entry that might not be there.

Copy link
Copy Markdown
Contributor Author

@vaibhavtiwari33 vaibhavtiwari33 Feb 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add a boolean state check behind the response sender map mutex so that we don't have to worry about this and can use your semantic
The abovementioned scenario would still play out with the mentioned semantics

Signed-off-by: Vaibhav Tiwari <vaibhav.tiwari33@gmail.com>
@vaibhavtiwari33 vaibhavtiwari33 marked this pull request as draft February 3, 2026 04:01
yhl25 and others added 4 commits February 3, 2026 10:24
Signed-off-by: Yashash <yashashhl25@gmail.com>
Signed-off-by: Vaibhav Tiwari <vaibhav.tiwari33@gmail.com>
Signed-off-by: Vaibhav Tiwari <vaibhav.tiwari33@gmail.com>
@vaibhavtiwari33
Copy link
Copy Markdown
Contributor Author

this will not work, let's fix the root problem which is we reaking out of the rx while rx is still active.

@vigith from the previous discussion we know that the problem lies with the following check:

        // only insert if we are able to send the message to the server
        if let Err(e) = self.read_tx.send(message.into()).await {
            ...
        }

        // insert the sender into the map
        self.senders
            .lock()
            .expect("failed to acquire poisoned lock")
            .insert(key.clone(), (msg_info, respond_to));

i.e., we're assuming that if the send to the read_tx succeeds, then the map udf must still be accepting requests, and hence we can add the message to the senders map.

The problem here is that, from https://docs.rs/tokio/latest/tokio/sync/mpsc/struct.Sender.html#method.send, we have

but a return value of Ok does not mean that the data will be received. 
It is possible for the corresponding receiver to hang up immediately after this function returns Ok

thus, the send can succeed but it doesn't guarantee that the receiver will process it.

So, we should have an atomic state on the senders map to ensure we can add to it.

@vigith
Copy link
Copy Markdown
Member

vigith commented Feb 4, 2026

i.e., we're assuming that if the send to the read_tx succeeds, then the map udf must still be accepting requests, and hence we can add the message to the senders map.

yes, tx and rx are decoupled, sending to tx (getting Ok()) doesn't mean rx is active.

How about we add to the Hashmap<> before writing to tx and if sending to tx fails, we can remove it from the Hashmap<>

@vaibhavtiwari33
Copy link
Copy Markdown
Contributor Author

i.e., we're assuming that if the send to the read_tx succeeds, then the map udf must still be accepting requests, and hence we can add the message to the senders map.

How about we add to the Hashmap<> before writing to tx and if sending to tx fails, we can remove it from the Hashmap<>

This approach has 2 issues:

  • the bug would still persist.
  • We'll either have to acquire the mutex for the senders map again after the send call in the worst case scenario or hold the mutex for the senders map across the send call (less preferable)

…e adding messages to the sender map

Signed-off-by: Vaibhav Tiwari <vaibhav.tiwari33@gmail.com>
Signed-off-by: Vaibhav Tiwari <vaibhav.tiwari33@gmail.com>
Signed-off-by: Vaibhav Tiwari <vaibhav.tiwari33@gmail.com>
@codecov
Copy link
Copy Markdown

codecov Bot commented Feb 4, 2026

Codecov Report

❌ Patch coverage is 90.00000% with 12 lines in your changes missing coverage. Please review.
✅ Project coverage is 80.08%. Comparing base (4e8e72b) to head (8d6da26).

Files with missing lines Patch % Lines
rust/numaflow-core/src/mapper/map/stream.rs 90.32% 6 Missing ⚠️
rust/numaflow-core/src/mapper/map/batch.rs 88.23% 4 Missing ⚠️
rust/numaflow-core/src/mapper/map/unary.rs 91.66% 2 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #3179      +/-   ##
==========================================
- Coverage   80.08%   80.08%   -0.01%     
==========================================
  Files         297      297              
  Lines       67717    67765      +48     
==========================================
+ Hits        54232    54270      +38     
- Misses      12935    12945      +10     
  Partials      550      550              

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Signed-off-by: Vaibhav Tiwari <vaibhav.tiwari33@gmail.com>
Signed-off-by: Vaibhav Tiwari <vaibhav.tiwari33@gmail.com>
@vaibhavtiwari33
Copy link
Copy Markdown
Contributor Author

@vigith Closing this PR, will add these changes in #3167 based on discussion with @yhl25

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

bug Something isn't working

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants