diff --git a/Cargo.lock b/Cargo.lock index e94e0ba6..9ca937e8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1287,6 +1287,16 @@ dependencies = [ "unescape", ] +[[package]] +name = "queue-on-workers" +version = "0.1.0" +dependencies = [ + "js-sys", + "serde", + "wasm-bindgen", + "worker", +] + [[package]] name = "quote" version = "1.0.35" diff --git a/README.md b/README.md index ed744ee8..b05c9f74 100644 --- a/README.md +++ b/README.md @@ -308,20 +308,41 @@ pub async fn main(message_batch: MessageBatch, env: Env, _ctx: Context) // Log the message and meta data console_log!( "Got message {:?}, with id {} and timestamp: {}", - message.body, - message.id, - message.timestamp.to_string() + message.body(), + message.id(), + message.timestamp().to_string() ); // Send the message body to the other queue - my_queue.send(&message.body).await?; + my_queue.send(message.body()).await?; + + // Ack individual message + message.ack(); + + // Retry individual message + message.retry(); } // Retry all messages message_batch.retry_all(); + // Ack all messages + message_batch.ack_all(); Ok(()) } ``` +You'll need to ensure you have the correct bindings in your `wrangler.toml`: +```toml +# ... +[[queues.consumers]] +queue = "myqueueotherqueue" +max_batch_size = 10 +max_batch_timeout = 30 + + +[[queues.producers]] +queue = "myqueue" +binding = "my_queue" +``` ## Testing with Miniflare diff --git a/examples/queue/Cargo.toml b/examples/queue/Cargo.toml new file mode 100644 index 00000000..ca6f06c9 --- /dev/null +++ b/examples/queue/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "queue-on-workers" +version = "0.1.0" +edition = "2021" + +[package.metadata.release] +release = false + +# https://github.com/rustwasm/wasm-pack/issues/1247 +[package.metadata.wasm-pack.profile.release] +wasm-opt = false + +[lib] +crate-type = ["cdylib"] + +[dependencies] +serde = "1" +worker = { workspace = true, features = ["queue"] } +wasm-bindgen = { workspace = true } +js-sys = { workspace = true } diff --git a/examples/queue/README.md b/examples/queue/README.md new file mode 100644 index 00000000..926b3859 --- /dev/null +++ b/examples/queue/README.md @@ -0,0 +1,3 @@ +# Using Queues on Cloudflare Workers + +Demonstration of using `worker::Queue` diff --git a/examples/queue/package.json b/examples/queue/package.json new file mode 100644 index 00000000..381eaa1d --- /dev/null +++ b/examples/queue/package.json @@ -0,0 +1,12 @@ +{ + "name": "queue-on-workers", + "version": "0.0.0", + "private": true, + "scripts": { + "deploy": "wrangler deploy", + "dev": "wrangler dev --local" + }, + "devDependencies": { + "wrangler": "^3" + } +} \ No newline at end of file diff --git a/examples/queue/src/lib.rs b/examples/queue/src/lib.rs new file mode 100644 index 00000000..8e6e0112 --- /dev/null +++ b/examples/queue/src/lib.rs @@ -0,0 +1,149 @@ +use serde::{Deserialize, Serialize}; +use wasm_bindgen::JsValue; +use worker::*; + +const MY_MESSAGES_BINDING_NAME: &str = "my_messages"; +const MY_MESSAGES_QUEUE_NAME: &str = "mymessages"; + +const RAW_MESSAGES_BINDING_NAME: &str = "raw_messages"; +const RAW_MESSAGES_QUEUE_NAME: &str = "rawmessages"; + +#[derive(Serialize, Debug, Clone, Deserialize)] +pub struct MyType { + foo: String, + bar: u32, +} + +#[event(fetch)] +async fn main(_req: Request, env: Env, _: worker::Context) -> Result { + let my_messages_queue = env.queue(MY_MESSAGES_BINDING_NAME)?; + let raw_messages_queue = env.queue(RAW_MESSAGES_BINDING_NAME)?; + + // Send a message with using a serializable struct + my_messages_queue + .send(MyType { + foo: "Hello world".into(), + bar: 1, + }) + .await?; + + // Send a batch of messages using some sort of iterator + my_messages_queue + .send_batch([ + // Use the MessageBuilder to set additional options + MessageBuilder::new(MyType { + foo: "Hello world".into(), + bar: 2, + }) + .delay_seconds(20) + .build(), + // Send a message with using a serializable struct + MyType { + foo: "Hello world".into(), + bar: 4, + } + .into(), + ]) + .await?; + + // Send a batch of messages using the BatchMessageBuilder + my_messages_queue + .send_batch( + BatchMessageBuilder::new() + .message(MyType { + foo: "Hello world".into(), + bar: 4, + }) + .messages(vec![ + MyType { + foo: "Hello world".into(), + bar: 5, + }, + MyType { + foo: "Hello world".into(), + bar: 6, + }, + ]) + .delay_seconds(10) + .build(), + ) + .await?; + + // Send a raw JSValue + raw_messages_queue + .send_raw( + // RawMessageBuilder has to be used as we should set content type of these raw messages + RawMessageBuilder::new(JsValue::from_str("7")) + .delay_seconds(30) + .build_with_content_type(QueueContentType::Json), + ) + .await?; + + // Send a batch of raw JSValues using the BatchMessageBuilder + raw_messages_queue + .send_raw_batch( + BatchMessageBuilder::new() + .message( + RawMessageBuilder::new(js_sys::Date::new_0().into()) + .build_with_content_type(QueueContentType::V8), + ) + .message( + RawMessageBuilder::new(JsValue::from_str("8")) + .build_with_content_type(QueueContentType::Json), + ) + .delay_seconds(10) + .build(), + ) + .await?; + + // Send a batch of raw JsValues using some sort of iterator + raw_messages_queue + .send_raw_batch(vec![RawMessageBuilder::new(JsValue::from_str("9")) + .delay_seconds(20) + .build_with_content_type(QueueContentType::Text)]) + .await?; + + Response::empty() +} + +// Consumes messages from `my_messages` queue and `raw_messages` queue +#[event(queue)] +pub async fn main(message_batch: MessageBatch, _: Env, _: Context) -> Result<()> { + match message_batch.queue().as_str() { + MY_MESSAGES_QUEUE_NAME => { + for message in message_batch.messages()? { + console_log!( + "Got message {:?}, with id {} and timestamp: {}", + message.body(), + message.id(), + message.timestamp().to_string(), + ); + if message.body().bar == 1 { + message.retry_with_options( + &QueueRetryOptionsBuilder::new() + .with_delay_seconds(10) + .build(), + ); + } else { + message.ack(); + } + } + } + RAW_MESSAGES_QUEUE_NAME => { + for message in message_batch.raw_iter() { + console_log!( + "Got raw message {:?}, with id {} and timestamp: {}", + message.body(), + message.id(), + message.timestamp().to_string(), + ); + } + message_batch.ack_all(); + } + _ => { + console_error!("Unknown queue: {}", message_batch.queue()); + } + } + + Ok(()) +} diff --git a/examples/queue/wrangler.toml b/examples/queue/wrangler.toml new file mode 100644 index 00000000..40f487f1 --- /dev/null +++ b/examples/queue/wrangler.toml @@ -0,0 +1,23 @@ +name = "queue-on-workers" +main = "build/worker/shim.mjs" +compatibility_date = "2024-03-26" + + +[build] +command = "cargo install --path ../../worker-build && worker-build --release" + +[[queues.consumers]] +queue = "mymessages" +dead_letter_queue = "mymessagesdlq" + +[[queues.consumers]] +queue = "rawmessages" + + +[[queues.producers]] +queue = "mymessages" +binding = "my_messages" + +[[queues.producers]] +queue = "rawmessages" +binding = "raw_messages" diff --git a/worker-macros/src/event.rs b/worker-macros/src/event.rs index eb92188e..cb4e1c64 100644 --- a/worker-macros/src/event.rs +++ b/worker-macros/src/event.rs @@ -159,7 +159,7 @@ pub fn expand_macro(attr: TokenStream, item: TokenStream, http: bool) -> TokenSt pub async fn #wrapper_fn_ident(event: ::worker::worker_sys::MessageBatch, env: ::worker::Env, ctx: ::worker::worker_sys::Context) { // call the original fn let ctx = worker::Context::new(ctx); - match #input_fn_ident(::worker::MessageBatch::new(event), env, ctx).await { + match #input_fn_ident(::worker::MessageBatch::from(event), env, ctx).await { Ok(()) => {}, Err(e) => { ::worker::console_log!("{}", &e); diff --git a/worker-sandbox/src/queue.rs b/worker-sandbox/src/queue.rs index 896b6a7d..7e7bfc9e 100644 --- a/worker-sandbox/src/queue.rs +++ b/worker-sandbox/src/queue.rs @@ -2,7 +2,9 @@ use serde::{Deserialize, Serialize}; use uuid::Uuid; use super::{SomeSharedData, GLOBAL_QUEUE_STATE}; -use worker::{console_log, event, Context, Env, MessageBatch, Request, Response, Result}; +use worker::{ + console_log, event, Context, Env, MessageBatch, MessageExt, Request, Response, Result, +}; #[derive(Serialize, Debug, Clone, Deserialize)] pub struct QueueBody { pub id: Uuid, @@ -15,11 +17,11 @@ pub async fn queue(message_batch: MessageBatch, _env: Env, _ctx: Cont for message in message_batch.messages()? { console_log!( "Received queue message {:?}, with id {} and timestamp: {}", - message.body, - message.id, - message.timestamp.to_string() + message.body(), + message.id(), + message.timestamp().to_string() ); - guard.push(message.body); + guard.push(message.into_body()); } Ok(()) } @@ -54,6 +56,29 @@ pub async fn handle_queue_send(req: Request, env: Env, _data: SomeSharedData) -> } } +#[worker::send] +pub async fn handle_batch_send(mut req: Request, env: Env, _: SomeSharedData) -> Result { + let messages: Vec = match req.json().await { + Ok(messages) => messages, + Err(err) => { + return Response::error(format!("Failed to parse request body: {err:?}"), 400); + } + }; + + let my_queue = match env.queue("my_queue") { + Ok(queue) => queue, + Err(err) => return Response::error(format!("Failed to get queue: {err:?}"), 500), + }; + + match my_queue.send_batch(messages).await { + Ok(()) => Response::ok("Message sent"), + Err(err) => Response::error( + format!("Failed to batch send message to queue: {err:?}"), + 500, + ), + } +} + pub async fn handle_queue(_req: Request, _env: Env, _data: SomeSharedData) -> Result { let guard = GLOBAL_QUEUE_STATE.lock().unwrap(); let messages: Vec = guard.clone(); diff --git a/worker-sandbox/src/router.rs b/worker-sandbox/src/router.rs index 31c57356..d6a1141a 100644 --- a/worker-sandbox/src/router.rs +++ b/worker-sandbox/src/router.rs @@ -181,6 +181,10 @@ pub fn make_router(data: SomeSharedData, env: Env) -> axum::Router { get(handler!(service::handle_remote_by_path)), ) .route("/queue/send/:id", post(handler!(queue::handle_queue_send))) + .route( + "/queue/send_batch", + post(handler!(queue::handle_batch_send)), + ) .route("/queue", get(handler!(queue::handle_queue))) .route("/d1/prepared", get(handler!(d1::prepared_statement))) .route("/d1/batch", get(handler!(d1::batch))) @@ -301,6 +305,7 @@ pub fn make_router<'a>(data: SomeSharedData) -> Router<'a, SomeSharedData> { ) .get_async("/remote-by-path", handler!(service::handle_remote_by_path)) .post_async("/queue/send/:id", handler!(queue::handle_queue_send)) + .post_async("/queue/send_batch", handler!(queue::handle_batch_send)) .get_async("/queue", handler!(queue::handle_queue)) .get_async("/d1/prepared", handler!(d1::prepared_statement)) .get_async("/d1/batch", handler!(d1::batch)) diff --git a/worker-sandbox/tests/queue.spec.ts b/worker-sandbox/tests/queue.spec.ts index 317bdc6d..ac364d34 100644 --- a/worker-sandbox/tests/queue.spec.ts +++ b/worker-sandbox/tests/queue.spec.ts @@ -27,4 +27,31 @@ describe("queue", () => { const message = messages.find((msg) => msg.id === id.toString()); expect(message).toMatchObject({ id: id.toString() }); }); + + test("batch send message to queue", async () => { + const id_1 = uuid.v4(); + const id_2 = uuid.v4(); + + let resp = await mf.dispatchFetch(`https://fake.host/queue/send_batch`, { + method: "POST", + body: JSON.stringify([{ id: id_1, id_string: id_1 }, { id: id_2, id_string: id_2 }]) + }); + + expect(resp.status).toBe(200); + + await new Promise((resolve) => setTimeout(resolve, 1200)); + + resp = await mf.dispatchFetch("https://fake.host/queue"); + expect(resp.status).toBe(200); + + let body = await resp.json(); + + const messages = body as { id: string }[]; + + const message_1 = messages.find((msg) => msg.id === id_1.toString()); + expect(message_1).toMatchObject({ id: id_1.toString() }); + + const message_2 = messages.find((msg) => msg.id === id_2.toString()); + expect(message_2).toMatchObject({ id: id_2.toString() }); + }); }); diff --git a/worker-sys/src/types/queue.rs b/worker-sys/src/types/queue.rs index 423a53d9..a9d6f2f1 100644 --- a/worker-sys/src/types/queue.rs +++ b/worker-sys/src/types/queue.rs @@ -7,13 +7,38 @@ extern "C" { pub type MessageBatch; #[wasm_bindgen(method, getter)] - pub fn queue(this: &MessageBatch) -> String; + pub fn queue(this: &MessageBatch) -> js_sys::JsString; #[wasm_bindgen(method, getter)] pub fn messages(this: &MessageBatch) -> js_sys::Array; #[wasm_bindgen(method, js_name=retryAll)] - pub fn retry_all(this: &MessageBatch); + pub fn retry_all(this: &MessageBatch, options: JsValue); + + #[wasm_bindgen(method, js_name=ackAll)] + pub fn ack_all(this: &MessageBatch); +} + +#[wasm_bindgen] +extern "C" { + #[wasm_bindgen(extends=js_sys::Object)] + #[derive(Debug, Clone, PartialEq, Eq)] + pub type Message; + + #[wasm_bindgen(method, getter)] + pub fn id(this: &Message) -> js_sys::JsString; + + #[wasm_bindgen(method, getter)] + pub fn timestamp(this: &Message) -> js_sys::Date; + + #[wasm_bindgen(method, getter)] + pub fn body(this: &Message) -> JsValue; + + #[wasm_bindgen(method)] + pub fn retry(this: &Message, options: JsValue); + + #[wasm_bindgen(method)] + pub fn ack(this: &Message); } #[wasm_bindgen] @@ -23,5 +48,8 @@ extern "C" { pub type Queue; #[wasm_bindgen(method)] - pub fn send(this: &Queue, message: JsValue) -> js_sys::Promise; + pub fn send(this: &Queue, message: JsValue, options: JsValue) -> js_sys::Promise; + + #[wasm_bindgen(method, js_name=sendBatch)] + pub fn send_batch(this: &Queue, messages: js_sys::Array, options: JsValue) -> js_sys::Promise; } diff --git a/worker/src/queue.rs b/worker/src/queue.rs index dfb8243b..48fff682 100644 --- a/worker/src/queue.rs +++ b/worker/src/queue.rs @@ -1,113 +1,239 @@ -use std::marker::PhantomData; +use std::{ + convert::{TryFrom, TryInto}, + marker::PhantomData, +}; use crate::{env::EnvBinding, Date, Error, Result}; use js_sys::Array; use serde::{de::DeserializeOwned, Serialize}; use wasm_bindgen::{prelude::*, JsCast}; use wasm_bindgen_futures::JsFuture; -use worker_sys::{MessageBatch as MessageBatchSys, Queue as EdgeQueue}; - -static BODY_KEY_STR: &str = "body"; -static ID_KEY_STR: &str = "id"; -static TIMESTAMP_KEY_STR: &str = "timestamp"; +use worker_sys::{Message as MessageSys, MessageBatch as MessageBatchSys, Queue as EdgeQueue}; +/// A batch of messages that are sent to a consumer Worker. pub struct MessageBatch { inner: MessageBatchSys, - messages: Array, - data: PhantomData, - timestamp_key: JsValue, - body_key: JsValue, - id_key: JsValue, -} - -impl MessageBatch { - pub fn new(message_batch_sys: MessageBatchSys) -> Self { - let timestamp_key = JsValue::from_str(TIMESTAMP_KEY_STR); - let body_key = JsValue::from_str(BODY_KEY_STR); - let id_key = JsValue::from_str(ID_KEY_STR); - Self { - messages: message_batch_sys.messages(), - inner: message_batch_sys, - data: PhantomData, - timestamp_key, - body_key, - id_key, - } - } -} - -pub struct Message { - pub body: T, - pub timestamp: Date, - pub id: String, + phantom: PhantomData, } impl MessageBatch { /// The name of the Queue that belongs to this batch. pub fn queue(&self) -> String { - self.inner.queue() + self.inner.queue().into() } /// Marks every message to be retried in the next batch. pub fn retry_all(&self) { - self.inner.retry_all(); + self.inner.retry_all(JsValue::null()); } - /// Iterator that deserializes messages in the message batch. Ordering of messages is not guaranteed. - pub fn iter(&self) -> MessageIter<'_, T> - where - T: DeserializeOwned, - { - MessageIter { - range: 0..self.messages.length(), - array: &self.messages, - timestamp_key: &self.timestamp_key, - body_key: &self.body_key, - id_key: &self.id_key, - data: PhantomData, + /// Marks every message to be retried in the next batch with options. + pub fn retry_all_with_options(&self, queue_retry_options: &QueueRetryOptions) { + self.inner + // SAFETY: QueueRetryOptions is controlled by this module and all data in it is serializable to a js value. + .retry_all(serde_wasm_bindgen::to_value(&queue_retry_options).unwrap()); + } + + /// Marks every message acknowledged in the batch. + pub fn ack_all(&self) { + self.inner.ack_all(); + } + + /// Iterator for raw messages in the message batch. Ordering of messages is not guaranteed. + pub fn raw_iter(&self) -> RawMessageIter { + let messages = self.inner.messages(); + RawMessageIter { + range: 0..messages.length(), + array: messages, } } +} +impl MessageBatch { /// An array of messages in the batch. Ordering of messages is not guaranteed. - pub fn messages(&self) -> Result>> - where - T: DeserializeOwned, - { + pub fn messages(&self) -> Result>> { self.iter().collect() } + + /// Iterator for messages in the message batch. Ordering of messages is not guaranteed. + pub fn iter(&self) -> MessageIter { + let messages = self.inner.messages(); + MessageIter { + range: 0..messages.length(), + array: messages, + marker: PhantomData, + } + } } -pub struct MessageIter<'a, T> { - range: std::ops::Range, - array: &'a Array, - timestamp_key: &'a JsValue, - body_key: &'a JsValue, - id_key: &'a JsValue, - data: PhantomData, +impl From for MessageBatch { + fn from(value: MessageBatchSys) -> Self { + Self { + inner: value, + phantom: PhantomData, + } + } } -impl MessageIter<'_, T> +/// A message that is sent to a consumer Worker. +pub struct Message { + inner: MessageSys, + body: T, +} + +impl Message { + /// The body of the message. + pub fn body(&self) -> &T { + &self.body + } + + /// The body of the message. + pub fn into_body(self) -> T { + self.body + } + + /// The raw body of the message. + pub fn raw_body(&self) -> JsValue { + self.inner().body() + } +} + +impl TryFrom for Message where T: DeserializeOwned, { - fn parse_message(&self, message: &JsValue) -> Result> { - let js_date = js_sys::Date::from(js_sys::Reflect::get(message, self.timestamp_key)?); - let id = js_sys::Reflect::get(message, self.id_key)? - .as_string() - .ok_or(Error::JsError( - "Invalid message batch. Failed to get id from message.".to_string(), - ))?; - let body = serde_wasm_bindgen::from_value(js_sys::Reflect::get(message, self.body_key)?)?; - - Ok(Message { - id, + type Error = Error; + + fn try_from(value: RawMessage) -> std::result::Result { + let body = serde_wasm_bindgen::from_value(value.body())?; + Ok(Self { + inner: value.inner, body, - timestamp: Date::from(js_date), }) } } -impl std::iter::Iterator for MessageIter<'_, T> +/// A message that is sent to a consumer Worker. +pub struct RawMessage { + inner: MessageSys, +} + +impl RawMessage { + /// The body of the message. + pub fn body(&self) -> JsValue { + self.inner.body() + } +} + +impl From for RawMessage { + fn from(value: MessageSys) -> Self { + Self { inner: value } + } +} + +trait MessageSysInner { + fn inner(&self) -> &MessageSys; +} + +impl MessageSysInner for RawMessage { + fn inner(&self) -> &MessageSys { + &self.inner + } +} + +impl MessageSysInner for Message { + fn inner(&self) -> &MessageSys { + &self.inner + } +} + +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +/// Optional configuration when marking a message or a batch of messages for retry. +pub struct QueueRetryOptions { + delay_seconds: Option, +} + +pub struct QueueRetryOptionsBuilder { + delay_seconds: Option, +} + +impl QueueRetryOptionsBuilder { + /// Creates a new retry options builder. + pub fn new() -> Self { + Self { + delay_seconds: None, + } + } + + #[must_use] + /// The number of seconds to delay a message for within the queue, before it can be delivered to a consumer + pub fn with_delay_seconds(mut self, delay_seconds: u32) -> Self { + self.delay_seconds = Some(delay_seconds); + self + } + + /// Build the retry options. + pub fn build(self) -> QueueRetryOptions { + QueueRetryOptions { + delay_seconds: self.delay_seconds, + } + } +} + +pub trait MessageExt { + /// A unique, system-generated ID for the message. + fn id(&self) -> String; + + /// A timestamp when the message was sent. + fn timestamp(&self) -> Date; + + /// Marks message to be retried. + fn retry(&self); + + /// Marks message to be retried with options. + fn retry_with_options(&self, queue_retry_options: &QueueRetryOptions); + + /// Marks message acknowledged. + fn ack(&self); +} + +impl MessageExt for T { + /// A unique, system-generated ID for the message. + fn id(&self) -> String { + self.inner().id().into() + } + + /// A timestamp when the message was sent. + fn timestamp(&self) -> Date { + Date::from(self.inner().timestamp()) + } + + /// Marks message to be retried. + fn retry(&self) { + self.inner().retry(JsValue::null()); + } + + /// Marks message to be retried with options. + fn retry_with_options(&self, queue_retry_options: &QueueRetryOptions) { + self.inner() + // SAFETY: QueueRetryOptions is controlled by this module and all data in it is serializable to a js value. + .retry(serde_wasm_bindgen::to_value(&queue_retry_options).unwrap()); + } + + /// Marks message acknowledged. + fn ack(&self) { + self.inner().ack(); + } +} + +pub struct MessageIter { + range: std::ops::Range, + array: Array, + marker: PhantomData, +} + +impl std::iter::Iterator for MessageIter where T: DeserializeOwned, { @@ -115,10 +241,9 @@ where fn next(&mut self) -> Option { let index = self.range.next()?; - let value = self.array.get(index); - - Some(self.parse_message(&value)) + let raw_message = RawMessage::from(MessageSys::from(value)); + Some(raw_message.try_into()) } #[inline] @@ -127,21 +252,53 @@ where } } -impl std::iter::DoubleEndedIterator for MessageIter<'_, T> +impl std::iter::DoubleEndedIterator for MessageIter where T: DeserializeOwned, { fn next_back(&mut self) -> Option { let index = self.range.next_back()?; let value = self.array.get(index); + let raw_message = RawMessage::from(MessageSys::from(value)); + Some(raw_message.try_into()) + } +} + +impl std::iter::FusedIterator for MessageIter where T: DeserializeOwned {} + +impl std::iter::ExactSizeIterator for MessageIter where T: DeserializeOwned {} + +pub struct RawMessageIter { + range: std::ops::Range, + array: Array, +} + +impl std::iter::Iterator for RawMessageIter { + type Item = RawMessage; + + fn next(&mut self) -> Option { + let index = self.range.next()?; + let value = self.array.get(index); + Some(MessageSys::from(value).into()) + } - Some(self.parse_message(&value)) + #[inline] + fn size_hint(&self) -> (usize, Option) { + self.range.size_hint() } } -impl<'a, T> std::iter::FusedIterator for MessageIter<'a, T> where T: DeserializeOwned {} +impl std::iter::DoubleEndedIterator for RawMessageIter { + fn next_back(&mut self) -> Option { + let index = self.range.next_back()?; + let value = self.array.get(index); + Some(MessageSys::from(value).into()) + } +} + +impl std::iter::FusedIterator for RawMessageIter {} -impl<'a, T> std::iter::ExactSizeIterator for MessageIter<'a, T> where T: DeserializeOwned {} +impl std::iter::ExactSizeIterator for RawMessageIter {} #[derive(Clone)] pub struct Queue(EdgeQueue); @@ -179,15 +336,341 @@ impl AsRef for Queue { } } +#[derive(Clone, Copy, Debug)] +pub enum QueueContentType { + /// Send a JavaScript object that can be JSON-serialized. This content type can be previewed from the Cloudflare dashboard. + Json, + /// Send a String. This content type can be previewed with the List messages from the dashboard feature. + Text, + /// Send a JavaScript object that cannot be JSON-serialized but is supported by structured clone (for example Date and Map). This content type cannot be previewed from the Cloudflare dashboard and will display as Base64-encoded. + V8, +} + +impl Serialize for QueueContentType { + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + serializer.serialize_str(match self { + Self::Json => "json", + Self::Text => "text", + Self::V8 => "v8", + }) + } +} + +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +pub struct QueueSendOptions { + content_type: Option, + delay_seconds: Option, +} + +pub struct MessageBuilder { + message: T, + delay_seconds: Option, + content_type: QueueContentType, +} + +impl MessageBuilder { + /// Creates a new message builder. The message must be `serializable`. + pub fn new(message: T) -> Self { + Self { + message, + delay_seconds: None, + content_type: QueueContentType::Json, + } + } + + #[must_use] + /// The number of seconds to delay a message for within the queue, before it can be delivered to a consumer + pub fn delay_seconds(mut self, delay_seconds: u32) -> Self { + self.delay_seconds = Some(delay_seconds); + self + } + + #[must_use] + /// The content type of the message. + /// Default is `QueueContentType::Json`. + pub fn content_type(mut self, content_type: QueueContentType) -> Self { + self.content_type = content_type; + self + } + + /// Build the message. + pub fn build(self) -> SendMessage { + SendMessage { + message: self.message, + options: Some(QueueSendOptions { + content_type: Some(self.content_type), + delay_seconds: self.delay_seconds, + }), + } + } +} + +pub struct RawMessageBuilder { + message: JsValue, + delay_seconds: Option, +} + +impl RawMessageBuilder { + /// Creates a new raw message builder. The message must be a `JsValue`. + pub fn new(message: JsValue) -> Self { + Self { + message, + delay_seconds: None, + } + } + + #[must_use] + /// The number of seconds to delay a message for within the queue, before it can be delivered to a consumer + pub fn delay_seconds(mut self, delay_seconds: u32) -> Self { + self.delay_seconds = Some(delay_seconds); + self + } + + /// Build the message with a content type. + pub fn build_with_content_type(self, content_type: QueueContentType) -> SendMessage { + SendMessage { + message: self.message, + options: Some(QueueSendOptions { + content_type: Some(content_type), + delay_seconds: self.delay_seconds, + }), + } + } +} + +/// A wrapper type used for sending message. +/// +/// This type can't be constructed directly. +/// +/// It should be constructed using the `MessageBuilder`, `RawMessageBuilder` or by calling `.into()` on a struct that is `serializable`. +pub struct SendMessage { + /// The body of the message. + /// + /// Can be either a serializable struct or a `JsValue`. + message: T, + + /// Options to apply to the current message, including content type and message delay settings. + options: Option, +} + +impl SendMessage { + fn into_raw_send_message(self) -> Result> { + Ok(SendMessage { + message: serde_wasm_bindgen::to_value(&self.message)?, + options: self.options, + }) + } +} + +impl From for SendMessage { + fn from(message: T) -> Self { + Self { + message, + options: Some(QueueSendOptions { + content_type: Some(QueueContentType::Json), + delay_seconds: None, + }), + } + } +} + +pub struct BatchSendMessage { + body: Vec>, + options: Option, +} + +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +pub struct QueueSendBatchOptions { + delay_seconds: Option, +} + +pub struct BatchMessageBuilder { + messages: Vec>, + delay_seconds: Option, +} + +impl BatchMessageBuilder { + /// Creates a new batch message builder. + pub fn new() -> Self { + Self { + messages: Vec::new(), + delay_seconds: None, + } + } + + #[must_use] + /// Adds a message to the batch. + pub fn message>>(mut self, message: U) -> Self { + self.messages.push(message.into()); + self + } + + #[must_use] + /// Adds messages to the batch. + pub fn messages(mut self, messages: U) -> Self + where + U: IntoIterator, + V: Into>, + { + self.messages + .extend(messages.into_iter().map(std::convert::Into::into)); + self + } + + #[must_use] + /// The number of seconds to delay a message for within the queue, before it can be delivered to a consumer + pub fn delay_seconds(mut self, delay_seconds: u32) -> Self { + self.delay_seconds = Some(delay_seconds); + self + } + + pub fn build(self) -> BatchSendMessage { + BatchSendMessage { + body: self.messages, + options: self + .delay_seconds + .map(|delay_seconds| QueueSendBatchOptions { + delay_seconds: Some(delay_seconds), + }), + } + } +} + +impl From for BatchSendMessage +where + U: IntoIterator, + V: Into>, +{ + fn from(value: U) -> Self { + Self { + body: value.into_iter().map(std::convert::Into::into).collect(), + options: None, + } + } +} + +impl BatchSendMessage { + fn into_raw_batch_send_message(self) -> Result> { + Ok(BatchSendMessage { + body: self + .body + .into_iter() + .map(SendMessage::into_raw_send_message) + .collect::>()?, + options: self.options, + }) + } +} + impl Queue { /// Sends a message to the Queue. - pub async fn send(&self, message: &T) -> Result<()> + /// + /// Accepts a struct that is `serializable`. + /// + /// If message options are needed use the `MessageBuilder` to create the message. + /// + /// ## Example + /// ```no_run + /// #[derive(Serialize)] + /// pub struct MyMessage { + /// my_data: u32, + /// } + /// + /// queue.send(MyMessage{ my_data: 1}).await?; + /// ``` + pub async fn send>>(&self, message: U) -> Result<()> where T: Serialize, { - let js_value = serde_wasm_bindgen::to_value(message)?; - let fut: JsFuture = self.0.send(js_value).into(); + let message: SendMessage = message.into(); + let serialized_message = message.into_raw_send_message()?; + self.send_raw(serialized_message).await + } + + /// Sends a raw `JsValue` to the Queue. + /// + /// Use the `RawMessageBuilder` to create the message. + pub async fn send_raw>>(&self, message: T) -> Result<()> { + let message: SendMessage = message.into(); + let options = match message.options { + Some(options) => serde_wasm_bindgen::to_value(&options)?, + None => JsValue::null(), + }; + + let fut: JsFuture = self.0.send(message.message, options).into(); + fut.await.map_err(Error::from)?; + Ok(()) + } + + /// Sends a batch of messages to the Queue. + /// + /// Accepts an iterator that produces structs that are `serializable`. + /// + /// If message options are needed use the `BatchMessageBuilder` to create the batch. + /// + /// ## Example + /// ```no_run + /// #[derive(Serialize)] + /// pub struct MyMessage { + /// my_data: u32, + /// } + /// + /// queue.send_batch(vec![MyMessage{ my_data: 1}]).await?; + /// ``` + pub async fn send_batch>>( + &self, + messages: U, + ) -> Result<()> { + let messages: BatchSendMessage = messages.into(); + let serialized_messages = messages.into_raw_batch_send_message()?; + self.send_raw_batch(serialized_messages).await + } + + /// Sends a batch of raw messages to the Queue. + /// + /// Accepts an iterator that produces structs that are `serializable`. + /// + /// If message options are needed use the `BatchMessageBuilder` to create the batch. + pub async fn send_raw_batch>>( + &self, + messages: T, + ) -> Result<()> { + let messages: BatchSendMessage = messages.into(); + let batch_send_options = serde_wasm_bindgen::to_value(&messages.options)?; + + let messages = messages + .body + .into_iter() + .map(|message: SendMessage| { + let body = message.message; + let message_send_request = js_sys::Object::new(); + + js_sys::Reflect::set(&message_send_request, &"body".into(), &body)?; + js_sys::Reflect::set( + &message_send_request, + &"contentType".into(), + &serde_wasm_bindgen::to_value( + &message.options.as_ref().map(|o| o.content_type), + )?, + )?; + js_sys::Reflect::set( + &message_send_request, + &"delaySeconds".into(), + &serde_wasm_bindgen::to_value( + &message.options.as_ref().map(|o| o.delay_seconds), + )?, + )?; + + Ok::(message_send_request.into()) + }) + .collect::>()?; + let fut: JsFuture = self.0.send_batch(messages, batch_send_options).into(); fut.await.map_err(Error::from)?; Ok(()) }