From 7d24abf1137a3b0f0a4537aaaa3e33c91b3d4acf Mon Sep 17 00:00:00 2001 From: Sienna Meridian Satterwhite Date: Wed, 17 Dec 2025 21:05:37 +0000 Subject: [PATCH] added methods for #128 * migrated to `bytes` to ensure zero-copy Signed-off-by: Sienna Meridian Satterwhite --- .../libmarathon/src/persistence/database.rs | 368 +++++++++++++++++- crates/libmarathon/src/persistence/plugin.rs | 21 +- crates/libmarathon/src/persistence/systems.rs | 2 +- .../src/persistence/type_registry.rs | 6 +- crates/libmarathon/src/persistence/types.rs | 38 +- 5 files changed, 407 insertions(+), 28 deletions(-) diff --git a/crates/libmarathon/src/persistence/database.rs b/crates/libmarathon/src/persistence/database.rs index c0aef8f..351f800 100644 --- a/crates/libmarathon/src/persistence/database.rs +++ b/crates/libmarathon/src/persistence/database.rs @@ -201,7 +201,7 @@ pub fn flush_to_sqlite(ops: &[PersistenceOp], conn: &mut Connection) -> Result Result, + pub updated_at: chrono::DateTime, + pub components: Vec, +} + +/// Loaded component data from database +#[derive(Debug)] +pub struct LoadedComponent { + pub component_type: String, + pub data: bytes::Bytes, +} + +/// Load all components for a single entity from the database +pub fn load_entity_components( + conn: &Connection, + entity_id: uuid::Uuid, +) -> Result> { + let mut stmt = conn.prepare( + "SELECT component_type, data + FROM components + WHERE entity_id = ?1", + )?; + + let components: Vec = stmt + .query_map([entity_id.as_bytes()], |row| { + let data_vec: Vec = row.get(1)?; + Ok(LoadedComponent { + component_type: row.get(0)?, + data: bytes::Bytes::from(data_vec), + }) + })? + .collect::, _>>()?; + + Ok(components) +} + +/// Load a single entity by network ID from the database +/// +/// Returns None if the entity doesn't exist. +pub fn load_entity_by_network_id( + conn: &Connection, + network_id: uuid::Uuid, +) -> Result> { + // Load entity metadata + let entity_data = conn + .query_row( + "SELECT id, entity_type, created_at, updated_at + FROM entities + WHERE id = ?1", + [network_id.as_bytes()], + |row| { + let id_bytes: Vec = row.get(0)?; + let mut id_array = [0u8; 16]; + id_array.copy_from_slice(&id_bytes); + let id = uuid::Uuid::from_bytes(id_array); + + Ok(( + id, + row.get::<_, String>(1)?, + row.get::<_, i64>(2)?, + row.get::<_, i64>(3)?, + )) + }, + ) + .optional()?; + + let Some((id, entity_type, created_at_ts, updated_at_ts)) = entity_data else { + return Ok(None); + }; + + // Load all components for this entity + let components = load_entity_components(conn, id)?; + + Ok(Some(LoadedEntity { + id, + entity_type, + created_at: chrono::DateTime::from_timestamp(created_at_ts, 0) + .unwrap_or_else(chrono::Utc::now), + updated_at: chrono::DateTime::from_timestamp(updated_at_ts, 0) + .unwrap_or_else(chrono::Utc::now), + components, + })) +} + +/// Load all entities from the database +/// +/// This loads all entity metadata and their components. +/// Used during startup to rehydrate the game state. +pub fn load_all_entities(conn: &Connection) -> Result> { + let mut stmt = conn.prepare( + "SELECT id, entity_type, created_at, updated_at + FROM entities + ORDER BY created_at ASC", + )?; + + let entity_rows = stmt.query_map([], |row| { + let id_bytes: Vec = row.get(0)?; + let mut id_array = [0u8; 16]; + id_array.copy_from_slice(&id_bytes); + let id = uuid::Uuid::from_bytes(id_array); + + Ok(( + id, + row.get::<_, String>(1)?, + row.get::<_, i64>(2)?, + row.get::<_, i64>(3)?, + )) + })?; + + let mut entities = Vec::new(); + + for row in entity_rows { + let (id, entity_type, created_at_ts, updated_at_ts) = row?; + + // Load all components for this entity + let components = load_entity_components(conn, id)?; + + entities.push(LoadedEntity { + id, + entity_type, + created_at: chrono::DateTime::from_timestamp(created_at_ts, 0) + .unwrap_or_else(chrono::Utc::now), + updated_at: chrono::DateTime::from_timestamp(updated_at_ts, 0) + .unwrap_or_else(chrono::Utc::now), + components, + }); + } + + Ok(entities) +} + +/// Load entities by entity type from the database +/// +/// Returns all entities matching the specified entity_type. +pub fn load_entities_by_type(conn: &Connection, entity_type: &str) -> Result> { + let mut stmt = conn.prepare( + "SELECT id, entity_type, created_at, updated_at + FROM entities + WHERE entity_type = ?1 + ORDER BY created_at ASC", + )?; + + let entity_rows = stmt.query_map([entity_type], |row| { + let id_bytes: Vec = row.get(0)?; + let mut id_array = [0u8; 16]; + id_array.copy_from_slice(&id_bytes); + let id = uuid::Uuid::from_bytes(id_array); + + Ok(( + id, + row.get::<_, String>(1)?, + row.get::<_, i64>(2)?, + row.get::<_, i64>(3)?, + )) + })?; + + let mut entities = Vec::new(); + + for row in entity_rows { + let (id, entity_type, created_at_ts, updated_at_ts) = row?; + + // Load all components for this entity + let components = load_entity_components(conn, id)?; + + entities.push(LoadedEntity { + id, + entity_type, + created_at: chrono::DateTime::from_timestamp(created_at_ts, 0) + .unwrap_or_else(chrono::Utc::now), + updated_at: chrono::DateTime::from_timestamp(updated_at_ts, 0) + .unwrap_or_else(chrono::Utc::now), + components, + }); + } + + Ok(entities) +} + +/// Rehydrate a loaded entity into the Bevy world +/// +/// Takes a `LoadedEntity` from the database and spawns it as a new Bevy entity, +/// deserializing and inserting all components using the ComponentTypeRegistry. +/// +/// # Arguments +/// +/// * `loaded_entity` - The entity data loaded from SQLite +/// * `world` - The Bevy world to spawn the entity into +/// * `component_registry` - Type registry for component deserialization +/// +/// # Returns +/// +/// The spawned Bevy `Entity` on success +/// +/// # Errors +/// +/// Returns an error if: +/// - Component deserialization fails +/// - Component type is not registered +/// - Component insertion fails +pub fn rehydrate_entity( + loaded_entity: LoadedEntity, + world: &mut bevy::prelude::World, + component_registry: &crate::persistence::ComponentTypeRegistry, +) -> Result { + use bevy::prelude::*; + + use crate::networking::NetworkedEntity; + + // Spawn a new entity + let entity = world.spawn_empty().id(); + + info!( + "Rehydrating entity {:?} with type {} and {} components", + loaded_entity.id, + loaded_entity.entity_type, + loaded_entity.components.len() + ); + + // Deserialize and insert each component + for component in &loaded_entity.components { + // Get deserialization function for this component type + let deserialize_fn = component_registry + .get_deserialize_fn_by_path(&component.component_type) + .ok_or_else(|| { + PersistenceError::Deserialization(format!( + "No deserialize function registered for component type: {}", + component.component_type + )) + })?; + + // Get insert function for this component type + let insert_fn = component_registry + .get_insert_fn_by_path(&component.component_type) + .ok_or_else(|| { + PersistenceError::Deserialization(format!( + "No insert function registered for component type: {}", + component.component_type + )) + })?; + + // Deserialize the component from bytes + let deserialized = deserialize_fn(&component.data).map_err(|e| { + PersistenceError::Deserialization(format!( + "Failed to deserialize component {}: {}", + component.component_type, e + )) + })?; + + // Insert the component into the entity + // Get an EntityWorldMut to pass to the insert function + let mut entity_mut = world.entity_mut(entity); + insert_fn(&mut entity_mut, deserialized); + + debug!( + "Inserted component {} into entity {:?}", + component.component_type, entity + ); + } + + // Add the NetworkedEntity component with the persisted network_id + // This ensures the entity maintains its identity across restarts + world.entity_mut(entity).insert(NetworkedEntity { + network_id: loaded_entity.id, + owner_node_id: uuid::Uuid::nil(), // Will be set by network system if needed + }); + + // Add the Persisted marker component + world + .entity_mut(entity) + .insert(crate::persistence::Persisted { + network_id: loaded_entity.id, + }); + + info!( + "Successfully rehydrated entity {:?} as Bevy entity {:?}", + loaded_entity.id, entity + ); + + Ok(entity) +} + +/// Rehydrate all entities from the database into the Bevy world +/// +/// This function is called during startup to restore the entire persisted +/// state. It loads all entities from SQLite and spawns them into the Bevy world +/// with all their components. +/// +/// # Arguments +/// +/// * `world` - The Bevy world to spawn entities into +/// +/// # Errors +/// +/// Returns an error if: +/// - Database connection fails +/// - Entity loading fails +/// - Entity rehydration fails +pub fn rehydrate_all_entities(world: &mut bevy::prelude::World) -> Result<()> { + use bevy::prelude::*; + + // Get database connection from resource + let loaded_entities = { + let db_res = world.resource::(); + let conn = db_res + .conn + .lock() + .map_err(|e| PersistenceError::Other(format!("Failed to lock database: {}", e)))?; + + // Load all entities from database + load_all_entities(&conn)? + }; + + info!("Loaded {} entities from database", loaded_entities.len()); + + if loaded_entities.is_empty() { + info!("No entities to rehydrate"); + return Ok(()); + } + + // Get component registry + let component_registry = { + let registry_res = world.resource::(); + registry_res.0 + }; + + // Rehydrate each entity + let mut rehydrated_count = 0; + let mut failed_count = 0; + + for loaded_entity in loaded_entities { + match rehydrate_entity(loaded_entity, world, component_registry) { + | Ok(entity) => { + rehydrated_count += 1; + debug!("Rehydrated entity {:?}", entity); + }, + | Err(e) => { + failed_count += 1; + error!("Failed to rehydrate entity: {}", e); + }, + } + } + + info!( + "Entity rehydration complete: {} succeeded, {} failed", + rehydrated_count, failed_count + ); + + if failed_count > 0 { + warn!( + "{} entities failed to rehydrate - check logs for details", + failed_count + ); + } + + Ok(()) +} + #[cfg(test)] mod tests { use super::*; @@ -656,7 +1018,7 @@ mod tests { PersistenceOp::UpsertComponent { entity_id, component_type: "Transform".to_string(), - data: vec![1, 2, 3, 4], + data: bytes::Bytes::from(vec![1, 2, 3, 4]), }, ]; diff --git a/crates/libmarathon/src/persistence/plugin.rs b/crates/libmarathon/src/persistence/plugin.rs index 921efec..2339b00 100644 --- a/crates/libmarathon/src/persistence/plugin.rs +++ b/crates/libmarathon/src/persistence/plugin.rs @@ -91,8 +91,12 @@ impl Plugin for PersistencePlugin { .insert_resource(PendingFlushTasks::default()) .init_resource::(); - // Add startup system - app.add_systems(Startup, persistence_startup_system); + // Add startup systems + // First initialize the database, then rehydrate entities + app.add_systems(Startup, ( + persistence_startup_system, + rehydrate_entities_system, + ).chain()); // Add systems in the appropriate schedule app.add_systems( @@ -159,6 +163,19 @@ fn persistence_startup_system(db: Res, mut metrics: ResMut Result>, /// Serialization function that reads from an entity (returns None if entity doesn't have this component) - pub serialize_fn: fn(&bevy::ecs::world::World, bevy::ecs::entity::Entity) -> Option>, + pub serialize_fn: fn(&bevy::ecs::world::World, bevy::ecs::entity::Entity) -> Option, /// Insert function that takes a boxed component and inserts it into an entity pub insert_fn: fn(&mut bevy::ecs::world::EntityWorldMut, Box), @@ -47,7 +47,7 @@ pub struct ComponentTypeRegistry { discriminant_to_deserializer: HashMap Result>>, /// Discriminant to serialization function - discriminant_to_serializer: HashMap Option>>, + discriminant_to_serializer: HashMap Option>, /// Discriminant to insert function discriminant_to_inserter: HashMap)>, @@ -196,7 +196,7 @@ impl ComponentTypeRegistry { &self, world: &bevy::ecs::world::World, entity: bevy::ecs::entity::Entity, - ) -> Vec<(u16, &'static str, Vec)> { + ) -> Vec<(u16, &'static str, bytes::Bytes)> { let mut results = Vec::new(); for (&discriminant, &serialize_fn) in &self.discriminant_to_serializer { diff --git a/crates/libmarathon/src/persistence/types.rs b/crates/libmarathon/src/persistence/types.rs index 70e9215..3268552 100644 --- a/crates/libmarathon/src/persistence/types.rs +++ b/crates/libmarathon/src/persistence/types.rs @@ -105,14 +105,14 @@ pub enum PersistenceOp { UpsertComponent { entity_id: EntityId, component_type: String, - data: Vec, + data: bytes::Bytes, }, /// Log an operation for CRDT sync LogOperation { node_id: NodeId, sequence: u64, - operation: Vec, + operation: bytes::Bytes, }, /// Update vector clock for causality tracking @@ -473,7 +473,7 @@ mod tests { buffer.add(PersistenceOp::UpsertComponent { entity_id, component_type: "Transform".to_string(), - data: vec![1, 2, 3], + data: bytes::Bytes::from(vec![1, 2, 3]), })?; assert_eq!(buffer.len(), 1); @@ -481,7 +481,7 @@ mod tests { buffer.add(PersistenceOp::UpsertComponent { entity_id, component_type: "Transform".to_string(), - data: vec![4, 5, 6], + data: bytes::Bytes::from(vec![4, 5, 6]), })?; assert_eq!(buffer.len(), 1); @@ -489,7 +489,7 @@ mod tests { let ops = buffer.take_operations(); assert_eq!(ops.len(), 1); if let PersistenceOp::UpsertComponent { data, .. } = &ops[0] { - assert_eq!(data, &vec![4, 5, 6]); + assert_eq!(data.as_ref(), &[4, 5, 6]); } else { panic!("Expected UpsertComponent"); } @@ -506,7 +506,7 @@ mod tests { .add(PersistenceOp::UpsertComponent { entity_id, component_type: "Transform".to_string(), - data: vec![1, 2, 3], + data: bytes::Bytes::from(vec![1, 2, 3]), }) .expect("Should successfully add Transform"); @@ -515,7 +515,7 @@ mod tests { .add(PersistenceOp::UpsertComponent { entity_id, component_type: "Velocity".to_string(), - data: vec![4, 5, 6], + data: bytes::Bytes::from(vec![4, 5, 6]), }) .expect("Should successfully add Velocity"); @@ -652,7 +652,7 @@ mod tests { let log_op = PersistenceOp::LogOperation { node_id, sequence: 1, - operation: vec![1, 2, 3], + operation: bytes::Bytes::from(vec![1, 2, 3]), }; let vector_clock_op = PersistenceOp::UpdateVectorClock { @@ -689,7 +689,7 @@ mod tests { buffer.add(PersistenceOp::UpsertComponent { entity_id, component_type: "Transform".to_string(), - data: vec![i], + data: bytes::Bytes::from(vec![i]), })?; } @@ -700,7 +700,7 @@ mod tests { let ops = buffer.take_operations(); assert_eq!(ops.len(), 1); if let PersistenceOp::UpsertComponent { data, .. } = &ops[0] { - assert_eq!(data, &vec![9]); + assert_eq!(data.as_ref(), &[9]); } else { panic!("Expected UpsertComponent"); } @@ -709,7 +709,7 @@ mod tests { buffer.add(PersistenceOp::UpsertComponent { entity_id, component_type: "Transform".to_string(), - data: vec![100], + data: bytes::Bytes::from(vec![100]), })?; assert_eq!(buffer.len(), 1); @@ -726,13 +726,13 @@ mod tests { buffer.add(PersistenceOp::UpsertComponent { entity_id: entity1, component_type: "Transform".to_string(), - data: vec![1], + data: bytes::Bytes::from(vec![1]), })?; buffer.add(PersistenceOp::UpsertComponent { entity_id: entity2, component_type: "Transform".to_string(), - data: vec![2], + data: bytes::Bytes::from(vec![2]), })?; // Should have 2 operations (different entities) @@ -742,7 +742,7 @@ mod tests { buffer.add(PersistenceOp::UpsertComponent { entity_id: entity1, component_type: "Transform".to_string(), - data: vec![3], + data: bytes::Bytes::from(vec![3]), })?; // Still 2 operations (first was replaced in-place) @@ -761,7 +761,7 @@ mod tests { .add_with_default_priority(PersistenceOp::LogOperation { node_id, sequence: 1, - operation: vec![1, 2, 3], + operation: bytes::Bytes::from(vec![1, 2, 3]), }) .unwrap(); @@ -776,7 +776,7 @@ mod tests { let entity_id = EntityId::new_v4(); // Create 11MB component (exceeds 10MB limit) - let oversized_data = vec![0u8; 11 * 1024 * 1024]; + let oversized_data = bytes::Bytes::from(vec![0u8; 11 * 1024 * 1024]); let result = buffer.add(PersistenceOp::UpsertComponent { entity_id, @@ -809,7 +809,7 @@ mod tests { let entity_id = EntityId::new_v4(); // Create exactly 10MB component (at limit) - let max_data = vec![0u8; 10 * 1024 * 1024]; + let max_data = bytes::Bytes::from(vec![0u8; 10 * 1024 * 1024]); let result = buffer.add(PersistenceOp::UpsertComponent { entity_id, @@ -824,7 +824,7 @@ mod tests { #[test] fn test_oversized_operation_returns_error() { let mut buffer = WriteBuffer::new(100); - let oversized_op = vec![0u8; 11 * 1024 * 1024]; + let oversized_op = bytes::Bytes::from(vec![0u8; 11 * 1024 * 1024]); let result = buffer.add(PersistenceOp::LogOperation { node_id: uuid::Uuid::new_v4(), @@ -860,7 +860,7 @@ mod tests { for size in sizes { let mut buffer = WriteBuffer::new(100); - let data = vec![0u8; size]; + let data = bytes::Bytes::from(vec![0u8; size]); let result = buffer.add(PersistenceOp::UpsertComponent { entity_id: uuid::Uuid::new_v4(),