Skip to content

Commit 6af0d6d

Browse files
committed
Buffer insertions and removals
Originally, we used commands to insert or remove components during replication. This wasn't optimal because Bevy doesn't batch commands. However, since Bevy was considering adding support for batching, we decided to wait. With the introduction of triggers, this became a real issue: triggers would fire after each insertion, and inside the observer, the user might not be able to access all components (depending on the insertion order). This change replaces commands with insertion and removal methods on `DeferredEntity` that buffer all changes and apply them later using `EntityWorldMut::insert_by_ids` and `EntityWorldMut::remove_by_ids`. They are flushed after processing each entity. This not only makes triggers behave as expected, but also significantly improves performance by avoiding extra archetype moves and lookups. I also slightly changed the behavior of the `Replicated` component to simplify the implementation and unlock additional performance. `Replicated` is no longer automatically inserted into non-replicated entities spawned from replicated components. This change is consistent with server behavior, where such entities also do not have the `Replicated` component.
1 parent c029476 commit 6af0d6d

12 files changed

Lines changed: 441 additions & 112 deletions

File tree

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,15 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
99

1010
### Changed
1111

12+
- Component removals and insertions for an entity are now buffered and applied as bundles to avoid triggering observers without all components being inserted or removed. This also significantly improves performance by avoiding extra archetype moves and lookups.
13+
- The `Replicated` component is no longer automatically inserted into non-replicated entities spawned from replicated components.
1214
- Replace `ServerEntityMap::get_by_*` and `ServerEntityMap::remove_by_*` with an entry-based API. Use `ServerEntityMap::server_entry` or `ServerEntityMap::client_entry` instead.
1315
- Print error instead of panic on mapping overwrite in `ServerEntityMap`.
1416

17+
### Removed
18+
19+
- `WriteCtx::commands`. You can now insert and remove components directly through `DeferredEntity`.
20+
1521
## [0.33.0] - 2025-04-27
1622

1723
### Changed

src/client.rs

Lines changed: 52 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ pub mod diagnostics;
44
pub mod event;
55
pub mod server_mutate_ticks;
66

