Skip to content

Commit f8a7624

Browse files
committed
feat(ext/cache): inject W3C traceparent/tracestate into LSC HTTP requests
Extract the active trace context on the JS side using the telemetry propagators (ContextManager.active() + PROPAGATORS.inject()) and pass the traceparent and tracestate values as optional string fields through the cache op calls (op_cache_put, op_cache_match, op_cache_delete). On the Rust side, CacheShard::get_object, put_object, and put_object_empty now accept an optional trace_headers parameter and inject the corresponding HTTP headers into outgoing LSC requests, enabling distributed trace propagation through the cache layer.
1 parent 0895897 commit f8a7624

File tree

6 files changed

+99
-10
lines changed

6 files changed

+99
-10
lines changed

ext/cache/01_cache.js

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,34 @@ import {
1111
const {
1212
ArrayPrototypePush,
1313
ObjectPrototypeIsPrototypeOf,
14+
SafeArrayIterator,
1415
StringPrototypeSplit,
1516
StringPrototypeTrim,
1617
Symbol,
1718
SymbolFor,
1819
TypeError,
1920
} = primordials;
2021

22+
import {
23+
ContextManager,
24+
PROPAGATORS,
25+
TRACING_ENABLED,
26+
} from "ext:deno_telemetry/telemetry.ts";
27+
28+
function getTraceHeaders() {
29+
if (!TRACING_ENABLED) return { __proto__: null };
30+
const headers = { __proto__: null };
31+
const context = ContextManager.active();
32+
for (const propagator of new SafeArrayIterator(PROPAGATORS)) {
33+
propagator.inject(context, headers, {
34+
set(carrier, key, value) {
35+
carrier[key] = value;
36+
},
37+
});
38+
}
39+
return headers;
40+
}
41+
2142
import * as webidl from "ext:deno_webidl/00_webidl.js";
2243
import {
2344
Request,
@@ -152,6 +173,7 @@ class Cache {
152173

153174
// Step 9-11.
154175
// Step 12-19: TODO(@satyarohith): do the insertion in background.
176+
const putTraceHeaders = getTraceHeaders();
155177
await op_cache_put(
156178
{
157179
cacheId: this[_id],
@@ -162,6 +184,8 @@ class Cache {
162184
responseStatus: innerResponse.status,
163185
responseStatusText: innerResponse.statusMessage,
164186
responseRid: rid,
187+
traceparent: putTraceHeaders["traceparent"],
188+
tracestate: putTraceHeaders["tracestate"],
165189
},
166190
);
167191
}
@@ -208,9 +232,12 @@ class Cache {
208232
) {
209233
r = new Request(request);
210234
}
235+
const deleteTraceHeaders = getTraceHeaders();
211236
return await op_cache_delete({
212237
cacheId: this[_id],
213238
requestUrl: r.url,
239+
traceparent: deleteTraceHeaders["traceparent"],
240+
tracestate: deleteTraceHeaders["tracestate"],
214241
});
215242
}
216243

@@ -252,12 +279,15 @@ class Cache {
252279
const url = new URL(r.url);
253280
url.hash = "";
254281
const innerRequest = toInnerRequest(r);
282+
const matchTraceHeaders = getTraceHeaders();
255283
const matchResult = await op_cache_match(
256284
{
257285
cacheId: this[_id],
258286
// deno-lint-ignore prefer-primordials
259287
requestUrl: url.toString(),
260288
requestHeaders: innerRequest.headerList,
289+
traceparent: matchTraceHeaders["traceparent"],
290+
tracestate: matchTraceHeaders["tracestate"],
261291
},
262292
);
263293
if (matchResult) {

ext/cache/lib.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ pub enum CacheError {
9898
pub struct CreateCache(pub Arc<dyn Fn() -> Result<CacheImpl, CacheError>>);
9999

100100
deno_core::extension!(deno_cache,
101-
deps = [ deno_webidl, deno_web, deno_fetch ],
101+
deps = [ deno_webidl, deno_web, deno_fetch, deno_telemetry ],
102102
ops = [
103103
op_cache_storage_open,
104104
op_cache_storage_has,
@@ -128,6 +128,8 @@ pub struct CachePutRequest {
128128
pub response_status: u16,
129129
pub response_status_text: String,
130130
pub response_rid: Option<ResourceId>,
131+
pub traceparent: Option<String>,
132+
pub tracestate: Option<String>,
131133
}
132134

133135
#[derive(Deserialize, Serialize, Debug)]
@@ -136,6 +138,8 @@ pub struct CacheMatchRequest {
136138
pub cache_id: i64,
137139
pub request_url: String,
138140
pub request_headers: Vec<(ByteString, ByteString)>,
141+
pub traceparent: Option<String>,
142+
pub tracestate: Option<String>,
139143
}
140144

141145
#[derive(Debug, Deserialize, Serialize)]
@@ -156,6 +160,8 @@ pub struct CacheMatchResponseMeta {
156160
pub struct CacheDeleteRequest {
157161
pub cache_id: i64,
158162
pub request_url: String,
163+
pub traceparent: Option<String>,
164+
pub tracestate: Option<String>,
159165
}
160166

161167
#[async_trait(?Send)]

ext/cache/lsc_shard.rs

Lines changed: 39 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,14 @@ use hyper::HeaderMap;
1414
use hyper::StatusCode;
1515
use hyper::body::Incoming;
1616
use hyper::header::AUTHORIZATION;
17+
use hyper::header::HeaderValue;
1718
use hyper_util::client::legacy::Client;
1819
use hyper_util::client::legacy::connect::HttpConnector;
1920
use hyper_util::rt::tokio::TokioExecutor;
2021

2122
use crate::CacheError;
2223

23-
type ClientBody =
24+
pub type ClientBody =
2425
Either<UnsyncBoxBody<Bytes, CacheError>, UnsyncBoxBody<Bytes, Infallible>>;
2526

2627
pub struct CacheShard {
@@ -44,16 +45,28 @@ impl CacheShard {
4445
pub async fn get_object(
4546
&self,
4647
object_key: &str,
48+
trace_headers: Option<(&str, Option<&str>)>,
4749
) -> Result<Option<Response<Incoming>>, CacheError> {
4850
let body = Either::Right(UnsyncBoxBody::new(Empty::new()));
49-
let req = Request::builder()
51+
let mut req = Request::builder()
5052
.method(Method::GET)
5153
.uri(format!("{}/objects/{}", self.endpoint, object_key))
5254
.header(&AUTHORIZATION, format!("Bearer {}", self.token))
5355
.header("x-ryw", "1")
5456
.body(body)
5557
.unwrap();
5658

59+
if let Some((traceparent, tracestate)) = trace_headers {
60+
req
61+
.headers_mut()
62+
.insert("traceparent", HeaderValue::from_str(traceparent)?);
63+
if let Some(tracestate) = tracestate {
64+
req
65+
.headers_mut()
66+
.insert("tracestate", HeaderValue::from_str(tracestate)?);
67+
}
68+
}
69+
5770
let res = self.client.request(req).await?;
5871

5972
if res.status().is_success() {
@@ -72,6 +85,7 @@ impl CacheShard {
7285
&self,
7386
object_key: &str,
7487
headers: HeaderMap,
88+
trace_headers: Option<(&str, Option<&str>)>,
7589
) -> Result<(), CacheError> {
7690
let body = Either::Right(UnsyncBoxBody::new(Empty::new()));
7791
let mut builder = Request::builder()
@@ -83,7 +97,17 @@ impl CacheShard {
8397
builder = builder.header(key, val)
8498
}
8599

86-
let req = builder.body(body).unwrap();
100+
let mut req = builder.body(body).unwrap();
101+
if let Some((traceparent, tracestate)) = trace_headers {
102+
req
103+
.headers_mut()
104+
.insert("traceparent", HeaderValue::from_str(traceparent)?);
105+
if let Some(tracestate) = tracestate {
106+
req
107+
.headers_mut()
108+
.insert("tracestate", HeaderValue::from_str(tracestate)?);
109+
}
110+
}
87111

88112
let res = self.client.request(req).await?;
89113

@@ -107,6 +131,7 @@ impl CacheShard {
107131
object_key: &str,
108132
headers: HeaderMap,
109133
body: UnsyncBoxBody<Bytes, CacheError>,
134+
trace_headers: Option<(&str, Option<&str>)>,
110135
) -> Result<(), CacheError> {
111136
let mut builder = Request::builder()
112137
.method(Method::PUT)
@@ -117,7 +142,17 @@ impl CacheShard {
117142
builder = builder.header(key, val)
118143
}
119144

120-
let req = builder.body(Either::Left(body)).unwrap();
145+
let mut req = builder.body(Either::Left(body)).unwrap();
146+
if let Some((traceparent, tracestate)) = trace_headers {
147+
req
148+
.headers_mut()
149+
.insert("traceparent", HeaderValue::from_str(traceparent)?);
150+
if let Some(tracestate) = tracestate {
151+
req
152+
.headers_mut()
153+
.insert("tracestate", HeaderValue::from_str(tracestate)?);
154+
}
155+
}
121156

122157
let res = self.client.request(req).await?;
123158

ext/cache/lscache.rs

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,13 @@ impl LscBackend {
144144
body_rx.into_stream().map_ok(http_body::Frame::data),
145145
);
146146
let body = UnsyncBoxBody::new(body);
147-
shard.put_object(&object_key, headers, body).await?;
147+
let trace_headers = request_response
148+
.traceparent
149+
.as_deref()
150+
.map(|tp| (tp, request_response.tracestate.as_deref()));
151+
shard
152+
.put_object(&object_key, headers, body, trace_headers)
153+
.await?;
148154
Ok(())
149155
}
150156

@@ -171,7 +177,11 @@ impl LscBackend {
171177
cache_name.as_bytes(),
172178
request.request_url.as_bytes(),
173179
);
174-
let Some(res) = shard.get_object(&object_key).await? else {
180+
let trace_headers = request
181+
.traceparent
182+
.as_deref()
183+
.map(|tp| (tp, request.tracestate.as_deref()));
184+
let Some(res) = shard.get_object(&object_key, trace_headers).await? else {
175185
return Ok(None);
176186
};
177187

@@ -285,7 +295,13 @@ impl LscBackend {
285295
.as_bytes(),
286296
)?,
287297
);
288-
shard.put_object_empty(&object_key, headers).await?;
298+
let trace_headers = request
299+
.traceparent
300+
.as_deref()
301+
.map(|tp| (tp, request.tracestate.as_deref()));
302+
shard
303+
.put_object_empty(&object_key, headers, trace_headers)
304+
.await?;
289305
Ok(true)
290306
}
291307
}

ext/cache/sqlite.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -316,6 +316,8 @@ impl SqliteBackedCache {
316316
.delete(CacheDeleteRequest {
317317
cache_id: request.cache_id,
318318
request_url: request.request_url,
319+
traceparent: None,
320+
tracestate: None,
319321
})
320322
.await;
321323
return Ok(None);

runtime/worker.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -377,8 +377,8 @@ impl MainWorker {
377377
let token = elems[1];
378378
use deno_cache::CacheShard;
379379

380-
let shard =
381-
Rc::new(CacheShard::new(endpoint.to_string(), token.to_string()));
380+
let shard = CacheShard::new(endpoint.to_string(), token.to_string());
381+
let shard = Rc::new(shard);
382382
let create_cache_fn = move || {
383383
let x = deno_cache::LscBackend::default();
384384
x.set_shard(shard.clone());

0 commit comments

Comments
 (0)