Skip to content

Commit 55b8969

Browse files
committed
Refactor worker test harness
1 parent bcc21d7 commit 55b8969

14 files changed

+1032
-740
lines changed

worker-sandbox/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ getrandom = { version = "0.2.10", features = ["js"] }
3030
hex = "0.4.3"
3131
http.workspace=true
3232
regex = "1.8.4"
33+
reqwest = { version="0.11" }
3334
serde = { version = "1.0.164", features = ["derive"] }
3435
serde_json = "1.0.96"
3536
worker = { path = "../worker", version = "0.0.21", features = ["queue", "d1"] }

worker-sandbox/src/alarm.rs

+27
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ use std::time::Duration;
22

33
use worker::*;
44

5+
use super::SomeSharedData;
6+
57
#[durable_object]
68
pub struct AlarmObject {
79
state: State,
@@ -39,3 +41,28 @@ impl DurableObject for AlarmObject {
3941
Response::ok("ALARMED")
4042
}
4143
}
44+
45+
pub async fn handle_alarm(_req: Request, ctx: RouteContext<SomeSharedData>) -> Result<Response> {
46+
let namespace = ctx.durable_object("ALARM")?;
47+
let stub = namespace.id_from_name("alarm")?.get_stub()?;
48+
// when calling fetch to a Durable Object, a full URL must be used. Alternatively, a
49+
// compatibility flag can be provided in wrangler.toml to opt-in to older behavior:
50+
// https://developers.cloudflare.com/workers/platform/compatibility-dates#durable-object-stubfetch-requires-a-full-url
51+
stub.fetch_with_str("https://fake-host/alarm").await
52+
}
53+
54+
pub async fn handle_id(_req: Request, ctx: RouteContext<SomeSharedData>) -> Result<Response> {
55+
let namespace = ctx.durable_object("COUNTER").expect("DAWJKHDAD");
56+
let stub = namespace.id_from_name("A")?.get_stub()?;
57+
// when calling fetch to a Durable Object, a full URL must be used. Alternatively, a
58+
// compatibility flag can be provided in wrangler.toml to opt-in to older behavior:
59+
// https://developers.cloudflare.com/workers/platform/compatibility-dates#durable-object-stubfetch-requires-a-full-url
60+
stub.fetch_with_str("https://fake-host/").await
61+
}
62+
63+
pub async fn handle_put_raw(req: Request, ctx: RouteContext<SomeSharedData>) -> Result<Response> {
64+
let namespace = ctx.durable_object("PUT_RAW_TEST_OBJECT")?;
65+
let id = namespace.unique_id()?;
66+
let stub = id.get_stub()?;
67+
stub.fetch_with_request(req).await
68+
}

worker-sandbox/src/cache.rs