7-
use bevy::{ecs::world::CommandQueue, prelude::*, reflect::TypeRegistry};
7+
use bevy::{prelude::*, reflect::TypeRegistry};
88
use bytes::{Buf, Bytes};
99
use log::{debug, trace};
1010
use postcard::experimental::max_size::MaxSize;
@@ -19,7 +19,7 @@ use crate::shared::{
1919
replication::{
2020
Replicated,
2121
command_markers::{CommandMarkers, EntityMarkers},
22-
deferred_entity::DeferredEntity,
22+
deferred_entity::{DeferredChanges, DeferredEntity},
2323
mutate_index::MutateIndex,
2424
replication_registry::{
2525
ReplicationRegistry,
@@ -29,7 +29,7 @@ use crate::shared::{
2929
update_message_flags::UpdateMessageFlags,
3030
},
3131
replicon_tick::RepliconTick,
32-
server_entity_map::ServerEntityMap,
32+
server_entity_map::{EntityEntry, ServerEntityMap},
3333
};
3434
use confirm_history::{ConfirmHistory, EntityReplicated};
3535
use server_mutate_ticks::{MutateTickReceived, ServerMutateTicks};
@@ -103,7 +103,7 @@ fn setup_channels(mut client: ResMut<RepliconClient>, channels: Res<RepliconChan
103103
/// See also [`ReplicationMessages`](crate::server::replication_messages::ReplicationMessages).
104104
pub(super) fn receive_replication(
105105
world: &mut World,
106-
mut queue: Local<CommandQueue>,
106+
mut changes: Local<DeferredChanges>,
107107
mut entity_markers: Local<EntityMarkers>,
108108
) -> Result<()> {
109109
world.resource_scope(|world, mut client: Mut<RepliconClient>| {
@@ -119,7 +119,7 @@ pub(super) fn receive_replication(
119119
let mut mutate_ticks =
120120
world.remove_resource::<ServerMutateTicks>();
121121
let mut params = ReceiveParams {
122-
queue: &mut queue,
122+
changes: &mut changes,
123123
entity_markers: &mut entity_markers,
124124
entity_map: &mut entity_map,
125125
replicated_events: &mut replicated_events,
@@ -405,29 +405,31 @@ fn apply_removals(
405405
) -> Result<()> {
406406
let server_entity = entity_serde::deserialize_entity(message)?;
407407

408-
let client_entity = params
409-
.entity_map
410-
.server_entry(server_entity)
411-
.or_insert_with(|| world.spawn(Replicated).id());
408+
let mut client_entity = match params.entity_map.server_entry(server_entity) {
409+
EntityEntry::Occupied(entry) => {
410+
DeferredEntity::new(world.entity_mut(entry.get()), params.changes)
411+
}
412+
EntityEntry::Vacant(entry) => {
413+
// It's possible to receive a removal when an entity is spawned and has a component removed in the same tick.
414+
// We could serialize the size of the removals instead of the total number of removals and just advance the cursor,
415+
// but it's a very rare case and not worth optimizing for.
416+
let mut client_entity = DeferredEntity::new(world.spawn_empty(), params.changes);
417+
client_entity.insert(Replicated);
418+
entry.insert(client_entity.id());
419+
client_entity
420+
}
421+
};
412422

413-
let mut client_entity = DeferredEntity::new(world, client_entity);
414-
let mut commands = client_entity.commands(params.queue);
415423
params
416424
.entity_markers
417425
.read(params.command_markers, &*client_entity);
418426

419-
confirm_tick(
420-
&mut commands,
421-
&mut client_entity,
422-
params.replicated_events,
423-
message_tick,
424-
);
427+
confirm_tick(&mut client_entity, params.replicated_events, message_tick);
425428

426429
let len = apply_array(ArrayKind::Sized, message, |message| {
427430
let fns_id = postcard_utils::from_buf(message)?;
428431
let (component_id, component_fns, _) = params.registry.get(fns_id);
429432
let mut ctx = RemoveCtx {
430-
commands: &mut commands,
431433
message_tick,
432434
component_id,
433435
};
@@ -440,7 +442,7 @@ fn apply_removals(
440442
stats.components_changed += len;
441443
}
442444

443-
params.queue.apply(world);
445+
client_entity.flush();
444446

445447
Ok(())
446448
}
@@ -454,33 +456,39 @@ fn apply_changes(
454456
) -> Result<()> {
455457
let server_entity = entity_serde::deserialize_entity(message)?;
456458

457-
let client_entity = params
458-
.entity_map
459-
.server_entry(server_entity)
460-
.or_insert_with(|| world.spawn(Replicated).id());
459+
let world_cell = world.as_unsafe_world_cell();
460+
let entities = world_cell.entities();
461+
// SAFETY: split into `Entities` and `DeferredEntity`.
462+
// The latter won't apply any structural changes until `flush`, and `Entities` won't be used afterward.
463+
let world = unsafe { world_cell.world_mut() };
464+
465+
let mut client_entity = match params.entity_map.server_entry(server_entity) {
466+
EntityEntry::Occupied(entry) => {
467+
DeferredEntity::new(world.entity_mut(entry.get()), params.changes)
468+
}
469+
EntityEntry::Vacant(entry) => {
470+
let mut client_entity = DeferredEntity::new(world.spawn_empty(), params.changes);
471+
client_entity.insert(Replicated);
472+
entry.insert(client_entity.id());
473+
client_entity
474+
}
475+
};
461476

462-
let mut client_entity = DeferredEntity::new(world, client_entity);
463-
let mut commands = client_entity.commands(params.queue);
464477
params
465478
.entity_markers
466479
.read(params.command_markers, &*client_entity);
467480

468-
confirm_tick(
469-
&mut commands,
470-
&mut client_entity,
471-
params.replicated_events,
472-
message_tick,
473-
);
481+
confirm_tick(&mut client_entity, params.replicated_events, message_tick);
474482

475483
let len = apply_array(ArrayKind::Sized, message, |message| {
476484
let fns_id = postcard_utils::from_buf(message)?;
477485
let (component_id, component_fns, rule_fns) = params.registry.get(fns_id);
478486
let mut ctx = WriteCtx {
479-
commands: &mut commands,
480487
entity_map: params.entity_map,
481488
type_registry: params.type_registry,
482489
component_id,
483490
message_tick,
491+
entities,
484492
ignore_mapping: false,
485493
};
486494

@@ -502,7 +510,7 @@ fn apply_changes(
502510
stats.components_changed += len;
503511
}
504512

505-
params.queue.apply(world);
513+
client_entity.flush();
506514

507515
Ok(())
508516
}
@@ -543,17 +551,14 @@ enum ArrayKind {
543551
}
544552

545553
fn confirm_tick(
546-
commands: &mut Commands,
547554
entity: &mut DeferredEntity,
548555
replicated_events: &mut Events<EntityReplicated>,
549556
tick: RepliconTick,
550557
) {
551558
if let Some(mut history) = entity.get_mut::<ConfirmHistory>() {
552559
history.set_last_tick(tick);
553560
} else {
554-
commands
555-
.entity(entity.id())
556-
.insert(ConfirmHistory::new(tick));
561+
entity.insert(ConfirmHistory::new(tick));
557562
}
558563
replicated_events.send(EntityReplicated {
559564
entity: entity.id(),
@@ -580,8 +585,13 @@ fn apply_mutations(
580585
return Ok(());
581586
};
582587

583-
let mut client_entity = DeferredEntity::new(world, client_entity);
584-
let mut commands = client_entity.commands(params.queue);
588+
let world_cell = world.as_unsafe_world_cell();
589+
let entities = world_cell.entities();
590+
// SAFETY: split into `Entities` and `DeferredEntity`.
591+
// The latter won't apply any structural changes until `flush`, and `Entities` won't be used afterward.
592+
let world = unsafe { world_cell.world_mut() };
593+
594+
let mut client_entity = DeferredEntity::new(world.entity_mut(client_entity), params.changes);
585595
params
586596
.entity_markers
587597
.read(params.command_markers, &*client_entity);
@@ -625,11 +635,11 @@ fn apply_mutations(
625635
let fns_id = postcard_utils::from_buf(&mut data)?;
626636
let (component_id, component_fns, rule_fns) = params.registry.get(fns_id);
627637
let mut ctx = WriteCtx {
628-
commands: &mut commands,
629638
entity_map: params.entity_map,
630639
type_registry: params.type_registry,
631640
component_id,
632641
message_tick,
642+
entities,
633643
ignore_mapping: false,
634644
};
635645

@@ -662,7 +672,7 @@ fn apply_mutations(
662672
stats.components_changed += components_count;
663673
}
664674

665-
params.queue.apply(world);
675+
client_entity.flush();
666676

667677
Ok(())
668678
}
@@ -671,7 +681,7 @@ fn apply_mutations(
671681
///
672682
/// To avoid passing a lot of arguments into all receive functions.
673683
struct ReceiveParams<'a> {
674-
queue: &'a mut CommandQueue,
684+
changes: &'a mut DeferredChanges,
675685
entity_markers: &'a mut EntityMarkers,
676686
entity_map: &'a mut ServerEntityMap,
677687
replicated_events: &'a mut Events<EntityReplicated>,

src/shared/replication/command_markers.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -84,17 +84,15 @@ pub trait AppMarkerExt {
8484
if let Some(mut history) = entity.get_mut::<History<C>>() {
8585
history.insert(ctx.message_tick, component);
8686
} else {
87-
ctx.commands
88-
.entity(entity.id())
89-
.insert(History([(ctx.message_tick, component)].into()));
87+
entity.insert(History([(ctx.message_tick, component)].into()));
9088
}
9189
9290
Ok(())
9391
}
9492
9593
/// Removes component `C` and its history.
96-
fn remove_history<C: Component>(ctx: &mut RemoveCtx, entity: &mut DeferredEntity) {
97-
ctx.commands.entity(entity.id()).remove::<History<C>>().remove::<C>();
94+
fn remove_history<C: Component>(_ctx: &mut RemoveCtx, entity: &mut DeferredEntity) {
95+
entity.remove::<History<C>>().remove::<C>();
9896
}
9997
10098
/// If this marker is present on an entity, registered components will be stored in [`History<T>`].

0 commit comments

Comments
 (0)