Skip to content

Commit 644546d

Browse files
committed
add broadcast
1 parent 1657ab4 commit 644546d

11 files changed

Lines changed: 1260 additions & 1 deletion

File tree

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1111

1212
- `iter_received` and `iter_sent` methods on `ClientMessages` and `ServerMessages` to inspect inbound and outbound messages on a channel without consuming them.
1313

14+
### Added
15+
16+
- Broadcast messages and events via `BroadcastMessageAppExt` and `BroadcastEventAppExt`. Emitted as `Broadcast<M>` on both the sender and the receiver to allow shared logic for client-side prediction.
17+
1418
### Changed
1519

1620
- `ServerMessages::retain_sent` is now public, allowing users to filter outbound messages before the backend drains them.

src/client/message.rs

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,37 @@ impl Plugin for ClientMessagePlugin {
5151
.build_state(app.world_mut())
5252
.build_system(send);
5353

54+
let broadcast_send_fn = (
55+
FilteredResourcesMutParamBuilder::new(|builder| {
56+
for message in registry.iter_all_broadcast() {
57+
builder.add_write_by_id(message.messages_id());
58+
}
59+
}),
60+
FilteredResourcesMutParamBuilder::new(|builder| {
61+
for message in registry.iter_all_broadcast() {
62+
builder.add_write_by_id(message.broadcast_id());
63+
}
64+
}),
65+
ParamBuilder,
66+
ParamBuilder,
67+
ParamBuilder,
68+
ParamBuilder,
69+
)
70+
.build_state(app.world_mut())
71+
.build_system(broadcast_send);
72+
73+
let broadcast_trigger_fn = (
74+
FilteredResourcesMutParamBuilder::new(|builder| {
75+
for event in registry.iter_broadcast_events() {
76+
builder.add_write_by_id(event.message().broadcast_id());
77+
}
78+
}),
79+
ParamBuilder,
80+
ParamBuilder,
81+
)
82+
.build_state(app.world_mut())
83+
.build_system(broadcast_trigger);
84+
5485
let receive_builder = (
5586
FilteredResourcesMutParamBuilder::new(|builder| {
5687
for message in registry.iter_all_server() {
@@ -113,6 +144,22 @@ impl Plugin for ClientMessagePlugin {
113144
.build_state(app.world_mut())
114145
.build_system(send_locally);
115146

147+
let broadcast_send_locally_fn = (
148+
FilteredResourcesMutParamBuilder::new(|builder| {
149+
for message in registry.iter_all_broadcast() {
150+
builder.add_write_by_id(message.broadcast_id());
151+
}
152+
}),
153+
FilteredResourcesMutParamBuilder::new(|builder| {
154+
for message in registry.iter_all_broadcast() {
155+
builder.add_write_by_id(message.messages_id());
156+
}
157+
}),
158+
ParamBuilder,
159+
)
160+
.build_state(app.world_mut())
161+
.build_system(broadcast_send_locally);
162+
116163
let reset_fn = (
117164
FilteredResourcesMutParamBuilder::new(|builder| {
118165
for message in registry.iter_all_client() {
@@ -124,6 +171,11 @@ impl Plugin for ClientMessagePlugin {
124171
builder.add_write_by_id(message.queue_id());
125172
}
126173
}),
174+
FilteredResourcesMutParamBuilder::new(|builder| {
175+
for message in registry.iter_all_broadcast() {
176+
builder.add_write_by_id(message.messages_id());
177+
}
178+
}),
127179
ParamBuilder,
128180
)
129181
.build_state(app.world_mut())
@@ -155,7 +207,10 @@ impl Plugin for ClientMessagePlugin {
155207
PostUpdate,
156208
(
157209
send_fn.run_if(in_state(ClientState::Connected)),
210+
broadcast_send_fn.run_if(in_state(ClientState::Connected)),
158211
send_locally_fn.run_if(in_state(ClientState::Disconnected)),
212+
broadcast_send_locally_fn.run_if(in_state(ClientState::Disconnected)),
213+
broadcast_trigger_fn,
159214
)
160215
.chain()
161216
.in_set(ClientSystems::Send),
@@ -197,6 +252,40 @@ fn send(
197252
}
198253
}
199254

255+
fn broadcast_send(
256+
mut messages: FilteredResourcesMut,
257+
mut broadcasts: FilteredResourcesMut,
258+
mut client_messages: ResMut<ClientMessages>,
259+
type_registry: Res<AppTypeRegistry>,
260+
entity_map: Res<ServerEntityMap>,
261+
registry: Res<RemoteMessageRegistry>,
262+
) {
263+
let mut ctx = ClientSendCtx {
264+
entity_map: &entity_map,
265+
type_registry: &type_registry,
266+
invalid_entities: Vec::new(),
267+
};
268+
269+
for message in registry.iter_all_broadcast() {
270+
let messages = messages
271+
.get_mut_by_id(message.messages_id())
272+
.expect("messages resource should be accessible");
273+
let broadcasts = broadcasts
274+
.get_mut_by_id(message.broadcast_id())
275+
.expect("broadcast messages resource should be accessible");
276+
277+
// SAFETY: passed pointers were obtained using this message data.
278+
unsafe {
279+
message.send(
280+
&mut ctx,
281+
messages.into_inner(),
282+
broadcasts.into_inner(),
283+
&mut client_messages,
284+
);
285+
}
286+
}
287+
}
288+
200289
fn receive(
201290
mut messages: FilteredResourcesMut,
202291
mut queues: FilteredResourcesMut,
@@ -246,6 +335,20 @@ fn trigger(
246335
}
247336
}
248337

338+
fn broadcast_trigger(
339+
mut broadcasts: FilteredResourcesMut,
340+
mut commands: Commands,
341+
registry: Res<RemoteMessageRegistry>,
342+
) {
343+
for event in registry.iter_broadcast_events() {
344+
let broadcasts = broadcasts
345+
.get_mut_by_id(event.message().broadcast_id())
346+
.expect("broadcast messages resource should be accessible");
347+
// SAFETY: passed pointer was obtained using this event data.
348+
unsafe { event.trigger(&mut commands, broadcasts.into_inner()) };
349+
}
350+
}
351+
249352
fn send_locally(
250353
mut from_messages: FilteredResourcesMut,
251354
mut messages: FilteredResourcesMut,
@@ -264,9 +367,28 @@ fn send_locally(
264367
}
265368
}
266369

370+
fn broadcast_send_locally(
371+
mut broadcasts: FilteredResourcesMut,
372+
mut messages: FilteredResourcesMut,
373+
registry: Res<RemoteMessageRegistry>,
374+
) {
375+
for message in registry.iter_all_broadcast() {
376+
let broadcasts = broadcasts
377+
.get_mut_by_id(message.broadcast_id())
378+
.expect("broadcast messages resource should be accessible");
379+
let messages = messages
380+
.get_mut_by_id(message.messages_id())
381+
.expect("messages resource should be accessible");
382+
383+
// SAFETY: passed pointers were obtained using this message data.
384+
unsafe { message.send_locally(broadcasts.into_inner(), messages.into_inner()) };
385+
}
386+
}
387+
267388
fn reset(
268389
mut messages: FilteredResourcesMut,
269390
mut queues: FilteredResourcesMut,
391+
mut broadcast_messages: FilteredResourcesMut,
270392
registry: Res<RemoteMessageRegistry>,
271393
) {
272394
for message in registry.iter_all_client() {
@@ -286,4 +408,13 @@ fn reset(
286408
// SAFETY: passed pointer was obtained using this message data.
287409
unsafe { messages.reset(queue.into_inner()) };
288410
}
411+
412+
for message in registry.iter_all_broadcast() {
413+
let messages = broadcast_messages
414+
.get_mut_by_id(message.messages_id())
415+
.expect("broadcast messages resource should be accessible");
416+
417+
// SAFETY: passed pointer was obtained using this message data.
418+
unsafe { message.reset(messages.into_inner()) };
419+
}
289420
}

src/lib.rs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -352,6 +352,32 @@ fn receive(ping: On<FromClient<Ping>>) {
352352
For events with entities inside use [`ClientEventAppExt::add_mapped_client_event`].
353353
Similar to messages, serialization can also be customized with [`ClientEventAppExt::add_client_event_with`].
354354
355+
If you need to react to the same message on both the client and the server (for example,
356+
to share logic between client-side prediction and authoritative server processing), use
357+
[`BroadcastMessageAppExt::add_broadcast_message`] or
358+
[`BroadcastEventAppExt::add_broadcast_event`]. The message is emitted as [`Broadcast<M>`]
359+
on both sides, with the [`Broadcaster`] field indicating its origin.
360+
361+
```
362+
# use bevy::{prelude::*, state::app::StatesPlugin};
363+
# use bevy_replicon::prelude::*;
364+
# use serde::{Deserialize, Serialize};
365+
# let mut app = App::new();
366+
# app.add_plugins((StatesPlugin, RepliconPlugins));
367+
app.add_broadcast_event::<Attack>(Channel::Ordered)
368+
.add_observer(on_attack);
369+
370+
fn on_attack(on: On<Broadcast<Attack>>) {
371+
info!("attack from `{:?}`", on.broadcaster);
372+
}
373+
374+
#[derive(Event, Deserialize, Serialize)]
375+
struct Attack;
376+
```
377+
378+
For events with entities inside use [`BroadcastEventAppExt::add_mapped_broadcast_event`].
379+
Serialization can also be customized with [`BroadcastEventAppExt::add_broadcast_event_with`].
380+
355381
### From server to client
356382
357383
A similar technique is used to send messages from server to clients. To do this,
@@ -705,6 +731,8 @@ pub mod prelude {
705731
},
706732
client_id::ClientId,
707733
message::{
734+
broadcast_event::{BroadcastEventAppExt, BroadcastTriggerExt},
735+
broadcast_message::{Broadcast, BroadcastMessageAppExt, Broadcaster},
708736
client_event::{ClientEventAppExt, ClientTriggerExt},
709737
client_message::{ClientMessageAppExt, FromClient},
710738
server_event::{ServerEventAppExt, ServerTriggerExt},

src/server/message.rs

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,19 @@ impl Plugin for ServerMessagePlugin {
6161
.build_state(app.world_mut())
6262
.build_system(receive);
6363

64+
let broadcast_receive_fn = (
65+
FilteredResourcesMutParamBuilder::new(|builder| {
66+
for message in registry.iter_all_broadcast() {
67+
builder.add_write_by_id(message.broadcast_id());
68+
}
69+
}),
70+
ParamBuilder,
71+
ParamBuilder,
72+
ParamBuilder,
73+
)
74+
.build_state(app.world_mut())
75+
.build_system(broadcast_receive);
76+
6477
let trigger_fn = (
6578
FilteredResourcesMutParamBuilder::new(|builder| {
6679
for event in registry.iter_client_events() {
@@ -73,6 +86,18 @@ impl Plugin for ServerMessagePlugin {
7386
.build_state(app.world_mut())
7487
.build_system(trigger);
7588

89+
let broadcast_trigger_fn = (
90+
FilteredResourcesMutParamBuilder::new(|builder| {
91+
for event in registry.iter_broadcast_events() {
92+
builder.add_write_by_id(event.message().broadcast_id());
93+
}
94+
}),
95+
ParamBuilder,
96+
ParamBuilder,
97+
)
98+
.build_state(app.world_mut())
99+
.build_system(broadcast_trigger);
100+
76101
let send_locally_fn = (
77102
FilteredResourcesMutParamBuilder::new(|builder| {
78103
for message in registry.iter_all_server() {
@@ -93,8 +118,9 @@ impl Plugin for ServerMessagePlugin {
93118
.add_systems(
94119
PreUpdate,
95120
(
96-
receive_fn.run_if(in_state(ServerState::Running)),
121+
(receive_fn, broadcast_receive_fn).run_if(in_state(ServerState::Running)),
97122
trigger_fn.run_if(in_state(ClientState::Disconnected)),
123+
broadcast_trigger_fn,
98124
)
99125
.chain()
100126
.in_set(ServerSystems::Receive),
@@ -176,6 +202,26 @@ fn receive(
176202
}
177203
}
178204

205+
fn broadcast_receive(
206+
mut broadcasts: FilteredResourcesMut,
207+
mut server_messages: ResMut<ServerMessages>,
208+
type_registry: Res<AppTypeRegistry>,
209+
message_registry: Res<RemoteMessageRegistry>,
210+
) {
211+
let mut ctx = ServerReceiveCtx {
212+
type_registry: &type_registry,
213+
};
214+
215+
for message in message_registry.iter_all_broadcast() {
216+
let broadcasts = broadcasts
217+
.get_mut_by_id(message.broadcast_id())
218+
.expect("broadcast messages resource should be accessible");
219+
220+
// SAFETY: passed pointer was obtained using this message data.
221+
unsafe { message.receive(&mut ctx, broadcasts.into_inner(), &mut server_messages) };
222+
}
223+
}
224+
179225
fn trigger(
180226
mut from_messages: FilteredResourcesMut,
181227
mut commands: Commands,
@@ -190,6 +236,20 @@ fn trigger(
190236
}
191237
}
192238

239+
fn broadcast_trigger(
240+
mut broadcasts: FilteredResourcesMut,
241+
mut commands: Commands,
242+
registry: Res<RemoteMessageRegistry>,
243+
) {
244+
for event in registry.iter_broadcast_events() {
245+
let broadcasts = broadcasts
246+
.get_mut_by_id(event.message().broadcast_id())
247+
.expect("broadcast messages resource should be accessible");
248+
// SAFETY: passed pointer was obtained using this event data.
249+
unsafe { event.trigger(&mut commands, broadcasts.into_inner()) };
250+
}
251+
}
252+
193253
fn send_locally(
194254
mut to_messages: FilteredResourcesMut,
195255
mut messages: FilteredResourcesMut,

src/shared/message.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
pub mod broadcast_event;
2+
pub mod broadcast_message;
13
pub mod client_event;
24
pub mod client_message;
35
pub mod ctx;

0 commit comments

Comments
 (0)