Skip to content

Commit

Permalink
Update queue bindings (#335)
Browse files Browse the repository at this point in the history
* Update queues

* Update queue bindings

* Fix tests and lint

* Fix tests

* Add support for message batches and retry / message options

* Add queue example

* Address comments
  • Loading branch information
jdon authored Mar 29, 2024
1 parent fbdaca4 commit cd41d40
Show file tree
Hide file tree
Showing 13 changed files with 901 additions and 95 deletions.
10 changes: 10 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

29 changes: 25 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -308,20 +308,41 @@ pub async fn main(message_batch: MessageBatch<MyType>, env: Env, _ctx: Context)
// Log the message and meta data
console_log!(
"Got message {:?}, with id {} and timestamp: {}",
message.body,
message.id,
message.timestamp.to_string()
message.body(),
message.id(),
message.timestamp().to_string()
);

// Send the message body to the other queue
my_queue.send(&message.body).await?;
my_queue.send(message.body()).await?;

// Ack individual message
message.ack();

// Retry individual message
message.retry();
}

// Retry all messages
message_batch.retry_all();
// Ack all messages
message_batch.ack_all();
Ok(())
}
```
You'll need to ensure you have the correct bindings in your `wrangler.toml`:
```toml
# ...
[[queues.consumers]]
queue = "myqueueotherqueue"
max_batch_size = 10
max_batch_timeout = 30


