Skip to content

Commit 1ddf6d7

Browse files
Implement Hibernatable Web Sockets API (#436)
* Rebase commit * Pull Request feedback * Fix problems with async_trait * Improve error message around incorrect impl methods * Improve error message around incorrect impl methods * Add missing semi-colon * Add missing async * Add clippy exceptions * Fix trait type * Properly qualify worker_sys * Change websockets to ref: * Revert formatting changes to Cargo.toml * Remove left-over code * fix formatting * clippy --------- Co-authored-by: Kevin Flansburg <[email protected]>
1 parent e06193a commit 1ddf6d7

File tree

5 files changed

+263
-19
lines changed

5 files changed

+263
-19
lines changed

worker-macros/src/durable_object.rs

+148-18
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use proc_macro2::{Ident, TokenStream};
22
use quote::{quote, ToTokens};
3-
use syn::{spanned::Spanned, Error, FnArg, ImplItem, Item, Type, TypePath};
3+
use syn::{spanned::Spanned, Error, FnArg, ImplItem, Item, Type, TypePath, Visibility};
44

55
pub fn expand_macro(tokens: TokenStream) -> syn::Result<TokenStream> {
66
let item = syn::parse2::<Item>(tokens)?;
@@ -20,24 +20,37 @@ pub fn expand_macro(tokens: TokenStream) -> syn::Result<TokenStream> {
2020
let struct_name = imp.self_ty;
2121
let items = imp.items;
2222
let mut tokenized = vec![];
23-
let mut has_alarm = false;
23+
24+
#[derive(Default)]
25+
struct OptionalMethods {
26+
has_alarm: bool,
27+
has_websocket_message: bool,
28+
has_websocket_close: bool,
29+
has_websocket_error: bool,
30+
}
31+
32+
let mut optional_methods = OptionalMethods::default();
2433

2534
for item in items {
2635
let impl_method = match item {
2736
ImplItem::Fn(func) => func,
2837
_ => return Err(Error::new_spanned(item, "Impl block must only contain methods"))
2938
};
3039

40+
let span = impl_method.sig.ident.span();
41+
3142
let tokens = match impl_method.sig.ident.to_string().as_str() {
3243
"new" => {
3344
let mut method = impl_method.clone();
3445
method.sig.ident = Ident::new("_new", method.sig.ident.span());
46+
method.vis = Visibility::Inherited;
47+
3548

3649
// modify the `state` argument so it is type ObjectState
3750
let arg_tokens = method.sig.inputs.first_mut().expect("DurableObject `new` method must have 2 arguments: state and env").into_token_stream();
3851
match syn::parse2::<FnArg>(arg_tokens)? {
3952
FnArg::Typed(pat) => {
40-
let path = syn::parse2::<TypePath>(quote!{worker_sys::DurableObjectState})?;
53+
let path = syn::parse2::<TypePath>(quote!{worker::worker_sys::DurableObjectState})?;
4154
let mut updated_pat = pat;
4255
updated_pat.ty = Box::new(Type::Path(path));
4356

@@ -57,17 +70,19 @@ pub fn expand_macro(tokens: TokenStream) -> syn::Result<TokenStream> {
5770
prepended.extend(method.block.stmts);
5871
method.block.stmts = prepended;
5972

60-
quote! {
73+
Ok(quote! {
6174
#pound[wasm_bindgen::prelude::wasm_bindgen(constructor)]
6275
pub #method
63-
}
76+
})
6477
},
6578
"fetch" => {
6679
let mut method = impl_method.clone();
6780
method.sig.ident = Ident::new("_fetch_raw", method.sig.ident.span());
68-
quote! {
81+
method.vis = Visibility::Inherited;
82+
83+
Ok(quote! {
6984
#pound[wasm_bindgen::prelude::wasm_bindgen(js_name = fetch)]
70-
pub fn _fetch(&mut self, req: worker_sys::web_sys::Request) -> js_sys::Promise {
85+
pub fn _fetch(&mut self, req: worker::worker_sys::web_sys::Request) -> worker::js_sys::Promise {
7186
// SAFETY:
7287
// On the surface, this is unsound because the Durable Object could be dropped
7388
// while JavaScript still has possession of the future. However,
@@ -77,22 +92,24 @@ pub fn expand_macro(tokens: TokenStream) -> syn::Result<TokenStream> {
7792
let static_self: &'static mut Self = unsafe {&mut *(self as *mut _)};
7893

7994
wasm_bindgen_futures::future_to_promise(async move {
80-
static_self._fetch_raw(req.into()).await.map(worker_sys::web_sys::Response::from).map(wasm_bindgen::JsValue::from)
95+
static_self._fetch_raw(req.into()).await.map(worker::worker_sys::web_sys::Response::from).map(wasm_bindgen::JsValue::from)
8196
.map_err(wasm_bindgen::JsValue::from)
8297
})
8398
}
8499

85100
#method
86-
}
101+
})
87102
},
88103
"alarm" => {
89-
has_alarm = true;
104+
optional_methods.has_alarm = true;
90105

91106
let mut method = impl_method.clone();
92107
method.sig.ident = Ident::new("_alarm_raw", method.sig.ident.span());
93-
quote! {
108+
method.vis = Visibility::Inherited;
109+
110+
Ok(quote! {
94111
#pound[wasm_bindgen::prelude::wasm_bindgen(js_name = alarm)]
95-
pub fn _alarm(&mut self) -> js_sys::Promise {
112+
pub fn _alarm(&mut self) -> worker::js_sys::Promise {
96113
// SAFETY:
97114
// On the surface, this is unsound because the Durable Object could be dropped
98115
// while JavaScript still has possession of the future. However,
@@ -102,24 +119,131 @@ pub fn expand_macro(tokens: TokenStream) -> syn::Result<TokenStream> {
102119
let static_self: &'static mut Self = unsafe {&mut *(self as *mut _)};
103120

104121
wasm_bindgen_futures::future_to_promise(async move {
105-
static_self._alarm_raw().await.map(worker_sys::web_sys::Response::from).map(wasm_bindgen::JsValue::from)
122+
static_self._alarm_raw().await.map(worker::worker_sys::web_sys::Response::from).map(wasm_bindgen::JsValue::from)
106123
.map_err(wasm_bindgen::JsValue::from)
107124
})
108125
}
109126

110127
#method
111-
}
112-
}
113-
_ => panic!()
128+
})
129+
},
130+
"websocket_message" => {
131+
optional_methods.has_websocket_message = true;
132+
133+
let mut method = impl_method.clone();
134+
method.sig.ident = Ident::new("_websocket_message_raw", method.sig.ident.span());
135+
method.vis = Visibility::Inherited;
136+
137+
Ok(quote! {
138+
#pound[wasm_bindgen::prelude::wasm_bindgen(js_name = webSocketMessage)]
139+
pub fn _websocket_message(&mut self, ws: worker::worker_sys::web_sys::WebSocket, message: wasm_bindgen::JsValue) -> worker::js_sys::Promise {
140+
let ws_message = if let Some(string_message) = message.as_string() {
141+
worker::WebSocketIncomingMessage::String(string_message)
142+
} else {
143+
let v = worker::js_sys::Uint8Array::new(&message).to_vec();
144+
worker::WebSocketIncomingMessage::Binary(v)
145+
};
146+
147+
// SAFETY:
148+
// On the surface, this is unsound because the Durable Object could be dropped
149+
// while JavaScript still has possession of the future. However,
150+
// we know something that Rust doesn't: that the Durable Object will never be destroyed
151+
// while there is still a running promise inside of it, therefore we can let a reference
152+
// to the durable object escape into a static-lifetime future.
153+
let static_self: &'static mut Self = unsafe {&mut *(self as *mut _)};
154+
155+
wasm_bindgen_futures::future_to_promise(async move {
156+
static_self._websocket_message_raw(ws.into(), ws_message).await.map(|_| wasm_bindgen::JsValue::NULL)
157+
.map_err(wasm_bindgen::JsValue::from)
158+
})
159+
}
160+
161+
#method
162+
})
163+
},
164+
"websocket_close" => {
165+
optional_methods.has_websocket_close = true;
166+
167+
let mut method = impl_method.clone();
168+
method.sig.ident = Ident::new("_websocket_close_raw", method.sig.ident.span());
169+
method.vis = Visibility::Inherited;
170+
171+
Ok(quote! {
172+
#pound[wasm_bindgen::prelude::wasm_bindgen(js_name = webSocketClose)]
173+
pub fn _websocket_close(&mut self, ws: worker::worker_sys::web_sys::WebSocket, code: usize, reason: String, was_clean: bool) -> worker::js_sys::Promise {
174+
// SAFETY:
175+
// On the surface, this is unsound because the Durable Object could be dropped
176+
// while JavaScript still has possession of the future. However,
177+
// we know something that Rust doesn't: that the Durable Object will never be destroyed
178+
// while there is still a running promise inside of it, therefore we can let a reference
179+
// to the durable object escape into a static-lifetime future.
180+
let static_self: &'static mut Self = unsafe {&mut *(self as *mut _)};
181+
182+
wasm_bindgen_futures::future_to_promise(async move {
183+
static_self._websocket_close_raw(ws.into(), code, reason, was_clean).await.map(|_| wasm_bindgen::JsValue::NULL)
184+
.map_err(wasm_bindgen::JsValue::from)
185+
})
186+
}
187+
188+
#method
189+
})
190+
},
191+
"websocket_error" => {
192+
optional_methods.has_websocket_error = true;
193+
194+
let mut method = impl_method.clone();
195+
method.sig.ident = Ident::new("_websocket_error_raw", method.sig.ident.span());
196+
method.vis = Visibility::Inherited;
197+
198+
Ok(quote! {
199+
#pound[wasm_bindgen::prelude::wasm_bindgen(js_name = webSocketError)]
200+
pub fn _websocket_error(&mut self, ws: worker::worker_sys::web_sys::WebSocket, error: wasm_bindgen::JsValue) -> worker::js_sys::Promise {
201+
// SAFETY:
202+
// On the surface, this is unsound because the Durable Object could be dropped
203+
// while JavaScript still has possession of the future. However,
204+
// we know something that Rust doesn't: that the Durable Object will never be destroyed
205+
// while there is still a running promise inside of it, therefore we can let a reference
206+
// to the durable object escape into a static-lifetime future.
207+
let static_self: &'static mut Self = unsafe {&mut *(self as *mut _)};
208+
209+
wasm_bindgen_futures::future_to_promise(async move {
210+
static_self._websocket_error_raw(ws.into(), error.into()).await.map(|_| wasm_bindgen::JsValue::NULL)
211+
.map_err(wasm_bindgen::JsValue::from)
212+
})
213+
}
214+
215+
#method
216+
})
217+
},
218+
ident => Err(Error::new(span, format!("Unsupported method `{}`, please move extra impl methods to a separate impl definition", ident)))
114219
};
115-
tokenized.push(tokens);
220+
tokenized.push(tokens?);
116221
}
117222

118-
let alarm_tokens = has_alarm.then(|| quote! {
223+
let alarm_tokens = optional_methods.has_alarm.then(|| quote! {
119224
async fn alarm(&mut self) -> ::worker::Result<worker::Response> {
120225
self._alarm_raw().await
121226
}
122227
});
228+
229+
let websocket_message_tokens = optional_methods.has_websocket_message.then(|| quote! {
230+
async fn websocket_message(&mut self, ws: ::worker::WebSocket, message: ::worker::WebSocketIncomingMessage) -> ::worker::Result<()> {
231+
self._websocket_message_raw(ws, message).await
232+
}
233+
});
234+
235+
let websocket_close_tokens = optional_methods.has_websocket_close.then(|| quote! {
236+
async fn websocket_close(&mut self, ws: ::worker::WebSocket, code: usize, reason: String, was_clean: bool) -> ::worker::Result<()> {
237+
self._websocket_close_raw(ws, code, reason, was_clean).await
238+
}
239+
});
240+
241+
let websocket_error_tokens = optional_methods.has_websocket_error.then(|| quote! {
242+
async fn websocket_error(&mut self, ws: ::worker::WebSocket, error: ::worker::Error) -> ::worker::Result<()> {
243+
self._websocket_error_raw(ws, error).await
244+
}
245+
});
246+
123247
Ok(quote! {
124248
#wasm_bindgen_attr
125249
impl #struct_name {
@@ -137,6 +261,12 @@ pub fn expand_macro(tokens: TokenStream) -> syn::Result<TokenStream> {
137261
}
138262

139263
#alarm_tokens
264+
265+
#websocket_message_tokens
266+
267+
#websocket_close_tokens
268+
269+
#websocket_error_tokens
140270
}
141271

142272
trait __Need_Durable_Object_Trait_Impl_With_durable_object_Attribute { const MACROED: bool = true; }

worker-sys/src/ext/websocket.rs

+20
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,12 @@ mod glue {
1010

1111
#[wasm_bindgen(method, catch)]
1212
pub fn accept(this: &WebSocket) -> Result<(), JsValue>;
13+
14+
#[wasm_bindgen(method, catch, js_name = "serializeAttachment")]
15+
pub fn serialize_attachment(this: &WebSocket, value: JsValue) -> Result<(), JsValue>;
16+
17+
#[wasm_bindgen(method, catch, js_name = "deserializeAttachment")]
18+
pub fn deserialize_attachment(this: &WebSocket) -> Result<JsValue, JsValue>;
1319
}
1420
}
1521

@@ -18,10 +24,24 @@ pub trait WebSocketExt {
1824
///
1925
/// [CF Documentation](https://developers.cloudflare.com/workers/runtime-apis/websockets#accept)
2026
fn accept(&self) -> Result<(), JsValue>;
27+
28+
fn serialize_attachment(&self, value: JsValue) -> Result<(), JsValue>;
29+
30+
fn deserialize_attachment(&self) -> Result<JsValue, JsValue>;
2131
}
2232

2333
impl WebSocketExt for web_sys::WebSocket {
2434
fn accept(&self) -> Result<(), JsValue> {
2535
self.unchecked_ref::<glue::WebSocket>().accept()
2636
}
37+
38+
fn serialize_attachment(&self, value: JsValue) -> Result<(), JsValue> {
39+
self.unchecked_ref::<glue::WebSocket>()
40+
.serialize_attachment(value)
41+
}
42+
43+
fn deserialize_attachment(&self) -> Result<JsValue, JsValue> {
44+
self.unchecked_ref::<glue::WebSocket>()
45+
.deserialize_attachment()
46+
}
2747
}

worker-sys/src/types/durable_object/state.rs

+17
Original file line numberDiff line numberDiff line change
@@ -15,4 +15,21 @@ extern "C" {
1515

1616
#[wasm_bindgen(method, js_name=waitUntil)]
1717
pub fn wait_until(this: &DurableObjectState, promise: &js_sys::Promise);
18+
19+
#[wasm_bindgen(method, js_name=acceptWebSocket)]
20+
pub fn accept_websocket(this: &DurableObjectState, ws: &web_sys::WebSocket);
21+
22+
#[wasm_bindgen(method, js_name=acceptWebSocket)]
23+
pub fn accept_websocket_with_tags(
24+
this: &DurableObjectState,
25+
ws: &web_sys::WebSocket,
26+
tags: Vec<JsValue>,
27+
);
28+
29+
#[wasm_bindgen(method, js_name=getWebSockets)]
30+
pub fn get_websockets(this: &DurableObjectState) -> Vec<web_sys::WebSocket>;
31+
32+
#[wasm_bindgen(method, js_name=getWebSockets)]
33+
pub fn get_websockets_with_tag(this: &DurableObjectState, tag: &str)
34+
-> Vec<web_sys::WebSocket>;
1835
}

0 commit comments

Comments
 (0)