Skip to content

Commit ae0d6ce

Browse files
authored
feat(socketio/socket): use reserve rather than reserve_many and Iterator for Permtis
1 parent df5faee commit ae0d6ce

File tree

4 files changed

+34
-59
lines changed

4 files changed

+34
-59
lines changed

engineioxide/src/socket.rs

+3-26
Original file line numberDiff line numberDiff line change
@@ -146,29 +146,6 @@ impl Permit<'_> {
146146
}
147147
}
148148

149-
/// An [`Iterator`] over the permits returned by the [`reserve`](Socket::reserve) function
150-
#[derive(Debug)]
151-
pub struct PermitIterator<'a> {
152-
inner: mpsc::PermitIterator<'a, PacketBuf>,
153-
}
154-
155-
impl<'a> Iterator for PermitIterator<'a> {
156-
type Item = Permit<'a>;
157-
158-
#[inline]
159-
fn next(&mut self) -> Option<Self::Item> {
160-
let inner = self.inner.next()?;
161-
Some(Permit { inner })
162-
}
163-
}
164-
impl ExactSizeIterator for PermitIterator<'_> {
165-
#[inline]
166-
fn len(&self) -> usize {
167-
self.inner.len()
168-
}
169-
}
170-
impl std::iter::FusedIterator for PermitIterator<'_> {}
171-
172149
/// Buffered packets to send to the client
173150
pub(crate) type PacketBuf = SmallVec<[Packet; 10]>;
174151
/// A [`Socket`] represents a client connection to the server.
@@ -417,9 +394,9 @@ where
417394
/// If the internal chan is full, the function will return a [`TrySendError::Full`] error.
418395
/// If the socket is closed, the function will return a [`TrySendError::Closed`] error.
419396
#[inline]
420-
pub fn reserve(&self, n: usize) -> Result<PermitIterator<'_>, TrySendError<()>> {
421-
let inner = self.internal_tx.try_reserve_many(n)?;
422-
Ok(PermitIterator { inner })
397+
pub fn reserve(&self) -> Result<Permit<'_>, TrySendError<()>> {
398+
let permit = self.internal_tx.try_reserve()?;
399+
Ok(Permit { inner: permit })
423400
}
424401

425402
/// Emits a message to the client.

