Skip to content

Commit cc25268

Browse files
Morb0Shatur
andauthored
Add broadcast messages and events (#684)
Co-authored-by: Hennadii Chernyshchyk <genaloner@gmail.com>
1 parent 8d23769 commit cc25268

11 files changed

Lines changed: 1275 additions & 4 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
99

1010
### Added
1111

12+
- Broadcast messages and events via `BroadcastMessageAppExt` and `BroadcastEventAppExt`. Emitted as `Broadcast<M>` on both the sender and the receiver to allow shared logic for client-side prediction.
1213
- `iter_received` and `iter_sent` methods on `ClientMessages` and `ServerMessages` to inspect inbound and outbound messages on a channel without consuming them.
1314

1415
### Changed

src/client/message.rs

Lines changed: 132 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,37 @@ impl Plugin for ClientMessagePlugin {
5151
.build_state(app.world_mut())
5252
.build_system(send);
5353

54+
let broadcast_send_fn = (
55+
FilteredResourcesMutParamBuilder::new(|builder| {
56+
for message in registry.iter_all_broadcast() {
57+
builder.add_write_by_id(message.messages_id());
58+
}
59+
}),
60+
FilteredResourcesMutParamBuilder::new(|builder| {
61+
for message in registry.iter_all_broadcast() {
62+
builder.add_write_by_id(message.broadcast_id());
63+
}
64+
}),
65+
ParamBuilder,
66+
ParamBuilder,
67+
ParamBuilder,
68+
ParamBuilder,
69+
)
70+
.build_state(app.world_mut())
71+
.build_system(broadcast_send);
72+
73+
let broadcast_trigger_fn = (
74+
FilteredResourcesMutParamBuilder::new(|builder| {
75+
for event in registry.iter_broadcast_events() {
76+
builder.add_write_by_id(event.message().broadcast_id());
77+
}
78+
}),
79+
ParamBuilder,
80+
ParamBuilder,
81+
)
82+
.build_state(app.world_mut())
83+
.build_system(broadcast_trigger);
84+
5485
let receive_builder = (
5586
FilteredResourcesMutParamBuilder::new(|builder| {
5687
for message in registry.iter_all_server() {
@@ -113,6 +144,22 @@ impl Plugin for ClientMessagePlugin {
113144
.build_state(app.world_mut())
114145
.build_system(send_locally);
115146

147+
let broadcast_send_locally_fn = (
148+
FilteredResourcesMutParamBuilder::new(|builder| {
149+
for message in registry.iter_all_broadcast() {
150+
builder.add_write_by_id(message.broadcast_id());
151+
}
152+
}),
153+
FilteredResourcesMutParamBuilder::new(|builder| {
154+
for message in registry.iter_all_broadcast() {
155+
builder.add_write_by_id(message.messages_id());
156+
}
157+
}),
158+
ParamBuilder,
159+
)
160+
.build_state(app.world_mut())
161+
.build_system(broadcast_send_locally);
162+
116163
let reset_fn = (
117164
FilteredResourcesMutParamBuilder::new(|builder| {
118165
for message in registry.iter_all_client() {
@@ -124,6 +171,11 @@ impl Plugin for ClientMessagePlugin {
124171
builder.add_write_by_id(message.queue_id());
125172
}
126173
}),
174+
FilteredResourcesMutParamBuilder::new(|builder| {
175+
for message in registry.iter_all_broadcast() {
176+
builder.add_write_by_id(message.messages_id());
177+
}
178+
}),
127179
ParamBuilder,
128180
)
129181
.build_state(app.world_mut())
@@ -154,8 +206,10 @@ impl Plugin for ClientMessagePlugin {
154206
.add_systems(
155207
PostUpdate,
156208
(
157-
send_fn.run_if(in_state(ClientState::Connected)),
158-
send_locally_fn.run_if(in_state(ClientState::Disconnected)),
209+
(send_fn, broadcast_send_fn).run_if(in_state(ClientState::Connected)),
210+
(send_locally_fn, broadcast_send_locally_fn)
211+
.run_if(in_state(ClientState::Disconnected)),
212+
broadcast_trigger_fn,
159213
)
160214
.chain()
161215
.in_set(ClientSystems::Send),
@@ -197,6 +251,40 @@ fn send(
197251
}
198252
}
199253

254+
fn broadcast_send(
255+
mut messages: FilteredResourcesMut,
256+
mut broadcasts: FilteredResourcesMut,
257+
mut client_messages: ResMut<ClientMessages>,
258+
type_registry: Res<AppTypeRegistry>,
259+
entity_map: Res<ServerEntityMap>,
260+
registry: Res<RemoteMessageRegistry>,
261+
) {
262+
let mut ctx = ClientSendCtx {
263+
entity_map: &entity_map,
264+
type_registry: &type_registry,
265+
invalid_entities: Vec::new(),
266+
};
267+
268+
for message in registry.iter_all_broadcast() {
269+
let messages = messages
270+
.get_mut_by_id(message.messages_id())
271+
.expect("messages resource should be accessible");
272+
let broadcasts = broadcasts
273+
.get_mut_by_id(message.broadcast_id())
274+
.expect("broadcast messages resource should be accessible");
275+
276+
// SAFETY: passed pointers were obtained using this message data.
277+
unsafe {
278+
message.send(
279+
&mut ctx,
280+
messages.into_inner(),
281+
broadcasts.into_inner(),
282+
&mut client_messages,
283+
);
284+
}
285+
}
286+
}
287+
200288
fn receive(
201289
mut messages: FilteredResourcesMut,
202290
mut queues: FilteredResourcesMut,
@@ -246,6 +334,20 @@ fn trigger(
246334
}
247335
}
248336

337+
fn broadcast_trigger(
338+
mut broadcasts: FilteredResourcesMut,
339+
mut commands: Commands,
340+
registry: Res<RemoteMessageRegistry>,
341+
) {
342+
for event in registry.iter_broadcast_events() {
343+
let broadcasts = broadcasts
344+
.get_mut_by_id(event.message().broadcast_id())
345+
.expect("broadcast messages resource should be accessible");
346+
// SAFETY: passed pointer was obtained using this event data.
347+
unsafe { event.trigger(&mut commands, broadcasts.into_inner()) };
348+
}
349+
}
350+
249351
fn send_locally(
250352
mut from_messages: FilteredResourcesMut,
251353
mut messages: FilteredResourcesMut,
@@ -264,9 +366,28 @@ fn send_locally(
264366
}
265367
}
266368

369+
fn broadcast_send_locally(
370+
mut broadcasts: FilteredResourcesMut,
371+
mut messages: FilteredResourcesMut,
372+
registry: Res<RemoteMessageRegistry>,
373+
) {
374+
for message in registry.iter_all_broadcast() {
375+
let broadcasts = broadcasts
376+
.get_mut_by_id(message.broadcast_id())
377+
.expect("broadcast messages resource should be accessible");
378+
let messages = messages
379+
.get_mut_by_id(message.messages_id())
380+
.expect("messages resource should be accessible");
381+
382+
// SAFETY: passed pointers were obtained using this message data.
383+
unsafe { message.send_locally(broadcasts.into_inner(), messages.into_inner()) };
384+
}
385+
}
386+
267387
fn reset(
268388
mut messages: FilteredResourcesMut,
269389
mut queues: FilteredResourcesMut,
390+
mut broadcast_messages: FilteredResourcesMut,
270391
registry: Res<RemoteMessageRegistry>,
271392
) {
272393
for message in registry.iter_all_client() {
@@ -286,4 +407,13 @@ fn reset(
286407
// SAFETY: passed pointer was obtained using this message data.
287408
unsafe { messages.reset(queue.into_inner()) };
288409
}
410+
411+
for message in registry.iter_all_broadcast() {
412+
let messages = broadcast_messages
413+
.get_mut_by_id(message.messages_id())
414+
.expect("broadcast messages resource should be accessible");
415+
416+
// SAFETY: passed pointer was obtained using this message data.
417+
unsafe { message.reset(messages.into_inner()) };
418+
}
289419
}

src/lib.rs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -352,6 +352,32 @@ fn receive(ping: On<FromClient<Ping>>) {
352352
For events with entities inside use [`ClientEventAppExt::add_mapped_client_event`].
353353
Similar to messages, serialization can also be customized with [`ClientEventAppExt::add_client_event_with`].
354354
355+
If you need to react to the same message on both the client and the server (for example,
356+
to share logic between client-side prediction and authoritative server processing), use
357+
[`BroadcastMessageAppExt::add_broadcast_message`] or
358+
[`BroadcastEventAppExt::add_broadcast_event`]. The message is emitted as [`Broadcast<M>`]
359+
on both sides, with the [`Broadcaster`] field indicating its origin.
360+
361+
```
362+
# use bevy::{prelude::*, state::app::StatesPlugin};
363+
# use bevy_replicon::prelude::*;
364+
# use serde::{Deserialize, Serialize};
365+
# let mut app = App::new();
366+
# app.add_plugins((StatesPlugin, RepliconPlugins));
367+
app.add_broadcast_event::<Attack>(Channel::Ordered)
368+
.add_observer(on_attack);
369+
370+
fn on_attack(on: On<Broadcast<Attack>>) {
371+
info!("attack from `{:?}`", on.broadcaster);
372+
}
373+
374+
#[derive(Event, Deserialize, Serialize)]
375+
struct Attack;
376+
```
377+
378+
Similar to regular client messages, we also provide [`BroadcastEventAppExt::add_mapped_broadcast_event`].
379+
and [`BroadcastEventAppExt::add_broadcast_event_with`].
380+
355381
### From server to client
356382
357383
A similar technique is used to send messages from server to clients. To do this,
@@ -705,6 +731,8 @@ pub mod prelude {
705731
},
706732
client_id::ClientId,
707733
message::{
734+
broadcast_event::{BroadcastEventAppExt, BroadcastTriggerExt},
735+
broadcast_message::{Broadcast, BroadcastMessageAppExt, Broadcaster},
708736
client_event::{ClientEventAppExt, ClientTriggerExt},
709737
client_message::{ClientMessageAppExt, FromClient},
710738
server_event::{ServerEventAppExt, ServerTriggerExt},

src/server/message.rs

Lines changed: 64 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,19 @@ impl Plugin for ServerMessagePlugin {
6161
.build_state(app.world_mut())
6262
.build_system(receive);
6363

64+
let broadcast_receive_fn = (
65+
FilteredResourcesMutParamBuilder::new(|builder| {
66+
for message in registry.iter_all_broadcast() {
67+
builder.add_write_by_id(message.broadcast_id());
68+
}
69+
}),
70+
ParamBuilder,
71+
ParamBuilder,
72+
ParamBuilder,
73+
)
74+
.build_state(app.world_mut())
75+
.build_system(broadcast_receive);
76+
6477
let trigger_fn = (
6578
FilteredResourcesMutParamBuilder::new(|builder| {
6679
for event in registry.iter_client_events() {
@@ -73,6 +86,18 @@ impl Plugin for ServerMessagePlugin {
7386
.build_state(app.world_mut())
7487
.build_system(trigger);
7588

89+
let broadcast_trigger_fn = (
90+
FilteredResourcesMutParamBuilder::new(|builder| {
91+
for event in registry.iter_broadcast_events() {
92+
builder.add_write_by_id(event.message().broadcast_id());
93+
}
94+
}),
95+
ParamBuilder,
96+
ParamBuilder,
97+
)
98+
.build_state(app.world_mut())
99+
.build_system(broadcast_trigger);
100+
76101
let send_locally_fn = (
77102
FilteredResourcesMutParamBuilder::new(|builder| {
78103
for message in registry.iter_all_server() {
@@ -93,8 +118,11 @@ impl Plugin for ServerMessagePlugin {
93118
.add_systems(
94119
PreUpdate,
95120
(
96-
receive_fn.run_if(in_state(ServerState::Running)),
97-
trigger_fn.run_if(in_state(ClientState::Disconnected)),
121+
(receive_fn, broadcast_receive_fn).run_if(in_state(ServerState::Running)),
122+
(
123+
trigger_fn.run_if(in_state(ClientState::Disconnected)),
124+
broadcast_trigger_fn,
125+
),
98126
)
99127
.chain()
100128
.in_set(ServerSystems::Receive),
@@ -176,6 +204,26 @@ fn receive(
176204
}
177205
}
178206

207+
fn broadcast_receive(
208+
mut broadcasts: FilteredResourcesMut,
209+
mut server_messages: ResMut<ServerMessages>,
210+
type_registry: Res<AppTypeRegistry>,
211+
message_registry: Res<RemoteMessageRegistry>,
212+
) {
213+
let mut ctx = ServerReceiveCtx {
214+
type_registry: &type_registry,
215+
};
216+
217+
for message in message_registry.iter_all_broadcast() {
218+
let broadcasts = broadcasts
219+
.get_mut_by_id(message.broadcast_id())
220+
.expect("broadcast messages resource should be accessible");
221+
222+
// SAFETY: passed pointer was obtained using this message data.
223+
unsafe { message.receive(&mut ctx, broadcasts.into_inner(), &mut server_messages) };
224+
}
225+
}
226+
179227
fn trigger(
180228
mut from_messages: FilteredResourcesMut,
181229
mut commands: Commands,
@@ -190,6 +238,20 @@ fn trigger(
190238
}
191239
}
192240

241+
fn broadcast_trigger(
242+
mut broadcasts: FilteredResourcesMut,
243+
mut commands: Commands,
244+
registry: Res<RemoteMessageRegistry>,
245+
) {
246+
for event in registry.iter_broadcast_events() {
247+
let broadcasts = broadcasts
248+
.get_mut_by_id(event.message().broadcast_id())
249+
.expect("broadcast messages resource should be accessible");
250+
// SAFETY: passed pointer was obtained using this event data.
251+
unsafe { event.trigger(&mut commands, broadcasts.into_inner()) };
252+
}
253+
}
254+
193255
fn send_locally(
194256
mut to_messages: FilteredResourcesMut,
195257
mut messages: FilteredResourcesMut,

src/shared/message.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
pub mod broadcast_event;
2+
pub mod broadcast_message;
13
pub mod client_event;
24
pub mod client_message;
35
pub mod ctx;

0 commit comments

Comments
 (0)