added persistence and networking

Signed-off-by: Sienna Meridian Satterwhite <sienna@r3t.io>
This commit is contained in:
2025-12-09 22:21:58 +00:00
parent 93ab598db0
commit e25078ba44
18 changed files with 1787 additions and 33528 deletions

View File

@@ -3,10 +3,10 @@
//! This module handles incoming EntityDelta messages and applies them to the
//! local Bevy world using CRDT merge semantics.
use bevy::{
prelude::*,
reflect::TypeRegistry,
};
use std::collections::HashMap;
use bevy::prelude::*;
use uuid::Uuid;
use crate::{
networking::{
@@ -16,17 +16,52 @@ use crate::{
},
delta_generation::NodeVectorClock,
entity_map::NetworkEntityMap,
merge::compare_operations_lww,
messages::{
ComponentData,
EntityDelta,
SyncMessage,
},
operations::ComponentOp,
NetworkedEntity,
VectorClock,
},
persistence::reflection::deserialize_component,
persistence::reflection::deserialize_component_typed,
};
/// Resource to track the last vector clock and originating node for each component on each entity
///
/// This enables Last-Write-Wins conflict resolution by comparing incoming
/// operations' vector clocks with the current component's vector clock.
/// The node_id is used as a deterministic tiebreaker for concurrent operations.
#[derive(Resource, Default)]
pub struct ComponentVectorClocks {
/// Maps (entity_network_id, component_type) -> (vector_clock, originating_node_id)
clocks: HashMap<(Uuid, String), (VectorClock, Uuid)>,
}
impl ComponentVectorClocks {
pub fn new() -> Self {
Self {
clocks: HashMap::new(),
}
}
/// Get the current vector clock and node_id for a component
pub fn get(&self, entity_id: Uuid, component_type: &str) -> Option<&(VectorClock, Uuid)> {
self.clocks.get(&(entity_id, component_type.to_string()))
}
/// Update the vector clock and node_id for a component
pub fn set(&mut self, entity_id: Uuid, component_type: String, clock: VectorClock, node_id: Uuid) {
self.clocks.insert((entity_id, component_type), (clock, node_id));
}
/// Remove all clocks for an entity (when entity is deleted)
pub fn remove_entity(&mut self, entity_id: Uuid) {
self.clocks.retain(|(eid, _), _| *eid != entity_id);
}
}
/// Apply an EntityDelta message to the local world
///
/// This function:
@@ -38,39 +73,33 @@ use crate::{
/// # Parameters
///
/// - `delta`: The EntityDelta to apply
/// - `commands`: Bevy Commands for spawning/modifying entities
/// - `entity_map`: Map from network_id to Entity
/// - `type_registry`: Bevy's type registry for deserialization
/// - `node_clock`: Our node's vector clock (for causality tracking)
/// - `blob_store`: Optional blob store for resolving large component references
/// - `tombstone_registry`: Optional tombstone registry for deletion tracking
/// - `world`: The Bevy world to apply changes to
pub fn apply_entity_delta(
delta: &EntityDelta,
commands: &mut Commands,
entity_map: &mut NetworkEntityMap,
type_registry: &TypeRegistry,
node_clock: &mut NodeVectorClock,
blob_store: Option<&BlobStore>,
mut tombstone_registry: Option<&mut crate::networking::TombstoneRegistry>,
world: &mut World,
) {
// Validate and merge the remote vector clock
// Check for clock regression (shouldn't happen in correct implementations)
if delta.vector_clock.happened_before(&node_clock.clock) {
warn!(
"Received operation with clock from the past for entity {:?}. \
Remote clock happened before our clock. This may indicate clock issues.",
delta.entity_id
);
}
{
let mut node_clock = world.resource_mut::<NodeVectorClock>();
// Merge the remote vector clock into ours
node_clock.clock.merge(&delta.vector_clock);
// Check for clock regression (shouldn't happen in correct implementations)
if delta.vector_clock.happened_before(&node_clock.clock) {
warn!(
"Received operation with clock from the past for entity {:?}. \
Remote clock happened before our clock. This may indicate clock issues.",
delta.entity_id
);
}
// Merge the remote vector clock into ours
node_clock.clock.merge(&delta.vector_clock);
}
// Check if any operations are Delete operations
for op in &delta.operations {
if let crate::networking::ComponentOp::Delete { vector_clock } = op {
// Record tombstone
if let Some(ref mut registry) = tombstone_registry {
if let Some(mut registry) = world.get_resource_mut::<crate::networking::TombstoneRegistry>() {
registry.record_deletion(
delta.entity_id,
delta.node_id,
@@ -78,8 +107,13 @@ pub fn apply_entity_delta(
);
// Despawn the entity if it exists locally
if let Some(entity) = entity_map.get_entity(delta.entity_id) {
commands.entity(entity).despawn();
let entity_to_despawn = {
let entity_map = world.resource::<NetworkEntityMap>();
entity_map.get_entity(delta.entity_id)
};
if let Some(entity) = entity_to_despawn {
world.despawn(entity);
let mut entity_map = world.resource_mut::<NetworkEntityMap>();
entity_map.remove_by_network_id(delta.entity_id);
info!("Despawned entity {:?} due to Delete operation", delta.entity_id);
}
@@ -91,7 +125,7 @@ pub fn apply_entity_delta(
}
// Check if we should ignore this delta due to deletion
if let Some(ref registry) = tombstone_registry {
if let Some(registry) = world.get_resource::<crate::networking::TombstoneRegistry>() {
if registry.should_ignore_operation(delta.entity_id, &delta.vector_clock) {
debug!(
"Ignoring delta for deleted entity {:?}",
@@ -101,29 +135,30 @@ pub fn apply_entity_delta(
}
}
// Look up or create the entity
let entity = match entity_map.get_entity(delta.entity_id) {
Some(entity) => entity,
None => {
// Spawn new entity with NetworkedEntity component
let entity = commands
.spawn(NetworkedEntity::with_id(delta.entity_id, delta.node_id))
.id();
entity_map.insert(delta.entity_id, entity);
info!(
"Spawned new networked entity {:?} from node {}",
delta.entity_id, delta.node_id
);
let entity = {
let entity_map = world.resource::<NetworkEntityMap>();
if let Some(entity) = entity_map.get_entity(delta.entity_id) {
entity
} else {
// Use shared helper to spawn networked entity with persistence
crate::networking::spawn_networked_entity(world, delta.entity_id, delta.node_id)
}
};
// Apply each operation (skip Delete operations - handled above)
for op in &delta.operations {
if !op.is_delete() {
apply_component_op(entity, op, commands, type_registry, blob_store);
apply_component_op(entity, op, delta.node_id, world);
}
}
// Trigger persistence by marking Persisted as changed
// This ensures remote entities are persisted after sync
if let Ok(mut entity_mut) = world.get_entity_mut(entity) {
if let Some(mut persisted) = entity_mut.get_mut::<crate::persistence::Persisted>() {
// Accessing &mut triggers Bevy's change detection
let _ = &mut *persisted;
debug!("Triggered persistence for synced entity {:?}", delta.entity_id);
}
}
}
@@ -135,17 +170,16 @@ pub fn apply_entity_delta(
fn apply_component_op(
entity: Entity,
op: &ComponentOp,
commands: &mut Commands,
type_registry: &TypeRegistry,
blob_store: Option<&BlobStore>,
incoming_node_id: Uuid,
world: &mut World,
) {
match op {
| ComponentOp::Set {
component_type,
data,
vector_clock: _,
vector_clock,
} => {
apply_set_operation(entity, component_type, data, commands, type_registry, blob_store);
apply_set_operation_with_lww(entity, component_type, data, vector_clock, incoming_node_id, world);
}
| ComponentOp::SetAdd { component_type, .. } => {
// OR-Set add - Phase 10 provides OrSet<T> type
@@ -174,6 +208,120 @@ fn apply_component_op(
}
}
/// Apply a Set operation with Last-Write-Wins conflict resolution
///
/// Compares the incoming vector clock with the stored clock for this component.
/// Only applies the operation if the incoming clock wins the LWW comparison.
/// Uses node_id as a deterministic tiebreaker for concurrent operations.
fn apply_set_operation_with_lww(
entity: Entity,
component_type: &str,
data: &ComponentData,
incoming_clock: &VectorClock,
incoming_node_id: Uuid,
world: &mut World,
) {
// Get the network ID for this entity
let entity_network_id = {
if let Ok(entity_ref) = world.get_entity(entity) {
if let Some(networked) = entity_ref.get::<crate::networking::NetworkedEntity>() {
networked.network_id
} else {
warn!("Entity {:?} has no NetworkedEntity component", entity);
return;
}
} else {
warn!("Entity {:?} not found", entity);
return;
}
};
// Check if we should apply this operation based on LWW
let should_apply = {
if let Some(component_clocks) = world.get_resource::<ComponentVectorClocks>() {
if let Some((current_clock, current_node_id)) = component_clocks.get(entity_network_id, component_type) {
// We have a current clock - do LWW comparison with real node IDs
let decision = compare_operations_lww(
current_clock,
*current_node_id,
incoming_clock,
incoming_node_id,
);
match decision {
crate::networking::merge::MergeDecision::ApplyRemote => {
debug!(
"Applying remote Set for {} (remote is newer)",
component_type
);
true
}
crate::networking::merge::MergeDecision::KeepLocal => {
debug!(
"Ignoring remote Set for {} (local is newer)",
component_type
);
false
}
crate::networking::merge::MergeDecision::Concurrent => {
// For concurrent operations, use node_id comparison as deterministic tiebreaker
// This ensures all nodes make the same decision for concurrent updates
if incoming_node_id > *current_node_id {
debug!(
"Applying remote Set for {} (concurrent, remote node_id {:?} > local {:?})",
component_type, incoming_node_id, current_node_id
);
true
} else {
debug!(
"Ignoring remote Set for {} (concurrent, local node_id {:?} >= remote {:?})",
component_type, current_node_id, incoming_node_id
);
false
}
}
crate::networking::merge::MergeDecision::Equal => {
debug!("Ignoring remote Set for {} (clocks equal)", component_type);
false
}
}
} else {
// No current clock - this is the first time we're setting this component
debug!(
"Applying remote Set for {} (no current clock)",
component_type
);
true
}
} else {
// No ComponentVectorClocks resource - apply unconditionally
warn!("ComponentVectorClocks resource not found - applying Set without LWW check");
true
}
};
if !should_apply {
return;
}
// Apply the operation
apply_set_operation(entity, component_type, data, world);
// Update the stored vector clock with node_id
if let Some(mut component_clocks) = world.get_resource_mut::<ComponentVectorClocks>() {
component_clocks.set(
entity_network_id,
component_type.to_string(),
incoming_clock.clone(),
incoming_node_id,
);
debug!(
"Updated vector clock for {} on entity {:?} (node_id: {:?})",
component_type, entity_network_id, incoming_node_id
);
}
}
/// Apply a Set operation (Last-Write-Wins)
///
/// Deserializes the component and inserts/updates it on the entity.
@@ -182,10 +330,13 @@ fn apply_set_operation(
entity: Entity,
component_type: &str,
data: &ComponentData,
commands: &mut Commands,
type_registry: &TypeRegistry,
blob_store: Option<&BlobStore>,
world: &mut World,
) {
let type_registry = {
let registry_resource = world.resource::<AppTypeRegistry>();
registry_resource.read()
};
let blob_store = world.get_resource::<BlobStore>();
// Get the actual data (resolve blob if needed)
let data_bytes = match data {
| ComponentData::Inline(bytes) => bytes.clone(),
@@ -211,19 +362,14 @@ fn apply_set_operation(
}
};
// Deserialize the component
let reflected = match deserialize_component(&data_bytes, type_registry) {
let reflected = match deserialize_component_typed(&data_bytes, component_type, &type_registry) {
Ok(reflected) => reflected,
Err(e) => {
error!(
"Failed to deserialize component {}: {}",
component_type, e
);
error!("Failed to deserialize component {}: {}", component_type, e);
return;
}
};
// Get the type registration
let registration = match type_registry.get_with_type_path(component_type) {
Some(reg) => reg,
None => {
@@ -232,40 +378,36 @@ fn apply_set_operation(
}
};
// Get ReflectComponent data
let reflect_component = match registration.data::<ReflectComponent>() {
Some(rc) => rc.clone(),
None => {
error!(
"Component type {} does not have ReflectComponent data",
component_type
);
error!("Component type {} does not have ReflectComponent data", component_type);
return;
}
};
// Clone what we need to avoid lifetime issues
let component_type_owned = component_type.to_string();
drop(type_registry);
// Insert or update the component
commands.queue(move |world: &mut World| {
// Get the type registry from the world and clone it
let type_registry_arc = {
let Some(type_registry_res) = world.get_resource::<AppTypeRegistry>() else {
error!("AppTypeRegistry not found in world");
return;
};
type_registry_res.clone()
};
let type_registry_arc = world.resource::<AppTypeRegistry>().clone();
let type_registry_guard = type_registry_arc.read();
// Now we can safely get mutable access to the world
let type_registry = type_registry_arc.read();
if let Ok(mut entity_mut) = world.get_entity_mut(entity) {
reflect_component.insert(&mut entity_mut, &*reflected, &type_registry_guard);
debug!("Applied Set operation for {}", component_type);
if let Ok(mut entity_mut) = world.get_entity_mut(entity) {
reflect_component.insert(&mut entity_mut, &*reflected, &type_registry);
debug!("Applied Set operation for {}", component_type_owned);
// If we just inserted a Transform component, also add NetworkedTransform
// This ensures remote entities can have their Transform changes detected
if component_type == "bevy_transform::components::transform::Transform" {
if let Ok(mut entity_mut) = world.get_entity_mut(entity) {
if entity_mut.get::<crate::networking::NetworkedTransform>().is_none() {
entity_mut.insert(crate::networking::NetworkedTransform::default());
debug!("Added NetworkedTransform to entity with Transform");
}
}
}
});
} else {
error!("Entity {:?} not found when applying component {}", entity, component_type);
}
}
/// System to receive and apply incoming EntityDelta messages
@@ -282,21 +424,14 @@ fn apply_set_operation(
/// App::new()
/// .add_systems(Update, receive_and_apply_deltas_system);
/// ```
pub fn receive_and_apply_deltas_system(
mut commands: Commands,
bridge: Option<Res<crate::networking::GossipBridge>>,
mut entity_map: ResMut<NetworkEntityMap>,
type_registry: Res<AppTypeRegistry>,
mut node_clock: ResMut<NodeVectorClock>,
blob_store: Option<Res<BlobStore>>,
mut tombstone_registry: Option<ResMut<crate::networking::TombstoneRegistry>>,
) {
let Some(bridge) = bridge else {
pub fn receive_and_apply_deltas_system(world: &mut World) {
// Check if bridge exists
if world.get_resource::<crate::networking::GossipBridge>().is_none() {
return;
};
}
let registry = type_registry.read();
let blob_store_ref = blob_store.as_deref();
// Clone the bridge to avoid borrowing issues
let bridge = world.resource::<crate::networking::GossipBridge>().clone();
// Poll for incoming messages
while let Some(message) = bridge.try_recv() {
@@ -320,15 +455,7 @@ pub fn receive_and_apply_deltas_system(
delta.operations.len()
);
apply_entity_delta(
&delta,
&mut commands,
&mut entity_map,
&registry,
&mut node_clock,
blob_store_ref,
tombstone_registry.as_deref_mut(),
);
apply_entity_delta(&delta, world);
}
| SyncMessage::JoinRequest { .. } => {
// Handled by handle_join_requests_system

View File

@@ -28,17 +28,28 @@ use crate::networking::{
/// ```
pub fn auto_detect_transform_changes_system(
mut query: Query<
&mut NetworkedEntity,
(Entity, &mut NetworkedEntity, &Transform),
(
With<NetworkedTransform>,
Or<(Changed<Transform>, Changed<GlobalTransform>)>,
),
>,
) {
// Count how many changed entities we found
let count = query.iter().count();
if count > 0 {
debug!("auto_detect_transform_changes_system: Found {} entities with changed Transform", count);
}
// Simply accessing &mut NetworkedEntity triggers Bevy's change detection
for mut _networked in query.iter_mut() {
for (_entity, mut networked, transform) in query.iter_mut() {
debug!(
"Marking NetworkedEntity {:?} as changed due to Transform change (pos: {:?})",
networked.network_id, transform.translation
);
// No-op - the mutable access itself marks NetworkedEntity as changed
// This will trigger the delta generation system
let _ = &mut *networked;
}
}

View File

@@ -7,7 +7,6 @@ use bevy::prelude::*;
use crate::networking::{
change_detection::LastSyncVersions,
entity_map::NetworkEntityMap,
gossip_bridge::GossipBridge,
messages::{
EntityDelta,
@@ -67,82 +66,133 @@ impl NodeVectorClock {
/// App::new()
/// .add_systems(Update, generate_delta_system);
/// ```
pub fn generate_delta_system(
query: Query<(Entity, &NetworkedEntity), Changed<NetworkedEntity>>,
world: &World,
type_registry: Res<AppTypeRegistry>,
mut node_clock: ResMut<NodeVectorClock>,
mut last_versions: ResMut<LastSyncVersions>,
bridge: Option<Res<GossipBridge>>,
_entity_map: Res<NetworkEntityMap>,
mut operation_log: Option<ResMut<crate::networking::OperationLog>>,
) {
// Early return if no gossip bridge
let Some(bridge) = bridge else {
pub fn generate_delta_system(world: &mut World) {
// Check if bridge exists
if world.get_resource::<GossipBridge>().is_none() {
return;
}
let changed_entities: Vec<(Entity, uuid::Uuid, uuid::Uuid)> = {
let mut query = world.query_filtered::<(Entity, &NetworkedEntity), Changed<NetworkedEntity>>();
query.iter(world)
.map(|(entity, networked)| (entity, networked.network_id, networked.owner_node_id))
.collect()
};
let registry = type_registry.read();
if changed_entities.is_empty() {
return;
}
for (entity, networked) in query.iter() {
// Check if we should sync this entity
let current_seq = node_clock.sequence();
if !last_versions.should_sync(networked.network_id, current_seq) {
continue;
}
debug!(
"generate_delta_system: Processing {} changed entities",
changed_entities.len()
);
// Increment our vector clock
node_clock.tick();
// Process each entity separately to avoid borrow conflicts
for (entity, network_id, _owner_node_id) in changed_entities {
// Phase 1: Check and update clocks, collect data
let mut system_state: bevy::ecs::system::SystemState<(
Res<GossipBridge>,
Res<AppTypeRegistry>,
ResMut<NodeVectorClock>,
ResMut<LastSyncVersions>,
Option<ResMut<crate::networking::OperationLog>>,
)> = bevy::ecs::system::SystemState::new(world);
// Build operations for all components
// TODO: Add BlobStore support in future phases
let operations = build_entity_operations(
entity,
world,
node_clock.node_id,
node_clock.clock.clone(),
&registry,
None, // blob_store - will be added in later phases
);
let (node_id, vector_clock, current_seq) = {
let (_, _, mut node_clock, last_versions, _) = system_state.get_mut(world);
// Check if we should sync this entity
let current_seq = node_clock.sequence();
if !last_versions.should_sync(network_id, current_seq) {
drop(last_versions);
drop(node_clock);
system_state.apply(world);
continue;
}
// Increment our vector clock
node_clock.tick();
(node_clock.node_id, node_clock.clock.clone(), current_seq)
};
// Phase 2: Build operations (needs world access without holding other borrows)
let operations = {
let type_registry = world.resource::<AppTypeRegistry>().read();
let ops = build_entity_operations(
entity,
world,
node_id,
vector_clock.clone(),
&type_registry,
None, // blob_store - will be added in later phases
);
drop(type_registry);
ops
};
if operations.is_empty() {
system_state.apply(world);
continue;
}
// Create EntityDelta
let delta = EntityDelta::new(
networked.network_id,
node_clock.node_id,
node_clock.clock.clone(),
operations,
);
// Phase 3: Record, broadcast, and update
let delta = {
let (bridge, _, _, mut last_versions, mut operation_log) = system_state.get_mut(world);
// Record in operation log for anti-entropy
if let Some(ref mut log) = operation_log {
log.record_operation(delta.clone());
}
// Wrap in VersionedMessage
let message = VersionedMessage::new(SyncMessage::EntityDelta {
entity_id: delta.entity_id,
node_id: delta.node_id,
vector_clock: delta.vector_clock.clone(),
operations: delta.operations.clone(),
});
// Broadcast
if let Err(e) = bridge.send(message) {
error!("Failed to broadcast EntityDelta: {}", e);
} else {
debug!(
"Broadcast EntityDelta for entity {:?} with {} operations",
networked.network_id,
delta.operations.len()
// Create EntityDelta
let delta = EntityDelta::new(
network_id,
node_id,
vector_clock.clone(),
operations,
);
// Update last sync version
last_versions.update(networked.network_id, current_seq);
// Record in operation log for anti-entropy
if let Some(ref mut log) = operation_log {
log.record_operation(delta.clone());
}
// Wrap in VersionedMessage
let message = VersionedMessage::new(SyncMessage::EntityDelta {
entity_id: delta.entity_id,
node_id: delta.node_id,
vector_clock: delta.vector_clock.clone(),
operations: delta.operations.clone(),
});
// Broadcast
if let Err(e) = bridge.send(message) {
error!("Failed to broadcast EntityDelta: {}", e);
} else {
debug!(
"Broadcast EntityDelta for entity {:?} with {} operations",
network_id,
delta.operations.len()
);
last_versions.update(network_id, current_seq);
}
delta
};
// Phase 4: Update component vector clocks for local modifications
{
if let Some(mut component_clocks) = world.get_resource_mut::<crate::networking::ComponentVectorClocks>() {
for op in &delta.operations {
if let crate::networking::ComponentOp::Set { component_type, vector_clock: op_clock, .. } = op {
component_clocks.set(network_id, component_type.clone(), op_clock.clone(), node_id);
debug!(
"Updated local vector clock for {} on entity {:?} (node_id: {:?})",
component_type, network_id, node_id
);
}
}
}
}
system_state.apply(world);
}
}

View File

@@ -62,11 +62,37 @@ impl GossipBridge {
Ok(())
}
/// Try to receive a message from the gossip network
/// Try to receive a message from the gossip network (from incoming queue)
pub fn try_recv(&self) -> Option<VersionedMessage> {
self.incoming.lock().ok()?.pop_front()
}
/// Drain all pending messages from the incoming queue atomically
///
/// This acquires the lock once and drains all messages, preventing race conditions
/// where messages could arrive between individual try_recv() calls.
pub fn drain_incoming(&self) -> Vec<VersionedMessage> {
self.incoming
.lock()
.ok()
.map(|mut queue| queue.drain(..).collect())
.unwrap_or_default()
}
/// Try to get a message from the outgoing queue to send to gossip
pub fn try_recv_outgoing(&self) -> Option<VersionedMessage> {
self.outgoing.lock().ok()?.pop_front()
}
/// Push a message to the incoming queue (for testing/integration)
pub fn push_incoming(&self, message: VersionedMessage) -> Result<()> {
self.incoming
.lock()
.map_err(|e| NetworkingError::Gossip(format!("Failed to lock incoming queue: {}", e)))?
.push_back(message);
Ok(())
}
/// Get our node ID
pub fn node_id(&self) -> NodeId {
self.node_id

View File

@@ -199,11 +199,15 @@ pub fn apply_full_state(
continue;
}
// Spawn entity with NetworkedEntity component
// Spawn entity with NetworkedEntity and Persisted components
// This ensures entities received via FullState are persisted locally
let entity = commands
.spawn(NetworkedEntity::with_id(
entity_state.entity_id,
entity_state.owner_node_id,
.spawn((
NetworkedEntity::with_id(
entity_state.entity_id,
entity_state.owner_node_id,
),
crate::persistence::Persisted::with_id(entity_state.entity_id),
))
.id();

View File

@@ -4,13 +4,15 @@
//! multiple systems each polling the same message queue. Instead, a single
//! dispatcher system polls once and routes messages to appropriate handlers.
use bevy::prelude::*;
use bevy::{
ecs::system::SystemState,
prelude::*,
};
use crate::networking::{
apply_entity_delta,
apply_full_state,
blob_support::BlobStore,
build_full_state,
build_missing_deltas,
delta_generation::NodeVectorClock,
entity_map::NetworkEntityMap,
@@ -47,93 +49,122 @@ use crate::networking::{
/// App::new()
/// .add_systems(Update, message_dispatcher_system);
/// ```
pub fn message_dispatcher_system(
world: &World,
mut commands: Commands,
bridge: Option<Res<GossipBridge>>,
mut entity_map: ResMut<NetworkEntityMap>,
type_registry: Res<AppTypeRegistry>,
mut node_clock: ResMut<NodeVectorClock>,
blob_store: Option<Res<BlobStore>>,
mut tombstone_registry: Option<ResMut<TombstoneRegistry>>,
operation_log: Option<Res<OperationLog>>,
networked_entities: Query<(Entity, &NetworkedEntity)>,
) {
let Some(bridge) = bridge else {
pub fn message_dispatcher_system(world: &mut World) {
// This is an exclusive system to avoid parameter conflicts with world access
// Check if bridge exists
if world.get_resource::<GossipBridge>().is_none() {
return;
}
// Atomically drain all pending messages from the incoming queue
// This prevents race conditions where messages could arrive between individual try_recv() calls
let messages: Vec<crate::networking::VersionedMessage> = {
let bridge = world.resource::<GossipBridge>();
bridge.drain_incoming()
};
let registry = type_registry.read();
let blob_store_ref = blob_store.as_deref();
// Dispatch each message (bridge is no longer borrowed)
for message in messages {
dispatch_message(world, message);
}
// Poll messages once and route to appropriate handlers
while let Some(message) = bridge.try_recv() {
match message.message {
// EntityDelta - apply remote operations
| SyncMessage::EntityDelta {
// Flush all queued commands to ensure components are inserted immediately
world.flush();
}
/// Helper function to dispatch a single message
/// This is separate to allow proper borrowing of world resources
fn dispatch_message(
world: &mut World,
message: crate::networking::VersionedMessage,
) {
match message.message {
// EntityDelta - apply remote operations
| SyncMessage::EntityDelta {
entity_id,
node_id,
vector_clock,
operations,
} => {
let delta = crate::networking::EntityDelta {
entity_id,
node_id,
vector_clock,
operations,
} => {
let delta = crate::networking::EntityDelta {
entity_id,
node_id,
vector_clock,
operations,
};
};
debug!(
"Received EntityDelta for entity {:?} with {} operations",
delta.entity_id,
delta.operations.len()
);
debug!(
"Received EntityDelta for entity {:?} with {} operations",
delta.entity_id,
delta.operations.len()
);
apply_entity_delta(
&delta,
&mut commands,
&mut entity_map,
&registry,
&mut node_clock,
blob_store_ref,
tombstone_registry.as_deref_mut(),
);
apply_entity_delta(&delta, world);
}
// JoinRequest - new peer joining
| SyncMessage::JoinRequest {
node_id,
session_secret,
} => {
info!("Received JoinRequest from node {}", node_id);
// TODO: Validate session_secret in Phase 13
if let Some(_secret) = session_secret {
debug!("Session secret validation not yet implemented");
}
// JoinRequest - new peer joining
| SyncMessage::JoinRequest {
node_id,
session_secret,
} => {
info!("Received JoinRequest from node {}", node_id);
// Build and send full state
// We need to collect data in separate steps to avoid borrow conflicts
let networked_entities = {
let mut query = world.query::<(Entity, &NetworkedEntity)>();
query.iter(world).collect::<Vec<_>>()
};
// TODO: Validate session_secret in Phase 13
if let Some(_secret) = session_secret {
debug!("Session secret validation not yet implemented");
}
let full_state = {
let type_registry = world.resource::<AppTypeRegistry>().read();
let node_clock = world.resource::<NodeVectorClock>();
let blob_store = world.get_resource::<BlobStore>();
// Build and send full state
let full_state = build_full_state(
build_full_state_from_data(
world,
&networked_entities,
&registry,
&type_registry,
&node_clock,
blob_store_ref,
);
blob_store.map(|b| b as &BlobStore),
)
};
// Get bridge to send response
if let Some(bridge) = world.get_resource::<GossipBridge>() {
if let Err(e) = bridge.send(full_state) {
error!("Failed to send FullState: {}", e);
} else {
info!("Sent FullState to node {}", node_id);
}
}
}
// FullState - receiving world state after join
| SyncMessage::FullState {
entities,
vector_clock,
} => {
info!("Received FullState with {} entities", entities.len());
// FullState - receiving world state after join
| SyncMessage::FullState {
entities,
vector_clock,
} => {
info!("Received FullState with {} entities", entities.len());
// Use SystemState to properly borrow multiple resources
let mut system_state: SystemState<(
Commands,
ResMut<NetworkEntityMap>,
Res<AppTypeRegistry>,
ResMut<NodeVectorClock>,
Option<Res<BlobStore>>,
Option<ResMut<TombstoneRegistry>>,
)> = SystemState::new(world);
{
let (mut commands, mut entity_map, type_registry, mut node_clock, blob_store, mut tombstone_registry) = system_state.get_mut(world);
let registry = type_registry.read();
apply_full_state(
entities,
@@ -142,68 +173,163 @@ pub fn message_dispatcher_system(
&mut entity_map,
&registry,
&mut node_clock,
blob_store_ref,
blob_store.as_deref(),
tombstone_registry.as_deref_mut(),
);
// registry is dropped here
}
// SyncRequest - peer requesting missing operations
| SyncMessage::SyncRequest {
node_id: requesting_node,
vector_clock: their_clock,
} => {
debug!("Received SyncRequest from node {}", requesting_node);
system_state.apply(world);
}
if let Some(ref op_log) = operation_log {
// Find operations they're missing
let missing_deltas = op_log.get_all_operations_newer_than(&their_clock);
// SyncRequest - peer requesting missing operations
| SyncMessage::SyncRequest {
node_id: requesting_node,
vector_clock: their_clock,
} => {
debug!("Received SyncRequest from node {}", requesting_node);
if !missing_deltas.is_empty() {
info!(
"Sending {} missing deltas to node {}",
missing_deltas.len(),
requesting_node
);
if let Some(op_log) = world.get_resource::<OperationLog>() {
// Find operations they're missing
let missing_deltas = op_log.get_all_operations_newer_than(&their_clock);
// Send MissingDeltas response
let response = build_missing_deltas(missing_deltas);
if !missing_deltas.is_empty() {
info!(
"Sending {} missing deltas to node {}",
missing_deltas.len(),
requesting_node
);
// Send MissingDeltas response
let response = build_missing_deltas(missing_deltas);
if let Some(bridge) = world.get_resource::<GossipBridge>() {
if let Err(e) = bridge.send(response) {
error!("Failed to send MissingDeltas: {}", e);
}
} else {
debug!("No missing deltas for node {}", requesting_node);
}
} else {
warn!("Received SyncRequest but OperationLog resource not available");
debug!("No missing deltas for node {}", requesting_node);
}
} else {
warn!("Received SyncRequest but OperationLog resource not available");
}
}
// MissingDeltas - receiving operations we requested
| SyncMessage::MissingDeltas { deltas } => {
info!("Received MissingDeltas with {} operations", deltas.len());
// MissingDeltas - receiving operations we requested
| SyncMessage::MissingDeltas { deltas } => {
info!("Received MissingDeltas with {} operations", deltas.len());
// Apply each delta
for delta in deltas {
debug!(
"Applying missing delta for entity {:?}",
delta.entity_id
);
// Apply each delta
for delta in deltas {
debug!(
"Applying missing delta for entity {:?}",
delta.entity_id
);
apply_entity_delta(
&delta,
&mut commands,
&mut entity_map,
&registry,
&mut node_clock,
blob_store_ref,
tombstone_registry.as_deref_mut(),
);
}
apply_entity_delta(&delta, world);
}
}
}
}
/// Helper to build full state from collected data
fn build_full_state_from_data(
world: &World,
networked_entities: &[(Entity, &NetworkedEntity)],
type_registry: &bevy::reflect::TypeRegistry,
node_clock: &NodeVectorClock,
blob_store: Option<&BlobStore>,
) -> crate::networking::VersionedMessage {
use crate::{
networking::{
blob_support::create_component_data,
messages::{
ComponentState,
EntityState,
},
},
persistence::reflection::serialize_component,
};
// Get tombstone registry to filter out deleted entities
let tombstone_registry = world.get_resource::<crate::networking::TombstoneRegistry>();
let mut entities = Vec::new();
for (entity, networked) in networked_entities {
// Skip tombstoned entities to prevent resurrection on joining nodes
if let Some(registry) = &tombstone_registry {
if registry.is_deleted(networked.network_id) {
debug!(
"Skipping tombstoned entity {:?} in full state build",
networked.network_id
);
continue;
}
}
let entity_ref = world.entity(*entity);
let mut components = Vec::new();
// Iterate over all type registrations to find components
for registration in type_registry.iter() {
// Skip if no ReflectComponent data
let Some(reflect_component) = registration.data::<ReflectComponent>() else {
continue;
};
let type_path = registration.type_info().type_path();
// Skip networked wrapper components
if type_path.ends_with("::NetworkedEntity")
|| type_path.ends_with("::NetworkedTransform")
|| type_path.ends_with("::NetworkedSelection")
|| type_path.ends_with("::NetworkedDrawingPath")
{
continue;
}
// Try to reflect this component from the entity
if let Some(reflected) = reflect_component.reflect(entity_ref) {
// Serialize the component
if let Ok(serialized) = serialize_component(reflected, type_registry) {
// Create component data (inline or blob)
let data = if let Some(store) = blob_store {
match create_component_data(serialized, store) {
Ok(d) => d,
Err(_) => continue,
}
} else {
crate::networking::ComponentData::Inline(serialized)
};
components.push(ComponentState {
component_type: type_path.to_string(),
data,
});
}
}
}
entities.push(EntityState {
entity_id: networked.network_id,
owner_node_id: networked.owner_node_id,
vector_clock: node_clock.clock.clone(),
components,
is_deleted: false,
});
}
info!(
"Built FullState with {} entities for new peer",
entities.len()
);
crate::networking::VersionedMessage::new(SyncMessage::FullState {
entities,
vector_clock: node_clock.clock.clone(),
})
}
#[cfg(test)]
mod tests {
#[test]

View File

@@ -71,3 +71,56 @@ pub use rga::*;
pub use sync_component::*;
pub use tombstones::*;
pub use vector_clock::*;
/// Spawn a networked entity with persistence enabled
///
/// Creates an entity with both NetworkedEntity and Persisted components,
/// registers it in the NetworkEntityMap, and returns the entity ID.
/// This is the single source of truth for creating networked entities
/// that need to be synchronized and persisted across the network.
///
/// # Parameters
/// - `world`: Bevy world to spawn entity in
/// - `entity_id`: Network ID for the entity (UUID)
/// - `node_id`: ID of the node that owns this entity
///
/// # Returns
/// The spawned Bevy entity's ID
///
/// # Example
/// ```no_run
/// use bevy::prelude::*;
/// use lib::networking::spawn_networked_entity;
/// use uuid::Uuid;
///
/// fn my_system(world: &mut World) {
/// let entity_id = Uuid::new_v4();
/// let node_id = Uuid::new_v4();
/// let entity = spawn_networked_entity(world, entity_id, node_id);
/// // Entity is now registered and ready for sync/persistence
/// }
/// ```
pub fn spawn_networked_entity(
world: &mut bevy::prelude::World,
entity_id: uuid::Uuid,
node_id: uuid::Uuid,
) -> bevy::prelude::Entity {
use bevy::prelude::*;
// Spawn with both NetworkedEntity and Persisted components
let entity = world.spawn((
NetworkedEntity::with_id(entity_id, node_id),
crate::persistence::Persisted::with_id(entity_id),
)).id();
// Register in entity map
let mut entity_map = world.resource_mut::<NetworkEntityMap>();
entity_map.insert(entity_id, entity);
info!(
"Spawned new networked entity {:?} from node {}",
entity_id, node_id
);
entity
}

View File

@@ -25,7 +25,7 @@ use crate::{
VectorClock,
},
},
persistence::reflection::serialize_component,
persistence::reflection::serialize_component_typed,
};
/// Build a Set operation (LWW) from a component
@@ -55,7 +55,7 @@ pub fn build_set_operation(
blob_store: Option<&BlobStore>,
) -> Result<ComponentOp> {
// Serialize the component
let serialized = serialize_component(component, type_registry)?;
let serialized = serialize_component_typed(component, type_registry)?;
// Create component data (inline or blob)
let data = if let Some(store) = blob_store {
@@ -97,6 +97,8 @@ pub fn build_entity_operations(
let mut operations = Vec::new();
let entity_ref = world.entity(entity);
debug!("build_entity_operations: Building operations for entity {:?}", entity);
// Iterate over all type registrations
for registration in type_registry.iter() {
// Skip if no ReflectComponent data
@@ -119,7 +121,7 @@ pub fn build_entity_operations(
// Try to reflect this component from the entity
if let Some(reflected) = reflect_component.reflect(entity_ref) {
// Serialize the component
if let Ok(serialized) = serialize_component(reflected, type_registry) {
if let Ok(serialized) = serialize_component_typed(reflected, type_registry) {
// Create component data (inline or blob)
let data = if let Some(store) = blob_store {
if let Ok(component_data) = create_component_data(serialized, store) {
@@ -138,12 +140,19 @@ pub fn build_entity_operations(
operations.push(ComponentOp::Set {
component_type: type_path.to_string(),
data,
vector_clock: clock,
vector_clock: clock.clone(),
});
debug!(" ✓ Added Set operation for {}", type_path);
}
}
}
debug!(
"build_entity_operations: Built {} operations for entity {:?}",
operations.len(),
entity
);
operations
}
@@ -173,7 +182,7 @@ pub fn build_transform_operation(
blob_store: Option<&BlobStore>,
) -> Result<ComponentOp> {
// Use reflection to serialize Transform
let serialized = serialize_component(transform.as_reflect(), type_registry)?;
let serialized = serialize_component_typed(transform.as_reflect(), type_registry)?;
// Create component data (inline or blob)
let data = if let Some(store) = blob_store {

View File

@@ -307,21 +307,14 @@ pub fn handle_sync_requests_system(
/// System to handle MissingDeltas messages
///
/// When we receive MissingDeltas (in response to our SyncRequest), apply them.
pub fn handle_missing_deltas_system(
mut commands: Commands,
bridge: Option<Res<GossipBridge>>,
mut entity_map: ResMut<crate::networking::NetworkEntityMap>,
type_registry: Res<AppTypeRegistry>,
mut node_clock: ResMut<NodeVectorClock>,
blob_store: Option<Res<crate::networking::BlobStore>>,
mut tombstone_registry: Option<ResMut<crate::networking::TombstoneRegistry>>,
) {
let Some(bridge) = bridge else {
pub fn handle_missing_deltas_system(world: &mut World) {
// Check if bridge exists
if world.get_resource::<GossipBridge>().is_none() {
return;
};
}
let registry = type_registry.read();
let blob_store_ref = blob_store.as_deref();
// Clone the bridge to avoid borrowing issues
let bridge = world.resource::<GossipBridge>().clone();
// Poll for MissingDeltas messages
while let Some(message) = bridge.try_recv() {
@@ -336,15 +329,7 @@ pub fn handle_missing_deltas_system(
delta.entity_id
);
crate::networking::apply_entity_delta(
&delta,
&mut commands,
&mut entity_map,
&registry,
&mut node_clock,
blob_store_ref,
tombstone_registry.as_deref_mut(),
);
crate::networking::apply_entity_delta(&delta, world);
}
}
| _ => {

View File

@@ -27,7 +27,10 @@
use bevy::prelude::*;
use crate::networking::{
change_detection::LastSyncVersions,
change_detection::{
auto_detect_transform_changes_system,
LastSyncVersions,
},
delta_generation::{
generate_delta_system,
NodeVectorClock,
@@ -158,7 +161,8 @@ impl Plugin for NetworkingPlugin {
.insert_resource(NetworkEntityMap::new())
.insert_resource(LastSyncVersions::default())
.insert_resource(OperationLog::new())
.insert_resource(TombstoneRegistry::new());
.insert_resource(TombstoneRegistry::new())
.insert_resource(crate::networking::ComponentVectorClocks::new());
// PreUpdate systems - handle incoming messages first
app.add_systems(
@@ -178,6 +182,8 @@ impl Plugin for NetworkingPlugin {
app.add_systems(
Update,
(
// Track Transform changes and mark NetworkedTransform as changed
auto_detect_transform_changes_system,
// Handle local entity deletions
handle_local_deletions_system,
),

View File

@@ -167,29 +167,61 @@ fn persistence_startup_system(db: Res<PersistenceDb>, mut metrics: ResMut<Persis
/// For automatic tracking without manual `mark_dirty()` calls, use the
/// `auto_track_component_changes_system` which automatically detects changes
/// to common components like Transform, GlobalTransform, etc.
fn collect_dirty_entities_bevy_system(
mut dirty: ResMut<DirtyEntitiesResource>,
mut write_buffer: ResMut<WriteBufferResource>,
query: Query<(Entity, &Persisted), Changed<Persisted>>,
world: &World,
type_registry: Res<AppTypeRegistry>,
) {
let registry = type_registry.read();
fn collect_dirty_entities_bevy_system(world: &mut World) {
// Collect changed entities first
let changed_entities: Vec<(Entity, uuid::Uuid)> = {
let mut query = world.query_filtered::<(Entity, &Persisted), Changed<Persisted>>();
query.iter(world)
.map(|(entity, persisted)| (entity, persisted.network_id))
.collect()
};
if changed_entities.is_empty() {
return;
}
// Serialize components for each entity
for (entity, network_id) in changed_entities {
// First, ensure the entity exists in the database
{
let now = chrono::Utc::now();
let mut write_buffer = world.resource_mut::<WriteBufferResource>();
write_buffer.add(PersistenceOp::UpsertEntity {
id: network_id,
data: EntityData {
id: network_id,
created_at: now,
updated_at: now,
entity_type: "NetworkedEntity".to_string(),
},
});
}
// Track changed entities and serialize all their components
for (entity, persisted) in query.iter() {
// Serialize all components on this entity (generic tracking)
let components = serialize_all_components_from_entity(entity, world, &registry);
let components = {
let type_registry = world.resource::<AppTypeRegistry>().read();
let comps = serialize_all_components_from_entity(entity, world, &type_registry);
drop(type_registry);
comps
};
// Add operations for each component
for (component_type, data) in components {
dirty.mark_dirty(persisted.network_id, &component_type);
// Get mutable access to dirty and mark it
{
let mut dirty = world.resource_mut::<DirtyEntitiesResource>();
dirty.mark_dirty(network_id, &component_type);
}
write_buffer.add(PersistenceOp::UpsertComponent {
entity_id: persisted.network_id,
component_type,
data,
});
// Get mutable access to write_buffer and add the operation
{
let mut write_buffer = world.resource_mut::<WriteBufferResource>();
write_buffer.add(PersistenceOp::UpsertComponent {
entity_id: network_id,
component_type,
data,
});
}
}
}
}

View File

@@ -9,12 +9,14 @@ use bevy::{
reflect::{
TypeRegistry,
serde::{
ReflectDeserializer,
ReflectSerializer,
TypedReflectDeserializer,
TypedReflectSerializer,
},
},
};
use bincode::Options as _;
use serde::de::DeserializeSeed;
use crate::persistence::error::{
PersistenceError,
Result,
@@ -101,7 +103,21 @@ pub fn serialize_component(
type_registry: &TypeRegistry,
) -> Result<Vec<u8>> {
let serializer = ReflectSerializer::new(component, type_registry);
bincode::serialize(&serializer).map_err(PersistenceError::from)
bincode::options().serialize(&serializer)
.map_err(PersistenceError::from)
}
/// Serialize a component when the type is known (more efficient for bincode)
///
/// This uses `TypedReflectSerializer` which doesn't include type path information,
/// making it compatible with `TypedReflectDeserializer` for binary formats.
pub fn serialize_component_typed(
component: &dyn Reflect,
type_registry: &TypeRegistry,
) -> Result<Vec<u8>> {
let serializer = TypedReflectSerializer::new(component, type_registry);
bincode::options().serialize(&serializer)
.map_err(PersistenceError::from)
}
/// Deserialize a component using Bevy's reflection system
@@ -134,9 +150,30 @@ pub fn deserialize_component(
type_registry: &TypeRegistry,
) -> Result<Box<dyn PartialReflect>> {
let mut deserializer = bincode::Deserializer::from_slice(bytes, bincode::options());
let reflect_deserializer = ReflectDeserializer::new(type_registry);
let reflect_deserializer = bevy::reflect::serde::ReflectDeserializer::new(type_registry);
reflect_deserializer
.deserialize(&mut deserializer)
.map_err(|e| PersistenceError::Deserialization(e.to_string()))
}
/// Deserialize a component when the type is known
///
/// Uses `TypedReflectDeserializer` which is more efficient for binary formats like bincode
/// when the component type is known at deserialization time.
pub fn deserialize_component_typed(
bytes: &[u8],
component_type: &str,
type_registry: &TypeRegistry,
) -> Result<Box<dyn PartialReflect>> {
let registration = type_registry.get_with_type_path(component_type)
.ok_or_else(|| PersistenceError::Deserialization(
format!("Type {} not registered", component_type)
))?;
let mut deserializer = bincode::Deserializer::from_slice(bytes, bincode::options());
let reflect_deserializer = TypedReflectDeserializer::new(registration, type_registry);
use serde::de::DeserializeSeed;
reflect_deserializer
.deserialize(&mut deserializer)
.map_err(|e| PersistenceError::Deserialization(e.to_string()))
@@ -235,8 +272,9 @@ pub fn serialize_all_components_from_entity(
// Try to reflect this component from the entity
if let Some(reflected) = reflect_component.reflect(entity_ref) {
// Serialize the component
if let Ok(data) = serialize_component(reflected, type_registry) {
// Serialize the component using typed serialization for consistency
// This matches the format expected by deserialize_component_typed
if let Ok(data) = serialize_component_typed(reflected, type_registry) {
components.push((type_path.to_string(), data));
}
}