socketioxide/src/handler/extract.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -298,10 +298,10 @@ impl<A: Adapter> AckSender<A> {
298298

299299
/// Send the ack response to the client.
300300
pub fn send<T: Serialize>(self, data: T) -> Result<(), SendError<T>> {
301-
use crate::socket::PermitIteratorExt;
301+
use crate::socket::PermitExt;
302302
if let Some(ack_id) = self.ack_id {
303-
let permits = match self.socket.reserve(1) {
304-
Ok(permits) => permits,
303+
let permit = match self.socket.reserve() {
304+
Ok(permit) => permit,
305305
Err(e) => {
306306
#[cfg(feature = "tracing")]
307307
tracing::debug!("sending error during emit message: {e:?}");
@@ -315,7 +315,7 @@ impl<A: Adapter> AckSender<A> {
315315
} else {
316316
Packet::bin_ack(ns, data, self.binary, ack_id)
317317
};
318-
permits.emit(packet);
318+
permit.send(packet);
319319
Ok(())
320320
} else {
321321
Ok(())

socketioxide/src/operators.rs

+7-7
Original file line numberDiff line numberDiff line change
@@ -346,20 +346,20 @@ impl<A: Adapter> ConfOperators<'_, A> {
346346
data: T,
347347
) -> Result<(), SendError<T>> {
348348
use crate::errors::SocketError;
349-
use crate::socket::PermitIteratorExt;
349+
use crate::socket::PermitExt;
350350
if !self.socket.connected() {
351351
return Err(SendError::Socket(SocketError::Closed(data)));
352352
}
353-
let permits = match self.socket.reserve(1) {
354-
Ok(permits) => permits,
353+
let permit = match self.socket.reserve() {
354+
Ok(permit) => permit,
355355
Err(e) => {
356356
#[cfg(feature = "tracing")]
357357
tracing::debug!("sending error during emit message: {e:?}");
358358
return Err(e.with_value(data).into());
359359
}
360360
};
361361
let packet = self.get_packet(event, data)?;
362-
permits.emit(packet);
362+
permit.send(packet);
363363

364364
Ok(())
365365
}
@@ -424,8 +424,8 @@ impl<A: Adapter> ConfOperators<'_, A> {
424424
if !self.socket.connected() {
425425
return Err(SendError::Socket(SocketError::Closed(data)));
426426
}
427-
let permits = match self.socket.reserve(1) {
428-
Ok(permits) => permits,
427+
let permit = match self.socket.reserve() {
428+
Ok(permit) => permit,
429429
Err(e) => {
430430
#[cfg(feature = "tracing")]
431431
tracing::debug!("sending error during emit message: {e:?}");
@@ -434,7 +434,7 @@ impl<A: Adapter> ConfOperators<'_, A> {
434434
};
435435
let timeout = self.timeout.unwrap_or(self.socket.config.ack_timeout);
436436
let packet = self.get_packet(event, data)?;
437-
let rx = self.socket.send_with_ack_permit(packet, permits);
437+
let rx = self.socket.send_with_ack_permit(packet, permit);
438438
let stream = AckInnerStream::send(rx, timeout, self.socket.id);
439439
Ok(AckStream::<V>::from(stream))
440440
}

socketioxide/src/socket.rs

+20-22
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use std::{
1212
time::Duration,
1313
};
1414

15-
use engineioxide::socket::{DisconnectReason as EIoDisconnectReason, Permit, PermitIterator};
15+
use engineioxide::socket::{DisconnectReason as EIoDisconnectReason, Permit};
1616
use serde::{de::DeserializeOwned, Serialize};
1717
use serde_json::Value;
1818
use tokio::sync::oneshot::{self, Receiver};
@@ -101,12 +101,11 @@ impl From<EIoDisconnectReason> for DisconnectReason {
101101
}
102102
}
103103

104-
pub(crate) trait PermitIteratorExt<'a>:
105-
ExactSizeIterator<Item = Permit<'a>> + Sized
106-
{
107-
fn emit(mut self, mut packet: Packet<'_>) {
108-
debug_assert!(self.len() > 0, "No permits available to send the message");
109-
104+
pub(crate) trait PermitExt<'a> {
105+
fn send(self, packet: Packet<'_>);
106+
}
107+
impl<'a> PermitExt<'a> for Permit<'a> {
108+
fn send(self, mut packet: Packet<'_>) {
110109
let bin_payloads = match packet.inner {
111110
PacketData::BinaryEvent(_, ref mut bin, _) | PacketData::BinaryAck(ref mut bin, _) => {
112111
Some(std::mem::take(&mut bin.bin))
@@ -117,13 +116,12 @@ pub(crate) trait PermitIteratorExt<'a>:
117116
let msg = packet.into();
118117

119118
if let Some(bin_payloads) = bin_payloads {
120-
self.next().unwrap().emit_many(msg, bin_payloads);
119+
self.emit_many(msg, bin_payloads);
121120
} else {
122-
self.next().unwrap().emit(msg);
121+
self.emit(msg);
123122
}
124123
}
125124
}
126-
impl<'a> PermitIteratorExt<'a> for PermitIterator<'a> {}
127125

128126
/// A Socket represents a client connected to a namespace.
129127
/// It is used to send and receive messages from the client, join and leave rooms, etc.
@@ -314,8 +312,8 @@ impl<A: Adapter> Socket<A> {
314312
return Err(SendError::Socket(SocketError::Closed(data)));
315313
}
316314

317-
let permits = match self.reserve(1) {
318-
Ok(permits) => permits,
315+
let permit = match self.reserve() {
316+
Ok(permit) => permit,
319317
Err(e) => {
320318
#[cfg(feature = "tracing")]
321319
tracing::debug!("sending error during emit message: {e:?}");
@@ -325,7 +323,7 @@ impl<A: Adapter> Socket<A> {
325323

326324
let ns = self.ns();
327325
let data = serde_json::to_value(data)?;
328-
permits.emit(Packet::event(ns, event.into(), data));
326+
permit.send(Packet::event(ns, event.into(), data));
329327
Ok(())
330328
}
331329

@@ -388,8 +386,8 @@ impl<A: Adapter> Socket<A> {
388386
if !self.connected() {
389387
return Err(SendError::Socket(SocketError::Closed(data)));
390388
}
391-
let permits = match self.reserve(1) {
392-
Ok(permits) => permits,
389+
let permit = match self.reserve() {
390+
Ok(permit) => permit,
393391
Err(e) => {
394392
#[cfg(feature = "tracing")]
395393
tracing::debug!("sending error during emit message: {e:?}");
@@ -398,7 +396,7 @@ impl<A: Adapter> Socket<A> {
398396
};
399397
let data = serde_json::to_value(data)?;
400398
let packet = Packet::event(self.ns(), event.into(), data);
401-
let rx = self.send_with_ack_permit(packet, permits);
399+
let rx = self.send_with_ack_permit(packet, permit);
402400
let stream = AckInnerStream::send(rx, self.config.ack_timeout, self.id);
403401
Ok(AckStream::<V>::from(stream))
404402
}
@@ -649,26 +647,26 @@ impl<A: Adapter> Socket<A> {
649647
&self.ns.path
650648
}
651649

652-
pub(crate) fn reserve(&self, n: usize) -> Result<PermitIterator<'_>, SocketError<()>> {
653-
Ok(self.esocket.reserve(n)?)
650+
pub(crate) fn reserve(&self) -> Result<Permit<'_>, SocketError<()>> {
651+
Ok(self.esocket.reserve()?)
654652
}
655653

656654
pub(crate) fn send(&self, packet: Packet<'_>) -> Result<(), SocketError<()>> {
657-
let permits = self.reserve(1)?;
658-
permits.emit(packet);
655+
let permit = self.reserve()?;
656+
permit.send(packet);
659657
Ok(())
660658
}
661659

662660
pub(crate) fn send_with_ack_permit(
663661
&self,
664662
mut packet: Packet<'_>,
665-
permits: PermitIterator<'_>,
663+
permit: Permit<'_>,
666664
) -> Receiver<AckResult<Value>> {
667665
let (tx, rx) = oneshot::channel();
668666

669667
let ack = self.ack_counter.fetch_add(1, Ordering::SeqCst) + 1;
670668
packet.inner.set_ack_id(ack);
671-
permits.emit(packet);
669+
permit.send(packet);
672670
self.ack_message.lock().unwrap().insert(ack, tx);
673671
rx
674672
}

0 commit comments

Comments
 (0)