Skip to content

Commit 53914bc

Browse files
authored
Merge branch 'master' into shutdown-rpc-initiate-shutdown
2 parents 5863db7 + 00d3888 commit 53914bc

86 files changed

Lines changed: 3660 additions & 82 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.
File renamed without changes.

.github/workflows/per-pr.yml

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ jobs:
4646
test:
4747
name: Unit Tests
4848
# Give extra time to ensure pushes to main have time to populate the cache
49-
timeout-minutes: ${{ github.ref == 'refs/heads/master' && 20 || 15 }}
49+
timeout-minutes: ${{ matrix.timeoutMinutes || (github.ref == 'refs/heads/master' && 20 || 15) }}
5050
strategy:
5151
fail-fast: false
5252
matrix:
@@ -60,6 +60,7 @@ jobs:
6060
runsOn: macos-14
6161
- os: macos-intel
6262
runsOn: macos-15-intel
63+
timeoutMinutes: 20
6364
runs-on: ${{ matrix.runsOn || matrix.os }}
6465
steps:
6566
- uses: actions/checkout@v4
@@ -240,6 +241,42 @@ jobs:
240241
save-if: ${{ github.ref == 'refs/heads/master' }}
241242
- run: cargo integ-test docker_
242243

244+
examples:
245+
name: "Build and run examples"
246+
timeout-minutes: 15
247+
runs-on: ubuntu-latest
248+
steps:
249+
- uses: actions/checkout@v4
250+
- uses: dtolnay/rust-toolchain@stable
251+
- name: Install protoc
252+
uses: arduino/setup-protoc@v3
253+
with:
254+
version: "23.x"
255+
repo-token: ${{ secrets.GITHUB_TOKEN }}
256+
- uses: Swatinem/rust-cache@v2
257+
with:
258+
save-if: ${{ github.ref == 'refs/heads/master' }}
259+
- name: Build examples
260+
run: cargo build --examples -p temporalio-sdk --features examples
261+
- name: Install Temporal CLI
262+
uses: temporalio/setup-temporal@v0
263+
- name: Start Temporal dev server
264+
run: |
265+
temporal server start-dev --headless &
266+
sleep 5
267+
temporal operator search-attribute create --name CustomKeywordField --type Keyword
268+
temporal operator search-attribute create --name CustomIntField --type Int
269+
- name: Run examples
270+
run: |
271+
for dir in crates/sdk/examples/*/; do
272+
sample="$(basename "$dir" | tr '_' '-')"
273+
cargo run -p temporalio-sdk --features examples --example "${sample}-worker" &
274+
WORKER_PID=$!
275+
timeout 20 cargo run -p temporalio-sdk --features examples --example "${sample}-starter"
276+
kill $WORKER_PID 2>/dev/null || true
277+
wait $WORKER_PID 2>/dev/null || true
278+
done
279+
243280
c-bridge-static-link-test:
244281
name: "C bridge static link test"
245282
runs-on: ubuntu-latest

ARCHITECTURE.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ If you're newer to SDKs in general, first check out the [SDKs Intro](./arch_docs
88
The below diagram depicts how Core-based SDKs are split into two parts. The `sdk-core` common code,
99
which is written in Rust, and a `sdk-lang` package specific to the language the user is writing
1010
their workflow/activity in. For example a user writing workflows in Rust would be pulling in (at
11-
least) two crates - `temporal-sdk-core` and `temporal-sdk-rust`.
11+
least) two crates - `temporalio-sdk-core` and `temporalio-sdk`.
1212

1313
![Arch Diagram](https://lucid.app/publicSegments/view/7872bb33-d2b9-4b90-8aa1-bac111136aa5/image.png)
1414

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ prost = "0.14"
3232
prost-types = { version = "0.7", package = "prost-wkt-types" }
3333
pbjson = "0.9"
3434
pbjson-build = "0.9"
35+
serde_json = "1.0"
3536

3637
[workspace.lints.rust]
3738
unreachable_pub = "warn"

crates/client/src/lib.rs

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ use temporalio_common::{
7272
HasWorkflowDefinition,
7373
data_converters::{DataConverter, SerializationContextData},
7474
protos::{
75-
coresdk::IntoPayloadsExt,
75+
coresdk::{AsJsonPayloadExt, IntoPayloadsExt},
7676
grpc::health::v1::health_client::HealthClient,
7777
proto_ts_to_system_time,
7878
temporal::api::{
@@ -81,6 +81,7 @@ use temporalio_common::{
8181
enums::v1::{TaskQueueKind, WorkflowExecutionStatus},
8282
errordetails::v1::WorkflowExecutionAlreadyStartedFailure,
8383
operatorservice::v1::operator_service_client::OperatorServiceClient,
84+
sdk::v1::UserMetadata,
8485
taskqueue::v1::TaskQueue,
8586
testservice::v1::test_service_client::TestServiceClient,
8687
workflow::v1 as workflow,
@@ -1030,6 +1031,22 @@ where
10301031
let workflow_id = options.workflow_id.clone();
10311032
let task_queue_name = options.task_queue.clone();
10321033

1034+
let user_metadata = if options.static_summary.is_some() || options.static_details.is_some()
1035+
{
1036+
Some(UserMetadata {
1037+
summary: options.static_summary.map(|s| {
1038+
s.as_json_payload()
1039+
.expect("String-to-JSON payload serialization is infallible")
1040+
}),
1041+
details: options.static_details.map(|s| {
1042+
s.as_json_payload()
1043+
.expect("String-to-JSON payload serialization is infallible")
1044+
}),
1045+
})
1046+
} else {
1047+
None
1048+
};
1049+
10331050
let run_id = if let Some(start_signal) = options.start_signal {
10341051
// Use signal-with-start when a start_signal is provided
10351052
let res = WorkflowService::signal_with_start_workflow_execution(
@@ -1060,6 +1077,7 @@ where
10601077
search_attributes: options.search_attributes.map(|d| d.into()),
10611078
cron_schedule: options.cron_schedule.unwrap_or_default(),
10621079
header: options.header.or(start_signal.header),
1080+
user_metadata,
10631081
..Default::default()
10641082
}
10651083
.into_request(),
@@ -1100,6 +1118,7 @@ where
11001118
completion_callbacks: options.completion_callbacks,
11011119
priority: Some(options.priority.into()),
11021120
header: options.header,
1121+
user_metadata,
11031122
..Default::default()
11041123
}
11051124
.into_request(),

crates/client/src/options_structs.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,12 @@ pub struct WorkflowStartOptions {
238238

239239
/// Headers to include with the start request.
240240
pub header: Option<Header>,
241+
242+
/// Single-line static summary for the workflow, shown in the Temporal UI.
243+
pub static_summary: Option<String>,
244+
245+
/// Multi-line static details for the workflow, shown in the Temporal UI.
246+
pub static_details: Option<String>,
241247
}
242248

243249
/// A signal to send atomically when starting a workflow.

crates/client/src/proxy.rs

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ impl HttpConnectProxyOptions {
5252
&self,
5353
uri: tonic::transport::Uri,
5454
) -> anyhow::Result<hyper::upgrade::Upgraded> {
55+
let uri = ensure_connect_authority_port(uri);
5556
debug!("Connecting to {} via proxy at {}", uri, self.target_addr);
5657
// Create CONNECT request
5758
let mut req_build = hyper::Request::builder().method("CONNECT").uri(uri);
@@ -207,3 +208,108 @@ impl Connection for ProxyStream {
207208
}
208209
}
209210
}
211+
212+
/// Ensure the URI authority includes an explicit port so that hyper emits a
213+
/// RFC 9110-compliant CONNECT request-target (authority-form requires host:port).
214+
fn ensure_connect_authority_port(uri: tonic::transport::Uri) -> tonic::transport::Uri {
215+
if uri.port().is_some() {
216+
return uri;
217+
}
218+
let port = match uri.scheme_str() {
219+
Some("https") => 443,
220+
Some("http") => 80,
221+
_ => return uri,
222+
};
223+
let mut parts = uri.into_parts();
224+
if let Some(ref authority) = parts.authority
225+
&& let Ok(new_auth) = format!("{}:{}", authority.host(), port).parse()
226+
{
227+
parts.authority = Some(new_auth);
228+
}
229+
tonic::transport::Uri::from_parts(parts).expect("adding port to valid URI should not fail")
230+
}
231+
232+
#[cfg(test)]
233+
mod tests {
234+
use super::*;
235+
use tokio::{
236+
io::{AsyncBufReadExt, AsyncWriteExt, BufReader},
237+
net::TcpListener,
238+
};
239+
240+
struct CapturedConnect {
241+
request_line: String,
242+
headers: Vec<String>,
243+
}
244+
245+
async fn mock_proxy() -> (String, tokio::task::JoinHandle<CapturedConnect>) {
246+
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
247+
let addr = listener.local_addr().unwrap().to_string();
248+
let handle = tokio::spawn(async move {
249+
let (stream, _) = listener.accept().await.unwrap();
250+
let mut reader = BufReader::new(stream);
251+
let mut request_line = String::new();
252+
reader.read_line(&mut request_line).await.unwrap();
253+
let mut headers = Vec::new();
254+
loop {
255+
let mut line = String::new();
256+
reader.read_line(&mut line).await.unwrap();
257+
if line == "\r\n" {
258+
break;
259+
}
260+
headers.push(line.trim_end().to_string());
261+
}
262+
reader
263+
.into_inner()
264+
.write_all(b"HTTP/1.1 200 OK\r\n\r\n")
265+
.await
266+
.unwrap();
267+
CapturedConnect {
268+
request_line,
269+
headers,
270+
}
271+
});
272+
(addr, handle)
273+
}
274+
275+
#[rstest::rstest]
276+
#[case("https://example.com/some/path", "CONNECT example.com:443 HTTP/1.1")]
277+
#[case("http://example.com", "CONNECT example.com:80 HTTP/1.1")]
278+
#[case("https://example.com:7233", "CONNECT example.com:7233 HTTP/1.1")]
279+
#[tokio::test]
280+
async fn connect_request_line(#[case] uri: &str, #[case] expected: &str) {
281+
let (proxy_addr, handle) = mock_proxy().await;
282+
let opts = HttpConnectProxyOptions {
283+
target_addr: proxy_addr,
284+
basic_auth: None,
285+
};
286+
let uri: tonic::transport::Uri = uri.parse().unwrap();
287+
let _ = opts.connect(uri).await;
288+
289+
let captured = handle.await.unwrap();
290+
assert_eq!(captured.request_line.trim(), expected);
291+
}
292+
293+
#[tokio::test]
294+
async fn connect_includes_basic_auth() {
295+
let (proxy_addr, handle) = mock_proxy().await;
296+
let opts = HttpConnectProxyOptions {
297+
target_addr: proxy_addr,
298+
basic_auth: Some(("user".to_string(), "pass".to_string())),
299+
};
300+
let uri: tonic::transport::Uri = "https://example.com:7233".parse().unwrap();
301+
let _ = opts.connect(uri).await;
302+
303+
let captured = handle.await.unwrap();
304+
let creds = BASE64_STANDARD.encode("user:pass");
305+
let auth_header = captured
306+
.headers
307+
.iter()
308+
.find(|h| h.to_lowercase().starts_with("proxy-authorization:"))
309+
.expect("missing proxy-authorization header");
310+
assert_eq!(
311+
auth_header.trim(),
312+
format!("proxy-authorization: Basic {creds}")
313+
);
314+
}
315+
}

0 commit comments

Comments
 (0)