[[queues.producers]]
queue = "myqueue"
binding = "my_queue"
```

## Testing with Miniflare

Expand Down
20 changes: 20 additions & 0 deletions examples/queue/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
[package]
name = "queue-on-workers"
version = "0.1.0"
edition = "2021"

[package.metadata.release]
release = false

# https://github.com/rustwasm/wasm-pack/issues/1247
[package.metadata.wasm-pack.profile.release]
wasm-opt = false

[lib]
crate-type = ["cdylib"]

[dependencies]
serde = "1"
worker = { workspace = true, features = ["queue"] }
wasm-bindgen = { workspace = true }
js-sys = { workspace = true }
3 changes: 3 additions & 0 deletions examples/queue/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Using Queues on Cloudflare Workers

Demonstration of using `worker::Queue`
12 changes: 12 additions & 0 deletions examples/queue/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"name": "queue-on-workers",
"version": "0.0.0",
"private": true,
"scripts": {
"deploy": "wrangler deploy",
"dev": "wrangler dev --local"
},
"devDependencies": {
"wrangler": "^3"
}
}
149 changes: 149 additions & 0 deletions examples/queue/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
use serde::{Deserialize, Serialize};
use wasm_bindgen::JsValue;
use worker::*;

const MY_MESSAGES_BINDING_NAME: &str = "my_messages";
const MY_MESSAGES_QUEUE_NAME: &str = "mymessages";

const RAW_MESSAGES_BINDING_NAME: &str = "raw_messages";
const RAW_MESSAGES_QUEUE_NAME: &str = "rawmessages";

#[derive(Serialize, Debug, Clone, Deserialize)]
pub struct MyType {
foo: String,
bar: u32,
}

#[event(fetch)]
async fn main(_req: Request, env: Env, _: worker::Context) -> Result<Response> {
let my_messages_queue = env.queue(MY_MESSAGES_BINDING_NAME)?;
let raw_messages_queue = env.queue(RAW_MESSAGES_BINDING_NAME)?;

// Send a message with using a serializable struct
my_messages_queue
.send(MyType {
foo: "Hello world".into(),
bar: 1,
})
.await?;

// Send a batch of messages using some sort of iterator
my_messages_queue
.send_batch([
// Use the MessageBuilder to set additional options
MessageBuilder::new(MyType {
foo: "Hello world".into(),
bar: 2,
})
.delay_seconds(20)
.build(),
// Send a message with using a serializable struct
MyType {
foo: "Hello world".into(),
bar: 4,
}
.into(),
])
.await?;

// Send a batch of messages using the BatchMessageBuilder
my_messages_queue
.send_batch(
BatchMessageBuilder::new()
.message(MyType {
foo: "Hello world".into(),
bar: 4,
})
.messages(vec![
MyType {
foo: "Hello world".into(),
bar: 5,
},
MyType {
foo: "Hello world".into(),
bar: 6,
},
])
.delay_seconds(10)
.build(),
)
.await?;

// Send a raw JSValue
raw_messages_queue
.send_raw(
// RawMessageBuilder has to be used as we should set content type of these raw messages
RawMessageBuilder::new(JsValue::from_str("7"))
.delay_seconds(30)
.build_with_content_type(QueueContentType::Json),
)
.await?;

// Send a batch of raw JSValues using the BatchMessageBuilder
raw_messages_queue
.send_raw_batch(
BatchMessageBuilder::new()
.message(
RawMessageBuilder::new(js_sys::Date::new_0().into())
.build_with_content_type(QueueContentType::V8),
)
.message(
RawMessageBuilder::new(JsValue::from_str("8"))
.build_with_content_type(QueueContentType::Json),
)
.delay_seconds(10)
.build(),
)
.await?;

// Send a batch of raw JsValues using some sort of iterator
raw_messages_queue
.send_raw_batch(vec![RawMessageBuilder::new(JsValue::from_str("9"))
.delay_seconds(20)
.build_with_content_type(QueueContentType::Text)])
.await?;

Response::empty()
}

// Consumes messages from `my_messages` queue and `raw_messages` queue
#[event(queue)]
pub async fn main(message_batch: MessageBatch<MyType>, _: Env, _: Context) -> Result<()> {
match message_batch.queue().as_str() {
MY_MESSAGES_QUEUE_NAME => {
for message in message_batch.messages()? {
console_log!(
"Got message {:?}, with id {} and timestamp: {}",
message.body(),
message.id(),
message.timestamp().to_string(),
);
if message.body().bar == 1 {
message.retry_with_options(
&QueueRetryOptionsBuilder::new()
.with_delay_seconds(10)
.build(),
);
} else {
message.ack();
}
}
}
RAW_MESSAGES_QUEUE_NAME => {
for message in message_batch.raw_iter() {
console_log!(
"Got raw message {:?}, with id {} and timestamp: {}",
message.body(),
message.id(),
message.timestamp().to_string(),
);
}
message_batch.ack_all();
}
_ => {
console_error!("Unknown queue: {}", message_batch.queue());
}
}

Ok(())
}
23 changes: 23 additions & 0 deletions examples/queue/wrangler.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
name = "queue-on-workers"
main = "build/worker/shim.mjs"
compatibility_date = "2024-03-26"


[build]
command = "cargo install --path ../../worker-build && worker-build --release"

[[queues.consumers]]
queue = "mymessages"
dead_letter_queue = "mymessagesdlq"

[[queues.consumers]]
queue = "rawmessages"


[[queues.producers]]
queue = "mymessages"
binding = "my_messages"

[[queues.producers]]
queue = "rawmessages"
binding = "raw_messages"
2 changes: 1 addition & 1 deletion worker-macros/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ pub fn expand_macro(attr: TokenStream, item: TokenStream, http: bool) -> TokenSt
pub async fn #wrapper_fn_ident(event: ::worker::worker_sys::MessageBatch, env: ::worker::Env, ctx: ::worker::worker_sys::Context) {
// call the original fn
let ctx = worker::Context::new(ctx);
match #input_fn_ident(::worker::MessageBatch::new(event), env, ctx).await {
match #input_fn_ident(::worker::MessageBatch::from(event), env, ctx).await {
Ok(()) => {},
Err(e) => {
::worker::console_log!("{}", &e);
Expand Down
35 changes: 30 additions & 5 deletions worker-sandbox/src/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ use serde::{Deserialize, Serialize};
use uuid::Uuid;

use super::{SomeSharedData, GLOBAL_QUEUE_STATE};
use worker::{console_log, event, Context, Env, MessageBatch, Request, Response, Result};
use worker::{
console_log, event, Context, Env, MessageBatch, MessageExt, Request, Response, Result,
};
#[derive(Serialize, Debug, Clone, Deserialize)]
pub struct QueueBody {
pub id: Uuid,
Expand All @@ -15,11 +17,11 @@ pub async fn queue(message_batch: MessageBatch<QueueBody>, _env: Env, _ctx: Cont
for message in message_batch.messages()? {
console_log!(
"Received queue message {:?}, with id {} and timestamp: {}",
message.body,
message.id,
message.timestamp.to_string()
message.body(),
message.id(),
message.timestamp().to_string()
);
guard.push(message.body);
guard.push(message.into_body());
}
Ok(())
}
Expand Down Expand Up @@ -54,6 +56,29 @@ pub async fn handle_queue_send(req: Request, env: Env, _data: SomeSharedData) ->
}
}

#[worker::send]
pub async fn handle_batch_send(mut req: Request, env: Env, _: SomeSharedData) -> Result<Response> {
let messages: Vec<QueueBody> = match req.json().await {
Ok(messages) => messages,
Err(err) => {
return Response::error(format!("Failed to parse request body: {err:?}"), 400);
}
};

let my_queue = match env.queue("my_queue") {
Ok(queue) => queue,
Err(err) => return Response::error(format!("Failed to get queue: {err:?}"), 500),
};

match my_queue.send_batch(messages).await {
Ok(()) => Response::ok("Message sent"),
Err(err) => Response::error(
format!("Failed to batch send message to queue: {err:?}"),
500,
),
}
}

pub async fn handle_queue(_req: Request, _env: Env, _data: SomeSharedData) -> Result<Response> {
let guard = GLOBAL_QUEUE_STATE.lock().unwrap();
let messages: Vec<QueueBody> = guard.clone();
Expand Down
5 changes: 5 additions & 0 deletions worker-sandbox/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,10 @@ pub fn make_router(data: SomeSharedData, env: Env) -> axum::Router {
get(handler!(service::handle_remote_by_path)),
)
.route("/queue/send/:id", post(handler!(queue::handle_queue_send)))
.route(
"/queue/send_batch",
post(handler!(queue::handle_batch_send)),
)
.route("/queue", get(handler!(queue::handle_queue)))
.route("/d1/prepared", get(handler!(d1::prepared_statement)))
.route("/d1/batch", get(handler!(d1::batch)))
Expand Down Expand Up @@ -301,6 +305,7 @@ pub fn make_router<'a>(data: SomeSharedData) -> Router<'a, SomeSharedData> {
)
.get_async("/remote-by-path", handler!(service::handle_remote_by_path))
.post_async("/queue/send/:id", handler!(queue::handle_queue_send))
.post_async("/queue/send_batch", handler!(queue::handle_batch_send))
.get_async("/queue", handler!(queue::handle_queue))
.get_async("/d1/prepared", handler!(d1::prepared_statement))
.get_async("/d1/batch", handler!(d1::batch))
Expand Down
27 changes: 27 additions & 0 deletions worker-sandbox/tests/queue.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,31 @@ describe("queue", () => {
const message = messages.find((msg) => msg.id === id.toString());
expect(message).toMatchObject({ id: id.toString() });
});

test("batch send message to queue", async () => {
const id_1 = uuid.v4();
const id_2 = uuid.v4();

let resp = await mf.dispatchFetch(`https://fake.host/queue/send_batch`, {
method: "POST",
body: JSON.stringify([{ id: id_1, id_string: id_1 }, { id: id_2, id_string: id_2 }])
});

expect(resp.status).toBe(200);

await new Promise((resolve) => setTimeout(resolve, 1200));

resp = await mf.dispatchFetch("https://fake.host/queue");
expect(resp.status).toBe(200);

let body = await resp.json();

const messages = body as { id: string }[];

const message_1 = messages.find((msg) => msg.id === id_1.toString());
expect(message_1).toMatchObject({ id: id_1.toString() });

const message_2 = messages.find((msg) => msg.id === id_2.toString());
expect(message_2).toMatchObject({ id: id_2.toString() });
});
});
Loading

0 comments on commit cd41d40

Please sign in to comment.