Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ARCHITECTURE.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ If you're newer to SDKs in general, first check out the [SDKs Intro](./arch_docs
The below diagram depicts how Core-based SDKs are split into two parts. The `sdk-core` common code,
which is written in Rust, and a `sdk-lang` package specific to the language the user is writing
their workflow/activity in. For example a user writing workflows in Rust would be pulling in (at
least) two crates - `temporal-sdk-core` and `temporal-sdk-rust`.
least) two crates - `temporalio-sdk-core` and `temporalio-sdk`.

![Arch Diagram](https://lucid.app/publicSegments/view/7872bb33-d2b9-4b90-8aa1-bac111136aa5/image.png)

Expand Down
72 changes: 72 additions & 0 deletions crates/sdk-core/src/core_tests/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,13 @@ use crate::{
use futures_util::stream;
use std::{
collections::{HashMap, VecDeque},
sync::{
Arc,
atomic::{AtomicBool, Ordering},
},
time::Duration,
};
use temporalio_client::MESSAGE_TOO_LARGE_KEY;
use temporalio_common::protos::{
TestHistoryBuilder, canned_histories,
coresdk::{
Expand Down Expand Up @@ -660,6 +665,73 @@ async fn legacy_query_response_gets_not_found_not_fatal() {
core.shutdown().await;
}

#[tokio::test]
async fn legacy_query_too_large_responds_with_failure() {
let wfid = "fake_wf_id";
let t = canned_histories::single_timer("1");
let tasks = [{
let mut pr = hist_to_poll_resp(&t, wfid.to_owned(), 1.into());
pr.query = Some(WorkflowQuery {
query_type: "query-type".to_string(),
query_args: Some(b"hi".into()),
header: None,
});
pr
}];
let mut mock = mock_worker_client();
let first_call = Arc::new(AtomicBool::new(true));
let fc = first_call.clone();
mock.expect_respond_legacy_query()
.times(2)
.returning(move |_, result| {
if fc.swap(false, Ordering::Relaxed) {
let mut err = tonic::Status::new(
tonic::Code::ResourceExhausted,
"grpc: received message larger than max",
);
err.metadata_mut().insert(MESSAGE_TOO_LARGE_KEY, 1.into());
Err(err)
} else {
assert!(
matches!(&result, LegacyQueryResult::Failed(f) if
f.failure.as_ref().unwrap().message == "GRPC Message too large"
),
"Expected query failure with message-too-large, got: {:?}",
matches!(&result, LegacyQueryResult::Failed(_)),
);
Ok(Default::default())
}
});
let mock = MockPollCfg::from_resp_batches(wfid, t, tasks, mock);
let mut mock = build_mock_pollers(mock);
mock.worker_cfg(|wc| wc.max_cached_workflows = 10);
let core = mock_worker(mock);

let task = core.poll_workflow_activation().await.unwrap();
core.complete_workflow_activation(WorkflowActivationCompletion::from_cmd(
task.run_id,
start_timer_cmd(1, Duration::from_secs(1)),
))
.await
.unwrap();

let task = core.poll_workflow_activation().await.unwrap();
assert_matches!(
task.jobs.as_slice(),
[WorkflowActivationJob {
variant: Some(workflow_activation_job::Variant::QueryWorkflow(q)),
}] => q
);
core.complete_workflow_activation(WorkflowActivationCompletion::from_cmd(
task.run_id,
query_ok(LEGACY_QUERY_ID.to_string(), "hi"),
))
.await
.unwrap();

core.shutdown().await;
}

#[tokio::test]
async fn new_query_fail() {
let wfid = "fake_wf_id";
Expand Down
31 changes: 30 additions & 1 deletion crates/sdk-core/src/worker/workflow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -851,11 +851,40 @@ impl Workflows {

/// Wraps responding to legacy queries. Handles ignore-able failures.
async fn respond_legacy_query(&self, tt: TaskToken, res: LegacyQueryResult) {
match self.client.respond_legacy_query(tt, res).await {
match self.client.respond_legacy_query(tt.clone(), res).await {
Ok(_) => {}
Err(e) if e.code() == tonic::Code::NotFound => {
warn!(error=?e, "Query not found when attempting to respond to it");
}
Err(e) if e.metadata().contains_key(MESSAGE_TOO_LARGE_KEY) => {
warn!(error=%e, "Query response too large, responding with failure");
let failure = Failure {
failure: Some(
temporalio_common::protos::temporal::api::failure::v1::Failure {
message: "GRPC Message too large".to_string(),
failure_info: Some(FailureInfo::ApplicationFailureInfo(
ApplicationFailureInfo {
r#type: "GrpcMessageTooLarge".to_string(),
non_retryable: true,
..Default::default()
},
)),
..Default::default()
},
),
force_cause: 0,
};
if let Err(e2) = self
.client
.respond_legacy_query(tt, LegacyQueryResult::Failed(failure))
.await
{
warn!(
error=%e2,
"Failed to send query failure response after message-too-large"
);
}
}
Err(e) => {
warn!(error= %e, "Network error while responding to legacy query");
}
Expand Down
5 changes: 2 additions & 3 deletions crates/sdk-core/tests/integ_tests/update_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1168,7 +1168,7 @@ async fn worker_restarted_in_middle_of_update() {
// Allow it to start again, the second time
BARR.wait().await;
// Poke the workflow off the sticky queue to get it to complete faster than WFT timeout
WorkflowService::reset_sticky_task_queue(
let _ = WorkflowService::reset_sticky_task_queue(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this grpc request fail now?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could've always failed, it fails if the workflow is already closed (which, IMO, is weird, but whatever). Safe to ignore.

&mut client.clone(),
ResetStickyTaskQueueRequest {
namespace: client.namespace(),
Expand All @@ -1179,8 +1179,7 @@ async fn worker_restarted_in_middle_of_update() {
}
.into_request(),
)
.await
.unwrap();
.await;
};
let run = async {
// This run attempt will get shut down
Expand Down
Loading