Skip to content

Commit e65b427

Browse files
committed
Add User-Agent, fetch context headers, and Web API tests
- Set default User-Agent (openworkers-runner/{version}) on reqwest client - Inject x-fetch-worker and x-fetch-tenant-id on outgoing requests - Disable redirect auto-follow for worker fetch - Add URL, URLSearchParams, btoa/atob integration tests - Polish adaptive span processor and telemetry - Use runtime-v8 v0.13.1 from remote - Bump to v0.13.2
1 parent 6e53c5e commit e65b427

7 files changed

Lines changed: 473 additions & 25 deletions

File tree

Cargo.lock

Lines changed: 4 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "openworkers-runner"
3-
version = "0.13.1"
3+
version = "0.13.2"
44
edition = "2024"
55
license = "MIT"
66
default-run = "openworkers-runner"
@@ -58,7 +58,7 @@ reqwest = { version = "0.13.1", default-features = false, features = ["rustls",
5858
openworkers-core = { git = "https://github.com/openworkers/openworkers-core", tag = "v0.13.0", features = ["hyper"] }
5959

6060
# Runtime backend (v8 only for now, others require older version of core)
61-
openworkers-runtime-v8 = { git = "https://github.com/openworkers/openworkers-runtime-v8", tag = "v0.13.0", optional = true, features = ["ptrcomp"] }
61+
openworkers-runtime-v8 = { git = "https://github.com/openworkers/openworkers-runtime-v8", tag = "v0.13.1", optional = true, features = ["ptrcomp"] }
6262

6363
# WASM runtime (optional)
6464
# openworkers-runtime-wasm = { path = "../openworkers-runtime-wasm", optional = true }

src/adaptive_span_processor.rs

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,12 @@
2626
//! Traffic is measured with exponential decay (60s half-life) to create
2727
//! a sliding window effect.
2828
29-
use opentelemetry::trace::TraceId;
30-
use opentelemetry_sdk::export::trace::SpanData;
31-
use opentelemetry_sdk::trace::{Context, SpanProcessor};
29+
use opentelemetry::Context;
30+
use opentelemetry_sdk::error::OTelSdkResult;
31+
use opentelemetry_sdk::trace::{SpanData, SpanProcessor};
3232
use std::collections::HashMap;
3333
use std::sync::{Arc, RwLock};
34-
use std::time::Instant;
34+
use std::time::{Duration, Instant};
3535

3636
/// Decay half-life for traffic measurement (seconds)
3737
const DECAY_HALF_LIFE: f64 = 60.0;
@@ -40,17 +40,20 @@ const DECAY_HALF_LIFE: f64 = 60.0;
4040
const TRAFFIC_THRESHOLD: f64 = 10.0;
4141

4242
/// Span processor that applies adaptive sampling at export time
43+
#[derive(Debug)]
4344
pub struct AdaptiveSpanProcessor<T: SpanProcessor> {
4445
inner: T,
4546
state: Arc<RwLock<ProcessorState>>,
4647
min_rate: f64,
4748
max_rate: f64,
4849
}
4950

51+
#[derive(Debug)]
5052
struct ProcessorState {
5153
worker_counts: HashMap<String, WorkerStats>,
5254
}
5355

56+
#[derive(Debug)]
5457
struct WorkerStats {
5558
count: f64,
5659
last_update: Instant,
@@ -75,7 +78,7 @@ impl<T: SpanProcessor> AdaptiveSpanProcessor<T> {
7578
.iter()
7679
.find(|kv| kv.key.as_str() == "worker_id")
7780
.map(|kv| kv.value.to_string())
78-
.filter(|id| !id.is_empty() && id != "Empty");
81+
.filter(|id: &String| !id.is_empty() && id != "Empty");
7982

8083
// Always export spans without worker_id (system spans, etc.)
8184
let worker_id = match worker_id {
@@ -122,8 +125,7 @@ impl<T: SpanProcessor> AdaptiveSpanProcessor<T> {
122125
stats.count += 1.0;
123126

124127
// Asymptotic sampling rate
125-
self.min_rate
126-
+ (self.max_rate - self.min_rate) / (1.0 + stats.count / TRAFFIC_THRESHOLD)
128+
self.min_rate + (self.max_rate - self.min_rate) / (1.0 + stats.count / TRAFFIC_THRESHOLD)
127129
}
128130
}
129131

@@ -138,11 +140,11 @@ impl<T: SpanProcessor> SpanProcessor for AdaptiveSpanProcessor<T> {
138140
}
139141
}
140142

141-
fn force_flush(&self) -> opentelemetry::trace::TraceResult<()> {
143+
fn force_flush(&self) -> OTelSdkResult {
142144
self.inner.force_flush()
143145
}
144146

145-
fn shutdown(&self) -> opentelemetry::trace::TraceResult<()> {
146-
self.inner.shutdown()
147+
fn shutdown_with_timeout(&self, timeout: Duration) -> OTelSdkResult {
148+
self.inner.shutdown_with_timeout(timeout)
147149
}
148150
}

src/ops.rs

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,9 @@ use tracing::Instrument;
4242
use crate::limiter::BindingLimiters;
4343
#[cfg(feature = "database")]
4444
use crate::services::database::{self as db_service, QueryMode};
45-
use crate::services::fetch::{do_fetch, generate_request_id, try_internal_worker_route};
45+
use crate::services::fetch::{
46+
FetchContext, do_fetch, generate_request_id, try_internal_worker_route,
47+
};
4648
use crate::services::kv as kv_service;
4749
use crate::services::storage::{build_s3_url, execute_s3_operation, sign_s3_request};
4850
use crate::store::{Binding, DatabaseConfig, KvConfig, StorageConfig, WorkerBindingConfig};
@@ -241,7 +243,11 @@ impl RunnerOperations {
241243
&config.bucket,
242244
)?;
243245

244-
do_fetch(auth_request, &self.stats, Some(&signed_headers)).await
246+
let ctx = FetchContext {
247+
worker_id: self.worker_id.as_deref(),
248+
user_id: self.user_id.as_deref(),
249+
};
250+
do_fetch(auth_request, &self.stats, Some(&signed_headers), Some(&ctx)).await
245251
}
246252
.instrument(span),
247253
)
@@ -284,16 +290,21 @@ impl OperationsHandler for RunnerOperations {
284290
self.worker_id
285291
);
286292

293+
let ctx = FetchContext {
294+
worker_id: self.worker_id.as_deref(),
295+
user_id: self.user_id.as_deref(),
296+
};
297+
287298
// Check if this is an internal worker URL that should be routed directly
288299
if let Some(internal_request) = try_internal_worker_route(&request) {
289300
tracing::debug!(
290301
"[ops] fetch shortcut: {} -> internal routing via x-worker-name",
291302
request.url
292303
);
293-
return do_fetch(internal_request, &self.stats, None).await;
304+
return do_fetch(internal_request, &self.stats, None, Some(&ctx)).await;
294305
}
295306

296-
do_fetch(request, &self.stats, None).await
307+
do_fetch(request, &self.stats, None, Some(&ctx)).await
297308
}
298309
.instrument(span),
299310
)
@@ -617,7 +628,7 @@ impl OperationsHandler for RunnerOperations {
617628
};
618629

