Skip to content

Commit be941e7

Browse files
committed
🚧 Fix race condition in streaming output detection
Signed-off-by: m-misiura <mmisiura@redhat.com>
1 parent 95b3b4d commit be941e7

File tree

1 file changed

+35
-5
lines changed
  • src/orchestrator/handlers/chat_completions_detection

1 file changed

+35
-5
lines changed

src/orchestrator/handlers/chat_completions_detection/streaming.rs

Lines changed: 35 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -563,14 +563,29 @@ async fn handle_whole_doc_detection(
563563
}
564564

565565
/// Builds a response with output detections.
566-
fn output_detection_response(
566+
async fn output_detection_response(
567567
completion_state: &Arc<CompletionState<ChatCompletionChunk>>,
568568
choice_index: u32,
569569
chunk: Chunk,
570570
detections: Vec<Detection>,
571571
) -> Result<ChatCompletionChunk, Error> {
572-
// Get chat completions for this choice index
573-
let chat_completions = completion_state.completions.get(&choice_index).unwrap();
572+
// Wait for entry to exist (yields to other tasks until ready)
573+
let chat_completions = {
574+
let mut attempts = 0;
575+
loop {
576+
if let Some(entry) = completion_state.completions.get(&choice_index) {
577+
break entry;
578+
}
579+
if attempts > 1000 {
580+
return Err(Error::Other(format!(
581+
"completion entry for choice_index {} not ready after 1000 yields",
582+
choice_index
583+
)));
584+
}
585+
attempts += 1;
586+
tokio::task::yield_now().await;
587+
}
588+
};
574589
// Get range of chat completions for this chunk
575590
let chat_completions = chat_completions
576591
.range(chunk.input_start_index..=chunk.input_end_index)
@@ -709,6 +724,7 @@ async fn process_detection_batch_stream(
709724
Ok((choice_index, chunk, detections)) => {
710725
let input_end_index = chunk.input_end_index;
711726
match output_detection_response(&completion_state, choice_index, chunk, detections)
727+
.await
712728
{
713729
Ok(chat_completion) => {
714730
// Send chat completion to response channel
@@ -718,8 +734,22 @@ async fn process_detection_batch_stream(
718734
return;
719735
}
720736
// If this is the final chat completion chunk with content, send chat completion chunk with finish reason
721-
let chat_completions =
722-
completion_state.completions.get(&choice_index).unwrap();
737+
// Wait for entry to exist (yields to other tasks until ready)
738+
let chat_completions = {
739+
let mut attempts = 0;
740+
loop {
741+
if let Some(entry) = completion_state.completions.get(&choice_index)
742+
{
743+
break entry;
744+
}
745+
if attempts > 1000 {
746+
error!(%trace_id, %choice_index, "completion entry not ready after 1000 yields");
747+
return;
748+
}
749+
attempts += 1;
750+
tokio::task::yield_now().await;
751+
}
752+
};
723753
if chat_completions.keys().rev().nth(1) == Some(&input_end_index)
724754
&& let Some((_, chat_completion)) = chat_completions.last_key_value()
725755
&& chat_completion

0 commit comments

Comments
 (0)