+107
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
use super::SomeSharedData;
2+
use futures_util::stream::StreamExt;
3+
use rand::Rng;
4+
use std::time::Duration;
5+
use worker::{console_log, Cache, Date, Delay, Request, Response, Result, RouteContext};
6+
7+
pub async fn handle_cache_example(
8+
req: Request,
9+
_ctx: RouteContext<SomeSharedData>,
10+
) -> Result<Response> {
11+
console_log!("url: {}", req.url()?.to_string());
12+
let cache = Cache::default();
13+
let key = req.url()?.to_string();
14+
if let Some(resp) = cache.get(&key, true).await? {
15+
console_log!("Cache HIT!");
16+
Ok(resp)
17+
} else {
18+
console_log!("Cache MISS!");
19+
let mut resp =
20+
Response::from_json(&serde_json::json!({ "timestamp": Date::now().as_millis() }))?;
21+
22+
// Cache API respects Cache-Control headers. Setting s-max-age to 10
23+
// will limit the response to be in cache for 10 seconds max
24+
resp.headers_mut().set("cache-control", "s-maxage=10")?;
25+
cache.put(key, resp.cloned()?).await?;
26+
Ok(resp)
27+
}
28+
}
29+
30+
pub async fn handle_cache_api_get(
31+
_req: Request,
32+
ctx: RouteContext<SomeSharedData>,
33+
) -> Result<Response> {
34+
if let Some(key) = ctx.param("key") {
35+
let cache = Cache::default();
36+
if let Some(resp) = cache.get(format!("https://{key}"), true).await? {
37+
return Ok(resp);
38+
} else {
39+
return Response::ok("cache miss");
40+
}
41+
}
42+
Response::error("key missing", 400)
43+
}
44+
45+
pub async fn handle_cache_api_put(
46+
_req: Request,
47+
ctx: RouteContext<SomeSharedData>,
48+
) -> Result<Response> {
49+
if let Some(key) = ctx.param("key") {
50+
let cache = Cache::default();
51+
52+
let mut resp =
53+
Response::from_json(&serde_json::json!({ "timestamp": Date::now().as_millis() }))?;
54+
55+
// Cache API respects Cache-Control headers. Setting s-max-age to 10
56+
// will limit the response to be in cache for 10 seconds max
57+
resp.headers_mut().set("cache-control", "s-maxage=10")?;
58+
cache.put(format!("https://{key}"), resp.cloned()?).await?;
59+
return Ok(resp);
60+
}
61+
Response::error("key missing", 400)
62+
}
63+
64+
pub async fn handle_cache_api_delete(
65+
_req: Request,
66+
ctx: RouteContext<SomeSharedData>,
67+
) -> Result<Response> {
68+
if let Some(key) = ctx.param("key") {
69+
let cache = Cache::default();
70+
71+
let res = cache.delete(format!("https://{key}"), true).await?;
72+
return Response::ok(serde_json::to_string(&res)?);
73+
}
74+
Response::error("key missing", 400)
75+
}
76+
77+
pub async fn handle_cache_stream(
78+
req: Request,
79+
_ctx: RouteContext<SomeSharedData>,
80+
) -> Result<Response> {
81+
console_log!("url: {}", req.url()?.to_string());
82+
let cache = Cache::default();
83+
let key = req.url()?.to_string();
84+
if let Some(resp) = cache.get(&key, true).await? {
85+
console_log!("Cache HIT!");
86+
Ok(resp)
87+
} else {
88+
console_log!("Cache MISS!");
89+
let mut rng = rand::thread_rng();
90+
let count = rng.gen_range(0..10);
91+
let stream = futures_util::stream::repeat("Hello, world!\n")
92+
.take(count)
93+
.then(|text| async move {
94+
Delay::from(Duration::from_millis(50)).await;
95+
Result::Ok(text.as_bytes().to_vec())
96+
});
97+
98+
let mut resp = Response::from_stream(stream)?;
99+
console_log!("resp = {:?}", resp);
100+
// Cache API respects Cache-Control headers. Setting s-max-age to 10
101+
// will limit the response to be in cache for 10 seconds max
102+
resp.headers_mut().set("cache-control", "s-maxage=10")?;
103+
104+
cache.put(key, resp.cloned()?).await?;
105+
Ok(resp)
106+
}
107+
}

worker-sandbox/src/fetch.rs