619630
// Execute the request through the runner
620-
do_fetch(internal_request, &OperationsStats::new(), None).await
631+
do_fetch(internal_request, &OperationsStats::new(), None, None).await
621632
}
622633
.instrument(span),
623634
)

src/services/fetch.rs

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,10 +57,12 @@ thread_local! {
5757
.unwrap_or(100);
5858

5959
reqwest::Client::builder()
60+
.user_agent(format!("openworkers-runner/{}", env!("CARGO_PKG_VERSION")))
6061
.pool_max_idle_per_host(pool_size)
6162
.pool_idle_timeout(Duration::from_secs(90))
6263
.connect_timeout(Duration::from_secs(5))
6364
.timeout(Duration::from_secs(30))
65+
.redirect(reqwest::redirect::Policy::none())
6466
.build()
6567
.expect("Failed to create HTTP client")
6668
});
@@ -109,6 +111,12 @@ pub fn try_internal_worker_route(request: &HttpRequest) -> Option<HttpRequest> {
109111
})
110112
}
111113

114+
/// Optional worker context injected as headers on outgoing fetch requests.
115+
pub struct FetchContext<'a> {
116+
pub worker_id: Option<&'a str>,
117+
pub user_id: Option<&'a str>,
118+
}
119+
112120
/// Execute an HTTP request with streaming response.
113121
///
114122
/// This is the core HTTP function used by both direct fetch and binding fetches.
@@ -117,6 +125,7 @@ pub async fn do_fetch(
117125
request: HttpRequest,
118126
stats: &OperationsStats,
119127
extra_headers: Option<&HashMap<String, String>>,
128+
ctx: Option<&FetchContext<'_>>,
120129
) -> Result<HttpResponse, String> {
121130
use std::sync::atomic::Ordering;
122131

@@ -142,13 +151,24 @@ pub async fn do_fetch(
142151
req_builder = req_builder.header(key, value);
143152
}
144153

145-
// Add extra headers if provided
154+
// Add extra headers if provided (e.g., AWS signatures)
146155
if let Some(headers) = extra_headers {
147156
for (key, value) in headers {
148157
req_builder = req_builder.header(key, value);
149158
}
150159
}
151160

161+
// Add worker context headers for observability
162+
if let Some(ctx) = ctx {
163+
if let Some(worker_id) = ctx.worker_id {
164+
req_builder = req_builder.header("x-fetch-worker", worker_id);
165+
}
166+
167+
if let Some(user_id) = ctx.user_id {
168+
req_builder = req_builder.header("x-fetch-tenant-id", user_id);
169+
}
170+
}
171+
152172
// Add body if present
153173
if let RequestBody::Bytes(body) = &request.body {
154174
stats

src/telemetry.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -126,14 +126,15 @@ fn init_otel() -> Result<(), Box<dyn std::error::Error>> {
126126
let span_exporter = span_builder.build()?;
127127

128128
// Create batch processor
129-
let batch_processor = opentelemetry_sdk::trace::BatchSpanProcessor::builder(span_exporter, opentelemetry_sdk::runtime::Tokio).build();
129+
let batch_processor =
130+
opentelemetry_sdk::trace::BatchSpanProcessor::builder(span_exporter).build();
130131

131132
// Wrap with adaptive processor for tail-based sampling
132133
// This filters spans at export time based on worker traffic
133134
let adaptive_processor = crate::adaptive_span_processor::AdaptiveSpanProcessor::new(
134135
batch_processor,
135-
0.01, // min_rate: 1% for high-traffic workers
136-
1.0, // max_rate: 100% for low-traffic workers
136+
0.01, // min_rate: 1% for high-traffic workers
137+
1.0, // max_rate: 100% for low-traffic workers
137138
);
138139

139140
// Create tracer provider

0 commit comments

Comments
 (0)