chore: checkpoint for the demo. almost!!

Signed-off-by: Sienna Meridian Satterwhite <sienna@r3t.io>
This commit is contained in:
2026-01-05 19:41:38 +00:00
parent d1d3aec8aa
commit d2fc967f1a
29 changed files with 3389 additions and 454 deletions

View File

@@ -71,11 +71,22 @@ impl EngineCore {
self.stop_networking().await;
}
EngineCommand::SaveSession => {
// TODO: Save current session state
tracing::debug!("SaveSession command received (stub)");
// Session state is auto-saved by save_session_on_shutdown_system in Bevy
// This command is a no-op, as persistence is handled by Bevy systems
tracing::debug!("SaveSession command received (session auto-save handled by Bevy)");
}
EngineCommand::LoadSession { session_id } => {
tracing::debug!("LoadSession command received for {} (stub)", session_id.to_code());
// Loading a session means switching to a different session
// This requires restarting networking with the new session
tracing::info!("LoadSession command received for {}", session_id.to_code());
// Stop current networking if any
if self.networking_task.is_some() {
self.stop_networking().await;
}
// Start networking with the new session
self.start_networking(session_id).await;
}
EngineCommand::TickClock => {
self.tick_clock();

View File

@@ -240,11 +240,43 @@ impl NetworkingManager {
Event::Received(msg) => {
// Deserialize and forward to GossipBridge for Bevy systems
if let Ok(versioned) = rkyv::from_bytes::<VersionedMessage, rkyv::rancor::Failure>(&msg.content) {
// Diagnostic logging: track message type and nonce
let msg_type = match &versioned.message {
SyncMessage::EntityDelta { entity_id, .. } => {
format!("EntityDelta({})", entity_id)
}
SyncMessage::JoinRequest { node_id, .. } => {
format!("JoinRequest({})", node_id)
}
SyncMessage::FullState { entities, .. } => {
format!("FullState({} entities)", entities.len())
}
SyncMessage::SyncRequest { node_id, .. } => {
format!("SyncRequest({})", node_id)
}
SyncMessage::MissingDeltas { deltas } => {
format!("MissingDeltas({} ops)", deltas.len())
}
SyncMessage::Lock(lock_msg) => {
format!("Lock({:?})", lock_msg)
}
};
tracing::debug!(
"[NetworkingManager::receive] Node {} received from iroh-gossip: {} (nonce: {})",
self.node_id, msg_type, versioned.nonce
);
if let Err(e) = self.bridge.push_incoming(versioned) {
tracing::error!("Failed to push message to GossipBridge: {}", e);
tracing::error!("Failed to forward {} to GossipBridge: {}", msg_type, e);
} else {
tracing::debug!("Forwarded message to Bevy via GossipBridge");
tracing::debug!(
"[NetworkingManager::receive] ✓ Forwarded {} to Bevy GossipBridge",
msg_type
);
}
} else {
tracing::warn!("Failed to deserialize message from iroh-gossip");
}
}
Event::NeighborUp(peer) => {
@@ -288,15 +320,54 @@ impl NetworkingManager {
// Poll GossipBridge for outgoing messages and broadcast via iroh
_ = bridge_poll_interval.tick() => {
let mut sent_count = 0;
while let Some(msg) = self.bridge.try_recv_outgoing() {
// Diagnostic logging: track message type and nonce
let msg_type = match &msg.message {
SyncMessage::EntityDelta { entity_id, .. } => {
format!("EntityDelta({})", entity_id)
}
SyncMessage::JoinRequest { node_id, .. } => {
format!("JoinRequest({})", node_id)
}
SyncMessage::FullState { entities, .. } => {
format!("FullState({} entities)", entities.len())
}
SyncMessage::SyncRequest { node_id, .. } => {
format!("SyncRequest({})", node_id)
}
SyncMessage::MissingDeltas { deltas } => {
format!("MissingDeltas({} ops)", deltas.len())
}
SyncMessage::Lock(lock_msg) => {
format!("Lock({:?})", lock_msg)
}
};
tracing::debug!(
"[NetworkingManager::broadcast] Node {} broadcasting: {} (nonce: {})",
self.node_id, msg_type, msg.nonce
);
if let Ok(bytes) = rkyv::to_bytes::<rkyv::rancor::Failure>(&msg).map(|b| b.to_vec()) {
if let Err(e) = self.sender.broadcast(Bytes::from(bytes)).await {
tracing::error!("Failed to broadcast message: {}", e);
tracing::error!("Failed to broadcast {} to iroh-gossip: {}", msg_type, e);
} else {
tracing::debug!("Broadcast message from Bevy via iroh-gossip");
sent_count += 1;
tracing::debug!(
"[NetworkingManager::broadcast] ✓ Sent {} to iroh-gossip network",
msg_type
);
}
}
}
if sent_count > 0 {
tracing::info!(
"[NetworkingManager::broadcast] Node {} sent {} messages to iroh-gossip network",
self.node_id, sent_count
);
}
}
// Periodic tasks: heartbeats and lock cleanup

View File

@@ -165,6 +165,24 @@ pub fn apply_entity_delta(delta: &EntityDelta, world: &mut World) {
);
}
}
// CRITICAL: Add marker to prevent feedback loop
//
// When we apply remote operations, insert_fn() triggers Bevy's change detection.
// This causes auto_detect_transform_changes_system to mark NetworkedEntity as changed,
// which would normally trigger generate_delta_system to broadcast it back, creating
// an infinite feedback loop.
//
// By adding SkipNextDeltaGeneration marker, we tell generate_delta_system to skip
// this entity for one frame. A cleanup system removes the marker after delta
// generation runs, allowing future local changes to be broadcast normally.
if let Ok(mut entity_mut) = world.get_entity_mut(entity) {
entity_mut.insert(crate::networking::SkipNextDeltaGeneration);
debug!(
"Added SkipNextDeltaGeneration marker to entity {:?} to prevent feedback loop",
delta.entity_id
);
}
}
/// Apply a single ComponentOp to an entity
@@ -235,14 +253,18 @@ fn apply_set_operation_with_lww(
) {
// Get component type name for logging and clock tracking
let type_registry = {
let registry_resource = world.resource::<crate::persistence::ComponentTypeRegistryResource>();
let registry_resource =
world.resource::<crate::persistence::ComponentTypeRegistryResource>();
registry_resource.0
};
let component_type_name = match type_registry.get_type_name(discriminant) {
| Some(name) => name,
| None => {
error!("Unknown discriminant {} - component not registered", discriminant);
error!(
"Unknown discriminant {} - component not registered",
discriminant
);
return;
},
};
@@ -310,7 +332,10 @@ fn apply_set_operation_with_lww(
}
},
| crate::networking::merge::MergeDecision::Equal => {
debug!("Ignoring remote Set for {} (clocks equal)", component_type_name);
debug!(
"Ignoring remote Set for {} (clocks equal)",
component_type_name
);
false
},
}
@@ -355,14 +380,9 @@ fn apply_set_operation_with_lww(
///
/// Deserializes the component and inserts/updates it on the entity.
/// Handles both inline data and blob references.
fn apply_set_operation(
entity: Entity,
discriminant: u16,
data: &ComponentData,
world: &mut World,
) {
fn apply_set_operation(entity: Entity, discriminant: u16, data: &ComponentData, world: &mut World) {
let blob_store = world.get_resource::<BlobStore>();
// Get the actual data (resolve blob if needed)
let data_bytes = match data {
| ComponentData::Inline(bytes) => bytes.clone(),
@@ -390,7 +410,8 @@ fn apply_set_operation(
// Get component type registry
let type_registry = {
let registry_resource = world.resource::<crate::persistence::ComponentTypeRegistryResource>();
let registry_resource =
world.resource::<crate::persistence::ComponentTypeRegistryResource>();
registry_resource.0
};
@@ -401,7 +422,10 @@ fn apply_set_operation(
let (deserialize_fn, insert_fn) = match (deserialize_fn, insert_fn) {
| (Some(d), Some(i)) => (d, i),
| _ => {
error!("Discriminant {} not registered in ComponentTypeRegistry", discriminant);
error!(
"Discriminant {} not registered in ComponentTypeRegistry",
discriminant
);
return;
},
};

View File

@@ -31,6 +31,7 @@ pub fn auto_detect_transform_changes_system(
(
With<NetworkedTransform>,
Or<(Changed<Transform>, Changed<GlobalTransform>)>,
Without<crate::networking::SkipNextDeltaGeneration>,
),
>,
) {

View File

@@ -11,6 +11,21 @@ use serde::{
use crate::networking::vector_clock::NodeId;
/// Marker component to skip delta generation for one frame after receiving remote updates
///
/// When we apply remote operations via `apply_entity_delta()`, the `insert_fn()` call
/// triggers Bevy's change detection. This would normally cause `generate_delta_system`
/// to create and broadcast a new delta, creating an infinite feedback loop.
///
/// By adding this marker when we apply remote updates, we tell `generate_delta_system`
/// to skip this entity for one frame. A cleanup system removes the marker after
/// delta generation runs, allowing future local changes to be broadcast normally.
///
/// This is an implementation detail of the feedback loop prevention mechanism.
/// User code should never need to interact with this component.
#[derive(Component, Debug)]
pub struct SkipNextDeltaGeneration;
/// Marker component indicating an entity should be synchronized over the
/// network
///

View File

@@ -70,8 +70,13 @@ pub fn generate_delta_system(world: &mut World) {
// Broadcast only happens when online
let changed_entities: Vec<(Entity, uuid::Uuid, uuid::Uuid)> = {
let mut query =
world.query_filtered::<(Entity, &NetworkedEntity), Or<(Added<NetworkedEntity>, Changed<NetworkedEntity>)>>();
let mut query = world.query_filtered::<
(Entity, &NetworkedEntity),
(
Or<(Added<NetworkedEntity>, Changed<NetworkedEntity>)>,
Without<crate::networking::SkipNextDeltaGeneration>,
),
>();
query
.iter(world)
.map(|(entity, networked)| (entity, networked.network_id, networked.owner_node_id))
@@ -98,22 +103,25 @@ pub fn generate_delta_system(world: &mut World) {
Option<ResMut<crate::networking::OperationLog>>,
)> = bevy::ecs::system::SystemState::new(world);
let (node_id, vector_clock, current_seq) = {
let (node_id, vector_clock, new_seq) = {
let (_, _, mut node_clock, last_versions, _) = system_state.get_mut(world);
// Check if we should sync this entity
// Check if we should sync this entity with the NEXT sequence (after tick)
// This prevents duplicate sends when system runs multiple times per frame
let current_seq = node_clock.sequence();
if !last_versions.should_sync(network_id, current_seq) {
let next_seq = current_seq + 1; // What the sequence will be after tick
if !last_versions.should_sync(network_id, next_seq) {
drop(last_versions);
drop(node_clock);
system_state.apply(world);
continue;
}
// Increment our vector clock
node_clock.tick();
// Increment our vector clock and get the NEW sequence
let new_seq = node_clock.tick();
debug_assert_eq!(new_seq, next_seq, "tick() should return next_seq");
(node_clock.node_id, node_clock.clock.clone(), current_seq)
(node_clock.node_id, node_clock.clock.clone(), new_seq)
};
// Phase 2: Build operations (needs world access without holding other borrows)
@@ -174,8 +182,10 @@ pub fn generate_delta_system(world: &mut World) {
);
}
// Update last sync version (both online and offline)
last_versions.update(network_id, current_seq);
// Update last sync version with NEW sequence (after tick) to prevent duplicates
// CRITICAL: Must use new_seq (after tick), not current_seq (before tick)
// This prevents sending duplicate deltas if system runs multiple times per frame
last_versions.update(network_id, new_seq);
delta
};
@@ -220,6 +230,46 @@ pub fn generate_delta_system(world: &mut World) {
}
}
/// Remove SkipNextDeltaGeneration markers after delta generation has run
///
/// This system must run AFTER `generate_delta_system` to allow entities to be
/// synced again on the next actual local change. The marker prevents feedback
/// loops by skipping entities that just received remote updates, but we need
/// to remove it so future local changes get broadcast.
///
/// Add this to your app after generate_delta_system:
///
/// ```no_run
/// use bevy::prelude::*;
/// use libmarathon::networking::{generate_delta_system, cleanup_skip_delta_markers_system};
///
/// App::new().add_systems(PostUpdate, (
/// generate_delta_system,
/// cleanup_skip_delta_markers_system,
/// ).chain());
/// ```
pub fn cleanup_skip_delta_markers_system(world: &mut World) {
// Use immediate removal (not deferred commands) to ensure markers are removed
// synchronously after generate_delta_system runs, not at the start of next frame
let entities_to_clean: Vec<Entity> = {
let mut query = world.query_filtered::<Entity, With<crate::networking::SkipNextDeltaGeneration>>();
query.iter(world).collect()
};
for entity in &entities_to_clean {
if let Ok(mut entity_mut) = world.get_entity_mut(*entity) {
entity_mut.remove::<crate::networking::SkipNextDeltaGeneration>();
}
}
if !entities_to_clean.is_empty() {
debug!(
"cleanup_skip_delta_markers_system: Removed markers from {} entities",
entities_to_clean.len()
);
}
}
#[cfg(test)]
mod tests {
use super::*;

View File

@@ -65,6 +65,33 @@ impl GossipBridge {
/// Send a message to the gossip network
pub fn send(&self, message: VersionedMessage) -> Result<()> {
// Diagnostic logging: track message type and nonce
let msg_type = match &message.message {
crate::networking::SyncMessage::EntityDelta { entity_id, .. } => {
format!("EntityDelta({})", entity_id)
}
crate::networking::SyncMessage::JoinRequest { node_id, .. } => {
format!("JoinRequest({})", node_id)
}
crate::networking::SyncMessage::FullState { entities, .. } => {
format!("FullState({} entities)", entities.len())
}
crate::networking::SyncMessage::SyncRequest { node_id, .. } => {
format!("SyncRequest({})", node_id)
}
crate::networking::SyncMessage::MissingDeltas { deltas } => {
format!("MissingDeltas({} ops)", deltas.len())
}
crate::networking::SyncMessage::Lock(lock_msg) => {
format!("Lock({:?})", lock_msg)
}
};
debug!(
"[GossipBridge::send] Node {} queuing message: {} (nonce: {})",
self.node_id, msg_type, message.nonce
);
self.outgoing
.lock()
.map_err(|e| NetworkingError::Gossip(format!("Failed to lock outgoing queue: {}", e)))?
@@ -97,6 +124,33 @@ impl GossipBridge {
/// Push a message to the incoming queue (for testing/integration)
pub fn push_incoming(&self, message: VersionedMessage) -> Result<()> {
// Diagnostic logging: track incoming message type
let msg_type = match &message.message {
crate::networking::SyncMessage::EntityDelta { entity_id, .. } => {
format!("EntityDelta({})", entity_id)
}
crate::networking::SyncMessage::JoinRequest { node_id, .. } => {
format!("JoinRequest({})", node_id)
}
crate::networking::SyncMessage::FullState { entities, .. } => {
format!("FullState({} entities)", entities.len())
}
crate::networking::SyncMessage::SyncRequest { node_id, .. } => {
format!("SyncRequest({})", node_id)
}
crate::networking::SyncMessage::MissingDeltas { deltas } => {
format!("MissingDeltas({} ops)", deltas.len())
}
crate::networking::SyncMessage::Lock(lock_msg) => {
format!("Lock({:?})", lock_msg)
}
};
debug!(
"[GossipBridge::push_incoming] Node {} received from network: {} (nonce: {})",
self.node_id, msg_type, message.nonce
);
self.incoming
.lock()
.map_err(|e| NetworkingError::Gossip(format!("Failed to lock incoming queue: {}", e)))?

View File

@@ -17,6 +17,7 @@ use crate::networking::{
GossipBridge,
NetworkedEntity,
SessionId,
Synced,
VectorClock,
blob_support::BlobStore,
delta_generation::NodeVectorClock,
@@ -129,8 +130,9 @@ pub fn build_full_state(
}
info!(
"Built FullState with {} entities for new peer",
entities.len()
"Built FullState with {} entities ({} total networked entities queried) for new peer",
entities.len(),
networked_entities.iter().count()
);
VersionedMessage::new(SyncMessage::FullState {
@@ -168,12 +170,17 @@ pub fn apply_full_state(
{
let mut node_clock = world.resource_mut::<NodeVectorClock>();
node_clock.clock.merge(&remote_clock);
info!("Vector clock after merge: {:?}", node_clock.clock);
}
let mut spawned_count = 0;
let mut tombstoned_count = 0;
// Spawn all entities and apply their state
for entity_state in entities {
// Handle deleted entities (tombstones)
if entity_state.is_deleted {
tombstoned_count += 1;
// Record tombstone
if let Some(mut registry) = world.get_resource_mut::<crate::networking::TombstoneRegistry>() {
registry.record_deletion(
@@ -185,20 +192,43 @@ pub fn apply_full_state(
continue;
}
// Spawn entity with NetworkedEntity and Persisted components
// This ensures entities received via FullState are persisted locally
let entity = world
.spawn((
NetworkedEntity::with_id(entity_state.entity_id, entity_state.owner_node_id),
crate::persistence::Persisted::with_id(entity_state.entity_id),
))
.id();
// Check if entity already exists in the map
let entity = {
let entity_map = world.resource::<NetworkEntityMap>();
entity_map.get_entity(entity_state.entity_id)
};
// Register in entity map
{
let mut entity_map = world.resource_mut::<NetworkEntityMap>();
entity_map.insert(entity_state.entity_id, entity);
}
let entity = match entity {
Some(existing_entity) => {
// Entity already exists - reuse it and update components
debug!(
"Entity {} already exists (local entity {:?}), updating components",
entity_state.entity_id, existing_entity
);
existing_entity
}
None => {
// Spawn new entity with NetworkedEntity, Persisted, and Synced components
// This ensures entities received via FullState are persisted locally and
// will auto-sync their Transform if one is added
let entity = world
.spawn((
NetworkedEntity::with_id(entity_state.entity_id, entity_state.owner_node_id),
crate::persistence::Persisted::with_id(entity_state.entity_id),
Synced,
))
.id();
// Register in entity map
{
let mut entity_map = world.resource_mut::<NetworkEntityMap>();
entity_map.insert(entity_state.entity_id, entity);
}
spawned_count += 1;
entity
}
};
let num_components = entity_state.components.len();
@@ -261,12 +291,31 @@ pub fn apply_full_state(
}
debug!(
"Spawned entity {:?} from FullState with {} components",
"Applied entity {:?} from FullState with {} components",
entity_state.entity_id, num_components
);
}
info!("FullState applied successfully");
info!(
"FullState applied successfully: spawned {} entities, skipped {} tombstones",
spawned_count, tombstoned_count
);
// Send SyncRequest to catch any deltas that arrived during FullState transfer
// This implements the "Final Sync" step from RFC 0004 (Session Lifecycle)
if let Some(bridge) = world.get_resource::<GossipBridge>() {
let node_clock = world.resource::<NodeVectorClock>();
let request = crate::networking::operation_log::build_sync_request(
node_clock.node_id,
node_clock.clock.clone(),
);
if let Err(e) = bridge.send(request) {
error!("Failed to send post-FullState SyncRequest: {}", e);
} else {
info!("Sent SyncRequest to catch deltas that arrived during FullState transfer");
}
}
}
/// System to handle JoinRequest messages

View File

@@ -64,8 +64,32 @@ pub fn message_dispatcher_system(world: &mut World) {
bridge.drain_incoming()
};
if !messages.is_empty() {
let node_id = world.resource::<GossipBridge>().node_id;
info!(
"[message_dispatcher] Node {} processing {} messages",
node_id,
messages.len()
);
}
// Dispatch each message (bridge is no longer borrowed)
for message in messages {
let node_id = world.resource::<GossipBridge>().node_id;
let msg_type = match &message.message {
SyncMessage::EntityDelta { entity_id, .. } => format!("EntityDelta({})", entity_id),
SyncMessage::JoinRequest { node_id, .. } => format!("JoinRequest({})", node_id),
SyncMessage::FullState { entities, .. } => format!("FullState({} entities)", entities.len()),
SyncMessage::SyncRequest { node_id, .. } => format!("SyncRequest({})", node_id),
SyncMessage::MissingDeltas { deltas } => format!("MissingDeltas({} ops)", deltas.len()),
SyncMessage::Lock(_) => "Lock".to_string(),
};
debug!(
"[message_dispatcher] Node {} dispatching: {} (nonce: {})",
node_id, msg_type, message.nonce
);
dispatch_message(world, message);
}
@@ -256,6 +280,18 @@ fn dispatch_message(world: &mut World, message: crate::networking::VersionedMess
} => {
debug!("Received SyncRequest from node {}", requesting_node);
// Merge the requesting node's vector clock into ours
// This ensures we learn about their latest sequence number
{
let mut node_clock = world.resource_mut::<NodeVectorClock>();
node_clock.clock.merge(&their_clock);
debug!(
"Merged SyncRequest clock from node {} (seq: {})",
requesting_node,
their_clock.get(requesting_node)
);
}
if let Some(op_log) = world.get_resource::<OperationLog>() {
// Find operations they're missing
let missing_deltas = op_log.get_all_operations_newer_than(&their_clock);
@@ -480,8 +516,10 @@ fn build_full_state_from_data(
}
info!(
"Built FullState with {} entities for new peer",
entities.len()
"Built FullState with {} entities ({} total queried, {} tombstoned) for new peer",
entities.len(),
networked_entities.len(),
networked_entities.len() - entities.len()
);
crate::networking::VersionedMessage::new(SyncMessage::FullState {

View File

@@ -27,12 +27,13 @@ pub struct VersionedMessage {
/// The actual sync message
pub message: SyncMessage,
/// Timestamp (nanos since UNIX epoch) to make messages unique
/// Nonce for selective deduplication control
///
/// This prevents iroh-gossip from deduplicating identical messages sent at different times.
/// For example, releasing and re-acquiring a lock sends identical LockRequest messages,
/// but they need to be treated as separate events.
pub timestamp_nanos: u64,
/// - For Lock messages: Unique nonce (counter + timestamp hash) to prevent
/// iroh-gossip deduplication, allowing repeated heartbeats.
/// - For other messages: Constant nonce (0) to enable content-based deduplication
/// by iroh-gossip, preventing feedback loops.
pub nonce: u32,
}
impl VersionedMessage {
@@ -40,18 +41,44 @@ impl VersionedMessage {
pub const CURRENT_VERSION: u32 = 1;
/// Create a new versioned message with the current protocol version
///
/// For Lock messages: Generates a unique nonce to prevent deduplication, since
/// lock heartbeats need to be sent repeatedly even with identical content.
///
/// For other messages: Uses a constant nonce (0) to enable iroh-gossip's
/// content-based deduplication. This prevents feedback loops where the same
/// EntityDelta gets broadcast repeatedly.
pub fn new(message: SyncMessage) -> Self {
use std::time::{SystemTime, UNIX_EPOCH};
// Only generate unique nonces for Lock messages (heartbeats need to bypass dedup)
let nonce = if matches!(message, SyncMessage::Lock(_)) {
use std::hash::Hasher;
use std::sync::atomic::{AtomicU32, Ordering};
use std::time::{SystemTime, UNIX_EPOCH};
let timestamp_nanos = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_nanos() as u64;
// Per-node rolling counter for sequential uniqueness
static COUNTER: AtomicU32 = AtomicU32::new(0);
let counter = COUNTER.fetch_add(1, Ordering::Relaxed);
// Millisecond timestamp for temporal uniqueness
let timestamp_millis = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u32;
// Hash counter + timestamp for final nonce
let mut hasher = rustc_hash::FxHasher::default();
hasher.write_u32(counter);
hasher.write_u32(timestamp_millis);
hasher.finish() as u32
} else {
// Use constant nonce for all other messages to enable content deduplication
0
};
Self {
version: Self::CURRENT_VERSION,
message,
timestamp_nanos,
nonce,
}
}
}

View File

@@ -54,6 +54,7 @@ mod plugin;
mod rga;
mod session;
mod session_lifecycle;
mod session_sync;
mod sync_component;
mod tombstones;
mod vector_clock;
@@ -81,6 +82,7 @@ pub use plugin::*;
pub use rga::*;
pub use session::*;
pub use session_lifecycle::*;
pub use session_sync::*;
pub use sync_component::*;
pub use tombstones::*;
pub use vector_clock::*;

View File

@@ -67,13 +67,12 @@ pub fn build_entity_operations(
};
// Build the operation
let mut clock = vector_clock.clone();
clock.increment(node_id);
// Use the vector_clock as-is - it's already been incremented by the caller (delta_generation.rs:116)
// All operations in the same EntityDelta share the same vector clock (same logical timestamp)
operations.push(ComponentOp::Set {
discriminant,
data,
vector_clock: clock.clone(),
vector_clock: vector_clock.clone(),
});
debug!(" ✓ Added Set operation for discriminant {}", discriminant);
@@ -86,3 +85,67 @@ pub fn build_entity_operations(
);
operations
}
#[cfg(test)]
mod tests {
use super::*;
use bevy::prelude::*;
use crate::networking::NetworkedEntity;
use crate::persistence::{ComponentTypeRegistry, Persisted};
#[test]
fn test_operations_use_passed_vector_clock_without_extra_increment() {
// Setup: Create a minimal world with an entity
let mut world = World::new();
let node_id = uuid::Uuid::new_v4();
// Use the global registry (Transform is already registered via inventory)
let registry = ComponentTypeRegistry::init();
// Create test entity with Transform
let entity_id = uuid::Uuid::new_v4();
let entity = world.spawn((
NetworkedEntity::with_id(entity_id, node_id),
Persisted::with_id(entity_id),
Transform::from_xyz(1.0, 2.0, 3.0),
)).id();
// Create a vector clock that's already been ticked
let mut vector_clock = VectorClock::new();
vector_clock.increment(node_id); // Simulate the tick that delta_generation does
let expected_clock = vector_clock.clone();
// Build operations
let operations = build_entity_operations(
entity,
&world,
node_id,
vector_clock.clone(),
&registry,
None,
);
// Verify: All operations should use the EXACT clock that was passed in
assert!(!operations.is_empty(), "Should have created at least one operation");
for op in &operations {
if let ComponentOp::Set { vector_clock: op_clock, .. } = op {
assert_eq!(
*op_clock, expected_clock,
"Operation clock should match the input clock exactly. \
The bug was that operation_builder would increment the clock again, \
causing EntityDelta.vector_clock and ComponentOp.vector_clock to be misaligned."
);
// Verify the sequence number matches
let op_seq = op_clock.get(node_id);
let expected_seq = expected_clock.get(node_id);
assert_eq!(
op_seq, expected_seq,
"Operation sequence should be {} (same as input clock), but got {}",
expected_seq, op_seq
);
}
}
}
}

View File

@@ -351,7 +351,7 @@ pub fn handle_missing_deltas_system(world: &mut World) {
/// adaptive sync intervals based on network conditions.
pub fn periodic_sync_system(
bridge: Option<Res<GossipBridge>>,
node_clock: Res<NodeVectorClock>,
mut node_clock: ResMut<NodeVectorClock>,
time: Res<Time>,
mut last_sync: Local<f32>,
) {
@@ -369,6 +369,9 @@ pub fn periodic_sync_system(
debug!("Sending periodic SyncRequest for anti-entropy");
// Increment clock for sending SyncRequest (this is a local operation)
node_clock.tick();
let request = build_sync_request(node_clock.node_id, node_clock.clock.clone());
if let Err(e) = bridge.send(request) {
error!("Failed to send SyncRequest: {}", e);

View File

@@ -37,6 +37,7 @@ use crate::networking::{
components::{NetworkedEntity, NetworkedTransform},
delta_generation::{
NodeVectorClock,
cleanup_skip_delta_markers_system,
generate_delta_system,
},
entity_map::{
@@ -53,6 +54,10 @@ use crate::networking::{
release_locks_on_deselection_system,
},
message_dispatcher::message_dispatcher_system,
messages::{
SyncMessage,
VersionedMessage,
},
operation_log::{
OperationLog,
periodic_sync_system,
@@ -62,13 +67,21 @@ use crate::networking::{
initialize_session_system,
save_session_on_shutdown_system,
},
session_sync::{
JoinRequestSent,
send_join_request_once_system,
transition_session_state_system,
},
sync_component::Synced,
tombstones::{
TombstoneRegistry,
garbage_collect_tombstones_system,
handle_local_deletions_system,
},
vector_clock::NodeId,
vector_clock::{
NodeId,
VectorClock,
},
};
/// Configuration for the networking plugin
@@ -179,7 +192,17 @@ fn auto_insert_sync_components(
entity_commands.insert(NetworkedTransform);
}
debug!("Auto-inserted sync components for entity {:?} (UUID: {})", entity, entity_id);
info!(
"[auto_insert_sync] Entity {:?} → NetworkedEntity({}), Persisted, {} auto-added",
entity,
entity_id,
if transforms.contains(entity) { "NetworkedTransform" } else { "no transform" }
);
}
let count = query.iter().count();
if count > 0 {
debug!("[auto_insert_sync] Processed {} newly synced entities this frame", count);
}
}
@@ -215,7 +238,7 @@ fn auto_insert_networked_transform(
fn trigger_sync_on_connect(
mut has_synced: Local<bool>,
bridge: Res<GossipBridge>,
node_clock: Res<NodeVectorClock>,
mut node_clock: ResMut<NodeVectorClock>,
operation_log: Res<OperationLog>,
) {
if *has_synced {
@@ -224,12 +247,39 @@ fn trigger_sync_on_connect(
let op_count = operation_log.total_operations();
debug!(
"Going online: triggering anti-entropy sync to broadcast {} offline operations",
"Going online: broadcasting {} offline operations to peers",
op_count
);
// Send a SyncRequest to trigger anti-entropy
// This will cause the message_dispatcher to respond with all operations from our log
// Broadcast all our stored operations to peers
// Use an empty vector clock to get ALL operations (not just newer ones)
let all_operations = operation_log.get_all_operations_newer_than(&VectorClock::new());
for delta in all_operations {
// Wrap in VersionedMessage
let message = VersionedMessage::new(SyncMessage::EntityDelta {
entity_id: delta.entity_id,
node_id: delta.node_id,
vector_clock: delta.vector_clock.clone(),
operations: delta.operations.clone(),
});
// Broadcast to peers
if let Err(e) = bridge.send(message) {
error!("Failed to broadcast offline EntityDelta: {}", e);
} else {
debug!(
"Broadcast offline EntityDelta for entity {:?} with {} operations",
delta.entity_id,
delta.operations.len()
);
}
}
// Also send a SyncRequest to get any operations we're missing from peers
// Increment clock for sending SyncRequest (this is a local operation)
node_clock.tick();
let request = crate::networking::operation_log::build_sync_request(
node_clock.node_id,
node_clock.clock.clone(),
@@ -238,7 +288,7 @@ fn trigger_sync_on_connect(
if let Err(e) = bridge.send(request) {
error!("Failed to send SyncRequest on connect: {}", e);
} else {
debug!("Sent SyncRequest to trigger anti-entropy sync");
debug!("Sent SyncRequest to get missing operations from peers");
}
*has_synced = true;
@@ -267,7 +317,10 @@ fn trigger_sync_on_connect(
/// ## Update
/// - Auto-detect Transform changes
/// - Handle local entity deletions
/// - Acquire locks when entities are selected
/// - Release locks when entities are deselected
/// - Send JoinRequest when networking starts (one-shot)
/// - Transition session state (Joining → Active)
///
/// ## PostUpdate
/// - Generate and broadcast EntityDelta for changed entities
@@ -289,6 +342,7 @@ fn trigger_sync_on_connect(
/// - `OperationLog` - Operation log for anti-entropy
/// - `TombstoneRegistry` - Tombstone tracking for deletions
/// - `EntityLockRegistry` - Entity lock registry with heartbeat tracking
/// - `JoinRequestSent` - Tracks if JoinRequest has been sent (session sync)
///
/// # Example
///
@@ -338,6 +392,7 @@ impl Plugin for NetworkingPlugin {
.insert_resource(OperationLog::new())
.insert_resource(TombstoneRegistry::new())
.insert_resource(EntityLockRegistry::new())
.insert_resource(JoinRequestSent::default())
.insert_resource(crate::networking::ComponentVectorClocks::new())
.insert_resource(crate::networking::LocalSelection::new());
@@ -374,6 +429,10 @@ impl Plugin for NetworkingPlugin {
acquire_locks_on_selection_system,
// Release locks when entities are deselected
release_locks_on_deselection_system,
// Session sync: send JoinRequest when networking starts
send_join_request_once_system,
// Session sync: transition session state based on sync completion
transition_session_state_system,
),
);
@@ -388,8 +447,10 @@ impl Plugin for NetworkingPlugin {
app.add_systems(
PostUpdate,
(
// Generate deltas for changed entities
generate_delta_system,
// Generate deltas for changed entities, then cleanup markers
// CRITICAL: cleanup_skip_delta_markers_system must run immediately after
// generate_delta_system to remove SkipNextDeltaGeneration markers
(generate_delta_system, cleanup_skip_delta_markers_system).chain(),
// Periodic anti-entropy sync
periodic_sync_system,
// Maintenance tasks

View File

@@ -0,0 +1,399 @@
//! Session synchronization systems
//!
//! This module handles automatic session lifecycle:
//! - Sending JoinRequest when networking starts
//! - Transitioning session state when receiving FullState
//! - Persisting session state changes
use bevy::prelude::*;
use crate::networking::{
CurrentSession,
GossipBridge,
JoinType,
NodeVectorClock,
SessionState,
build_join_request,
plugin::SessionSecret,
};
/// System to send JoinRequest when networking comes online
///
/// This system detects when GossipBridge is added and sends a JoinRequest
/// to discover peers and sync state. It only runs once when networking starts.
///
/// Add to your app as a Startup system AFTER GossipBridge is created:
/// ```no_run
/// use bevy::prelude::*;
/// use libmarathon::networking::send_join_request_on_connect_system;
///
/// App::new()
/// .add_systems(Update, send_join_request_on_connect_system);
/// ```
pub fn send_join_request_on_connect_system(
current_session: ResMut<CurrentSession>,
node_clock: Res<NodeVectorClock>,
bridge: Option<Res<GossipBridge>>,
session_secret: Option<Res<SessionSecret>>,
) {
// Only run when bridge exists and session is in Joining state
let Some(bridge) = bridge else {
return;
};
// Only send JoinRequest when in Joining state
if current_session.session.state != SessionState::Joining {
return;
}
let node_id = node_clock.node_id;
let session_id = current_session.session.id.clone();
// Determine join type based on whether we have a last known clock
let join_type = if current_session.last_known_clock.node_count() > 0 {
// Rejoin - we have a previous clock snapshot
JoinType::Rejoin {
last_active: current_session.session.last_active,
entity_count: current_session.session.entity_count,
}
} else {
// Fresh join - no previous state
JoinType::Fresh
};
// Get session secret if configured
let secret = session_secret.as_ref().map(|s| bytes::Bytes::from(s.as_bytes().to_vec()));
// Build JoinRequest
let last_known_clock = if current_session.last_known_clock.node_count() > 0 {
Some(current_session.last_known_clock.clone())
} else {
None
};
let request = build_join_request(
node_id,
session_id.clone(),
secret,
last_known_clock,
join_type.clone(),
);
// Send JoinRequest
match bridge.send(request) {
Ok(()) => {
info!(
"Sent JoinRequest for session {} (type: {:?})",
session_id.to_code(),
join_type
);
}
Err(e) => {
error!("Failed to send JoinRequest: {}", e);
}
}
}
/// System to transition session to Active when sync completes
///
/// This system monitors for session state changes and handles transitions:
/// - Joining → Active: When we receive FullState or initial sync completes
///
/// This is an exclusive system to allow world queries.
///
/// Add to your app as an Update system:
/// ```no_run
/// use bevy::prelude::*;
/// use libmarathon::networking::transition_session_state_system;
///
/// App::new()
/// .add_systems(Update, transition_session_state_system);
/// ```
pub fn transition_session_state_system(world: &mut World) {
// Only process state transitions when we have networking
if world.get_resource::<GossipBridge>().is_none() {
return;
}
// Get values we need (clone to avoid holding references)
let (session_state, session_id, join_request_sent, clock_node_count) = {
let Some(current_session) = world.get_resource::<CurrentSession>() else {
return;
};
let Some(node_clock) = world.get_resource::<NodeVectorClock>() else {
return;
};
let Some(join_sent) = world.get_resource::<JoinRequestSent>() else {
return;
};
(
current_session.session.state,
current_session.session.id.clone(),
join_sent.sent,
node_clock.clock.node_count(),
)
};
// Use a non-send resource for the timer to ensure it's stored per-world
#[derive(Default, Resource)]
struct JoinTimer(Option<std::time::Instant>);
if !world.contains_resource::<JoinTimer>() {
world.insert_resource(JoinTimer::default());
}
match session_state {
SessionState::Joining => {
// Start timer when JoinRequest is sent
{
let mut timer = world.resource_mut::<JoinTimer>();
if join_request_sent && timer.0.is_none() {
timer.0 = Some(std::time::Instant::now());
debug!("Started join timer - will transition to Active after timeout if no peers respond");
}
}
// Count entities in world
let entity_count = world
.query::<&crate::networking::NetworkedEntity>()
.iter(world)
.count();
// Transition to Active if:
// 1. We have received entities (entity_count > 0) AND have multiple nodes in clock
// This ensures FullState was received and applied, OR
// 2. We've waited 3 seconds and either:
// a) We have entities (sync completed), OR
// b) No entities exist yet (we're the first node in session)
let should_transition = if entity_count > 0 && clock_node_count > 1 {
// We've received and applied FullState with entities
info!(
"Session {} transitioning to Active (received {} entities from {} peers)",
session_id.to_code(),
entity_count,
clock_node_count - 1
);
true
} else {
let timer = world.resource::<JoinTimer>();
if let Some(start_time) = timer.0 {
// Check if 3 seconds have passed since JoinRequest
if start_time.elapsed().as_secs() >= 3 {
if entity_count > 0 {
info!(
"Session {} transitioning to Active (timeout reached, have {} entities)",
session_id.to_code(),
entity_count
);
} else {
info!(
"Session {} transitioning to Active (timeout - no peers or empty session, first node)",
session_id.to_code()
);
}
true
} else {
false
}
} else {
false
}
};
if should_transition {
let mut current_session = world.resource_mut::<CurrentSession>();
current_session.transition_to(SessionState::Active);
// Reset timer
let mut timer = world.resource_mut::<JoinTimer>();
timer.0 = None;
}
}
SessionState::Active => {
// Already active, reset timer
if let Some(mut timer) = world.get_resource_mut::<JoinTimer>() {
timer.0 = None;
}
}
SessionState::Disconnected => {
// If we reconnected (bridge exists), transition to Joining
// This is handled by the networking startup logic
if let Some(mut timer) = world.get_resource_mut::<JoinTimer>() {
timer.0 = None;
}
}
SessionState::Created | SessionState::Left => {
// Should not be in these states when networking is active
if let Some(mut timer) = world.get_resource_mut::<JoinTimer>() {
timer.0 = None;
}
}
}
}
/// Resource to track if we've sent the initial JoinRequest
///
/// This prevents sending multiple JoinRequests on subsequent frame updates
#[derive(Resource)]
pub struct JoinRequestSent {
pub sent: bool,
/// Timer to wait for peers before sending JoinRequest
/// If no peers connect after 1 second, send anyway (we're first node)
pub wait_started: Option<std::time::Instant>,
}
impl Default for JoinRequestSent {
fn default() -> Self {
Self {
sent: false,
wait_started: None,
}
}
}
/// One-shot system to send JoinRequest only once when networking starts
///
/// CRITICAL: Waits for at least one peer to connect via pkarr+DHT before sending
/// JoinRequest. This prevents broadcasting to an empty network.
///
/// Timing:
/// - If peers connect: Send JoinRequest immediately (they'll receive it)
/// - If no peers after 1 second: Send anyway (we're probably first node in session)
pub fn send_join_request_once_system(
mut join_sent: ResMut<JoinRequestSent>,
current_session: ResMut<CurrentSession>,
node_clock: Res<NodeVectorClock>,
bridge: Option<Res<GossipBridge>>,
session_secret: Option<Res<SessionSecret>>,
) {
// Skip if already sent
if join_sent.sent {
return;
}
// Only run when bridge exists and session is in Joining state
let Some(bridge) = bridge else {
return;
};
if current_session.session.state != SessionState::Joining {
return;
}
// Start wait timer when conditions are met
if join_sent.wait_started.is_none() {
join_sent.wait_started = Some(std::time::Instant::now());
debug!("Started waiting for peers before sending JoinRequest (max 1 second)");
}
// Check if we have any peers connected (node_count > 1 means we + at least 1 peer)
let peer_count = node_clock.clock.node_count().saturating_sub(1);
let wait_elapsed = join_sent.wait_started.unwrap().elapsed();
// Send JoinRequest if:
// 1. At least one peer has connected (they'll receive our JoinRequest), OR
// 2. We've waited 1 second with no peers (we're probably the first node)
let should_send = if peer_count > 0 {
debug!(
"Sending JoinRequest now - {} peer(s) connected (waited {:?})",
peer_count, wait_elapsed
);
true
} else if wait_elapsed.as_millis() >= 1000 {
debug!(
"Sending JoinRequest after timeout - no peers connected, assuming first node (waited {:?})",
wait_elapsed
);
true
} else {
// Still waiting for peers
false
};
if !should_send {
return;
}
let node_id = node_clock.node_id;
let session_id = current_session.session.id.clone();
// Determine join type
let join_type = if current_session.last_known_clock.node_count() > 0 {
JoinType::Rejoin {
last_active: current_session.session.last_active,
entity_count: current_session.session.entity_count,
}
} else {
JoinType::Fresh
};
// Get session secret if configured
let secret = session_secret.as_ref().map(|s| bytes::Bytes::from(s.as_bytes().to_vec()));
// Build JoinRequest
let last_known_clock = if current_session.last_known_clock.node_count() > 0 {
Some(current_session.last_known_clock.clone())
} else {
None
};
let request = build_join_request(
node_id,
session_id.clone(),
secret,
last_known_clock,
join_type.clone(),
);
// Send JoinRequest
match bridge.send(request) {
Ok(()) => {
info!(
"Sent JoinRequest for session {} (type: {:?})",
session_id.to_code(),
join_type
);
join_sent.sent = true;
// Transition to Active immediately if we're the first node
// (Otherwise we'll wait for FullState)
// Actually, let's always wait a bit for potential peers
}
Err(e) => {
error!("Failed to send JoinRequest: {}", e);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::networking::{Session, SessionId, VectorClock};
#[test]
fn test_join_request_sent_tracking() {
let mut sent = JoinRequestSent::default();
assert!(!sent.sent);
sent.sent = true;
assert!(sent.sent);
}
#[test]
fn test_session_state_transitions() {
let session_id = SessionId::new();
let session = Session::new(session_id);
let mut current = CurrentSession::new(session, VectorClock::new());
assert_eq!(current.session.state, SessionState::Created);
current.transition_to(SessionState::Joining);
assert_eq!(current.session.state, SessionState::Joining);
current.transition_to(SessionState::Active);
assert_eq!(current.session.state, SessionState::Active);
}
}

View File

@@ -219,6 +219,7 @@ pub fn handle_local_deletions_system(
mut tombstone_registry: ResMut<TombstoneRegistry>,
mut operation_log: Option<ResMut<crate::networking::OperationLog>>,
bridge: Option<Res<GossipBridge>>,
mut write_buffer: Option<ResMut<crate::persistence::WriteBufferResource>>,
) {
for (entity, networked) in query.iter() {
// Increment clock for deletion
@@ -231,13 +232,34 @@ pub fn handle_local_deletions_system(
)
.delete();
// Record tombstone
// Record tombstone in memory
tombstone_registry.record_deletion(
networked.network_id,
node_clock.node_id,
node_clock.clock.clone(),
);
// Persist tombstone to database
if let Some(ref mut buffer) = write_buffer {
// Serialize the vector clock using rkyv
match rkyv::to_bytes::<rkyv::rancor::Failure>(&node_clock.clock).map(|b| b.to_vec()) {
Ok(clock_bytes) => {
if let Err(e) = buffer.add(crate::persistence::PersistenceOp::RecordTombstone {
entity_id: networked.network_id,
deleting_node: node_clock.node_id,
deletion_clock: bytes::Bytes::from(clock_bytes),
}) {
error!("Failed to persist tombstone for entity {:?}: {}", networked.network_id, e);
} else {
debug!("Persisted tombstone for entity {:?} to database", networked.network_id);
}
},
Err(e) => {
error!("Failed to serialize vector clock for tombstone persistence: {:?}", e);
}
}
}
// Create EntityDelta with Delete operation
let delta = crate::networking::EntityDelta::new(
networked.network_id,

View File

@@ -253,6 +253,24 @@ pub fn flush_to_sqlite(ops: &[PersistenceOp], conn: &mut Connection) -> Result<u
)?;
count += 1;
},
| PersistenceOp::RecordTombstone {
entity_id,
deleting_node,
deletion_clock,
} => {
tx.execute(
"INSERT OR REPLACE INTO tombstones (entity_id, deleting_node, deletion_clock, created_at)
VALUES (?1, ?2, ?3, ?4)",
rusqlite::params![
entity_id.as_bytes(),
&deleting_node.to_string(),
deletion_clock.as_ref(),
current_timestamp(),
],
)?;
count += 1;
},
}
}
@@ -977,6 +995,117 @@ pub fn rehydrate_all_entities(world: &mut bevy::prelude::World) -> Result<()> {
Ok(())
}
/// Load all tombstones from the database into the TombstoneRegistry
///
/// This function is called during startup to restore deletion tombstones
/// from the database, preventing resurrection of deleted entities after
/// application restart.
///
/// # Arguments
///
/// * `world` - The Bevy world containing the TombstoneRegistry resource
///
/// # Errors
///
/// Returns an error if:
/// - Database connection fails
/// - Tombstone loading fails
/// - Vector clock deserialization fails
pub fn load_tombstones(world: &mut bevy::prelude::World) -> Result<()> {
use bevy::prelude::*;
// Get database connection and load tombstones
let tombstone_rows = {
let db_res = world.resource::<crate::persistence::PersistenceDb>();
let conn = db_res
.conn
.lock()
.map_err(|e| PersistenceError::Other(format!("Failed to lock database: {}", e)))?;
// Load all tombstones from database
let mut stmt = conn.prepare(
"SELECT entity_id, deleting_node, deletion_clock, created_at
FROM tombstones
ORDER BY created_at ASC",
)?;
let rows = stmt.query_map([], |row| {
let entity_id_bytes: std::borrow::Cow<'_, [u8]> = row.get(0)?;
let mut entity_id_array = [0u8; 16];
entity_id_array.copy_from_slice(&entity_id_bytes);
let entity_id = uuid::Uuid::from_bytes(entity_id_array);
let deleting_node_str: String = row.get(1)?;
let deletion_clock_bytes: std::borrow::Cow<'_, [u8]> = row.get(2)?;
let created_at_ts: i64 = row.get(3)?;
Ok((
entity_id,
deleting_node_str,
deletion_clock_bytes.to_vec(),
created_at_ts,
))
})?;
rows.collect::<std::result::Result<Vec<_>, _>>()?
};
info!("Loaded {} tombstones from database", tombstone_rows.len());
if tombstone_rows.is_empty() {
info!("No tombstones to restore");
return Ok(());
}
// Restore tombstones into TombstoneRegistry
let mut loaded_count = 0;
let mut failed_count = 0;
{
let mut tombstone_registry = world.resource_mut::<crate::networking::TombstoneRegistry>();
for (entity_id, deleting_node_str, deletion_clock_bytes, _created_at_ts) in tombstone_rows {
// Parse node ID
let deleting_node = match uuid::Uuid::parse_str(&deleting_node_str) {
Ok(id) => id,
Err(e) => {
error!("Failed to parse deleting_node UUID for entity {:?}: {}", entity_id, e);
failed_count += 1;
continue;
}
};
// Deserialize vector clock
let deletion_clock = match rkyv::from_bytes::<crate::networking::VectorClock, rkyv::rancor::Failure>(&deletion_clock_bytes) {
Ok(clock) => clock,
Err(e) => {
error!("Failed to deserialize vector clock for tombstone {:?}: {:?}", entity_id, e);
failed_count += 1;
continue;
}
};
// Record the tombstone in the registry
tombstone_registry.record_deletion(entity_id, deleting_node, deletion_clock);
loaded_count += 1;
}
}
info!(
"Tombstone restoration complete: {} succeeded, {} failed",
loaded_count, failed_count
);
if failed_count > 0 {
warn!(
"{} tombstones failed to restore - check logs for details",
failed_count
);
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;

View File

@@ -29,6 +29,11 @@ pub const MIGRATIONS: &[Migration] = &[
name: "sessions",
up: include_str!("migrations/004_sessions.sql"),
},
Migration {
version: 5,
name: "tombstones",
up: include_str!("migrations/005_tombstones.sql"),
},
];
/// Initialize the migrations table

View File

@@ -0,0 +1,13 @@
-- Migration 005: Add tombstones table
-- Stores deletion tombstones to prevent resurrection of deleted entities
CREATE TABLE IF NOT EXISTS tombstones (
entity_id BLOB PRIMARY KEY,
deleting_node TEXT NOT NULL,
deletion_clock BLOB NOT NULL,
created_at INTEGER NOT NULL
);
-- Index for querying tombstones by session (for future session scoping)
CREATE INDEX IF NOT EXISTS idx_tombstones_created
ON tombstones(created_at DESC);

View File

@@ -92,10 +92,11 @@ impl Plugin for PersistencePlugin {
.init_resource::<ComponentTypeRegistryResource>();
// Add startup systems
// First initialize the database, then rehydrate entities
// First initialize the database, then rehydrate entities and tombstones
app.add_systems(Startup, (
persistence_startup_system,
rehydrate_entities_system,
load_tombstones_system,
).chain());
// Add systems in the appropriate schedule
@@ -168,7 +169,43 @@ fn persistence_startup_system(db: Res<PersistenceDb>, mut metrics: ResMut<Persis
/// This system runs after `persistence_startup_system` and loads all entities
/// from SQLite, deserializing and spawning them into the Bevy world with all
/// their components.
///
/// **Important**: Only rehydrates entities when rejoining an existing session.
/// New sessions start with 0 entities to avoid loading entities from previous
/// sessions.
fn rehydrate_entities_system(world: &mut World) {
// Check if we're rejoining an existing session
let should_rehydrate = {
let current_session = world.get_resource::<crate::networking::CurrentSession>();
match current_session {
Some(session) => {
// Only rehydrate if we have a last_known_clock (indicates we're rejoining)
let is_rejoin = session.last_known_clock.node_count() > 0;
if is_rejoin {
info!(
"Rejoining session {} - will rehydrate persisted entities",
session.session.id.to_code()
);
} else {
info!(
"New session {} - starting with 0 entities",
session.session.id.to_code()
);
}
is_rejoin
}
None => {
warn!("No CurrentSession found - skipping entity rehydration");
false
}
}
};
if !should_rehydrate {
info!("Skipping entity rehydration for new session");
return;
}
if let Err(e) = crate::persistence::database::rehydrate_all_entities(world) {
error!("Failed to rehydrate entities from database: {}", e);
} else {
@@ -176,6 +213,19 @@ fn rehydrate_entities_system(world: &mut World) {
}
}
/// Exclusive startup system to load tombstones from database
///
/// This system runs after `rehydrate_entities_system` and loads all tombstones
/// from SQLite, deserializing them into the TombstoneRegistry to prevent
/// resurrection of deleted entities.
fn load_tombstones_system(world: &mut World) {
if let Err(e) = crate::persistence::database::load_tombstones(world) {
error!("Failed to load tombstones from database: {}", e);
} else {
info!("Successfully loaded tombstones from database");
}
}
/// System to collect dirty entities using Bevy's change detection
///
/// This system tracks changes to the `Persisted` component. When `Persisted` is

View File

@@ -126,6 +126,13 @@ pub enum PersistenceOp {
entity_id: EntityId,
component_type: String,
},
/// Record a tombstone for a deleted entity
RecordTombstone {
entity_id: EntityId,
deleting_node: NodeId,
deletion_clock: bytes::Bytes, // Serialized VectorClock
},
}
impl PersistenceOp {