Skip to content

Fix race condition in streaming output detection#524

Merged
gkumbhat merged 1 commit intofoundation-model-stack:mainfrom
m-misiura:fix-streaming-race-condition
Jan 13, 2026
Merged

Fix race condition in streaming output detection#524
gkumbhat merged 1 commit intofoundation-model-stack:mainfrom
m-misiura:fix-streaming-race-condition

Conversation

@m-misiura
Copy link
Copy Markdown
Contributor

Description

When load testing, the following was observed:

There appears to be aa race condition in streaming chat completions with output detection that causes consistent panics. When detectors respond faster than the LLM stream inserts completions, .unwrap() on a missing HashMap entry panics. Our in-cluster detectors respond in ~2-8ms which triggers this consistently. Two concurrent tasks access completion_state.completions without synchronization:

  1. Spawned task (process_chat_completion_stream): inserts completions from LLM
  2. Main task (process_detection_batch_stream): reads completions when detectors respond
    When detectors respond faster than the LLM stream inserts, .unwrap() on a missing entry causes panic.

On the trusty fork, the following issue was raised: trustyai-explainability#12, but it is presumably better to handle this upstream.

This PR attempts to address this by replacing .unwrap() with yield_now() loop that waits for entries
to become available.

@m-misiura m-misiura force-pushed the fix-streaming-race-condition branch from dc56b41 to be941e7 Compare January 7, 2026 15:56
Copy link
Copy Markdown
Collaborator

@gkumbhat gkumbhat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to add a test for this particular scenario? May be an integration test here: https://github.com/foundation-model-stack/fms-guardrails-orchestrator/blob/main/tests/chat_completions_streaming.rs

Comment on lines +573 to +586
let chat_completions = {
let mut attempts = 0;
loop {
if let Some(entry) = completion_state.completions.get(&choice_index) {
break entry;
}
if attempts > 1000 {
return Err(Error::Other(format!(
"completion entry for choice_index {} not ready after 1000 yields",
choice_index
)));
}
attempts += 1;
tokio::task::yield_now().await;
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

am wondering if this creates a bit of busy waiting scenario here and what if this 1000 run out sooner in some cases where generation server is for some reason extra slow 🤔

I guess one solution is to make this logic event driven, where this await gets notified once it receives new element.

For this implementation, we should move the 1000 to top of the file as constants.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for all the valid concerns! I:

  1. moved the limit to a constant
  2. replaced yield-count with a timeout and used tokio::time::timeout() in an attempt to deal with scenarios where they may be a slow generation server

If you prefer the previous solution, just let me know and I can revert back :)


/// Builds a response with output detections.
fn output_detection_response(
async fn output_detection_response(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this need to be async ? or can we do the await operation inside tokio runtime ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question! IIUC yield_now()) only works in async context, so I think it should remain this way; please let me know what you think!

@declark1
Copy link
Copy Markdown
Contributor

declark1 commented Jan 7, 2026

Two concurrent tasks access completion_state.completions without synchronization

FYI, DashMap is used for completion state, which is a concurrent hashmap, so I don't think this is a synchronization issue.

@m-misiura
Copy link
Copy Markdown
Contributor Author

Two concurrent tasks access completion_state.completions without synchronization

FYI, DashMap is used for completion state, which is a concurrent hashmap, so I don't think this is a synchronization issue.

You are correct @declark1. IIUC, the issue is around entry existence. The detector task tries to .get() an entry before the stream task has .insert()ed it for that choice_index. The .unwrap() would panic on the missing entry.

… approach

Signed-off-by: m-misiura <mmisiura@redhat.com>
@m-misiura m-misiura force-pushed the fix-streaming-race-condition branch from b2ae9b8 to 1f4d637 Compare January 8, 2026 10:09
Copy link
Copy Markdown
Collaborator

@gkumbhat gkumbhat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for adding helpful comments and addressing all the review suggestions.

@gkumbhat gkumbhat merged commit b37c4bf into foundation-model-stack:main Jan 13, 2026
2 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants