Skip to content

Commit 3395250

Browse files
committed
Fix possible issue with queries not always being handled before next WFT
1 parent f8d5884 commit 3395250

1 file changed

Lines changed: 12 additions & 8 deletions

File tree

  • core/src/worker/workflow

core/src/worker/workflow/mod.rs

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ mod workflow_stream;
1414
pub(crate) use driven_workflow::DrivenWorkflow;
1515
pub(crate) use history_update::HistoryUpdate;
1616

17-
use crate::protosext::ValidPollWFTQResponse;
1817
use crate::{
1918
MetricsContext,
2019
abstractions::{
@@ -23,7 +22,9 @@ use crate::{
2322
},
2423
internal_flags::InternalFlags,
2524
pollers::TrackedPermittedTqResp,
26-
protosext::{legacy_query_failure, protocol_messages::IncomingProtocolMessage},
25+
protosext::{
26+
ValidPollWFTQResponse, legacy_query_failure, protocol_messages::IncomingProtocolMessage,
27+
},
2728
telemetry::{TelemetryInstance, VecDisplayer, set_trace_subscriber_for_current_thread},
2829
worker::{
2930
LocalActRequest, LocalActivityExecutionResult, LocalActivityResolution,
@@ -49,7 +50,6 @@ use std::{
4950
collections::VecDeque,
5051
fmt::Debug,
5152
future::Future,
52-
mem,
5353
ops::DerefMut,
5454
rc::Rc,
5555
result,
@@ -1099,8 +1099,7 @@ struct BufferedTasks {
10991099
/// current one has been processed).
11001100
query_only_tasks: VecDeque<PermittedWFT>,
11011101
/// These are query-only tasks for the *buffered* wft, if any. They will all be discarded if
1102-
/// a buffered wft is replaced before being handled. They move to `query_only_tasks` once the
1103-
/// buffered task is taken.
1102+
/// a buffered wft is replaced before being handled.
11041103
query_only_tasks_for_buffered: VecDeque<PermittedWFT>,
11051104
}
11061105

@@ -1135,9 +1134,14 @@ impl BufferedTasks {
11351134
if let Some(q) = self.query_only_tasks.pop_front() {
11361135
return Some(q);
11371136
}
1138-
if let Some(t) = self.wft.take() {
1139-
self.query_only_tasks = mem::take(&mut self.query_only_tasks_for_buffered);
1140-
return Some(t);
1137+
if self.wft.is_some() {
1138+
if let Some(q) = self.query_only_tasks_for_buffered.pop_front() {
1139+
return Some(q);
1140+
}
1141+
if let Some(t) = self.wft.take() {
1142+
self.query_only_tasks.clear();
1143+
return Some(t);
1144+
}
11411145
}
11421146
None
11431147
}

0 commit comments

Comments
 (0)