diff --git a/ARCHITECTURE.md b/ARCHITECTURE.md index b1279d613..eaa2d64f4 100644 --- a/ARCHITECTURE.md +++ b/ARCHITECTURE.md @@ -8,7 +8,7 @@ If you're newer to SDKs in general, first check out the [SDKs Intro](./arch_docs The below diagram depicts how Core-based SDKs are split into two parts. The `sdk-core` common code, which is written in Rust, and a `sdk-lang` package specific to the language the user is writing their workflow/activity in. For example a user writing workflows in Rust would be pulling in (at -least) two crates - `temporal-sdk-core` and `temporal-sdk-rust`. +least) two crates - `temporalio-sdk-core` and `temporalio-sdk`. ![Arch Diagram](https://lucid.app/publicSegments/view/7872bb33-d2b9-4b90-8aa1-bac111136aa5/image.png) diff --git a/crates/sdk-core/src/core_tests/queries.rs b/crates/sdk-core/src/core_tests/queries.rs index 697a6bd02..02640afdd 100644 --- a/crates/sdk-core/src/core_tests/queries.rs +++ b/crates/sdk-core/src/core_tests/queries.rs @@ -11,8 +11,13 @@ use crate::{ use futures_util::stream; use std::{ collections::{HashMap, VecDeque}, + sync::{ + Arc, + atomic::{AtomicBool, Ordering}, + }, time::Duration, }; +use temporalio_client::MESSAGE_TOO_LARGE_KEY; use temporalio_common::protos::{ TestHistoryBuilder, canned_histories, coresdk::{ @@ -660,6 +665,73 @@ async fn legacy_query_response_gets_not_found_not_fatal() { core.shutdown().await; } +#[tokio::test] +async fn legacy_query_too_large_responds_with_failure() { + let wfid = "fake_wf_id"; + let t = canned_histories::single_timer("1"); + let tasks = [{ + let mut pr = hist_to_poll_resp(&t, wfid.to_owned(), 1.into()); + pr.query = Some(WorkflowQuery { + query_type: "query-type".to_string(), + query_args: Some(b"hi".into()), + header: None, + }); + pr + }]; + let mut mock = mock_worker_client(); + let first_call = Arc::new(AtomicBool::new(true)); + let fc = first_call.clone(); + mock.expect_respond_legacy_query() + .times(2) + .returning(move |_, result| { + if fc.swap(false, Ordering::Relaxed) { + let mut err = tonic::Status::new( + tonic::Code::ResourceExhausted, + "grpc: received message larger than max", + ); + err.metadata_mut().insert(MESSAGE_TOO_LARGE_KEY, 1.into()); + Err(err) + } else { + assert!( + matches!(&result, LegacyQueryResult::Failed(f) if + f.failure.as_ref().unwrap().message == "GRPC Message too large" + ), + "Expected query failure with message-too-large, got: {:?}", + matches!(&result, LegacyQueryResult::Failed(_)), + ); + Ok(Default::default()) + } + }); + let mock = MockPollCfg::from_resp_batches(wfid, t, tasks, mock); + let mut mock = build_mock_pollers(mock); + mock.worker_cfg(|wc| wc.max_cached_workflows = 10); + let core = mock_worker(mock); + + let task = core.poll_workflow_activation().await.unwrap(); + core.complete_workflow_activation(WorkflowActivationCompletion::from_cmd( + task.run_id, + start_timer_cmd(1, Duration::from_secs(1)), + )) + .await + .unwrap(); + + let task = core.poll_workflow_activation().await.unwrap(); + assert_matches!( + task.jobs.as_slice(), + [WorkflowActivationJob { + variant: Some(workflow_activation_job::Variant::QueryWorkflow(q)), + }] => q + ); + core.complete_workflow_activation(WorkflowActivationCompletion::from_cmd( + task.run_id, + query_ok(LEGACY_QUERY_ID.to_string(), "hi"), + )) + .await + .unwrap(); + + core.shutdown().await; +} + #[tokio::test] async fn new_query_fail() { let wfid = "fake_wf_id"; diff --git a/crates/sdk-core/src/worker/workflow/mod.rs b/crates/sdk-core/src/worker/workflow/mod.rs index 796a98df4..4c3ab10dc 100644 --- a/crates/sdk-core/src/worker/workflow/mod.rs +++ b/crates/sdk-core/src/worker/workflow/mod.rs @@ -851,11 +851,40 @@ impl Workflows { /// Wraps responding to legacy queries. Handles ignore-able failures. async fn respond_legacy_query(&self, tt: TaskToken, res: LegacyQueryResult) { - match self.client.respond_legacy_query(tt, res).await { + match self.client.respond_legacy_query(tt.clone(), res).await { Ok(_) => {} Err(e) if e.code() == tonic::Code::NotFound => { warn!(error=?e, "Query not found when attempting to respond to it"); } + Err(e) if e.metadata().contains_key(MESSAGE_TOO_LARGE_KEY) => { + warn!(error=%e, "Query response too large, responding with failure"); + let failure = Failure { + failure: Some( + temporalio_common::protos::temporal::api::failure::v1::Failure { + message: "GRPC Message too large".to_string(), + failure_info: Some(FailureInfo::ApplicationFailureInfo( + ApplicationFailureInfo { + r#type: "GrpcMessageTooLarge".to_string(), + non_retryable: true, + ..Default::default() + }, + )), + ..Default::default() + }, + ), + force_cause: 0, + }; + if let Err(e2) = self + .client + .respond_legacy_query(tt, LegacyQueryResult::Failed(failure)) + .await + { + warn!( + error=%e2, + "Failed to send query failure response after message-too-large" + ); + } + } Err(e) => { warn!(error= %e, "Network error while responding to legacy query"); } diff --git a/crates/sdk-core/tests/integ_tests/update_tests.rs b/crates/sdk-core/tests/integ_tests/update_tests.rs index 61e7d95bf..406f42508 100644 --- a/crates/sdk-core/tests/integ_tests/update_tests.rs +++ b/crates/sdk-core/tests/integ_tests/update_tests.rs @@ -1168,7 +1168,7 @@ async fn worker_restarted_in_middle_of_update() { // Allow it to start again, the second time BARR.wait().await; // Poke the workflow off the sticky queue to get it to complete faster than WFT timeout - WorkflowService::reset_sticky_task_queue( + let _ = WorkflowService::reset_sticky_task_queue( &mut client.clone(), ResetStickyTaskQueueRequest { namespace: client.namespace(), @@ -1179,8 +1179,7 @@ async fn worker_restarted_in_middle_of_update() { } .into_request(), ) - .await - .unwrap(); + .await; }; let run = async { // This run attempt will get shut down