Skip to content

Commit ae794d8

Browse files
author
Charles Bournhonesque
committed
Remove client/server features on send/receive
Change-Id: Id054116e37bb1513892ee87b03ba5a5e0e499831
1 parent 193864f commit ae794d8

4 files changed

Lines changed: 163 additions & 177 deletions

File tree

src/lib.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -713,15 +713,13 @@ pub mod prelude {
713713
receive_markers::AppMarkerExt,
714714
registry::rule_fns::RuleFns,
715715
rules::{AppRuleExt, component::ReplicationMode},
716+
send::related_entities::SyncRelatedAppExt,
716717
signature::Signature,
717718
},
718719
replicon_tick::RepliconTick,
719720
},
720721
};
721722

722-
#[cfg(feature = "server")]
723-
pub use super::shared::replication::send::related_entities::SyncRelatedAppExt;
724-
725723
#[cfg(feature = "client")]
726724
pub use super::client::{
727725
ClientPlugin, ClientReplicationStats, ClientSystems, Remote, message::ClientMessagePlugin,

src/shared/message/server_message.rs

Lines changed: 162 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,17 @@
1-
#[cfg(feature = "server")]
21
pub(crate) mod message_buffer;
32
mod message_queue;
4-
#[cfg(feature = "server")]
5-
mod send;
63

74
use core::any::TypeId;
85

6+
use bevy::ptr::Ptr;
97
use bevy::{
108
ecs::{component::ComponentId, entity::MapEntities},
119
prelude::*,
1210
ptr::PtrMut,
1311
};
1412
use bytes::Bytes;
1513
use log::{debug, error, warn};
14+
use postcard::experimental::max_size::MaxSize;
1615
use serde::{Serialize, de::DeserializeOwned};
1716

1817
use super::{
@@ -21,6 +20,7 @@ use super::{
2120
registry::RemoteMessageRegistry,
2221
};
2322
use crate::{postcard_utils, prelude::*};
23+
use message_buffer::{MessageBuffer, SerializedMessage};
2424
use message_queue::MessageQueue;
2525

2626
/// An extension trait for [`App`] for creating server messages.
@@ -232,8 +232,7 @@ pub(crate) struct ServerMessage {
232232
/// ID of `M`.
233233
type_id: TypeId,
234234

235-
#[cfg(feature = "server")]
236-
send_or_buffer: send::SendOrBufferFn,
235+
send_or_buffer: SendOrBufferFn,
237236
receive: ReceiveFn,
238237
send_locally: SendLocallyFn,
239238
reset: ResetFn,
@@ -266,7 +265,6 @@ impl ServerMessage {
266265
queue_id,
267266
channel_id,
268267
type_id: TypeId::of::<M>(),
269-
#[cfg(feature = "server")]
270268
send_or_buffer: Self::send_or_buffer_typed::<M, I>,
271269
receive: Self::receive_typed::<M, I>,
272270
send_locally: Self::send_locally_typed::<M>,
@@ -499,6 +497,154 @@ impl ServerMessage {
499497
}
500498
}
501499

500+
impl ServerMessage {
501+
/// Sends a message to client(s).
502+
///
503+
/// # Safety
504+
///
505+
/// The caller must ensure that `to_messages` is [`Messages<ToClients<M>>`]
506+
/// and this instance was created for `M`.
507+
pub(crate) unsafe fn send_or_buffer(
508+
&self,
509+
ctx: &mut ServerSendCtx,
510+
to_messages: &Ptr,
511+
server_messages: &mut ServerMessages,
512+
clients: &Query<Entity, With<ConnectedClient>>,
513+
message_buffer: &mut MessageBuffer,
514+
) {
515+
unsafe {
516+
(self.send_or_buffer)(
517+
self,
518+
ctx,
519+
to_messages,
520+
server_messages,
521+
clients,
522+
message_buffer,
523+
)
524+
}
525+
}
526+
527+
/// Typed version of [`Self::send_or_buffer`].
528+
///
529+
/// # Safety
530+
///
531+
/// The caller must ensure that `to_messages` is [`Messages<ToClients<M>>`]
532+
/// and this instance was created for `M` and `I`.
533+
pub(super) unsafe fn send_or_buffer_typed<M: Message, I: 'static>(
534+
&self,
535+
ctx: &mut ServerSendCtx,
536+
to_messages: &Ptr,
537+
server_messages: &mut ServerMessages,
538+
clients: &Query<Entity, With<ConnectedClient>>,
539+
message_buffer: &mut MessageBuffer,
540+
) {
541+
let to_messages: &Messages<ToClients<M>> = unsafe { to_messages.deref() };
542+
// For server messages we don't track read message because
543+
// all of them will always be drained in the local sending system.
544+
for ToClients { message, mode } in to_messages.get_cursor().read(to_messages) {
545+
debug!("sending message `{}` with `{mode:?}`", ShortName::of::<M>());
546+
547+
if self.independent {
548+
unsafe {
549+
self.send_independent_message::<M, I>(
550+
ctx,
551+
message,
552+
mode,
553+
server_messages,
554+
clients,
555+
)
556+
.expect("independent server message should be serializable");
557+
}
558+
} else {
559+
unsafe {
560+
self.buffer_message::<M, I>(ctx, message, *mode, message_buffer)
561+
.expect("server message should be serializable");
562+
}
563+
}
564+
}
565+
}
566+
567+
/// Sends independent remote message `M` based on a mode.
568+
///
569+
/// # Safety
570+
///
571+
/// The caller must ensure that this instance was created for `M` and `I`.
572+
///
573+
/// For regular messages see [`Self::buffer_message`].
574+
unsafe fn send_independent_message<M: Message, I: 'static>(
575+
&self,
576+
ctx: &mut ServerSendCtx,
577+
message: &M,
578+
mode: &SendMode,
579+
server_messages: &mut ServerMessages,
580+
clients: &Query<Entity, With<ConnectedClient>>,
581+
) -> Result<()> {
582+
let mut message_bytes = Vec::new();
583+
unsafe { self.serialize::<M, I>(ctx, message, &mut message_bytes)? }
584+
let message_bytes: Bytes = message_bytes.into();
585+
586+
match *mode {
587+
SendMode::Broadcast => {
588+
for client in clients {
589+
server_messages.send(client, self.channel_id, message_bytes.clone());
590+
}
591+
}
592+
SendMode::BroadcastExcept(ignored_id) => {
593+
for client in clients {
594+
if ignored_id != client.into() {
595+
server_messages.send(client, self.channel_id, message_bytes.clone());
596+
}
597+
}
598+
}
599+
SendMode::Direct(client_id) => {
600+
if let ClientId::Client(client) = client_id {
601+
server_messages.send(client, self.channel_id, message_bytes.clone());
602+
}
603+
}
604+
}
605+
606+
Ok(())
607+
}
608+
609+
/// Buffers message `M` based on mode.
610+
///
611+
/// # Safety
612+
///
613+
/// The caller must ensure that this instance was created for `M` and `I`.
614+
///
615+
/// For independent messages see [`Self::send_independent_message`].
616+
unsafe fn buffer_message<M: Message, I: 'static>(
617+
&self,
618+
ctx: &mut ServerSendCtx,
619+
message: &M,
620+
mode: SendMode,
621+
message_buffer: &mut MessageBuffer,
622+
) -> Result<()> {
623+
let message_bytes = unsafe { self.serialize_with_padding::<M, I>(ctx, message)? };
624+
message_buffer.insert(mode, self.channel_id, message_bytes);
625+
Ok(())
626+
}
627+
628+
/// Helper for serializing a server message.
629+
///
630+
/// Will prepend padding bytes for where the update tick will be inserted to the injected message.
631+
///
632+
/// # Safety
633+
///
634+
/// The caller must ensure that this instance was created for `M` and `I`.
635+
unsafe fn serialize_with_padding<M: Message, I: 'static>(
636+
&self,
637+
ctx: &mut ServerSendCtx,
638+
message: &M,
639+
) -> Result<SerializedMessage> {
640+
let mut message_bytes = vec![0; RepliconTick::POSTCARD_MAX_SIZE]; // Padding for the tick.
641+
unsafe { self.serialize::<M, I>(ctx, message, &mut message_bytes)? }
642+
let message = SerializedMessage::Raw(message_bytes);
643+
644+
Ok(message)
645+
}
646+
}
647+
502648
/// Signature of server message receiving functions.
503649
type ReceiveFn = unsafe fn(
504650
&ServerMessage,
@@ -509,6 +655,16 @@ type ReceiveFn = unsafe fn(
509655
RepliconTick,
510656
);
511657

658+
/// Signature of server message sending functions.
659+
type SendOrBufferFn = unsafe fn(
660+
&ServerMessage,
661+
&mut ServerSendCtx,
662+
&Ptr,
663+
&mut ServerMessages,
664+
&Query<Entity, With<ConnectedClient>>,
665+
&mut MessageBuffer,
666+
);
667+
512668
/// Signature of server message sending functions.
513669
type SendLocallyFn = unsafe fn(PtrMut, PtrMut);
514670

0 commit comments

Comments
 (0)