Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions core/src/worker/nexus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,11 @@ use temporal_sdk_core_protos::{
CancelNexusTask, NexusTask, NexusTaskCancelReason, nexus_task, nexus_task_completion,
},
},
temporal::api::nexus::v1::{request::Variant, response, start_operation_response},
temporal::api::nexus::{
self,
v1::{request::Variant, response, start_operation_response},
},
utilities::normalize_http_headers,
};
use tokio::{
join,
Expand All @@ -42,7 +46,7 @@ use tokio::{
use tokio_stream::wrappers::UnboundedReceiverStream;
use tokio_util::sync::CancellationToken;

static REQUEST_TIMEOUT_HEADER: &str = "Request-Timeout";
static REQUEST_TIMEOUT_HEADER: &str = "request-timeout";

/// Centralizes all state related to received nexus tasks
pub(super) struct NexusManager {
Expand Down Expand Up @@ -245,11 +249,18 @@ where
.filter_map(move |t| {
let res = match t {
TaskStreamInput::Poll(t) => match *t {
Ok(t) => {
Ok(mut t) => {
if let Some(dur) = t.resp.sched_to_start() {
self.metrics.nexus_task_sched_to_start_latency(dur);
};

if let Some(ref mut req) = t.resp.request {
req.header = normalize_http_headers(std::mem::take(&mut req.header));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we std::mem::take here? Just for safety, to make sure that memory is cleared out?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just accept a mutable hash map reference in the normalize helper?

Copy link
Copy Markdown
Member Author

@Sushisource Sushisource Sep 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It allows me to own the old map, therefore I can call into_iter and avoid copying all the values

if let Some(nexus::v1::request::Variant::StartOperation(ref mut sor)) = req.variant {
sor.callback_header = normalize_http_headers(std::mem::take(&mut sor.callback_header));
}
}

let tt = TaskToken(t.resp.task_token.clone());
let mut timeout_task = None;
if let Some(timeout_str) = t
Expand Down
12 changes: 12 additions & 0 deletions sdk-core-protos/src/utilities.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::collections::HashMap;

use prost::{EncodeError, Message};

pub trait TryIntoOrNone<F, T> {
Expand Down Expand Up @@ -26,3 +28,13 @@ pub fn pack_any<T: Message>(
Message::encode(msg, &mut value)?;
Ok(prost_wkt_types::Any { type_url, value })
}

/// Given a header map, lowercase all the keys and return it as a new map.
/// Any keys that are duplicated after lowercasing will clobber each other in undefined ordering.
pub fn normalize_http_headers(headers: HashMap<String, String>) -> HashMap<String, String> {
let mut new_headers = HashMap::new();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it'd be better to accept a mutable map if we can, but if we must do it this way, would recommend either using an iter approach (HashMap's FromIterator just drops dupes) or at least using with_capacity instead of new.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You cannot mutate hashmap keys. with_capacity, sure, but, it's lightyears away from being a bit of perf that matters.

for (header_key, val) in headers.into_iter() {
new_headers.insert(header_key.to_lowercase(), val);
}
new_headers
}
10 changes: 10 additions & 0 deletions tests/integ_tests/workflow_tests/nexus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,16 @@ async fn nexus_async(
let client = starter.get_client().await.get_client().clone();
let nexus_task_handle = async {
let mut nt = core_worker.poll_nexus_task().await.unwrap().unwrap_task();
// Verify request header key for timeout exists and is lowercase
if outcome == Outcome::Timeout {
assert!(
nt.request
.as_ref()
.unwrap()
.header
.contains_key("request-timeout")
);
}
let start_req = assert_matches!(
nt.request.unwrap().variant.unwrap(),
request::Variant::StartOperation(sr) => sr
Expand Down