diff --git a/Cargo.toml b/Cargo.toml index e4ea71863..0b8e8c647 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,9 +24,9 @@ js-sys = { version = "0.3.82", path = "./wasm-bindgen/crates/js-sys" } serde = { version = "1.0.164", features = ["derive"] } serde_json = "1.0.140" serde-wasm-bindgen = "0.6.5" -wasm-bindgen = { version = "0.2.105", path = "./wasm-bindgen" } +wasm-bindgen = { version = "0.2.105", path = "./wasm-bindgen", features = ["unsafe-single-threaded-traits"] } wasm-bindgen-cli-support = { version = "0.2.105", path = "./wasm-bindgen/crates/cli-support" } -wasm-bindgen-futures = { version = "0.4.54", path = "./wasm-bindgen/crates/futures" } +wasm-bindgen-futures = { version = "0.4.54", path = "./wasm-bindgen/crates/futures", features = ["unsafe-single-threaded-traits"] } wasm-bindgen-macro-support = { version = "0.2.105", path = "./wasm-bindgen/crates/macro-support" } wasm-bindgen-shared = { version = "0.2.105", path = "./wasm-bindgen/crates/shared" } wasm-bindgen-test = { version = "0.3.50", path = "./wasm-bindgen/crates/test" } diff --git a/examples/rpc-client/src/calculator.rs b/examples/rpc-client/src/calculator.rs index ad76ca941..a90239194 100644 --- a/examples/rpc-client/src/calculator.rs +++ b/examples/rpc-client/src/calculator.rs @@ -22,20 +22,18 @@ mod sys { pub trait Calculator { async fn add(&self, a: u32, b: u32) -> ::worker::Result; } -pub struct CalculatorService(::worker::send::SendWrapper); +pub struct CalculatorService(sys::CalculatorSys); #[async_trait::async_trait] impl Calculator for CalculatorService { async fn add(&self, a: u32, b: u32) -> ::worker::Result { let promise = self.0.add(a, b)?; - let fut = ::worker::send::SendFuture::new( - ::worker::wasm_bindgen_futures::JsFuture::from(promise), - ); + let fut = ::worker::wasm_bindgen_futures::JsFuture::from(promise); let output = fut.await?; Ok(::serde_wasm_bindgen::from_value(output)?) } } impl From<::worker::Fetcher> for CalculatorService { fn from(fetcher: ::worker::Fetcher) -> Self { - Self(::worker::send::SendWrapper::new(fetcher.into_rpc())) + Self(fetcher.into_rpc()) } } diff --git a/test/src/alarm.rs b/test/src/alarm.rs index 09e2861b6..48d5c5899 100644 --- a/test/src/alarm.rs +++ b/test/src/alarm.rs @@ -34,7 +34,6 @@ impl DurableObject for AlarmObject { } } -#[worker::send] pub async fn handle_alarm(_req: Request, env: Env, _data: SomeSharedData) -> Result { let namespace = env.durable_object("ALARM")?; let stub = namespace.id_from_name("alarm")?.get_stub()?; diff --git a/test/src/analytics_engine.rs b/test/src/analytics_engine.rs index cd0880d97..152d6f2f8 100644 --- a/test/src/analytics_engine.rs +++ b/test/src/analytics_engine.rs @@ -2,7 +2,6 @@ use super::SomeSharedData; use uuid::Uuid; use worker::{AnalyticsEngineDataPointBuilder, Env, Request, Response, Result}; -#[worker::send] pub async fn handle_analytics_event( req: Request, env: Env, @@ -31,5 +30,5 @@ pub async fn handle_analytics_event( .add_double(200) .write_to(&dataset)?; - return Response::ok("Events sent"); + Response::ok("Events sent") } diff --git a/test/src/assets.rs b/test/src/assets.rs index f3bedc0f7..eb69bfdc1 100644 --- a/test/src/assets.rs +++ b/test/src/assets.rs @@ -17,7 +17,6 @@ pub async fn handle_asset( } #[cfg(feature = "http")] -#[worker::send] pub async fn handle_asset( req: worker::Request, env: worker::Env, diff --git a/test/src/auto_response.rs b/test/src/auto_response.rs index 0582f3bd5..38c56211d 100644 --- a/test/src/auto_response.rs +++ b/test/src/auto_response.rs @@ -33,7 +33,6 @@ impl DurableObject for AutoResponseObject { } // Route handler to exercise the Durable Object from tests. -#[worker::send] pub async fn handle_auto_response( _req: Request, env: Env, diff --git a/test/src/cache.rs b/test/src/cache.rs index dcb73c67a..6ade3cfc0 100644 --- a/test/src/cache.rs +++ b/test/src/cache.rs @@ -10,7 +10,6 @@ fn key(req: &Request) -> Result> { Ok(segments.nth(2).map(ToOwned::to_owned)) } -#[worker::send] pub async fn handle_cache_example( req: Request, _env: Env, @@ -34,7 +33,6 @@ pub async fn handle_cache_example( } } -#[worker::send] pub async fn handle_cache_api_get( req: Request, _env: Env, @@ -50,7 +48,6 @@ pub async fn handle_cache_api_get( Response::error("key missing", 400) } -#[worker::send] pub async fn handle_cache_api_put( req: Request, _env: Env, @@ -69,7 +66,6 @@ pub async fn handle_cache_api_put( Response::error("key missing", 400) } -#[worker::send] pub async fn handle_cache_api_delete( req: Request, _env: Env, @@ -84,7 +80,6 @@ pub async fn handle_cache_api_delete( Response::error("key missing", 400) } -#[worker::send] pub async fn handle_cache_stream( req: Request, _env: Env, @@ -98,8 +93,10 @@ pub async fn handle_cache_stream( Ok(resp) } else { console_log!("Cache MISS!"); - let mut rng = rand::rng(); - let count = rng.random_range(0..10); + let count = { + let mut rng = rand::rng(); + rng.random_range(0..10) + }; let stream = futures_util::stream::repeat("Hello, world!\n") .take(count) .then(|text| async move { diff --git a/test/src/container.rs b/test/src/container.rs index 44dd8fb0b..196e21be8 100644 --- a/test/src/container.rs +++ b/test/src/container.rs @@ -81,7 +81,6 @@ impl DurableObject for EchoContainer { const CONTAINER_NAME: &str = "my-container"; -#[worker::send] pub async fn handle_container( mut req: Request, env: Env, diff --git a/test/src/counter.rs b/test/src/counter.rs index 9c9dc6a11..e89a74ba3 100644 --- a/test/src/counter.rs +++ b/test/src/counter.rs @@ -95,7 +95,6 @@ impl DurableObject for Counter { } } -#[worker::send] pub async fn handle_id(req: Request, env: Env, _data: SomeSharedData) -> Result { let durable_object_name = if req.path().contains("shared") { "SHARED_COUNTER" @@ -110,7 +109,6 @@ pub async fn handle_id(req: Request, env: Env, _data: SomeSharedData) -> Result< stub.fetch_with_str("https://fake-host/").await } -#[worker::send] pub async fn handle_websocket(req: Request, env: Env, _data: SomeSharedData) -> Result { let durable_object_name = if req.path().contains("shared") { "SHARED_COUNTER" diff --git a/test/src/d1.rs b/test/src/d1.rs index 63ce86355..2f5db5b3f 100644 --- a/test/src/d1.rs +++ b/test/src/d1.rs @@ -14,7 +14,6 @@ struct Person { age: u32, } -#[worker::send] pub async fn prepared_statement( _req: Request, env: Env, @@ -68,7 +67,6 @@ pub async fn prepared_statement( Response::ok("ok") } -#[worker::send] pub async fn batch(_req: Request, env: Env, _data: SomeSharedData) -> Result { let db = env.d1("DB")?; let mut results = db @@ -93,7 +91,6 @@ pub async fn batch(_req: Request, env: Env, _data: SomeSharedData) -> Result Result { let db = env.d1("DB")?; let result = db @@ -104,14 +101,12 @@ pub async fn exec(mut req: Request, env: Env, _data: SomeSharedData) -> Result Result { let db = env.d1("DB")?; let bytes = db.dump().await?; Response::from_bytes(bytes) } -#[worker::send] pub async fn error(_req: Request, env: Env, _data: SomeSharedData) -> Result { let db = env.d1("DB")?; let error = db @@ -138,7 +133,6 @@ struct NullablePerson { age: Option, } -#[worker::send] pub async fn jsvalue_null_is_null( _req: Request, _env: Env, @@ -149,7 +143,6 @@ pub async fn jsvalue_null_is_null( Response::ok("ok") } -#[worker::send] pub async fn serialize_optional_none( _req: Request, _env: Env, @@ -164,7 +157,6 @@ pub async fn serialize_optional_none( Response::ok("ok") } -#[worker::send] pub async fn serialize_optional_some( _req: Request, _env: Env, @@ -179,7 +171,6 @@ pub async fn serialize_optional_some( Response::ok("ok") } -#[worker::send] pub async fn deserialize_optional_none( _req: Request, _env: Env, @@ -201,7 +192,6 @@ pub async fn deserialize_optional_none( Response::ok("ok") } -#[worker::send] pub async fn insert_and_retrieve_optional_none( _req: Request, env: Env, @@ -227,7 +217,6 @@ pub async fn insert_and_retrieve_optional_none( Response::ok("ok") } -#[worker::send] pub async fn insert_and_retrieve_optional_some( _req: Request, env: Env, @@ -252,7 +241,6 @@ pub async fn insert_and_retrieve_optional_some( Response::ok("ok") } -#[worker::send] pub async fn retrieve_optional_none( _req: Request, env: Env, @@ -269,7 +257,6 @@ pub async fn retrieve_optional_none( Response::ok("ok") } -#[worker::send] pub async fn retrieve_optional_some( _req: Request, env: Env, @@ -286,7 +273,6 @@ pub async fn retrieve_optional_some( Response::ok("ok") } -#[worker::send] pub async fn retrive_first_none( _req: Request, env: Env, diff --git a/test/src/durable.rs b/test/src/durable.rs index e1167703c..9756db771 100644 --- a/test/src/durable.rs +++ b/test/src/durable.rs @@ -176,7 +176,6 @@ impl DurableObject for MyClass { } // Route handlers to exercise the Durable Object from tests. -#[worker::send] pub async fn handle_hello( _req: Request, env: Env, @@ -190,7 +189,6 @@ pub async fn handle_hello( .await } -#[worker::send] pub async fn handle_hello_unique( _req: Request, env: Env, @@ -204,7 +202,6 @@ pub async fn handle_hello_unique( .await } -#[worker::send] pub async fn handle_storage( _req: Request, env: Env, @@ -215,7 +212,6 @@ pub async fn handle_storage( stub.fetch_with_str("https://fake-host/storage").await } -#[worker::send] pub async fn handle_basic_test( _req: Request, env: Env, @@ -290,7 +286,6 @@ pub async fn handle_basic_test( Response::ok("ok") } -#[worker::send] pub async fn handle_get_by_name( _req: Request, env: Env, @@ -308,7 +303,6 @@ pub async fn handle_get_by_name( .await } -#[worker::send] pub async fn handle_get_by_name_with_location_hint( _req: Request, env: Env, diff --git a/test/src/fetch.rs b/test/src/fetch.rs index d5c351bc6..26a584dda 100644 --- a/test/src/fetch.rs +++ b/test/src/fetch.rs @@ -7,7 +7,6 @@ use worker::{ RequestInit, Response, Result, }; -#[worker::send] pub async fn handle_fetch(_req: Request, _env: Env, _data: SomeSharedData) -> Result { let req = Request::new("https://example.com", Method::Post)?; let resp = Fetch::Request(req).send().await?; @@ -19,7 +18,6 @@ pub async fn handle_fetch(_req: Request, _env: Env, _data: SomeSharedData) -> Re )) } -#[worker::send] pub async fn handle_fetch_json( _req: Request, _env: Env, @@ -40,7 +38,6 @@ pub async fn handle_fetch_json( )) } -#[worker::send] pub async fn handle_proxy_request( req: Request, _env: Env, @@ -57,7 +54,6 @@ pub async fn handle_proxy_request( Fetch::Url(url.parse()?).send().await } -#[worker::send] pub async fn handle_request_init_fetch( _req: Request, _env: Env, @@ -69,7 +65,6 @@ pub async fn handle_request_init_fetch( .await } -#[worker::send] pub async fn handle_request_init_fetch_post( _req: Request, _env: Env, @@ -82,7 +77,6 @@ pub async fn handle_request_init_fetch_post( .await } -#[worker::send] pub async fn handle_cancelled_fetch( _req: Request, _env: Env, @@ -115,7 +109,6 @@ pub async fn handle_cancelled_fetch( Ok(res) } -#[worker::send] pub async fn handle_fetch_timeout( _req: Request, _env: Env, @@ -158,7 +151,6 @@ pub async fn handle_fetch_timeout( } } -#[worker::send] pub async fn handle_cloned_fetch( _req: Request, _env: Env, @@ -179,7 +171,6 @@ pub async fn handle_cloned_fetch( Response::ok((left == right).to_string()) } -#[worker::send] pub async fn handle_cloned_response_attributes( _req: Request, _env: Env, diff --git a/test/src/form.rs b/test/src/form.rs index 4c6ad57f1..7f88824b6 100644 --- a/test/src/form.rs +++ b/test/src/form.rs @@ -5,7 +5,6 @@ use serde::{Deserialize, Serialize}; use std::convert::TryFrom; use worker::{kv, Env, FormEntry, Request, Response, Result}; -#[worker::send] pub async fn handle_formdata_name( mut req: Request, _env: Env, @@ -48,7 +47,6 @@ struct FileSize { size: u32, } -#[worker::send] pub async fn handle_formdata_file_size( mut req: Request, env: Env, @@ -88,7 +86,6 @@ pub async fn handle_formdata_file_size( Response::error("Bad Request", 400) } -#[worker::send] pub async fn handle_formdata_file_size_hash( req: Request, env: Env, @@ -108,7 +105,6 @@ pub async fn handle_formdata_file_size_hash( Response::error("Bad Request", 400) } -#[worker::send] pub async fn handle_is_secret( mut req: Request, env: Env, diff --git a/test/src/js_snippets.rs b/test/src/js_snippets.rs index ca929da8a..a5b7197e1 100644 --- a/test/src/js_snippets.rs +++ b/test/src/js_snippets.rs @@ -22,12 +22,10 @@ extern "C" { fn js_throw_error(); } -#[worker::send] pub async fn performance_now(_req: Request, _env: Env, _data: SomeSharedData) -> Result { Response::ok(format!("now: {}", js_performance_now())) } -#[worker::send] pub async fn console_log(_req: Request, _env: Env, _data: SomeSharedData) -> Result { js_console_log("test".to_owned()); Response::ok("OK") diff --git a/test/src/kv.rs b/test/src/kv.rs index 55533aae6..62726db8e 100644 --- a/test/src/kv.rs +++ b/test/src/kv.rs @@ -17,7 +17,6 @@ macro_rules! kv_assert_eq { }}; } -#[worker::send] pub async fn handle_post_key_value( req: Request, env: Env, @@ -39,7 +38,6 @@ pub async fn handle_post_key_value( const TEST_NAMESPACE: &str = "TEST"; -#[worker::send] pub async fn get(_req: Request, env: Env, _data: SomeSharedData) -> Result { let store = env.kv(TEST_NAMESPACE)?; let value = store.get("simple").text().await?; @@ -49,7 +47,6 @@ pub async fn get(_req: Request, env: Env, _data: SomeSharedData) -> Result Result { let store = env.kv(TEST_NAMESPACE)?; let value = store.get("not_found").text().await?; @@ -59,7 +56,6 @@ pub async fn get_not_found(_req: Request, env: Env, _data: SomeSharedData) -> Re } } -#[worker::send] pub async fn list_keys(_req: Request, env: Env, _data: SomeSharedData) -> Result { let store = env.kv(TEST_NAMESPACE)?; let list_res = store.list().execute().await?; @@ -70,7 +66,6 @@ pub async fn list_keys(_req: Request, env: Env, _data: SomeSharedData) -> Result Response::ok("passed") } -#[worker::send] pub async fn put_simple(_req: Request, env: Env, _data: SomeSharedData) -> Result { let store = env.kv(TEST_NAMESPACE)?; store.put("put_a", "test")?.execute().await?; @@ -81,7 +76,6 @@ pub async fn put_simple(_req: Request, env: Env, _data: SomeSharedData) -> Resul Response::ok("passed") } -#[worker::send] pub async fn put_metadata(_req: Request, env: Env, _data: SomeSharedData) -> Result { let store = env.kv(TEST_NAMESPACE)?; store.put("put_b", "test")?.metadata(100)?.execute().await?; @@ -93,7 +87,6 @@ pub async fn put_metadata(_req: Request, env: Env, _data: SomeSharedData) -> Res Response::ok("passed") } -#[worker::send] pub async fn put_expiration(_req: Request, env: Env, _data: SomeSharedData) -> Result { const EXPIRATION: u64 = 2_000_000_000; let store = env.kv(TEST_NAMESPACE)?; @@ -117,7 +110,6 @@ pub async fn put_expiration(_req: Request, env: Env, _data: SomeSharedData) -> R Response::ok("passed") } -#[worker::send] pub async fn put_metadata_struct( _req: Request, env: Env, diff --git a/test/src/lib.rs b/test/src/lib.rs index 4f28ca705..fe444b4d5 100644 --- a/test/src/lib.rs +++ b/test/src/lib.rs @@ -52,6 +52,9 @@ pub struct SomeSharedData { regex: &'static Regex, } +unsafe impl Send for SomeSharedData {} +unsafe impl Sync for SomeSharedData {} + static GLOBAL_STATE: AtomicBool = AtomicBool::new(false); static GLOBAL_QUEUE_STATE: Mutex> = Mutex::new(Vec::new()); diff --git a/test/src/put_raw.rs b/test/src/put_raw.rs index 4901cf88c..773c527a8 100644 --- a/test/src/put_raw.rs +++ b/test/src/put_raw.rs @@ -64,7 +64,6 @@ impl DurableObject for PutRawTestObject { } } -#[worker::send] pub(crate) async fn handle_put_raw( req: Request, env: Env, diff --git a/test/src/queue.rs b/test/src/queue.rs index 5e9da4a8a..feb1f7aa4 100644 --- a/test/src/queue.rs +++ b/test/src/queue.rs @@ -26,7 +26,6 @@ pub async fn queue(message_batch: MessageBatch, _env: Env, _ctx: Cont Ok(()) } -#[worker::send] pub async fn handle_queue_send(req: Request, env: Env, _data: SomeSharedData) -> Result { let uri = req.url()?; let mut segments = uri.path_segments().unwrap(); @@ -54,7 +53,6 @@ pub async fn handle_queue_send(req: Request, env: Env, _data: SomeSharedData) -> } } -#[worker::send] pub async fn handle_batch_send(mut req: Request, env: Env, _: SomeSharedData) -> Result { let messages: Vec = match req.json().await { Ok(messages) => messages, diff --git a/test/src/r2.rs b/test/src/r2.rs index 5ef6c0f93..d8c1246ce 100644 --- a/test/src/r2.rs +++ b/test/src/r2.rs @@ -31,7 +31,6 @@ pub async fn seed_bucket(bucket: &Bucket) -> Result<()> { Ok(()) } -#[worker::send] pub async fn list_empty(_req: Request, env: Env, _data: SomeSharedData) -> Result { let bucket = env.bucket("EMPTY_BUCKET")?; @@ -43,7 +42,6 @@ pub async fn list_empty(_req: Request, env: Env, _data: SomeSharedData) -> Resul Response::ok("ok") } -#[worker::send] pub async fn list(_req: Request, env: Env, _data: SomeSharedData) -> Result { let bucket = env.bucket("SEEDED_BUCKET")?; seed_bucket(&bucket).await?; @@ -96,7 +94,6 @@ pub async fn list(_req: Request, env: Env, _data: SomeSharedData) -> Result Result { let bucket = env.bucket("EMPTY_BUCKET")?; @@ -119,7 +116,6 @@ pub async fn get_empty(_req: Request, env: Env, _data: SomeSharedData) -> Result Response::ok("ok") } -#[worker::send] pub async fn get(_req: Request, env: Env, _data: SomeSharedData) -> Result { let bucket = env.bucket("SEEDED_BUCKET")?; seed_bucket(&bucket).await?; @@ -141,7 +137,6 @@ pub async fn get(_req: Request, env: Env, _data: SomeSharedData) -> Result Result { let bucket = env.bucket("PUT_BUCKET")?; @@ -175,7 +170,6 @@ pub async fn put(_req: Request, env: Env, _data: SomeSharedData) -> Result Result { let bucket = env.bucket("PUT_BUCKET")?; let (http_metadata, custom_metadata, object_with_props) = @@ -190,7 +184,6 @@ pub async fn put_properties(_req: Request, env: Env, _data: SomeSharedData) -> R } #[allow(clippy::large_stack_arrays)] -#[worker::send] pub async fn put_multipart(_req: Request, env: Env, _data: SomeSharedData) -> Result { const R2_MULTIPART_CHUNK_MIN_SIZE: usize = 5 * 1_024 * 1_024; // 5MiB. // const TEST_CHUNK_COUNT: usize = 3; @@ -246,7 +239,6 @@ pub async fn put_multipart(_req: Request, env: Env, _data: SomeSharedData) -> Re Response::ok("ok") } -#[worker::send] pub async fn delete(_req: Request, env: Env, _data: SomeSharedData) -> Result { let bucket = env.bucket("DELETE_BUCKET")?; diff --git a/test/src/rate_limit.rs b/test/src/rate_limit.rs index d8665d1e4..af1b69186 100644 --- a/test/src/rate_limit.rs +++ b/test/src/rate_limit.rs @@ -2,7 +2,6 @@ use super::SomeSharedData; use std::collections::HashMap; use worker::{js_sys, Env, Request, Response, Result}; -#[worker::send] pub async fn handle_rate_limit_check( _req: Request, env: Env, @@ -18,7 +17,6 @@ pub async fn handle_rate_limit_check( })) } -#[worker::send] pub async fn handle_rate_limit_with_key( req: Request, env: Env, @@ -37,7 +35,6 @@ pub async fn handle_rate_limit_with_key( })) } -#[worker::send] pub async fn handle_rate_limit_bulk_test( _req: Request, env: Env, @@ -62,7 +59,6 @@ pub async fn handle_rate_limit_bulk_test( })) } -#[worker::send] pub async fn handle_rate_limit_reset( _req: Request, env: Env, diff --git a/test/src/request.rs b/test/src/request.rs index 6b89113bb..3428f8584 100644 --- a/test/src/request.rs +++ b/test/src/request.rs @@ -69,7 +69,6 @@ pub async fn handle_headers(req: Request, _env: Env, _data: SomeSharedData) -> R .ok("returned your headers to you.") } -#[worker::send] pub async fn handle_post_file_size( mut req: Request, _env: Env, @@ -79,7 +78,6 @@ pub async fn handle_post_file_size( Response::ok(format!("size = {}", bytes.len())) } -#[worker::send] pub async fn handle_async_text_echo( mut req: Request, _env: Env, @@ -110,7 +108,6 @@ pub async fn handle_bytes(_req: Request, _env: Env, _data: SomeSharedData) -> Re Response::from_bytes(vec![1, 2, 3, 4, 5, 6, 7]) } -#[worker::send] pub async fn handle_api_data( mut req: Request, _env: Env, @@ -181,7 +178,6 @@ pub async fn handle_now(_req: Request, _env: Env, _data: SomeSharedData) -> Resu Response::ok(js_date.to_string()) } -#[worker::send] pub async fn handle_cloned(_req: Request, _env: Env, _data: SomeSharedData) -> Result { let mut resp = Response::ok("Hello")?; let mut resp1 = resp.cloned()?; @@ -192,7 +188,6 @@ pub async fn handle_cloned(_req: Request, _env: Env, _data: SomeSharedData) -> R Response::ok((left == right).to_string()) } -#[worker::send] pub async fn handle_cloned_stream( _req: Request, _env: Env, @@ -224,7 +219,6 @@ pub async fn handle_custom_response_body( Response::from_body(ResponseBody::Body(vec![b'h', b'e', b'l', b'l', b'o'])) } -#[worker::send] pub async fn handle_wait_delay(req: Request, _env: Env, _data: SomeSharedData) -> Result { let uri = req.url()?; let mut segments = uri.path_segments().unwrap(); diff --git a/test/src/router.rs b/test/src/router.rs index 4ca07033e..bd43b7267 100644 --- a/test/src/router.rs +++ b/test/src/router.rs @@ -266,7 +266,6 @@ async fn respond_async(req: Request, _env: Env, _data: SomeSharedData) -> Result .ok(format!("Ok (async): {}", String::from(req.method()))) } -#[worker::send] async fn handle_close_event(_req: Request, env: Env, _data: SomeSharedData) -> Result { let some_namespace_kv = env.kv("SOME_NAMESPACE")?; let got_close_event = some_namespace_kv @@ -279,7 +278,6 @@ async fn handle_close_event(_req: Request, env: Env, _data: SomeSharedData) -> R Response::ok(got_close_event) } -#[worker::send] async fn catchall(req: Request, _env: Env, _data: SomeSharedData) -> Result { let uri = req.url()?; let path = uri.path(); diff --git a/test/src/secret_store.rs b/test/src/secret_store.rs index 9410356a2..47c3316f1 100644 --- a/test/src/secret_store.rs +++ b/test/src/secret_store.rs @@ -1,7 +1,6 @@ use crate::SomeSharedData; use worker::{Env, Request, Response, Result}; -#[worker::send] pub async fn get_from_secret_store( _req: Request, env: Env, @@ -16,7 +15,6 @@ pub async fn get_from_secret_store( } } -#[worker::send] pub async fn get_from_secret_store_missing( _req: Request, env: Env, diff --git a/test/src/service.rs b/test/src/service.rs index 91fc2bcd8..9f3bbd61e 100644 --- a/test/src/service.rs +++ b/test/src/service.rs @@ -3,7 +3,6 @@ use super::SomeSharedData; use std::convert::TryInto; use worker::{Env, Method, Request, RequestInit, Response, Result}; -#[worker::send] pub async fn handle_remote_by_request( req: Request, env: Env, @@ -21,7 +20,6 @@ pub async fn handle_remote_by_request( result } -#[worker::send] pub async fn handle_remote_by_path( req: Request, env: Env, diff --git a/test/src/socket.rs b/test/src/socket.rs index 81fe2983e..7687dfce9 100644 --- a/test/src/socket.rs +++ b/test/src/socket.rs @@ -3,7 +3,6 @@ use tokio::io::AsyncReadExt; use tokio::io::AsyncWriteExt; use worker::{ConnectionBuilder, Env, Error, Request, Response, Result}; -#[worker::send] pub async fn handle_socket_failed( _req: Request, _env: Env, @@ -12,16 +11,13 @@ pub async fn handle_socket_failed( let socket = ConnectionBuilder::new().connect("127.0.0.1", 25000)?; match socket.opened().await { - Ok(_) => { - return Err(Error::RustError( - "Socket should have failed to open.".to_owned(), - )) - } + Ok(_) => Err(Error::RustError( + "Socket should have failed to open.".to_owned(), + )), Err(e) => Response::ok(format!("{e:?}")), } } -#[worker::send] pub async fn handle_socket_read( _req: Request, _env: Env, diff --git a/test/src/sql_counter.rs b/test/src/sql_counter.rs index 93d92395b..c5080467e 100644 --- a/test/src/sql_counter.rs +++ b/test/src/sql_counter.rs @@ -83,7 +83,6 @@ impl SqlCounter { } } -#[worker::send] /// Route handler that proxies a request to our SqlCounter Durable Object with id derived from the /// path `/sql-counter/{name}` (so every name gets its own instance). pub async fn handle_sql_counter( diff --git a/test/src/sql_iterator.rs b/test/src/sql_iterator.rs index 014c39f2d..8c27a859c 100644 --- a/test/src/sql_iterator.rs +++ b/test/src/sql_iterator.rs @@ -412,7 +412,6 @@ impl SqlIterator { } } -#[worker::send] /// Route handler for the SQL iterator test Durable Object. pub async fn handle_sql_iterator( req: Request, diff --git a/test/src/ws.rs b/test/src/ws.rs index e0c3899af..79ad22e65 100644 --- a/test/src/ws.rs +++ b/test/src/ws.rs @@ -40,7 +40,6 @@ pub async fn handle_websocket(_req: Request, env: Env, _data: SomeSharedData) -> Response::from_websocket(pair.client) } -#[worker::send] pub async fn handle_websocket_client( _req: Request, _env: Env, diff --git a/wasm-bindgen b/wasm-bindgen index f72a9d68d..6c77f4b85 160000 --- a/wasm-bindgen +++ b/wasm-bindgen @@ -1 +1 @@ -Subproject commit f72a9d68d90cd75513e18859403f0139cc6913a1 +Subproject commit 6c77f4b85e3fde87b04189ac724ef130da4bdd05 diff --git a/worker-codegen/src/wit.rs b/worker-codegen/src/wit.rs index 50e2aa0a1..9774aac1e 100644 --- a/worker-codegen/src/wit.rs +++ b/worker-codegen/src/wit.rs @@ -85,7 +85,7 @@ fn expand_trait(interface: &Interface, interface_name: &Ident) -> anyhow::Result fn expand_struct(struct_name: &Ident, sys_name: &Ident) -> anyhow::Result { let struct_raw = quote!( - pub struct #struct_name(::worker::send::SendWrapper); + pub struct #struct_name(sys::#sys_name); ); let struct_item: syn::ItemStruct = syn::parse2(struct_raw)?; Ok(struct_item) @@ -95,7 +95,7 @@ fn expand_from_impl(struct_name: &Ident, from_type: &syn::Type) -> anyhow::Resul let impl_raw = quote!( impl From<#from_type> for #struct_name { fn from(fetcher: #from_type) -> Self { - Self(::worker::send::SendWrapper::new(fetcher.into_rpc())) + Self(fetcher.into_rpc()) } } ); @@ -144,7 +144,7 @@ fn expand_rpc_impl( let method_raw = quote!( async fn #ident(&self) -> ::worker::Result<#ret_type> { let promise = #invocation_item?; - let fut = ::worker::send::SendFuture::new(::worker::wasm_bindgen_futures::JsFuture::from(promise)); + let fut = ::worker::wasm_bindgen_futures::JsFuture::from(promise); let output = fut.await?; Ok(::serde_wasm_bindgen::from_value(output)?) } diff --git a/worker-macros/src/lib.rs b/worker-macros/src/lib.rs index c6f2cd376..4dd95c725 100644 --- a/worker-macros/src/lib.rs +++ b/worker-macros/src/lib.rs @@ -125,8 +125,7 @@ pub fn event(attr: TokenStream, item: TokenStream) -> TokenStream { /// expect the handler to be `Send`, such as `axum`. /// /// ```rust -/// #[worker::send] -/// async fn foo() { +/// /// async fn foo() { /// // JsFuture is !Send /// let fut = JsFuture::from(promise); /// fut.await diff --git a/worker/src/ai.rs b/worker/src/ai.rs index da2bdbdff..f57eb289d 100644 --- a/worker/src/ai.rs +++ b/worker/src/ai.rs @@ -1,4 +1,4 @@ -use crate::{env::EnvBinding, send::SendFuture}; +use crate::env::EnvBinding; use crate::{Error, Result}; use serde::de::DeserializeOwned; use serde::Serialize; @@ -19,10 +19,10 @@ impl Ai { model: impl AsRef, input: T, ) -> Result { - let fut = SendFuture::new(JsFuture::from( + let fut = JsFuture::from( self.0 .run(model.as_ref(), serde_wasm_bindgen::to_value(&input)?), - )); + ); match fut.await { Ok(output) => Ok(serde_wasm_bindgen::from_value(output)?), Err(err) => Err(Error::from(err)), diff --git a/worker/src/crypto.rs b/worker/src/crypto.rs index aa6a96de5..c88af1f7d 100644 --- a/worker/src/crypto.rs +++ b/worker/src/crypto.rs @@ -1,9 +1,6 @@ use js_sys::{ArrayBuffer, Uint8Array}; -use wasm_bindgen::prelude::*; use wasm_bindgen_futures::JsFuture; -use crate::send::SendFuture; - /// A Rust-friendly wrapper around the non-standard [crypto.DigestStream](https://developers.cloudflare.com/workers/runtime-apis/web-crypto/#constructors) API /// /// Example usage: @@ -36,8 +33,8 @@ impl DigestStream { } pub async fn digest(&self) -> Result { - let fut = SendFuture::new(JsFuture::from(self.inner.digest())); - let buffer: ArrayBuffer = fut.await?.unchecked_into(); + let fut = JsFuture::from(self.inner.digest()); + let buffer: ArrayBuffer = fut.await?.into(); Ok(Uint8Array::new(&buffer)) } diff --git a/worker/src/delay.rs b/worker/src/delay.rs index 0dce13960..cfb3c5c6a 100644 --- a/worker/src/delay.rs +++ b/worker/src/delay.rs @@ -98,3 +98,7 @@ impl PinnedDrop for Delay { } } } + +/// SAFETY: Cloudflare Workers runtime is single-threaded, so it's safe to mark Delay as Send +/// even though it contains Rc>. +unsafe impl Send for Delay {} diff --git a/worker/src/lib.rs b/worker/src/lib.rs index ccc1a9193..3a538e7fd 100644 --- a/worker/src/lib.rs +++ b/worker/src/lib.rs @@ -91,8 +91,7 @@ //! //! ```rust //! // This macro makes the whole function (i.e. the `Future` it returns) `Send`. -//! #[worker::send] -//! async fn handler(Extension(env): Extension) -> Response { +//! //! async fn handler(Extension(env): Extension) -> Response { //! let kv = env.kv("FOO").unwrap()?; //! // Holding `kv`, which is not `Send` across `await` boundary would mark this function as `!Send` //! let value = kv.get("foo").text().await?; @@ -234,7 +233,6 @@ mod response; mod router; mod schedule; mod secret_store; -pub mod send; mod socket; mod sql; mod streams; diff --git a/worker/src/rate_limit.rs b/worker/src/rate_limit.rs index 5f381fa9c..73b975eb5 100644 --- a/worker/src/rate_limit.rs +++ b/worker/src/rate_limit.rs @@ -1,4 +1,4 @@ -use crate::{send::SendFuture, EnvBinding, Result}; +use crate::{EnvBinding, Result}; use serde::{Deserialize, Serialize}; use wasm_bindgen::{JsCast, JsValue}; use wasm_bindgen_futures::JsFuture; @@ -17,9 +17,6 @@ pub struct RateLimitOutcome { pub success: bool, } -unsafe impl Send for RateLimiter {} -unsafe impl Sync for RateLimiter {} - impl EnvBinding for RateLimiter { const TYPE_NAME: &'static str = "Ratelimit"; } @@ -27,7 +24,7 @@ impl RateLimiter { pub async fn limit(&self, key: String) -> Result { let arg = serde_wasm_bindgen::to_value(&RateLimitOptions { key })?; let promise = self.0.limit(arg.into())?; - let fut = SendFuture::new(JsFuture::from(promise)); + let fut = JsFuture::from(promise); let result = fut.await?; let outcome = serde_wasm_bindgen::from_value(result)?; Ok(outcome) diff --git a/worker/src/secret_store.rs b/worker/src/secret_store.rs index e5103844b..2a3920874 100644 --- a/worker/src/secret_store.rs +++ b/worker/src/secret_store.rs @@ -1,13 +1,10 @@ -use crate::{ - send::{SendFuture, SendWrapper}, - EnvBinding, Fetcher, Result, -}; +use crate::{EnvBinding, Fetcher, Result}; use wasm_bindgen::JsCast; use wasm_bindgen_futures::JsFuture; /// A binding to a Cloudflare Secret Store. #[derive(Debug, Clone)] -pub struct SecretStore(SendWrapper); +pub struct SecretStore(worker_sys::SecretStoreSys); // Allows for attachment to axum router, as Workers will never allow multithreading. unsafe impl Send for SecretStore {} @@ -46,7 +43,7 @@ impl From for SecretStore { impl From for SecretStore { fn from(fetcher: Fetcher) -> Self { - Self(SendWrapper::new(fetcher.into_rpc())) + Self(fetcher.into_rpc()) } } @@ -66,7 +63,7 @@ impl SecretStore { Err(_) => return Ok(None), // Secret not found }; - let fut = SendFuture::new(JsFuture::from(promise)); + let fut = JsFuture::from(promise); let output = match fut.await { Ok(val) => val, diff --git a/worker/src/send.rs b/worker/src/send.rs deleted file mode 100644 index 2e9fe9754..000000000 --- a/worker/src/send.rs +++ /dev/null @@ -1,98 +0,0 @@ -//! This module provides utilities for working with JavaScript types -//! which do not implement `Send`, in contexts where `Send` is required. -//! Workers is guaranteed to be single-threaded, so it is safe to -//! wrap any type with `Send` and `Sync` traits. - -use futures_util::future::Future; -use pin_project::pin_project; -use std::fmt::Debug; -use std::fmt::Display; -use std::pin::Pin; -use std::task::Context; -use std::task::Poll; - -#[derive(Debug)] -#[pin_project] -/// Wrap any future to make it `Send`. -/// -/// ```rust -/// let fut = SendFuture::new(JsFuture::from(promise)); -/// fut.await -/// ``` -pub struct SendFuture { - #[pin] - inner: F, -} - -impl SendFuture { - pub fn new(inner: F) -> Self { - Self { inner } - } -} - -unsafe impl Send for SendFuture {} -unsafe impl Sync for SendFuture {} - -impl Future for SendFuture { - type Output = F::Output; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.project(); - this.inner.poll(cx) - } -} - -/// Wrap any type to make it `Send`. -/// -/// ```rust -/// // js_sys::Promise is !Send -/// let send_promise = SendWrapper::new(promise); -/// ``` -pub struct SendWrapper(pub T); - -unsafe impl Send for SendWrapper {} -unsafe impl Sync for SendWrapper {} - -impl SendWrapper { - pub fn new(inner: T) -> Self { - Self(inner) - } -} - -impl std::ops::Deref for SendWrapper { - type Target = T; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -impl std::ops::DerefMut for SendWrapper { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.0 - } -} - -impl Debug for SendWrapper { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "SendWrapper({:?})", self.0) - } -} - -impl Clone for SendWrapper { - fn clone(&self) -> Self { - Self(self.0.clone()) - } -} - -impl Default for SendWrapper { - fn default() -> Self { - Self(T::default()) - } -} - -impl Display for SendWrapper { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "SendWrapper({})", self.0) - } -} diff --git a/worker/src/streams.rs b/worker/src/streams.rs index 90f982fdd..80bc6550f 100644 --- a/worker/src/streams.rs +++ b/worker/src/streams.rs @@ -102,6 +102,11 @@ impl Stream for FixedLengthStream { } } +/// SAFETY: Cloudflare Workers runtime is single-threaded, so it's safe to mark FixedLengthStream +/// as Send and Sync even though it contains a trait object that may not be Send/Sync. +unsafe impl Send for FixedLengthStream {} +unsafe impl Sync for FixedLengthStream {} + impl From for FixedLengthStreamSys { fn from(stream: FixedLengthStream) -> Self { let raw = if stream.length < u32::MAX as u64 { diff --git a/worker/src/websocket.rs b/worker/src/websocket.rs index 6be3594ab..3c040143a 100644 --- a/worker/src/websocket.rs +++ b/worker/src/websocket.rs @@ -462,7 +462,7 @@ async fn fetch_with_request_raw(request: crate::Request) -> Result