Skip to content

Commit 3f59f7a

Browse files
committed
wip: Have Bin implement FromMessageParts and Data implement FromMessage
1 parent f856599 commit 3f59f7a

File tree

17 files changed

+47
-41
lines changed

17 files changed

+47
-41
lines changed

e2e/socketioxide/socketioxide.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ fn on_connect(socket: SocketRef, Data(data): Data<PayloadValue>) {
2727
// keep this handler async to test async message handlers
2828
socket.on(
2929
"message-with-ack",
30-
|Data::<PayloadValue>(data), ack: AckSender| async move {
30+
|ack: AckSender, Data::<PayloadValue>(data)| async move {
3131
info!("Received event: {:?}", data);
3232
ack.send(data).ok();
3333
},

examples/angular-todomvc/src/main.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
4343

4444
s.on(
4545
"update-store",
46-
|s: SocketRef, Data::<Vec<Todo>>(new_todos), State(Todos(todos))| {
46+
|s: SocketRef, State(Todos(todos)), Data::<Vec<Todo>>(new_todos)| {
4747
info!("Received update-store event: {:?}", new_todos);
4848

4949
let mut todos = todos.lock().unwrap();

examples/axum-echo-tls/axum_echo-tls.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ fn on_connect(socket: SocketRef, Data(data): Data<PayloadValue>) {
2323

2424
socket.on(
2525
"message-with-ack",
26-
|Data::<PayloadValue>(data), ack: AckSender| {
26+
|ack: AckSender, Data::<PayloadValue>(data)| {
2727
info!("Received event: {:?}", data);
2828
ack.send(data).ok();
2929
},

examples/axum-echo/axum_echo.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ fn on_connect(socket: SocketRef, Data(data): Data<PayloadValue>) {
2020

2121
socket.on(
2222
"message-with-ack",
23-
|Data::<PayloadValue>(data), ack: AckSender| {
23+
|ack: AckSender, Data::<PayloadValue>(data)| {
2424
info!("Received event: {:?}", data);
2525
ack.send(data).ok();
2626
},

examples/basic-crud-application/src/handlers/todo.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ impl Todos {
4141
}
4242
}
4343

44-
pub fn create(s: SocketRef, Data(data): Data<PartialTodo>, ack: AckSender, todos: State<Todos>) {
44+
pub fn create(s: SocketRef, ack: AckSender, todos: State<Todos>, Data(data): Data<PartialTodo>) {
4545
let id = Uuid::new_v4();
4646
let todo = Todo { id, inner: data };
4747

@@ -53,12 +53,12 @@ pub fn create(s: SocketRef, Data(data): Data<PartialTodo>, ack: AckSender, todos
5353
s.broadcast().emit("todo:created", todo).ok();
5454
}
5555

56-
pub async fn read(Data(id): Data<Uuid>, ack: AckSender, todos: State<Todos>) {
56+
pub async fn read(ack: AckSender, todos: State<Todos>, Data(id): Data<Uuid>) {
5757
let todo = todos.get(&id).ok_or(Error::NotFound);
5858
ack.send(todo).ok();
5959
}
6060

61-
pub async fn update(s: SocketRef, Data(data): Data<Todo>, ack: AckSender, todos: State<Todos>) {
61+
pub async fn update(s: SocketRef, ack: AckSender, todos: State<Todos>, Data(data): Data<Todo>) {
6262
let res = todos
6363
.get_mut(&data.id)
6464
.ok_or(Error::NotFound)
@@ -70,7 +70,7 @@ pub async fn update(s: SocketRef, Data(data): Data<Todo>, ack: AckSender, todos:
7070
ack.send(res).ok();
7171
}
7272

73-
pub async fn delete(s: SocketRef, Data(id): Data<Uuid>, ack: AckSender, todos: State<Todos>) {
73+
pub async fn delete(s: SocketRef, ack: AckSender, todos: State<Todos>, Data(id): Data<Uuid>) {
7474
let res = todos.remove(&id).ok_or(Error::NotFound).map(|_| {
7575
s.broadcast().emit("todo:deleted", id).ok();
7676
});

examples/chat/src/main.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
7070

7171
s.on(
7272
"add user",
73-
|s: SocketRef, Data::<String>(username), user_cnt: State<UserCnt>| {
73+
|s: SocketRef, user_cnt: State<UserCnt>, Data::<String>(username)| {
7474
if s.extensions.get::<Username>().is_some() {
7575
return;
7676
}

examples/hyper-echo/hyper_echo.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ fn on_connect(socket: SocketRef, Data(data): Data<PayloadValue>) {
2424

2525
socket.on(
2626
"message-with-ack",
27-
|Data::<PayloadValue>(data), ack: AckSender| {
27+
|ack: AckSender, Data::<PayloadValue>(data)| {
2828
info!("Received event: {:?}", data);
2929
ack.send(data).ok();
3030
},

examples/loco-rooms-chat/src/channels/application.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ pub async fn on_connect(socket: SocketRef) {
1818

1919
socket.on(
2020
"join",
21-
|socket: SocketRef, Data::<String>(room), store: State<state::MessageStore>| async move {
21+
|socket: SocketRef, store: State<state::MessageStore>, Data::<String>(room)| async move {
2222
tracing::info!("Received join: {:?}", room);
2323
let _ = socket.leave_all();
2424
let _ = socket.join(room.clone());
@@ -29,7 +29,7 @@ pub async fn on_connect(socket: SocketRef) {
2929

3030
socket.on(
3131
"message",
32-
|socket: SocketRef, Data::<MessageIn>(data), store: State<state::MessageStore>| async move {
32+
|socket: SocketRef, store: State<state::MessageStore>, Data::<MessageIn>(data)| async move {
3333
tracing::info!("Received message: {:?}", data);
3434

3535
let response = state::Message {

examples/private-messaging/src/handlers.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ pub fn on_connection(
5353

5454
s.on(
5555
"private message",
56-
|s: SocketRef, Data(PrivateMessageReq { to, content }), State(Messages(msg))| {
56+
|s: SocketRef, State(Messages(msg)), Data(PrivateMessageReq { to, content })| {
5757
let user_id = s.extensions.get::<Session>().unwrap().user_id;
5858
let message = Message {
5959
from: user_id,

examples/react-rooms-chat/src/main.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ async fn on_connect(socket: SocketRef) {
2727

2828
socket.on(
2929
"join",
30-
|socket: SocketRef, Data::<String>(room), store: State<state::MessageStore>| async move {
30+
|socket: SocketRef, store: State<state::MessageStore>, Data::<String>(room)| async move {
3131
info!("Received join: {:?}", room);
3232
let _ = socket.leave_all();
3333
let _ = socket.join(room.clone());
@@ -38,7 +38,7 @@ async fn on_connect(socket: SocketRef) {
3838

3939
socket.on(
4040
"message",
41-
|socket: SocketRef, Data::<MessageIn>(data), store: State<state::MessageStore>| async move {
41+
|socket: SocketRef, store: State<state::MessageStore>, Data::<MessageIn>(data)| async move {
4242
info!("Received message: {:?}", data);
4343

4444
let response = state::Message {

examples/salvo-echo/salvo_echo.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ fn on_connect(socket: SocketRef, Data(data): Data<PayloadValue>) {
2323

2424
socket.on(
2525
"message-with-ack",
26-
|Data::<PayloadValue>(data), ack: AckSender| {
26+
|ack: AckSender, Data::<PayloadValue>(data)| {
2727
info!("Received event: {:?}", data);
2828
ack.send(data).ok();
2929
},

examples/viz-echo/viz_echo.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ fn on_connect(socket: SocketRef, Data(data): Data<PayloadValue>) {
2020

2121
socket.on(
2222
"message-with-ack",
23-
|Data::<PayloadValue>(data), ack: AckSender| {
23+
|ack: AckSender, Data::<PayloadValue>(data)| {
2424
info!("Received event: {:?}", data);
2525
ack.send(data).ok();
2626
},

socketioxide/src/handler/extract.rs

+22-16
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,16 @@
22
//! and [`DisconnectHandler`](super::DisconnectHandler).
33
//!
44
//! They can be used to extract data from the context of the handler and get specific params. Here are some examples of extractors:
5-
//! * [`Data`]: extracts and deserialize to json any data, if a deserialization error occurs the handler won't be called:
5+
//! * [`Data`]: extracts and deserialize to json any data. Because it consumes the event it should be the last argument. If a
6+
//! deserialization error occurs the handler won't be called:
67
//! - for [`ConnectHandler`](super::ConnectHandler): extracts and deserialize to json the auth data
78
//! - for [`MessageHandler`](super::MessageHandler): extracts and deserialize to json the message data
8-
//! * [`TryData`]: extracts and deserialize to json any data but with a `Result` type in case of error:
9+
//! * [`TryData`]: extracts and deserialize to json any data. Because it consumes the event it should be the last argument. In case of
10+
//! error, a `Result` type is returned;
911
//! - for [`ConnectHandler`](super::ConnectHandler): extracts and deserialize to json the auth data
1012
//! - for [`MessageHandler`](super::MessageHandler): extracts and deserialize to json the message data
1113
//! * [`SocketRef`]: extracts a reference to the [`Socket`]
12-
//! * [`Bin`]: extract a binary payload for a given message. Because it consumes the event it should be the last argument
14+
//! * [`Bin`]: extract a binary payload for a given message.
1315
//! * [`AckSender`]: Can be used to send an ack response to the current message event
1416
//! * [`ProtocolVersion`](crate::ProtocolVersion): extracts the protocol version
1517
//! * [`TransportType`](crate::TransportType): extracts the transport type
@@ -129,19 +131,19 @@ where
129131
.map(Data)
130132
}
131133
}
132-
impl<T, A> FromMessageParts<A> for Data<T>
134+
impl<T, A> FromMessage<A> for Data<T>
133135
where
134136
T: DeserializeOwned,
135137
A: Adapter,
136138
{
137139
type Error = serde_json::Error;
138-
fn from_message_parts(
139-
_: &Arc<Socket<A>>,
140-
v: &mut PayloadValue,
141-
_: &Option<i64>,
140+
fn from_message(
141+
_: Arc<Socket<A>>,
142+
mut v: PayloadValue,
143+
_: Option<i64>,
142144
) -> Result<Self, Self::Error> {
143-
upwrap_array(v);
144-
v.clone().into_data::<T>().map(Data)
145+
upwrap_array(&mut v);
146+
v.into_data::<T>().map(Data)
145147
}
146148
}
147149

@@ -237,14 +239,18 @@ impl<A: Adapter> SocketRef<A> {
237239
/// An Extractor that returns the binary data of the message.
238240
/// If there is no binary data, it will contain an empty vec.
239241
pub struct Bin(pub Vec<Vec<u8>>);
240-
impl<A: Adapter> FromMessage<A> for Bin {
242+
impl<A: Adapter> FromMessageParts<A> for Bin {
241243
type Error = Infallible;
242-
fn from_message(
243-
_: Arc<Socket<A>>,
244-
mut v: PayloadValue,
245-
_: Option<i64>,
244+
fn from_message_parts(
245+
_: &Arc<Socket<A>>,
246+
v: &mut PayloadValue,
247+
_: &Option<i64>,
246248
) -> Result<Self, Infallible> {
247-
Ok(Bin(v.extract_binary_payloads()))
249+
Ok(Bin(v
250+
.binary_payloads_ref()
251+
.into_iter()
252+
.map(Clone::clone)
253+
.collect()))
248254
}
249255
}
250256

socketioxide/src/handler/message.rs

+5-5
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@
2929
//! ack.send("ack data").ok();
3030
//! });
3131
//!
32-
//! // `Bin` extractor must be the last argument because it consumes the rest of the packet
33-
//! s.on("binary_event", |s: SocketRef, TryData::<String>(data), Bin(bin)| {
32+
//! // `Data` extractor must be the last argument because it consumes the rest of the packet
33+
//! s.on("binary_event", |s: SocketRef, Bin(bin), TryData::<String>(data)| {
3434
//! println!("Socket received event with data: {:?} and binary data: {:?}", data, bin);
3535
//! })
3636
//! });
@@ -47,8 +47,8 @@
4747
//! tokio::time::sleep(std::time::Duration::from_secs(1)).await;
4848
//! println!("Socket received event with data: {}", data);
4949
//! });
50-
//! // `Bin` extractor must be the last argument because it consumes the rest of the packet
51-
//! s.on("/binary_event", move |s: SocketRef, TryData::<String>(data), Bin(bin)| async move {
50+
//! // `Data` extractor must be the last argument because it consumes the rest of the packet
51+
//! s.on("/binary_event", move |s: SocketRef, Bin(bin), TryData::<String>(data)| async move {
5252
//! println!("Socket received event with data: {:?} and binary data: {:?}", data, bin);
5353
//! })
5454
//! });
@@ -60,7 +60,7 @@
6060
//! # use serde_json::Error;
6161
//! # use socketioxide::extract::*;
6262
//! // async named event handler
63-
//! async fn on_event(s: SocketRef, Data(data): Data<PayloadValue>, ack: AckSender) {
63+
//! async fn on_event(s: SocketRef, ack: AckSender, Data(data): Data<PayloadValue>) {
6464
//! tokio::time::sleep(std::time::Duration::from_secs(1)).await;
6565
//! ack.send("Here is my acknowledgment!").ok();
6666
//! }

socketioxide/src/io.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -295,7 +295,7 @@ impl<A: Adapter> SocketIo<A> {
295295
/// // Register an async handler for the "test" event and extract the data as a `MyData` struct
296296
/// // Extract the binary payload as a `Vec<Vec<u8>>` with the Bin extractor.
297297
/// // It should be the last extractor because it consumes the request
298-
/// socket.on("test", |socket: SocketRef, Data::<MyData>(data), ack: AckSender| async move {
298+
/// socket.on("test", |socket: SocketRef, ack: AckSender, Data::<MyData>(data)| async move {
299299
/// println!("Received a test message {:?}", data);
300300
/// tokio::time::sleep(std::time::Duration::from_secs(1)).await;
301301
/// ack.send(data).ok(); // The data received is sent back to the client through the ack

socketioxide/src/operators.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -304,7 +304,7 @@ impl<A: Adapter> ConfOperators<'_, A> {
304304
/// # use socketioxide::{PayloadValue, SocketIo, extract::*};
305305
/// let (_, io) = SocketIo::new_svc();
306306
/// io.ns("/", |socket: SocketRef| {
307-
/// socket.on("test", |socket: SocketRef, Data::<PayloadValue>(data), Bin(bin)| async move {
307+
/// socket.on("test", |socket: SocketRef, Bin(bin), Data::<PayloadValue>(data)| async move {
308308
/// // Emit a test message to the client
309309
/// socket.emit("test", data).ok();
310310
///

socketioxide/src/socket.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,7 @@ impl<A: Adapter> Socket<A> {
218218
/// // Register an async handler for the "test" event and extract the data as a `MyData` struct
219219
/// // Extract the binary payload as a `Vec<Vec<u8>>` with the Bin extractor.
220220
/// // It should be the last extractor because it consumes the request
221-
/// socket.on("test", |socket: SocketRef, Data::<MyData>(data), ack: AckSender| async move {
221+
/// socket.on("test", |socket: SocketRef, ack: AckSender, Data::<MyData>(data)| async move {
222222
/// println!("Received a test message {:?}", data);
223223
/// tokio::time::sleep(std::time::Duration::from_secs(1)).await;
224224
/// ack.send(data).ok(); // The data received is sent back to the client through the ack

0 commit comments

Comments
 (0)