+160
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
use super::{ApiData, SomeSharedData};
2+
use futures_util::future::Either;
3+
use std::time::Duration;
4+
use worker::{
5+
wasm_bindgen_futures, AbortController, Delay, Fetch, Method, Request, RequestInit, Response,
6+
Result, RouteContext,
7+
};
8+
9+
pub async fn handle_fetch(_req: Request, _ctx: RouteContext<SomeSharedData>) -> Result<Response> {
10+
let req = Request::new("https://example.com", Method::Post)?;
11+
worker::console_log!("foo");
12+
let resp = Fetch::Request(req).send().await?;
13+
worker::console_log!("bar");
14+
let resp2 = Fetch::Url("https://example.com".parse()?).send().await?;
15+
worker::console_log!("baz");
16+
Response::ok(format!(
17+
"received responses with codes {} and {}",
18+
resp.status_code(),
19+
resp2.status_code()
20+
))
21+
}
22+
23+
pub async fn handle_fetch_json(
24+
_req: Request,
25+
_ctx: RouteContext<SomeSharedData>,
26+
) -> Result<Response> {
27+
let data: ApiData = Fetch::Url(
28+
"https://jsonplaceholder.typicode.com/todos/1"
29+
.parse()
30+
.unwrap(),
31+
)
32+
.send()
33+
.await?
34+
.json()
35+
.await?;
36+
Response::ok(format!(
37+
"API Returned user: {} with title: {} and completed: {}",
38+
data.user_id, data.title, data.completed
39+
))
40+
}
41+
42+
pub async fn handle_proxy_request(
43+
_req: Request,
44+
ctx: RouteContext<SomeSharedData>,
45+
) -> Result<Response> {
46+
let url = ctx.param("url").unwrap();
47+
Fetch::Url(url.parse()?).send().await
48+
}
49+
50+
pub async fn handle_request_init_fetch(
51+
_req: Request,
52+
_ctx: RouteContext<SomeSharedData>,
53+
) -> Result<Response> {
54+
let init = RequestInit::new();
55+
Fetch::Request(Request::new_with_init("https://cloudflare.com", &init)?)
56+
.send()
57+
.await
58+
}
59+
60+
pub async fn handle_request_init_fetch_post(
61+
_req: Request,
62+
_ctx: RouteContext<SomeSharedData>,
63+
) -> Result<Response> {
64+
let mut init = RequestInit::new();
65+
init.method = Method::Post;
66+
Fetch::Request(Request::new_with_init("https://httpbin.org/post", &init)?)
67+
.send()
68+
.await
69+
}
70+
71+
pub async fn handle_cancelled_fetch(
72+
_req: Request,
73+
_ctx: RouteContext<SomeSharedData>,
74+
) -> Result<Response> {
75+
let controller = AbortController::default();
76+
let signal = controller.signal();
77+
78+
let (tx, rx) = futures_channel::oneshot::channel();
79+
80+
// Spawns a future that'll make our fetch request and not block this function.
81+
wasm_bindgen_futures::spawn_local({
82+
async move {
83+
let fetch = Fetch::Url("https://cloudflare.com".parse().unwrap());
84+
let res = fetch.send_with_signal(&signal).await;
85+
tx.send(res).unwrap();
86+
}
87+
});
88+
89+
// And then we try to abort that fetch as soon as we start it, hopefully before
90+
// cloudflare.com responds.
91+
controller.abort();
92+
93+
let res = rx.await.unwrap();
94+
let res = res.unwrap_or_else(|err| {
95+
let text = err.to_string();
96+
Response::ok(text).unwrap()
97+
});
98+
99+
Ok(res)
100+
}
101+
102+
pub async fn handle_fetch_timeout(
103+
_req: Request,
104+
_ctx: RouteContext<SomeSharedData>,
105+
) -> Result<Response> {
106+
let controller = AbortController::default();
107+
let signal = controller.signal();
108+
109+
let fetch_fut = async {
110+
let fetch = Fetch::Url("https://miniflare.mocks/delay".parse().unwrap());
111+
let mut res = fetch.send_with_signal(&signal).await?;
112+
let text = res.text().await?;
113+
Ok::<String, worker::Error>(text)
114+
};
115+
let delay_fut = async {
116+
Delay::from(Duration::from_millis(100)).await;
117+
controller.abort();
118+
Response::ok("Cancelled")
119+
};
120+
121+
futures_util::pin_mut!(fetch_fut);
122+
futures_util::pin_mut!(delay_fut);
123+
124+
match futures_util::future::select(delay_fut, fetch_fut).await {
125+
Either::Left((res, cancelled_fut)) => {
126+
// Ensure that the cancelled future returns an AbortError.
127+
match cancelled_fut.await {
128+
Err(e) if e.to_string().contains("AbortError") => { /* Yay! It worked, let's do nothing to celebrate */
129+
}
130+
Err(e) => panic!(
131+
"Fetch errored with a different error than expected: {:#?}",
132+
e
133+
),
134+
Ok(text) => panic!("Fetch unexpectedly succeeded: {}", text),
135+
}
136+
137+
res
138+
}
139+
Either::Right(_) => panic!("Delay future should have resolved first"),
140+
}
141+
}
142+
143+
pub async fn handle_cloned_fetch(
144+
_req: Request,
145+
_ctx: RouteContext<SomeSharedData>,
146+
) -> Result<Response> {
147+
let mut resp = Fetch::Url(
148+
"https://jsonplaceholder.typicode.com/todos/1"
149+
.parse()
150+
.unwrap(),
151+
)
152+
.send()
153+
.await?;
154+
let mut resp1 = resp.cloned()?;
155+
156+
let left = resp.text().await?;
157+
let right = resp1.text().await?;
158+
159+
Response::ok((left == right).to_string())
160+
}

0 commit comments

Comments
 (0)