Skip to content

Commit 3500a93

Browse files
authored
Buffered insertions (#484)
Originally, we used commands to insert or remove components during replication. This wasn't optimal because Bevy doesn't batch commands. However, since Bevy was considering adding support for batching, we decided to wait. With the introduction of triggers, this became a real issue: triggers would fire after each insertion, and inside the observer, the user might not be able to access all components (depending on the insertion order). This change replaces commands with insertion and removal methods on `DeferredEntity` that buffer all changes and apply them later using `EntityWorldMut::insert_by_ids` and `EntityWorldMut::remove_by_ids`. They are flushed after processing each entity. This not only makes triggers behave as expected, but also significantly improves performance by avoiding extra archetype moves and lookups. I also slightly changed the behavior of the `Replicated` component to simplify the implementation and unlock additional performance. `Replicated` is no longer automatically inserted into non-replicated entities spawned from replicated components. This change is consistent with server behavior, where such entities also do not have the `Replicated` component. The commit entry-based API for `ServerEntityMap` that is required for this change.
1 parent bee3fd6 commit 3500a93

13 files changed

Lines changed: 618 additions & 172 deletions

File tree

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
2121
- `ReplicationRule` now stores `Vec<ComponentRule>` instead of `Vec<(ComponentId, FnsId)>`
2222
- `RuleFns` now available from prelude.
2323
- Rules created with the same priority now evaluated in their creation order.
24+
- Component removals and insertions for an entity are now buffered and applied as bundles to avoid triggering observers without all components being inserted or removed. This also significantly improves performance by avoiding extra archetype moves and lookups.
25+
- The `Replicated` component is no longer automatically inserted into non-replicated entities spawned from replicated components.
26+
- Replace `ServerEntityMap::get_by_*` and `ServerEntityMap::remove_by_*` with an entry-based API. Use `ServerEntityMap::server_entry` or `ServerEntityMap::client_entry` instead.
27+
- Print error instead of panic on mapping overwrite in `ServerEntityMap`.
2428

2529
### Removed
2630

31+
- `WriteCtx::commands`. You can now insert and remove components directly through `DeferredEntity`.
2732
- Deprecated methods.
2833

2934
## [0.33.0] - 2025-04-27

src/client.rs

Lines changed: 55 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ pub mod diagnostics;
44
pub mod event;
55
pub mod server_mutate_ticks;
66

7-
use bevy::{ecs::world::CommandQueue, prelude::*, reflect::TypeRegistry};
7+
use bevy::{prelude::*, reflect::TypeRegistry};
88
use bytes::{Buf, Bytes};
99
use log::{debug, trace};
1010
use postcard::experimental::max_size::MaxSize;
@@ -19,7 +19,7 @@ use crate::shared::{
1919
replication::{
2020
Replicated,
2121
command_markers::{CommandMarkers, EntityMarkers},
22-
deferred_entity::DeferredEntity,
22+
deferred_entity::{DeferredChanges, DeferredEntity},
2323
mutate_index::MutateIndex,
2424
replication_registry::{
2525
ReplicationRegistry,
@@ -29,7 +29,7 @@ use crate::shared::{
2929
update_message_flags::UpdateMessageFlags,
3030
},
3131
replicon_tick::RepliconTick,
32-
server_entity_map::ServerEntityMap,
32+
server_entity_map::{EntityEntry, ServerEntityMap},
3333
};
3434
use confirm_history::{ConfirmHistory, EntityReplicated};
3535
use server_mutate_ticks::{MutateTickReceived, ServerMutateTicks};
@@ -103,7 +103,7 @@ fn setup_channels(mut client: ResMut<RepliconClient>, channels: Res<RepliconChan
103103
/// See also [`ReplicationMessages`](crate::server::replication_messages::ReplicationMessages).
104104
pub(super) fn receive_replication(
105105
world: &mut World,
106-
mut queue: Local<CommandQueue>,
106+
mut changes: Local<DeferredChanges>,
107107
mut entity_markers: Local<EntityMarkers>,
108108
) -> Result<()> {
109109
world.resource_scope(|world, mut client: Mut<RepliconClient>| {
@@ -119,7 +119,7 @@ pub(super) fn receive_replication(
119119
let mut mutate_ticks =
120120
world.remove_resource::<ServerMutateTicks>();
121121
let mut params = ReceiveParams {
122-
queue: &mut queue,
122+
changes: &mut changes,
123123
entity_markers: &mut entity_markers,
124124
entity_map: &mut entity_map,
125125
replicated_events: &mut replicated_events,
@@ -385,7 +385,8 @@ fn apply_despawn(
385385
let server_entity = entity_serde::deserialize_entity(message)?;
386386
if let Some(client_entity) = params
387387
.entity_map
388-
.remove_by_server(server_entity)
388+
.server_entry(server_entity)
389+
.remove()
389390
.and_then(|entity| world.get_entity_mut(entity).ok())
390391
{
391392
let ctx = DespawnCtx { message_tick };
@@ -404,28 +405,31 @@ fn apply_removals(
404405
) -> Result<()> {
405406
let server_entity = entity_serde::deserialize_entity(message)?;
406407

407-
let client_entity = params
408-
.entity_map
409-
.get_by_server_or_insert(server_entity, || world.spawn(Replicated).id());
408+
let mut client_entity = match params.entity_map.server_entry(server_entity) {
409+
EntityEntry::Occupied(entry) => {
410+
DeferredEntity::new(world.entity_mut(entry.get()), params.changes)
411+
}
412+
EntityEntry::Vacant(entry) => {
413+
// It's possible to receive a removal when an entity is spawned and has a component removed in the same tick.
414+
// We could serialize the size of the removals instead of the total number of removals and just advance the cursor,
415+
// but it's a very rare case and not worth optimizing for.
416+
let mut client_entity = DeferredEntity::new(world.spawn_empty(), params.changes);
417+
client_entity.insert(Replicated);
418+
entry.insert(client_entity.id());
419+
client_entity
420+
}
421+
};
410422

411-
let mut client_entity = DeferredEntity::new(world, client_entity);
412-
let mut commands = client_entity.commands(params.queue);
413423
params
414424
.entity_markers
415425
.read(params.command_markers, &*client_entity);
416426

417-
confirm_tick(
418-
&mut commands,
419-
&mut client_entity,
420-
params.replicated_events,
421-
message_tick,
422-
);
427+
confirm_tick(&mut client_entity, params.replicated_events, message_tick);
423428

424429
let len = apply_array(ArrayKind::Sized, message, |message| {
425430
let fns_id = postcard_utils::from_buf(message)?;
426431
let (component_id, component_fns, _) = params.registry.get(fns_id);
427432
let mut ctx = RemoveCtx {
428-
commands: &mut commands,
429433
message_tick,
430434
component_id,
431435
};
@@ -438,7 +442,7 @@ fn apply_removals(
438442
stats.components_changed += len;
439443
}
440444

441-
params.queue.apply(world);
445+
client_entity.flush();
442446

443447
Ok(())
444448
}
@@ -452,32 +456,39 @@ fn apply_changes(
452456
) -> Result<()> {
453457
let server_entity = entity_serde::deserialize_entity(message)?;
454458

455-
let client_entity = params
456-
.entity_map
457-
.get_by_server_or_insert(server_entity, || world.spawn(Replicated).id());
459+
let world_cell = world.as_unsafe_world_cell();
460+
let entities = world_cell.entities();
461+
// SAFETY: split into `Entities` and `DeferredEntity`.
462+
// The latter won't apply any structural changes until `flush`, and `Entities` won't be used afterward.
463+
let world = unsafe { world_cell.world_mut() };
464+
465+
let mut client_entity = match params.entity_map.server_entry(server_entity) {
466+
EntityEntry::Occupied(entry) => {
467+
DeferredEntity::new(world.entity_mut(entry.get()), params.changes)
468+
}
469+
EntityEntry::Vacant(entry) => {
470+
let mut client_entity = DeferredEntity::new(world.spawn_empty(), params.changes);
471+
client_entity.insert(Replicated);
472+
entry.insert(client_entity.id());
473+
client_entity
474+
}
475+
};
458476

459-
let mut client_entity = DeferredEntity::new(world, client_entity);
460-
let mut commands = client_entity.commands(params.queue);
461477
params
462478
.entity_markers
463479
.read(params.command_markers, &*client_entity);
464480

465-
confirm_tick(
466-
&mut commands,
467-
&mut client_entity,
468-
params.replicated_events,
469-
message_tick,
470-
);
481+
confirm_tick(&mut client_entity, params.replicated_events, message_tick);
471482

472483
let len = apply_array(ArrayKind::Sized, message, |message| {
473484
let fns_id = postcard_utils::from_buf(message)?;
474485
let (component_id, component_fns, rule_fns) = params.registry.get(fns_id);
475486
let mut ctx = WriteCtx {
476-
commands: &mut commands,
477487
entity_map: params.entity_map,
478488
type_registry: params.type_registry,
479489
component_id,
480490
message_tick,
491+
entities,
481492
ignore_mapping: false,
482493
};
483494

@@ -499,7 +510,7 @@ fn apply_changes(
499510
stats.components_changed += len;
500511
}
501512

502-
params.queue.apply(world);
513+
client_entity.flush();
503514

504515
Ok(())
505516
}
@@ -540,17 +551,14 @@ enum ArrayKind {
540551
}
541552

542553
fn confirm_tick(
543-
commands: &mut Commands,
544554
entity: &mut DeferredEntity,
545555
replicated_events: &mut Events<EntityReplicated>,
546556
tick: RepliconTick,
547557
) {
548558
if let Some(mut history) = entity.get_mut::<ConfirmHistory>() {
549559
history.set_last_tick(tick);
550560
} else {
551-
commands
552-
.entity(entity.id())
553-
.insert(ConfirmHistory::new(tick));
561+
entity.insert(ConfirmHistory::new(tick));
554562
}
555563
replicated_events.send(EntityReplicated {
556564
entity: entity.id(),
@@ -570,15 +578,20 @@ fn apply_mutations(
570578
let server_entity = entity_serde::deserialize_entity(message)?;
571579
let data_size: usize = postcard_utils::from_buf(message)?;
572580

573-
let Some(client_entity) = params.entity_map.get_by_server(server_entity) else {
581+
let Some(&client_entity) = params.entity_map.to_client().get(&server_entity) else {
574582
// Mutation could arrive after a despawn from update message.
575583
debug!("ignoring mutations received for unknown server's {server_entity:?}");
576584
message.advance(data_size);
577585
return Ok(());
578586
};
579587

580-
let mut client_entity = DeferredEntity::new(world, client_entity);
581-
let mut commands = client_entity.commands(params.queue);
588+
let world_cell = world.as_unsafe_world_cell();
589+
let entities = world_cell.entities();
590+
// SAFETY: split into `Entities` and `DeferredEntity`.
591+
// The latter won't apply any structural changes until `flush`, and `Entities` won't be used afterward.
592+
let world = unsafe { world_cell.world_mut() };
593+
594+
let mut client_entity = DeferredEntity::new(world.entity_mut(client_entity), params.changes);
582595
params
583596
.entity_markers
584597
.read(params.command_markers, &*client_entity);
@@ -622,11 +635,11 @@ fn apply_mutations(
622635
let fns_id = postcard_utils::from_buf(&mut data)?;
623636
let (component_id, component_fns, rule_fns) = params.registry.get(fns_id);
624637
let mut ctx = WriteCtx {
625-
commands: &mut commands,
626638
entity_map: params.entity_map,
627639
type_registry: params.type_registry,
628640
component_id,
629641
message_tick,
642+
entities,
630643
ignore_mapping: false,
631644
};
632645

@@ -659,7 +672,7 @@ fn apply_mutations(
659672
stats.components_changed += components_count;
660673
}
661674

662-
params.queue.apply(world);
675+
client_entity.flush();
663676

664677
Ok(())
665678
}
@@ -668,7 +681,7 @@ fn apply_mutations(
668681
///
669682
/// To avoid passing a lot of arguments into all receive functions.
670683
struct ReceiveParams<'a> {
671-
queue: &'a mut CommandQueue,
684+
changes: &'a mut DeferredChanges,
672685
entity_markers: &'a mut EntityMarkers,
673686
entity_map: &'a mut ServerEntityMap,
674687
replicated_events: &'a mut Events<EntityReplicated>,

src/shared/replication/command_markers.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -84,17 +84,15 @@ pub trait AppMarkerExt {
8484
if let Some(mut history) = entity.get_mut::<History<C>>() {
8585
history.insert(ctx.message_tick, component);
8686
} else {
87-
ctx.commands
88-
.entity(entity.id())
89-
.insert(History([(ctx.message_tick, component)].into()));
87+
entity.insert(History([(ctx.message_tick, component)].into()));
9088
}
9189
9290
Ok(())
9391
}
9492
9593
/// Removes component `C` and its history.
96-
fn remove_history<C: Component>(ctx: &mut RemoveCtx, entity: &mut DeferredEntity) {
97-
ctx.commands.entity(entity.id()).remove::<History<C>>().remove::<C>();
94+
fn remove_history<C: Component>(_ctx: &mut RemoveCtx, entity: &mut DeferredEntity) {
95+
entity.remove::<History<C>>().remove::<C>();
9896
}
9997
10098
/// If this marker is present on an entity, registered components will be stored in [`History<T>`].

0 commit comments

Comments
 (0)