From 888e5d303cf96e4ffd162487829464a34edf22fe Mon Sep 17 00:00:00 2001 From: Sienna Meridian Satterwhite Date: Sun, 16 Nov 2025 11:50:49 +0000 Subject: [PATCH] format Signed-off-by: Sienna Meridian Satterwhite --- crates/lib/src/db.rs | 25 ++++- crates/lib/src/lib.rs | 17 ++- crates/lib/src/models.rs | 23 +++- crates/lib/src/persistence/config.rs | 9 +- crates/lib/src/persistence/database.rs | 97 ++++++++++------- crates/lib/src/persistence/error.rs | 28 ++--- crates/lib/src/persistence/health.rs | 24 ++-- crates/lib/src/persistence/lifecycle.rs | 75 +++++++------ crates/lib/src/persistence/metrics.rs | 20 ++-- crates/lib/src/persistence/mod.rs | 32 +++--- crates/lib/src/persistence/plugin.rs | 45 +++++--- crates/lib/src/persistence/reflection.rs | 45 +++++--- crates/lib/src/persistence/systems.rs | 92 +++++++++------- crates/lib/src/persistence/types.rs | 103 +++++++++++------- crates/lib/src/sync.rs | 39 +++++-- crates/lib/tests/our_messages_test.rs | 49 +++++++-- crates/lib/tests/sync_integration.rs | 74 ++++++++----- crates/server/src/components/database.rs | 3 +- crates/server/src/components/gossip.rs | 40 ++++--- crates/server/src/config.rs | 27 +++-- crates/server/src/db/operations.rs | 30 ++++- crates/server/src/db/schema.rs | 23 ++-- crates/server/src/iroh_sync.rs | 17 ++- crates/server/src/main.rs | 35 +++--- crates/server/src/models.rs | 10 +- crates/server/src/services/chat_poller.rs | 58 +++++++--- .../server/src/services/embedding_service.rs | 27 +++-- crates/server/src/services/emotion_service.rs | 35 ++++-- crates/server/src/services/mod.rs | 2 +- crates/server/src/systems/database.rs | 5 +- crates/server/src/systems/gossip.rs | 43 ++++---- crates/server/src/systems/setup.rs | 6 +- crates/sync-macros/src/lib.rs | 68 ++++++------ 33 files changed, 766 insertions(+), 460 deletions(-) diff --git a/crates/lib/src/db.rs b/crates/lib/src/db.rs index fd19b99..2807822 100644 --- a/crates/lib/src/db.rs +++ b/crates/lib/src/db.rs @@ -1,6 +1,14 @@ -use crate::error::Result; -use crate::models::*; -use rusqlite::{Connection, OpenFlags, Row, params}; +use rusqlite::{ + Connection, + OpenFlags, + Row, + params, +}; + +use crate::{ + error::Result, + models::*, +}; pub struct ChatDb { conn: Connection, @@ -27,7 +35,10 @@ impl ChatDb { start_date: Option>, end_date: Option>, ) -> Result> { - use chrono::{TimeZone, Utc}; + use chrono::{ + TimeZone, + Utc, + }; // Default date range: January 1, 2024 to now let start = @@ -84,7 +95,7 @@ impl ChatDb { WHERE h.id = ? GROUP BY c.ROWID ORDER BY msg_count DESC - LIMIT 1" + LIMIT 1", )?; let chat = stmt.query_row(params![phone_number], |row| { @@ -98,7 +109,9 @@ impl ChatDb { room_name: row.get(6)?, is_archived: row.get::<_, i64>(7)? != 0, is_filtered: row.get::<_, i64>(8)? != 0, - last_read_message_timestamp: row.get::<_, Option>(9)?.map(apple_timestamp_to_datetime), + last_read_message_timestamp: row + .get::<_, Option>(9)? + .map(apple_timestamp_to_datetime), }) })?; diff --git a/crates/lib/src/lib.rs b/crates/lib/src/lib.rs index 15ca16a..8f6b368 100644 --- a/crates/lib/src/lib.rs +++ b/crates/lib/src/lib.rs @@ -1,6 +1,7 @@ //! Data access layer for iMessage chat.db //! -//! This library provides a read-only interface to query messages from a specific conversation. +//! This library provides a read-only interface to query messages from a +//! specific conversation. //! //! # Safety //! @@ -20,12 +21,18 @@ //! # Ok::<(), lib::ChatDbError>(()) //! ``` +mod db; mod error; mod models; -mod db; -pub mod sync; pub mod persistence; +pub mod sync; -pub use error::{ChatDbError, Result}; -pub use models::{Message, Chat}; pub use db::ChatDb; +pub use error::{ + ChatDbError, + Result, +}; +pub use models::{ + Chat, + Message, +}; diff --git a/crates/lib/src/models.rs b/crates/lib/src/models.rs index 69af7fd..1ced581 100644 --- a/crates/lib/src/models.rs +++ b/crates/lib/src/models.rs @@ -1,5 +1,11 @@ -use chrono::{DateTime, Utc}; -use serde::{Deserialize, Serialize}; +use chrono::{ + DateTime, + Utc, +}; +use serde::{ + Deserialize, + Serialize, +}; /// Represents a message in the iMessage database #[derive(Debug, Clone, Serialize, Deserialize)] @@ -41,7 +47,8 @@ pub struct Chat { pub last_read_message_timestamp: Option>, } -/// Helper function to convert Apple's Cocoa timestamp (seconds since 2001-01-01) to DateTime +/// Helper function to convert Apple's Cocoa timestamp (seconds since +/// 2001-01-01) to DateTime pub fn apple_timestamp_to_datetime(timestamp: i64) -> DateTime { // Apple's Cocoa timestamps are in nanoseconds since 2001-01-01 00:00:00 UTC // Convert to Unix timestamp (seconds since 1970-01-01 00:00:00 UTC) @@ -50,7 +57,8 @@ pub fn apple_timestamp_to_datetime(timestamp: i64) -> DateTime { let seconds = timestamp / 1_000_000_000 + APPLE_EPOCH_OFFSET; let nanos = (timestamp % 1_000_000_000) as u32; - DateTime::from_timestamp(seconds, nanos).unwrap_or_else(|| DateTime::from_timestamp(0, 0).unwrap()) + DateTime::from_timestamp(seconds, nanos) + .unwrap_or_else(|| DateTime::from_timestamp(0, 0).unwrap()) } /// Helper function to convert DateTime to Apple's Cocoa timestamp @@ -65,8 +73,13 @@ pub fn datetime_to_apple_timestamp(dt: DateTime) -> i64 { #[cfg(test)] mod tests { + use chrono::{ + Datelike, + TimeZone, + Timelike, + }; + use super::*; - use chrono::{Datelike, TimeZone, Timelike}; #[test] fn test_apple_timestamp_to_datetime_zero() { diff --git a/crates/lib/src/persistence/config.rs b/crates/lib/src/persistence/config.rs index 254da59..ac44b87 100644 --- a/crates/lib/src/persistence/config.rs +++ b/crates/lib/src/persistence/config.rs @@ -1,9 +1,14 @@ //! Configuration for the persistence layer -use crate::persistence::error::Result; -use serde::{Deserialize, Serialize}; use std::time::Duration; +use serde::{ + Deserialize, + Serialize, +}; + +use crate::persistence::error::Result; + /// Default critical flush delay in milliseconds const DEFAULT_CRITICAL_FLUSH_DELAY_MS: u64 = 1000; diff --git a/crates/lib/src/persistence/database.rs b/crates/lib/src/persistence/database.rs index 8ffadf2..707287d 100644 --- a/crates/lib/src/persistence/database.rs +++ b/crates/lib/src/persistence/database.rs @@ -1,11 +1,21 @@ //! Database schema and operations for persistence layer -use crate::persistence::types::*; -use crate::persistence::error::{PersistenceError, Result}; -use chrono::Utc; -use rusqlite::{Connection, OptionalExtension}; use std::path::Path; +use chrono::Utc; +use rusqlite::{ + Connection, + OptionalExtension, +}; + +use crate::persistence::{ + error::{ + PersistenceError, + Result, + }, + types::*, +}; + /// Default SQLite page size in bytes (4KB) const DEFAULT_PAGE_SIZE: i64 = 4096; @@ -164,7 +174,7 @@ pub fn flush_to_sqlite(ops: &[PersistenceOp], conn: &mut Connection) -> Result { + | PersistenceOp::UpsertEntity { id, data } => { tx.execute( "INSERT OR REPLACE INTO entities (id, entity_type, created_at, updated_at) VALUES (?1, ?2, ?3, ?4)", @@ -176,9 +186,9 @@ pub fn flush_to_sqlite(ops: &[PersistenceOp], conn: &mut Connection) -> Result Result Result { + | PersistenceOp::UpdateVectorClock { node_id, counter } => { tx.execute( "INSERT OR REPLACE INTO vector_clock (node_id, counter, updated_at) VALUES (?1, ?2, ?3)", rusqlite::params![node_id, counter, current_timestamp()], )?; count += 1; - } + }, - PersistenceOp::DeleteEntity { id } => { - tx.execute("DELETE FROM entities WHERE id = ?1", rusqlite::params![id.as_bytes()])?; + | PersistenceOp::DeleteEntity { id } => { + tx.execute( + "DELETE FROM entities WHERE id = ?1", + rusqlite::params![id.as_bytes()], + )?; count += 1; - } + }, - PersistenceOp::DeleteComponent { + | PersistenceOp::DeleteComponent { entity_id, component_type, } => { @@ -237,7 +250,7 @@ pub fn flush_to_sqlite(ops: &[PersistenceOp], conn: &mut Connection) -> Result Result Result Result { let mode_str = match mode { - CheckpointMode::Passive => "PASSIVE", - CheckpointMode::Full => "FULL", - CheckpointMode::Restart => "RESTART", - CheckpointMode::Truncate => "TRUNCATE", + | CheckpointMode::Passive => "PASSIVE", + | CheckpointMode::Full => "FULL", + | CheckpointMode::Restart => "RESTART", + | CheckpointMode::Truncate => "TRUNCATE", }; let query = format!("PRAGMA wal_checkpoint({})", mode_str); // Returns (busy, log_pages, checkpointed_pages) let (busy, log_pages, checkpointed_pages): (i32, i32, i32) = - conn.query_row(&query, [], |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)))?; + conn.query_row(&query, [], |row| { + Ok((row.get(0)?, row.get(1)?, row.get(2)?)) + })?; // Update checkpoint state conn.execute( @@ -303,15 +319,16 @@ pub fn checkpoint_wal(conn: &mut Connection, mode: CheckpointMode) -> Result Result { // Check if WAL file exists and get its size match std::fs::metadata(&wal_path) { - Ok(metadata) => Ok(metadata.len() as i64), - Err(_) => Ok(0), // WAL doesn't exist yet + | Ok(metadata) => Ok(metadata.len() as i64), + | Err(_) => Ok(0), // WAL doesn't exist yet } } @@ -360,8 +377,9 @@ pub struct CheckpointInfo { /// Set a session state value in the database /// -/// Session state is used to track application lifecycle events and detect crashes. -/// Values persist across restarts, enabling crash detection and recovery. +/// Session state is used to track application lifecycle events and detect +/// crashes. Values persist across restarts, enabling crash detection and +/// recovery. /// /// # Parameters /// - `conn`: Mutable reference to the SQLite connection @@ -404,12 +422,13 @@ pub fn get_session_state(conn: &Connection, key: &str) -> Result> /// Check if the previous session had a clean shutdown /// -/// This is critical for crash detection. When the application starts, this checks -/// if the previous session ended cleanly. If not, it indicates a crash occurred, -/// and recovery procedures may be needed. +/// This is critical for crash detection. When the application starts, this +/// checks if the previous session ended cleanly. If not, it indicates a crash +/// occurred, and recovery procedures may be needed. /// -/// **Side effect**: Resets the clean_shutdown flag to "false" for the current session. -/// Call [`mark_clean_shutdown`] during normal shutdown to set it back to "true". +/// **Side effect**: Resets the clean_shutdown flag to "false" for the current +/// session. Call [`mark_clean_shutdown`] during normal shutdown to set it back +/// to "true". /// /// # Parameters /// - `conn`: Mutable reference to the SQLite connection (mutates session state) @@ -537,7 +556,11 @@ mod tests { // After checking clean shutdown, flag should be reset to false // So if we check again without marking, it should report as crash let value = get_session_state(&conn, "clean_shutdown")?; - assert_eq!(value, Some("false".to_string()), "Flag should be reset after check"); + assert_eq!( + value, + Some("false".to_string()), + "Flag should be reset after check" + ); Ok(()) } diff --git a/crates/lib/src/persistence/error.rs b/crates/lib/src/persistence/error.rs index b6c8f88..0ec80dd 100644 --- a/crates/lib/src/persistence/error.rs +++ b/crates/lib/src/persistence/error.rs @@ -42,16 +42,16 @@ pub enum PersistenceError { impl fmt::Display for PersistenceError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { - Self::Database(err) => write!(f, "Database error: {}", err), - Self::Serialization(err) => write!(f, "Serialization error: {}", err), - Self::Deserialization(msg) => write!(f, "Deserialization error: {}", msg), - Self::Config(msg) => write!(f, "Configuration error: {}", msg), - Self::Io(err) => write!(f, "I/O error: {}", err), - Self::TypeNotRegistered(type_name) => { + | Self::Database(err) => write!(f, "Database error: {}", err), + | Self::Serialization(err) => write!(f, "Serialization error: {}", err), + | Self::Deserialization(msg) => write!(f, "Deserialization error: {}", msg), + | Self::Config(msg) => write!(f, "Configuration error: {}", msg), + | Self::Io(err) => write!(f, "I/O error: {}", err), + | Self::TypeNotRegistered(type_name) => { write!(f, "Type not registered in type registry: {}", type_name) - } - Self::NotFound(msg) => write!(f, "Not found: {}", msg), - Self::CircuitBreakerOpen { + }, + | Self::NotFound(msg) => write!(f, "Not found: {}", msg), + | Self::CircuitBreakerOpen { consecutive_failures, retry_after_secs, } => write!( @@ -59,7 +59,7 @@ impl fmt::Display for PersistenceError { "Circuit breaker open after {} consecutive failures, retry after {} seconds", consecutive_failures, retry_after_secs ), - Self::Other(msg) => write!(f, "{}", msg), + | Self::Other(msg) => write!(f, "{}", msg), } } } @@ -67,10 +67,10 @@ impl fmt::Display for PersistenceError { impl std::error::Error for PersistenceError { fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { match self { - Self::Database(err) => Some(err), - Self::Serialization(err) => Some(err), - Self::Io(err) => Some(err), - _ => None, + | Self::Database(err) => Some(err), + | Self::Serialization(err) => Some(err), + | Self::Io(err) => Some(err), + | _ => None, } } } diff --git a/crates/lib/src/persistence/health.rs b/crates/lib/src/persistence/health.rs index c53b589..4592edd 100644 --- a/crates/lib/src/persistence/health.rs +++ b/crates/lib/src/persistence/health.rs @@ -1,7 +1,11 @@ //! Health monitoring and error recovery for persistence layer +use std::time::{ + Duration, + Instant, +}; + use bevy::prelude::*; -use std::time::{Duration, Instant}; /// Base delay for exponential backoff in milliseconds const BASE_RETRY_DELAY_MS: u64 = 1000; // 1 second @@ -52,11 +56,10 @@ impl Default for PersistenceHealth { } impl PersistenceHealth { - /// Circuit breaker threshold - open after this many consecutive failures - pub const CIRCUIT_BREAKER_THRESHOLD: u32 = 5; - /// How long to keep circuit breaker open before attempting recovery pub const CIRCUIT_BREAKER_COOLDOWN: Duration = Duration::from_secs(60); + /// Circuit breaker threshold - open after this many consecutive failures + pub const CIRCUIT_BREAKER_THRESHOLD: u32 = 5; /// Record a successful flush pub fn record_flush_success(&mut self) { @@ -102,9 +105,9 @@ impl PersistenceHealth { /// Check if we should attempt operations (circuit breaker state) /// - /// **CRITICAL FIX**: Now takes `&mut self` to properly reset the circuit breaker - /// after cooldown expires. This prevents the circuit breaker from remaining - /// permanently open after one post-cooldown failure. + /// **CRITICAL FIX**: Now takes `&mut self` to properly reset the circuit + /// breaker after cooldown expires. This prevents the circuit breaker + /// from remaining permanently open after one post-cooldown failure. pub fn should_attempt_operation(&mut self) -> bool { if !self.circuit_breaker_open { return true; @@ -114,7 +117,9 @@ impl PersistenceHealth { if let Some(opened_at) = self.circuit_breaker_opened_at { if opened_at.elapsed() >= Self::CIRCUIT_BREAKER_COOLDOWN { // Transition to half-open state by resetting the breaker - info!("Circuit breaker cooldown elapsed - entering half-open state (testing recovery)"); + info!( + "Circuit breaker cooldown elapsed - entering half-open state (testing recovery)" + ); self.circuit_breaker_open = false; self.circuit_breaker_opened_at = None; // consecutive_flush_failures is kept to track if this probe succeeds @@ -128,7 +133,8 @@ impl PersistenceHealth { /// Get exponential backoff delay based on consecutive failures pub fn get_retry_delay(&self) -> Duration { // Exponential backoff: 1s, 2s, 4s, 8s, 16s, max 30s - let delay_ms = BASE_RETRY_DELAY_MS * 2u64.pow(self.consecutive_flush_failures.min(MAX_BACKOFF_EXPONENT)); + let delay_ms = BASE_RETRY_DELAY_MS * + 2u64.pow(self.consecutive_flush_failures.min(MAX_BACKOFF_EXPONENT)); Duration::from_millis(delay_ms.min(MAX_RETRY_DELAY_MS)) } } diff --git a/crates/lib/src/persistence/lifecycle.rs b/crates/lib/src/persistence/lifecycle.rs index 3954bc1..3a463c5 100644 --- a/crates/lib/src/persistence/lifecycle.rs +++ b/crates/lib/src/persistence/lifecycle.rs @@ -18,9 +18,10 @@ //! } //! ``` -use crate::persistence::*; use bevy::prelude::*; +use crate::persistence::*; + /// Application lifecycle events that require persistence handling /// /// These events are critical moments where data must be flushed immediately @@ -39,9 +40,11 @@ pub enum AppLifecycleEvent { /// 5 seconds to complete critical tasks before suspension. DidEnterBackground, - /// Application will enter foreground (iOS: `applicationWillEnterForeground`) + /// Application will enter foreground (iOS: + /// `applicationWillEnterForeground`) /// - /// Sent when the app is about to enter the foreground (user returning to app). + /// Sent when the app is about to enter the foreground (user returning to + /// app). WillEnterForeground, /// Application did become active (iOS: `applicationDidBecomeActive`) @@ -51,7 +54,8 @@ pub enum AppLifecycleEvent { /// Application will terminate (iOS: `applicationWillTerminate`) /// - /// Sent when the app is about to terminate. Similar to shutdown but from OS. + /// Sent when the app is about to terminate. Similar to shutdown but from + /// OS. WillTerminate, } @@ -69,7 +73,7 @@ pub fn lifecycle_event_system( ) { for event in events.read() { match event { - AppLifecycleEvent::WillResignActive => { + | AppLifecycleEvent::WillResignActive => { // App is becoming inactive - perform immediate flush info!("App will resign active - performing immediate flush"); @@ -79,9 +83,9 @@ pub fn lifecycle_event_system( } else { health.record_flush_success(); } - } + }, - AppLifecycleEvent::DidEnterBackground => { + | AppLifecycleEvent::DidEnterBackground => { // App entered background - perform immediate flush and checkpoint info!("App entered background - performing immediate flush and checkpoint"); @@ -96,47 +100,50 @@ pub fn lifecycle_event_system( // Also checkpoint the WAL to ensure durability let start = std::time::Instant::now(); match db.lock() { - Ok(mut conn) => { - match checkpoint_wal(&mut conn, CheckpointMode::Passive) { - Ok(_) => { - let duration = start.elapsed(); - metrics.record_checkpoint(duration); - health.record_checkpoint_success(); - info!("Background checkpoint completed successfully"); - } - Err(e) => { - error!("Failed to checkpoint on background: {}", e); - health.record_checkpoint_failure(); - } - } - } - Err(e) => { + | Ok(mut conn) => match checkpoint_wal(&mut conn, CheckpointMode::Passive) { + | Ok(_) => { + let duration = start.elapsed(); + metrics.record_checkpoint(duration); + health.record_checkpoint_success(); + info!("Background checkpoint completed successfully"); + }, + | Err(e) => { + error!("Failed to checkpoint on background: {}", e); + health.record_checkpoint_failure(); + }, + }, + | Err(e) => { error!("Failed to acquire database lock for checkpoint: {}", e); health.record_checkpoint_failure(); - } + }, } - } + }, - AppLifecycleEvent::WillTerminate => { + | AppLifecycleEvent::WillTerminate => { // App will terminate - perform shutdown sequence warn!("App will terminate - performing shutdown sequence"); - if let Err(e) = shutdown_system(&mut write_buffer, &db, &mut metrics, Some(&mut pending_tasks)) { + if let Err(e) = shutdown_system( + &mut write_buffer, + &db, + &mut metrics, + Some(&mut pending_tasks), + ) { error!("Failed to perform shutdown on terminate: {}", e); } else { info!("Clean shutdown completed on terminate"); } - } + }, - AppLifecycleEvent::WillEnterForeground => { + | AppLifecycleEvent::WillEnterForeground => { // App returning from background - no immediate action needed info!("App will enter foreground"); - } + }, - AppLifecycleEvent::DidBecomeActive => { + | AppLifecycleEvent::DidBecomeActive => { // App became active - no immediate action needed info!("App did become active"); - } + }, } } } @@ -149,10 +156,10 @@ mod tests { fn test_lifecycle_event_creation() { let event = AppLifecycleEvent::WillResignActive; match event { - AppLifecycleEvent::WillResignActive => { + | AppLifecycleEvent::WillResignActive => { // Success - } - _ => panic!("Event type mismatch"), + }, + | _ => panic!("Event type mismatch"), } } } diff --git a/crates/lib/src/persistence/metrics.rs b/crates/lib/src/persistence/metrics.rs index 624402c..943fa7f 100644 --- a/crates/lib/src/persistence/metrics.rs +++ b/crates/lib/src/persistence/metrics.rs @@ -142,19 +142,19 @@ pub enum HealthWarning { impl std::fmt::Display for HealthWarning { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - HealthWarning::SlowFlush(duration) => { + | HealthWarning::SlowFlush(duration) => { + write!(f, "Flush duration ({:?}) exceeds 50ms threshold", duration) + }, + | HealthWarning::LargeWal(size) => { + write!(f, "WAL size ({} bytes) exceeds 5MB threshold", size) + }, + | HealthWarning::HighCrashRate(rate) => { write!( f, - "Flush duration ({:?}) exceeds 50ms threshold", - duration + "Crash recovery rate ({:.1}%) exceeds 10% threshold", + rate * 100.0 ) - } - HealthWarning::LargeWal(size) => { - write!(f, "WAL size ({} bytes) exceeds 5MB threshold", size) - } - HealthWarning::HighCrashRate(rate) => { - write!(f, "Crash recovery rate ({:.1}%) exceeds 10% threshold", rate * 100.0) - } + }, } } } diff --git a/crates/lib/src/persistence/mod.rs b/crates/lib/src/persistence/mod.rs index 9964f9f..0e15a59 100644 --- a/crates/lib/src/persistence/mod.rs +++ b/crates/lib/src/persistence/mod.rs @@ -1,17 +1,19 @@ //! Persistence layer for battery-efficient state management //! //! This module implements the persistence strategy defined in RFC 0002. -//! It provides a three-tier system to minimize disk I/O while maintaining data durability: +//! It provides a three-tier system to minimize disk I/O while maintaining data +//! durability: //! //! 1. **In-Memory Dirty Tracking** - Track changes without writing immediately //! 2. **Write Buffer** - Batch and coalesce operations before writing -//! 3. **SQLite with WAL Mode** - Controlled checkpoints to minimize fsync() calls +//! 3. **SQLite with WAL Mode** - Controlled checkpoints to minimize fsync() +//! calls //! //! # Example //! //! ```no_run -//! use lib::persistence::*; //! use bevy::prelude::*; +//! use lib::persistence::*; //! //! fn setup(mut commands: Commands) { //! // Spawn an entity with the Persisted marker @@ -28,24 +30,24 @@ //! } //! ``` -mod types; -mod database; -mod systems; mod config; +mod database; +mod error; +mod health; +mod lifecycle; mod metrics; mod plugin; mod reflection; -mod health; -mod error; -mod lifecycle; +mod systems; +mod types; -pub use types::*; -pub use database::*; -pub use systems::*; pub use config::*; +pub use database::*; +pub use error::*; +pub use health::*; +pub use lifecycle::*; pub use metrics::*; pub use plugin::*; pub use reflection::*; -pub use health::*; -pub use error::*; -pub use lifecycle::*; +pub use systems::*; +pub use types::*; diff --git a/crates/lib/src/persistence/plugin.rs b/crates/lib/src/persistence/plugin.rs index 0671459..2ff701d 100644 --- a/crates/lib/src/persistence/plugin.rs +++ b/crates/lib/src/persistence/plugin.rs @@ -3,10 +3,17 @@ //! This module provides a Bevy plugin that sets up all the necessary resources //! and systems for the persistence layer. -use crate::persistence::*; +use std::{ + ops::{ + Deref, + DerefMut, + }, + path::PathBuf, +}; + use bevy::prelude::*; -use std::path::PathBuf; -use std::ops::{Deref, DerefMut}; + +use crate::persistence::*; /// Bevy plugin for persistence /// @@ -143,10 +150,7 @@ impl std::ops::DerefMut for WriteBufferResource { } /// Startup system to initialize persistence -fn persistence_startup_system( - db: Res, - mut metrics: ResMut, -) { +fn persistence_startup_system(db: Res, mut metrics: ResMut) { if let Err(e) = startup_system(db.deref(), metrics.deref_mut()) { error!("Failed to initialize persistence: {}", e); } else { @@ -192,10 +196,12 @@ fn collect_dirty_entities_bevy_system( /// System to automatically track changes to common Bevy components /// -/// This system detects changes to Transform, automatically triggering persistence -/// by accessing `Persisted` mutably (which marks it as changed via Bevy's change detection). +/// This system detects changes to Transform, automatically triggering +/// persistence by accessing `Persisted` mutably (which marks it as changed via +/// Bevy's change detection). /// -/// Add this system to your app if you want automatic persistence of Transform changes: +/// Add this system to your app if you want automatic persistence of Transform +/// changes: /// /// ```no_run /// # use bevy::prelude::*; @@ -214,7 +220,6 @@ pub fn auto_track_transform_changes_system( } } - /// System to checkpoint the WAL fn checkpoint_bevy_system( db: Res, @@ -223,18 +228,22 @@ fn checkpoint_bevy_system( mut metrics: ResMut, mut health: ResMut, ) { - match checkpoint_system(db.deref(), config.deref(), timer.deref_mut(), metrics.deref_mut()) { - Ok(_) => { + match checkpoint_system( + db.deref(), + config.deref(), + timer.deref_mut(), + metrics.deref_mut(), + ) { + | Ok(_) => { health.record_checkpoint_success(); - } - Err(e) => { + }, + | Err(e) => { health.record_checkpoint_failure(); error!( "Failed to checkpoint WAL (attempt {}): {}", - health.consecutive_checkpoint_failures, - e + health.consecutive_checkpoint_failures, e ); - } + }, } } diff --git a/crates/lib/src/persistence/reflection.rs b/crates/lib/src/persistence/reflection.rs index 086e7ee..ecda67d 100644 --- a/crates/lib/src/persistence/reflection.rs +++ b/crates/lib/src/persistence/reflection.rs @@ -4,21 +4,33 @@ //! using reflection, allowing the persistence layer to work with any component //! that implements Reflect. -use bevy::prelude::*; -use bevy::reflect::serde::{ReflectSerializer, ReflectDeserializer}; -use bevy::reflect::TypeRegistry; -use crate::persistence::error::{PersistenceError, Result}; +use bevy::{ + prelude::*, + reflect::{ + TypeRegistry, + serde::{ + ReflectDeserializer, + ReflectSerializer, + }, + }, +}; + +use crate::persistence::error::{ + PersistenceError, + Result, +}; /// Marker component to indicate that an entity should be persisted /// -/// Add this component to any entity that should have its state persisted to disk. -/// The persistence system will automatically serialize all components on entities -/// with this marker when they change. +/// Add this component to any entity that should have its state persisted to +/// disk. The persistence system will automatically serialize all components on +/// entities with this marker when they change. /// /// # Triggering Persistence /// -/// To trigger persistence after modifying components on an entity, access `Persisted` -/// mutably through a query. Bevy's change detection will automatically mark it as changed: +/// To trigger persistence after modifying components on an entity, access +/// `Persisted` mutably through a query. Bevy's change detection will +/// automatically mark it as changed: /// /// ```no_run /// # use bevy::prelude::*; @@ -31,8 +43,8 @@ use crate::persistence::error::{PersistenceError, Result}; /// } /// ``` /// -/// Alternatively, use `auto_track_transform_changes_system` for automatic persistence -/// of Transform changes without manual queries. +/// Alternatively, use `auto_track_transform_changes_system` for automatic +/// persistence of Transform changes without manual queries. #[derive(Component, Reflect, Default)] #[reflect(Component)] pub struct Persisted { @@ -103,7 +115,8 @@ pub fn serialize_component( /// /// # Returns /// - `Ok(Box)`: Deserialized component (needs downcasting) -/// - `Err`: If deserialization fails (e.g., type not registered, data corruption) +/// - `Err`: If deserialization fails (e.g., type not registered, data +/// corruption) /// /// # Examples /// ```no_run @@ -137,7 +150,8 @@ pub fn deserialize_component( /// /// # Parameters /// - `entity`: Bevy entity to read the component from -/// - `component_type`: Type path string (e.g., "bevy_transform::components::Transform") +/// - `component_type`: Type path string (e.g., +/// "bevy_transform::components::Transform") /// - `world`: Bevy world containing the entity /// - `type_registry`: Bevy's type registry for reflection metadata /// @@ -155,7 +169,7 @@ pub fn deserialize_component( /// entity, /// "bevy_transform::components::Transform", /// world, -/// ®istry +/// ®istry, /// )?; /// # Some(()) /// # } @@ -192,7 +206,8 @@ pub fn serialize_component_from_entity( /// - `type_registry`: Bevy's type registry for reflection metadata /// /// # Returns -/// Vector of tuples containing (component_type_path, serialized_data) for each component +/// Vector of tuples containing (component_type_path, serialized_data) for each +/// component pub fn serialize_all_components_from_entity( entity: Entity, world: &World, diff --git a/crates/lib/src/persistence/systems.rs b/crates/lib/src/persistence/systems.rs index 4c652a4..df23768 100644 --- a/crates/lib/src/persistence/systems.rs +++ b/crates/lib/src/persistence/systems.rs @@ -1,16 +1,31 @@ //! Bevy systems for the persistence layer //! -//! This module provides systems that integrate the persistence layer with Bevy's ECS. -//! These systems handle dirty tracking, write buffering, and flushing to SQLite. +//! This module provides systems that integrate the persistence layer with +//! Bevy's ECS. These systems handle dirty tracking, write buffering, and +//! flushing to SQLite. -use crate::persistence::*; -use crate::persistence::error::Result; -use bevy::prelude::*; -use bevy::tasks::{IoTaskPool, Task}; +use std::{ + sync::{ + Arc, + Mutex, + }, + time::Instant, +}; + +use bevy::{ + prelude::*, + tasks::{ + IoTaskPool, + Task, + }, +}; use futures_lite::future; use rusqlite::Connection; -use std::sync::{Arc, Mutex}; -use std::time::Instant; + +use crate::persistence::{ + error::Result, + *, +}; /// Resource wrapping the SQLite connection #[derive(Clone, bevy::prelude::Resource)] @@ -48,8 +63,9 @@ impl PersistenceDb { /// - `Ok(MutexGuard)`: Locked connection ready for use /// - `Err(PersistenceError)`: If mutex is poisoned pub fn lock(&self) -> Result> { - self.conn.lock() - .map_err(|e| PersistenceError::Other(format!("Database connection mutex poisoned: {}", e))) + self.conn.lock().map_err(|e| { + PersistenceError::Other(format!("Database connection mutex poisoned: {}", e)) + }) } } @@ -85,9 +101,9 @@ pub struct FlushResult { fn calculate_bytes_written(ops: &[PersistenceOp]) -> u64 { ops.iter() .map(|op| match op { - PersistenceOp::UpsertComponent { data, .. } => data.len() as u64, - PersistenceOp::LogOperation { operation, .. } => operation.len() as u64, - _ => 0, + | PersistenceOp::UpsertComponent { data, .. } => data.len() as u64, + | PersistenceOp::LogOperation { operation, .. } => operation.len() as u64, + | _ => 0, }) .sum() } @@ -120,10 +136,7 @@ fn perform_flush_sync( /// Helper function to perform a flush asynchronously (for normal operations) /// /// This runs on the I/O task pool to avoid blocking the main thread -fn perform_flush_async( - ops: Vec, - db: PersistenceDb, -) -> Result { +fn perform_flush_async(ops: Vec, db: PersistenceDb) -> Result { if ops.is_empty() { return Ok(FlushResult { operations_count: 0, @@ -151,10 +164,12 @@ fn perform_flush_async( /// System to flush the write buffer to SQLite asynchronously /// -/// This system runs on a schedule based on the configuration and battery status. -/// It spawns async tasks to avoid blocking the main thread and handles errors gracefully. +/// This system runs on a schedule based on the configuration and battery +/// status. It spawns async tasks to avoid blocking the main thread and handles +/// errors gracefully. /// -/// The system also polls pending flush tasks and updates metrics when they complete. +/// The system also polls pending flush tasks and updates metrics when they +/// complete. pub fn flush_system( mut write_buffer: ResMut, db: Res, @@ -170,7 +185,7 @@ pub fn flush_system( pending_tasks.tasks.retain_mut(|task| { if let Some(result) = future::block_on(future::poll_once(task)) { match result { - Ok(flush_result) => { + | Ok(flush_result) => { let previous_failures = health.consecutive_flush_failures; health.record_flush_success(); @@ -183,12 +198,10 @@ pub fn flush_system( // Emit recovery event if we recovered from failures if previous_failures > 0 { - recovery_events.write(PersistenceRecoveryEvent { - previous_failures, - }); + recovery_events.write(PersistenceRecoveryEvent { previous_failures }); } - } - Err(e) => { + }, + | Err(e) => { health.record_flush_failure(); let error_msg = format!("{}", e); @@ -205,7 +218,7 @@ pub fn flush_system( consecutive_failures: health.consecutive_flush_failures, circuit_breaker_open: health.circuit_breaker_open, }); - } + }, } false // Remove completed task } else { @@ -235,9 +248,7 @@ pub fn flush_system( let task_pool = IoTaskPool::get(); let db_clone = db.clone(); - let task = task_pool.spawn(async move { - perform_flush_async(ops, db_clone.clone()) - }); + let task = task_pool.spawn(async move { perform_flush_async(ops, db_clone.clone()) }); pending_tasks.tasks.push(task); @@ -247,7 +258,8 @@ pub fn flush_system( /// System to checkpoint the WAL file /// -/// This runs less frequently than flush_system to merge the WAL into the main database. +/// This runs less frequently than flush_system to merge the WAL into the main +/// database. pub fn checkpoint_system( db: &PersistenceDb, config: &PersistenceConfig, @@ -308,24 +320,30 @@ pub fn shutdown_system( // CRITICAL: Wait for all pending async flushes to complete // This prevents data loss from in-flight operations if let Some(pending) = pending_tasks { - info!("Waiting for {} pending flush tasks to complete before shutdown", pending.tasks.len()); + info!( + "Waiting for {} pending flush tasks to complete before shutdown", + pending.tasks.len() + ); for task in pending.tasks.drain(..) { // Block on each pending task to ensure completion match future::block_on(task) { - Ok(flush_result) => { + | Ok(flush_result) => { // Update metrics for completed flush metrics.record_flush( flush_result.operations_count, flush_result.duration, flush_result.bytes_written, ); - debug!("Pending flush completed: {} operations", flush_result.operations_count); - } - Err(e) => { + debug!( + "Pending flush completed: {} operations", + flush_result.operations_count + ); + }, + | Err(e) => { error!("Pending flush failed during shutdown: {}", e); // Continue with shutdown even if a task failed - } + }, } } diff --git a/crates/lib/src/persistence/types.rs b/crates/lib/src/persistence/types.rs index dc6022f..0f824d2 100644 --- a/crates/lib/src/persistence/types.rs +++ b/crates/lib/src/persistence/types.rs @@ -1,13 +1,26 @@ //! Core types for the persistence layer -use chrono::{DateTime, Utc}; -use serde::{Deserialize, Serialize}; -use std::collections::{HashMap, HashSet}; -use std::time::Instant; +use std::{ + collections::{ + HashMap, + HashSet, + }, + time::Instant, +}; + use bevy::prelude::Resource; +use chrono::{ + DateTime, + Utc, +}; +use serde::{ + Deserialize, + Serialize, +}; /// Maximum size for a single component in bytes (10MB) -/// Components larger than this may indicate serialization issues or unbounded data growth +/// Components larger than this may indicate serialization issues or unbounded +/// data growth const MAX_COMPONENT_SIZE_BYTES: usize = 10 * 1024 * 1024; /// Critical flush deadline in milliseconds (1 second for tier-1 operations) @@ -23,7 +36,8 @@ pub type NodeId = String; /// /// Determines how quickly an operation should be flushed to disk: /// - **Normal**: Regular batched flushing (5-60s intervals based on battery) -/// - **Critical**: Flush within 1 second (tier-1 operations like user actions, CRDT ops) +/// - **Critical**: Flush within 1 second (tier-1 operations like user actions, +/// CRDT ops) /// - **Immediate**: Flush immediately (shutdown, background suspension) #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)] pub enum FlushPriority { @@ -85,10 +99,7 @@ impl DirtyEntities { #[derive(Debug, Clone, Serialize, Deserialize)] pub enum PersistenceOp { /// Insert or update an entity's existence - UpsertEntity { - id: EntityId, - data: EntityData, - }, + UpsertEntity { id: EntityId, data: EntityData }, /// Insert or update a component on an entity UpsertComponent { @@ -105,15 +116,10 @@ pub enum PersistenceOp { }, /// Update vector clock for causality tracking - UpdateVectorClock { - node_id: NodeId, - counter: u64, - }, + UpdateVectorClock { node_id: NodeId, counter: u64 }, /// Delete an entity - DeleteEntity { - id: EntityId, - }, + DeleteEntity { id: EntityId }, /// Delete a component from an entity DeleteComponent { @@ -125,17 +131,18 @@ pub enum PersistenceOp { impl PersistenceOp { /// Get the default priority for this operation type /// - /// CRDT operations (LogOperation, UpdateVectorClock) are critical tier-1 operations - /// that should be flushed within 1 second to maintain causality across nodes. - /// Other operations use normal priority by default. + /// CRDT operations (LogOperation, UpdateVectorClock) are critical tier-1 + /// operations that should be flushed within 1 second to maintain + /// causality across nodes. Other operations use normal priority by + /// default. pub fn default_priority(&self) -> FlushPriority { match self { // CRDT operations are tier-1 (critical) - PersistenceOp::LogOperation { .. } | PersistenceOp::UpdateVectorClock { .. } => { + | PersistenceOp::LogOperation { .. } | PersistenceOp::UpdateVectorClock { .. } => { FlushPriority::Critical - } + }, // All other operations are normal priority by default - _ => FlushPriority::Normal, + | _ => FlushPriority::Normal, } } } @@ -181,7 +188,8 @@ impl WriteBuffer { /// Add an operation to the write buffer with normal priority /// - /// This is a convenience method that calls `add_with_priority` with `FlushPriority::Normal`. + /// This is a convenience method that calls `add_with_priority` with + /// `FlushPriority::Normal`. /// /// # Panics /// Panics if component data exceeds MAX_COMPONENT_SIZE_BYTES (10MB) @@ -191,8 +199,9 @@ impl WriteBuffer { /// Add an operation using its default priority /// - /// Uses `PersistenceOp::default_priority()` to determine priority automatically. - /// CRDT operations will be added as Critical, others as Normal. + /// Uses `PersistenceOp::default_priority()` to determine priority + /// automatically. CRDT operations will be added as Critical, others as + /// Normal. /// /// # Panics /// Panics if component data exceeds MAX_COMPONENT_SIZE_BYTES (10MB) @@ -212,7 +221,11 @@ impl WriteBuffer { pub fn add_with_priority(&mut self, op: PersistenceOp, priority: FlushPriority) { // Validate component size to prevent unbounded memory growth match &op { - PersistenceOp::UpsertComponent { data, component_type, .. } => { + | PersistenceOp::UpsertComponent { + data, + component_type, + .. + } => { if data.len() > MAX_COMPONENT_SIZE_BYTES { panic!( "Component {} size ({} bytes) exceeds maximum ({} bytes). \ @@ -222,8 +235,8 @@ impl WriteBuffer { MAX_COMPONENT_SIZE_BYTES ); } - } - PersistenceOp::LogOperation { operation, .. } => { + }, + | PersistenceOp::LogOperation { operation, .. } => { if operation.len() > MAX_COMPONENT_SIZE_BYTES { panic!( "Operation size ({} bytes) exceeds maximum ({} bytes)", @@ -231,12 +244,16 @@ impl WriteBuffer { MAX_COMPONENT_SIZE_BYTES ); } - } - _ => {} + }, + | _ => {}, } match &op { - PersistenceOp::UpsertComponent { entity_id, component_type, .. } => { + | PersistenceOp::UpsertComponent { + entity_id, + component_type, + .. + } => { // Remove any existing pending write for this entity+component self.pending_operations.retain(|existing_op| { !matches!(existing_op, @@ -247,8 +264,8 @@ impl WriteBuffer { } if e_id == entity_id && c_type == component_type ) }); - } - PersistenceOp::UpsertEntity { id, .. } => { + }, + | PersistenceOp::UpsertEntity { id, .. } => { // Remove any existing pending write for this entity self.pending_operations.retain(|existing_op| { !matches!(existing_op, @@ -256,10 +273,10 @@ impl WriteBuffer { if e_id == id ) }); - } - _ => { + }, + | _ => { // Other operations don't need coalescing - } + }, } // Track priority for flush urgency @@ -308,8 +325,8 @@ impl WriteBuffer { } // Normal flushing conditions - self.pending_operations.len() >= self.max_operations - || self.last_flush.elapsed() >= flush_interval + self.pending_operations.len() >= self.max_operations || + self.last_flush.elapsed() >= flush_interval } /// Get the number of pending operations @@ -370,7 +387,8 @@ impl BatteryStatus { /// Check if the device is in a battery-critical state /// - /// Returns true if battery is low (<20%) and not charging, or low power mode is enabled. + /// Returns true if battery is low (<20%) and not charging, or low power + /// mode is enabled. pub fn is_battery_critical(&self) -> bool { (self.level < 0.2 && !self.is_charging) || self.is_low_power_mode } @@ -521,8 +539,9 @@ mod tests { assert!(!buffer.should_flush(std::time::Duration::from_secs(100))); // Simulate deadline passing by manually setting the time - buffer.first_critical_time = - Some(Instant::now() - std::time::Duration::from_millis(CRITICAL_FLUSH_DEADLINE_MS + 100)); + buffer.first_critical_time = Some( + Instant::now() - std::time::Duration::from_millis(CRITICAL_FLUSH_DEADLINE_MS + 100), + ); // Now should flush due to deadline assert!(buffer.should_flush(std::time::Duration::from_secs(100))); diff --git a/crates/lib/src/sync.rs b/crates/lib/src/sync.rs index f2ad43c..7439152 100644 --- a/crates/lib/src/sync.rs +++ b/crates/lib/src/sync.rs @@ -1,24 +1,37 @@ -use chrono::{DateTime, Utc}; -use serde::{Deserialize, Serialize}; -use std::ops::{Deref, DerefMut}; - -// Re-export the macros -pub use sync_macros::{synced, Synced}; +use std::ops::{ + Deref, + DerefMut, +}; +use chrono::{ + DateTime, + Utc, +}; // Re-export common CRDT types from the crdts library pub use crdts::{ + CmRDT, + CvRDT, ctx::ReadCtx, lwwreg::LWWReg, map::Map, orswot::Orswot, - CmRDT, CvRDT, +}; +use serde::{ + Deserialize, + Serialize, +}; +// Re-export the macros +pub use sync_macros::{ + Synced, + synced, }; pub type NodeId = String; /// Transparent wrapper for synced values /// -/// This wraps any value with LWW semantics but allows you to use it like a normal value +/// This wraps any value with LWW semantics but allows you to use it like a +/// normal value #[derive(Debug, Clone, Serialize, Deserialize)] pub struct SyncedValue { value: T, @@ -55,8 +68,8 @@ impl SyncedValue { pub fn merge(&mut self, other: &Self) { // Only clone if we're actually going to use the values (when other is newer) - if other.timestamp > self.timestamp - || (other.timestamp == self.timestamp && other.node_id > self.node_id) + if other.timestamp > self.timestamp || + (other.timestamp == self.timestamp && other.node_id > self.node_id) { self.value = other.value.clone(); self.timestamp = other.timestamp; @@ -95,7 +108,10 @@ pub struct SyncMessage { impl SyncMessage { pub fn new(node_id: NodeId, operation: T) -> Self { - use std::sync::atomic::{AtomicU64, Ordering}; + use std::sync::atomic::{ + AtomicU64, + Ordering, + }; static COUNTER: AtomicU64 = AtomicU64::new(0); let seq = COUNTER.fetch_add(1, Ordering::SeqCst); @@ -134,7 +150,6 @@ pub trait Syncable: Sized { } } - #[cfg(test)] mod tests { use super::*; diff --git a/crates/lib/tests/our_messages_test.rs b/crates/lib/tests/our_messages_test.rs index d92a367..d7779ba 100644 --- a/crates/lib/tests/our_messages_test.rs +++ b/crates/lib/tests/our_messages_test.rs @@ -1,5 +1,8 @@ -use lib::{ChatDb, Result}; use chrono::Datelike; +use lib::{ + ChatDb, + Result, +}; /// Test that we can get messages from the Dutch phone number conversation #[test] @@ -12,11 +15,14 @@ fn test_get_our_messages_default_range() -> Result<()> { println!("Found {} messages from January 2024 to now", messages.len()); // Verify we got some messages - assert!(messages.len() > 0, "Should find messages in the conversation"); + assert!( + messages.len() > 0, + "Should find messages in the conversation" + ); // Verify messages are in chronological order (ASC) for i in 1..messages.len().min(10) { - if let (Some(prev_date), Some(curr_date)) = (messages[i-1].date, messages[i].date) { + if let (Some(prev_date), Some(curr_date)) = (messages[i - 1].date, messages[i].date) { assert!( prev_date <= curr_date, "Messages should be in ascending date order" @@ -28,8 +34,12 @@ fn test_get_our_messages_default_range() -> Result<()> { for msg in messages.iter().take(10) { if let Some(date) = msg.date { assert!(date.year() >= 2024, "Messages should be from 2024 or later"); - println!("Message date: {}, from_me: {}, text: {:?}", - date, msg.is_from_me, msg.text.as_ref().map(|s| &s[..s.len().min(50)])); + println!( + "Message date: {}, from_me: {}, text: {:?}", + date, + msg.is_from_me, + msg.text.as_ref().map(|s| &s[..s.len().min(50)]) + ); } } @@ -39,7 +49,10 @@ fn test_get_our_messages_default_range() -> Result<()> { /// Test that we can get messages with a custom date range #[test] fn test_get_our_messages_custom_range() -> Result<()> { - use chrono::{TimeZone, Utc}; + use chrono::{ + TimeZone, + Utc, + }; let db = ChatDb::open("chat.db")?; @@ -57,7 +70,9 @@ fn test_get_our_messages_custom_range() -> Result<()> { assert!( date >= start && date <= end, "Message date {} should be between {} and {}", - date, start, end + date, + start, + end ); } } @@ -86,11 +101,25 @@ fn test_conversation_summary() -> Result<()> { for (i, msg) in messages.iter().take(5).enumerate() { if let Some(date) = msg.date { let sender = if msg.is_from_me { "Me" } else { "Them" }; - let text = msg.text.as_ref() - .map(|t| if t.len() > 60 { format!("{}...", &t[..60]) } else { t.clone() }) + let text = msg + .text + .as_ref() + .map(|t| { + if t.len() > 60 { + format!("{}...", &t[..60]) + } else { + t.clone() + } + }) .unwrap_or_else(|| "[No text]".to_string()); - println!("{}. {} ({}): {}", i + 1, date.format("%Y-%m-%d %H:%M"), sender, text); + println!( + "{}. {} ({}): {}", + i + 1, + date.format("%Y-%m-%d %H:%M"), + sender, + text + ); } } diff --git a/crates/lib/tests/sync_integration.rs b/crates/lib/tests/sync_integration.rs index 317cefa..2694346 100644 --- a/crates/lib/tests/sync_integration.rs +++ b/crates/lib/tests/sync_integration.rs @@ -1,7 +1,19 @@ -use lib::sync::{synced, SyncMessage, Syncable}; -use iroh::{Endpoint, protocol::{Router, ProtocolHandler, AcceptError}}; -use anyhow::Result; use std::sync::Arc; + +use anyhow::Result; +use iroh::{ + Endpoint, + protocol::{ + AcceptError, + ProtocolHandler, + Router, + }, +}; +use lib::sync::{ + SyncMessage, + Syncable, + synced, +}; use tokio::sync::Mutex; /// Test configuration that can be synced @@ -28,20 +40,25 @@ impl ProtocolHandler for SyncProtocol { println!("Accepting connection from: {}", connection.remote_id()); // Accept the bidirectional stream - let (mut send, mut recv) = connection.accept_bi().await + let (mut send, mut recv) = connection + .accept_bi() + .await .map_err(AcceptError::from_err)?; println!("Stream accepted, reading message..."); // Read the sync message - let bytes = recv.read_to_end(1024 * 1024).await + let bytes = recv + .read_to_end(1024 * 1024) + .await .map_err(AcceptError::from_err)?; println!("Received {} bytes", bytes.len()); // Deserialize and apply - let msg = SyncMessage::::from_bytes(&bytes) - .map_err(|e| AcceptError::from_err(std::io::Error::new(std::io::ErrorKind::InvalidData, e)))?; + let msg = SyncMessage::::from_bytes(&bytes).map_err(|e| { + AcceptError::from_err(std::io::Error::new(std::io::ErrorKind::InvalidData, e)) + })?; println!("Applying operation from node: {}", msg.node_id); @@ -51,8 +68,7 @@ impl ProtocolHandler for SyncProtocol { println!("Operation applied successfully"); // Close the stream - send.finish() - .map_err(AcceptError::from_err)?; + send.finish().map_err(AcceptError::from_err)?; Ok(()) } @@ -76,24 +92,24 @@ async fn test_sync_between_two_nodes() -> Result<()> { println!("Node 2: {}", node2_id); // Create synced configs on both nodes - let mut config1 = TestConfig::new( - 42, - "initial".to_string(), - node1_id.clone(), - ); + let mut config1 = TestConfig::new(42, "initial".to_string(), node1_id.clone()); - let config2 = TestConfig::new( - 42, - "initial".to_string(), - node2_id.clone(), - ); + let config2 = TestConfig::new(42, "initial".to_string(), node2_id.clone()); let config2_shared = Arc::new(Mutex::new(config2)); println!("\nInitial state:"); - println!(" Node 1: value={}, name={}", config1.value(), config1.name()); + println!( + " Node 1: value={}, name={}", + config1.value(), + config1.name() + ); { let config2 = config2_shared.lock().await; - println!(" Node 2: value={}, name={}", config2.value(), config2.name()); + println!( + " Node 2: value={}, name={}", + config2.value(), + config2.name() + ); } // Set up router on node2 to accept incoming connections @@ -101,9 +117,7 @@ async fn test_sync_between_two_nodes() -> Result<()> { let protocol = SyncProtocol { config: config2_shared.clone(), }; - let router = Router::builder(node2) - .accept(SYNC_ALPN, protocol) - .spawn(); + let router = Router::builder(node2).accept(SYNC_ALPN, protocol).spawn(); router.endpoint().online().await; println!("✓ Node2 router ready"); @@ -136,10 +150,18 @@ async fn test_sync_between_two_nodes() -> Result<()> { // Verify both configs have the same value println!("\nFinal state:"); - println!(" Node 1: value={}, name={}", config1.value(), config1.name()); + println!( + " Node 1: value={}, name={}", + config1.value(), + config1.name() + ); { let config2 = config2_shared.lock().await; - println!(" Node 2: value={}, name={}", config2.value(), config2.name()); + println!( + " Node 2: value={}, name={}", + config2.value(), + config2.name() + ); assert_eq!(*config1.value(), 100); assert_eq!(*config2.value(), 100); diff --git a/crates/server/src/components/database.rs b/crates/server/src/components/database.rs index eeca181..e4913a6 100644 --- a/crates/server/src/components/database.rs +++ b/crates/server/src/components/database.rs @@ -1,7 +1,8 @@ +use std::sync::Arc; + use bevy::prelude::*; use parking_lot::Mutex; use rusqlite::Connection; -use std::sync::Arc; use crate::config::Config; diff --git a/crates/server/src/components/gossip.rs b/crates/server/src/components/gossip.rs index e460888..32e0475 100644 --- a/crates/server/src/components/gossip.rs +++ b/crates/server/src/components/gossip.rs @@ -1,13 +1,24 @@ -use bevy::prelude::*; -use iroh::protocol::Router; -use iroh::Endpoint; -use iroh_gossip::api::{GossipReceiver, GossipSender}; -use iroh_gossip::net::Gossip; -use iroh_gossip::proto::TopicId; -use parking_lot::Mutex; -use serde::{Deserialize, Serialize}; use std::sync::Arc; +use bevy::prelude::*; +use iroh::{ + Endpoint, + protocol::Router, +}; +use iroh_gossip::{ + api::{ + GossipReceiver, + GossipSender, + }, + net::Gossip, + proto::TopicId, +}; +use parking_lot::Mutex; +use serde::{ + Deserialize, + Serialize, +}; + /// Message envelope for gossip sync #[derive(Debug, Clone, Serialize, Deserialize)] pub struct SyncMessage { @@ -56,13 +67,9 @@ pub struct GossipTopic(pub TopicId); /// Bevy resource for tracking gossip initialization task #[derive(Resource)] -pub struct GossipInitTask(pub bevy::tasks::Task>); +pub struct GossipInitTask( + pub bevy::tasks::Task>, +); /// Bevy message: a new message that needs to be published to gossip #[derive(Message, Clone, Debug)] @@ -70,7 +77,8 @@ pub struct PublishMessageEvent { pub message: lib::Message, } -/// Bevy message: a message received from gossip that needs to be saved to SQLite +/// Bevy message: a message received from gossip that needs to be saved to +/// SQLite #[derive(Message, Clone, Debug)] pub struct GossipMessageReceived { pub sync_message: SyncMessage, diff --git a/crates/server/src/config.rs b/crates/server/src/config.rs index aeb295d..0a5ee2a 100644 --- a/crates/server/src/config.rs +++ b/crates/server/src/config.rs @@ -1,7 +1,16 @@ -use anyhow::{Context, Result}; -use serde::{Deserialize, Serialize}; -use std::fs; -use std::path::Path; +use std::{ + fs, + path::Path, +}; + +use anyhow::{ + Context, + Result, +}; +use serde::{ + Deserialize, + Serialize, +}; #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Config { @@ -45,8 +54,7 @@ impl Config { pub fn from_file>(path: P) -> Result { let content = fs::read_to_string(path.as_ref()) .context(format!("Failed to read config file: {:?}", path.as_ref()))?; - let config: Config = toml::from_str(&content) - .context("Failed to parse config file")?; + let config: Config = toml::from_str(&content).context("Failed to parse config file")?; Ok(config) } @@ -68,15 +76,12 @@ impl Config { hostname: "lonni-daemon".to_string(), state_dir: "./tailscale-state".to_string(), }, - grpc: GrpcConfig { - port: 50051, - }, + grpc: GrpcConfig { port: 50051 }, } } pub fn save>(&self, path: P) -> Result<()> { - let content = toml::to_string_pretty(self) - .context("Failed to serialize config")?; + let content = toml::to_string_pretty(self).context("Failed to serialize config")?; fs::write(path.as_ref(), content) .context(format!("Failed to write config file: {:?}", path.as_ref()))?; Ok(()) diff --git a/crates/server/src/db/operations.rs b/crates/server/src/db/operations.rs index 8427121..97ddf2c 100644 --- a/crates/server/src/db/operations.rs +++ b/crates/server/src/db/operations.rs @@ -1,7 +1,22 @@ -use crate::db::schema::{deserialize_embedding, serialize_embedding}; -use crate::models::*; -use chrono::{TimeZone, Utc}; -use rusqlite::{params, Connection, OptionalExtension, Result, Row}; +use chrono::{ + TimeZone, + Utc, +}; +use rusqlite::{ + Connection, + OptionalExtension, + Result, + Row, + params, +}; + +use crate::{ + db::schema::{ + deserialize_embedding, + serialize_embedding, + }, + models::*, +}; /// Insert a new message into the database pub fn insert_message(conn: &Connection, msg: &lib::Message) -> Result { @@ -71,7 +86,10 @@ pub fn insert_message_embedding( } /// Get message embedding -pub fn get_message_embedding(conn: &Connection, message_id: i64) -> Result> { +pub fn get_message_embedding( + conn: &Connection, + message_id: i64, +) -> Result> { conn.query_row( "SELECT id, message_id, embedding, model_name, created_at FROM message_embeddings WHERE message_id = ?1", @@ -203,7 +221,7 @@ pub fn list_emotions( ) -> Result> { let mut query = String::from( "SELECT id, message_id, emotion, confidence, model_version, created_at, updated_at - FROM emotions WHERE 1=1" + FROM emotions WHERE 1=1", ); if emotion_filter.is_some() { diff --git a/crates/server/src/db/schema.rs b/crates/server/src/db/schema.rs index 7b79067..f73025d 100644 --- a/crates/server/src/db/schema.rs +++ b/crates/server/src/db/schema.rs @@ -1,4 +1,7 @@ -use rusqlite::{Connection, Result}; +use rusqlite::{ + Connection, + Result, +}; use tracing::info; pub fn initialize_database(conn: &Connection) -> Result<()> { @@ -9,14 +12,17 @@ pub fn initialize_database(conn: &Connection) -> Result<()> { // Try to load the vector extension (non-fatal if it fails for now) match unsafe { conn.load_extension_enable() } { - Ok(_) => { + | Ok(_) => { match unsafe { conn.load_extension(vec_path, None::<&str>) } { - Ok(_) => info!("Loaded sqlite-vec extension"), - Err(e) => info!("Could not load sqlite-vec extension: {}. Vector operations will not be available.", e), + | Ok(_) => info!("Loaded sqlite-vec extension"), + | Err(e) => info!( + "Could not load sqlite-vec extension: {}. Vector operations will not be available.", + e + ), } let _ = unsafe { conn.load_extension_disable() }; - } - Err(e) => info!("Extension loading not enabled: {}", e), + }, + | Err(e) => info!("Extension loading not enabled: {}", e), } // Create messages table @@ -172,10 +178,7 @@ pub fn initialize_database(conn: &Connection) -> Result<()> { /// Helper function to serialize f32 vector to bytes for storage pub fn serialize_embedding(embedding: &[f32]) -> Vec { - embedding - .iter() - .flat_map(|f| f.to_le_bytes()) - .collect() + embedding.iter().flat_map(|f| f.to_le_bytes()).collect() } /// Helper function to deserialize bytes back to f32 vector diff --git a/crates/server/src/iroh_sync.rs b/crates/server/src/iroh_sync.rs index 2af93f4..aa0aeaa 100644 --- a/crates/server/src/iroh_sync.rs +++ b/crates/server/src/iroh_sync.rs @@ -1,9 +1,16 @@ use anyhow::Result; -use iroh::protocol::Router; -use iroh::Endpoint; -use iroh_gossip::api::{GossipReceiver, GossipSender}; -use iroh_gossip::net::Gossip; -use iroh_gossip::proto::TopicId; +use iroh::{ + Endpoint, + protocol::Router, +}; +use iroh_gossip::{ + api::{ + GossipReceiver, + GossipSender, + }, + net::Gossip, + proto::TopicId, +}; /// Initialize Iroh endpoint and gossip for the given topic pub async fn init_iroh_gossip( diff --git a/crates/server/src/main.rs b/crates/server/src/main.rs index 162ca2d..43179a5 100644 --- a/crates/server/src/main.rs +++ b/crates/server/src/main.rs @@ -8,20 +8,24 @@ mod models; mod services; mod systems; -use anyhow::{Context, Result}; +use std::{ + path::Path, + sync::Arc, +}; + +use anyhow::{ + Context, + Result, +}; use bevy::prelude::*; -use config::Config; -use iroh_gossip::proto::TopicId; -use parking_lot::Mutex; -use rusqlite::Connection; -use std::path::Path; -use std::sync::Arc; - -// Re-export init function -pub use iroh_sync::init_iroh_gossip; - // Import components and systems use components::*; +use config::Config; +use iroh_gossip::proto::TopicId; +// Re-export init function +pub use iroh_sync::init_iroh_gossip; +use parking_lot::Mutex; +use rusqlite::Connection; use systems::*; fn main() { @@ -29,11 +33,11 @@ fn main() { // Load configuration and initialize database let (config, us_db) = match initialize_app() { - Ok(data) => data, - Err(e) => { + | Ok(data) => data, + | Err(e) => { eprintln!("Failed to initialize app: {}", e); return; - } + }, }; // Create a topic ID for gossip (use a fixed topic for now) @@ -85,8 +89,7 @@ fn initialize_app() -> Result<(Config, Arc>)> { // Initialize database println!("Initializing database at {}", config.database.path); - let conn = - Connection::open(&config.database.path).context("Failed to open database")?; + let conn = Connection::open(&config.database.path).context("Failed to open database")?; db::initialize_database(&conn).context("Failed to initialize database schema")?; diff --git a/crates/server/src/models.rs b/crates/server/src/models.rs index 6f6df77..a158398 100644 --- a/crates/server/src/models.rs +++ b/crates/server/src/models.rs @@ -1,5 +1,11 @@ -use chrono::{DateTime, Utc}; -use serde::{Deserialize, Serialize}; +use chrono::{ + DateTime, + Utc, +}; +use serde::{ + Deserialize, + Serialize, +}; /// Represents a message stored in our database #[derive(Debug, Clone, Serialize, Deserialize)] diff --git a/crates/server/src/services/chat_poller.rs b/crates/server/src/services/chat_poller.rs index 4e0a15b..626da04 100644 --- a/crates/server/src/services/chat_poller.rs +++ b/crates/server/src/services/chat_poller.rs @@ -1,13 +1,30 @@ -use crate::db; -use anyhow::{Context, Result}; +use std::{ + path::Path, + sync::Arc, + time::Duration, +}; + +use anyhow::{ + Context, + Result, +}; use chrono::Utc; use rusqlite::Connection; -use std::path::Path; -use std::sync::Arc; -use std::time::Duration; -use tokio::sync::{mpsc, Mutex}; -use tokio::time; -use tracing::{debug, error, info, warn}; +use tokio::{ + sync::{ + Mutex, + mpsc, + }, + time, +}; +use tracing::{ + debug, + error, + info, + warn, +}; + +use crate::db; pub struct ChatPollerService { chat_db_path: String, @@ -33,12 +50,15 @@ impl ChatPollerService { pub async fn run(&self) -> Result<()> { info!("Starting chat poller service"); - info!("Polling {} every {:?}", self.chat_db_path, self.poll_interval); + info!( + "Polling {} every {:?}", + self.chat_db_path, self.poll_interval + ); // Get last processed rowid from database let us_db = self.us_db.lock().await; - let mut last_rowid = db::get_last_processed_rowid(&us_db) - .context("Failed to get last processed rowid")?; + let mut last_rowid = + db::get_last_processed_rowid(&us_db).context("Failed to get last processed rowid")?; drop(us_db); info!("Starting from rowid: {}", last_rowid); @@ -49,7 +69,7 @@ impl ChatPollerService { interval.tick().await; match self.poll_messages(last_rowid).await { - Ok(new_messages) => { + | Ok(new_messages) => { if !new_messages.is_empty() { info!("Found {} new messages", new_messages.len()); @@ -74,10 +94,10 @@ impl ChatPollerService { } else { debug!("No new messages"); } - } - Err(e) => { + }, + | Err(e) => { error!("Error polling messages: {}", e); - } + }, } } } @@ -85,12 +105,14 @@ impl ChatPollerService { async fn poll_messages(&self, last_rowid: i64) -> Result> { // Check if chat.db exists if !Path::new(&self.chat_db_path).exists() { - return Err(anyhow::anyhow!("chat.db not found at {}", self.chat_db_path)); + return Err(anyhow::anyhow!( + "chat.db not found at {}", + self.chat_db_path + )); } // Open chat.db (read-only) - let chat_db = lib::ChatDb::open(&self.chat_db_path) - .context("Failed to open chat.db")?; + let chat_db = lib::ChatDb::open(&self.chat_db_path).context("Failed to open chat.db")?; // Get messages with rowid > last_rowid // We'll use the existing get_our_messages but need to filter by rowid diff --git a/crates/server/src/services/embedding_service.rs b/crates/server/src/services/embedding_service.rs index 8371502..bf549c1 100644 --- a/crates/server/src/services/embedding_service.rs +++ b/crates/server/src/services/embedding_service.rs @@ -1,9 +1,18 @@ -use crate::db; +use std::sync::Arc; + use anyhow::Result; use rusqlite::Connection; -use std::sync::Arc; -use tokio::sync::{mpsc, Mutex}; -use tracing::{error, info, warn}; +use tokio::sync::{ + Mutex, + mpsc, +}; +use tracing::{ + error, + info, + warn, +}; + +use crate::db; /// Service responsible for generating embeddings for messages and words pub struct EmbeddingService { @@ -47,11 +56,11 @@ impl EmbeddingService { // Get message ID from our database let us_db = self.us_db.lock().await; let message_id = match db::get_message_id_by_chat_rowid(&us_db, msg.rowid)? { - Some(id) => id, - None => { + | Some(id) => id, + | None => { warn!("Message {} not found in database, skipping", msg.rowid); return Ok(()); - } + }, }; // Check if embedding already exists @@ -61,8 +70,8 @@ impl EmbeddingService { // Skip if message has no text let text = match &msg.text { - Some(t) if !t.is_empty() => t, - _ => return Ok(()), + | Some(t) if !t.is_empty() => t, + | _ => return Ok(()), }; drop(us_db); diff --git a/crates/server/src/services/emotion_service.rs b/crates/server/src/services/emotion_service.rs index 6831b05..3b28f1d 100644 --- a/crates/server/src/services/emotion_service.rs +++ b/crates/server/src/services/emotion_service.rs @@ -1,9 +1,18 @@ -use crate::db; +use std::sync::Arc; + use anyhow::Result; use rusqlite::Connection; -use std::sync::Arc; -use tokio::sync::{mpsc, Mutex}; -use tracing::{error, info, warn}; +use tokio::sync::{ + Mutex, + mpsc, +}; +use tracing::{ + error, + info, + warn, +}; + +use crate::db; /// Service responsible for classifying emotions in messages pub struct EmotionService { @@ -56,11 +65,11 @@ impl EmotionService { // Get message ID from our database let us_db = self.us_db.lock().await; let message_id = match db::get_message_id_by_chat_rowid(&us_db, msg.rowid)? { - Some(id) => id, - None => { + | Some(id) => id, + | None => { warn!("Message {} not found in database, skipping", msg.rowid); return Ok(()); - } + }, }; // Check if emotion classification already exists @@ -70,8 +79,8 @@ impl EmotionService { // Skip if message has no text let text = match &msg.text { - Some(t) if !t.is_empty() => t, - _ => return Ok(()), + | Some(t) if !t.is_empty() => t, + | _ => return Ok(()), }; drop(us_db); @@ -82,7 +91,13 @@ impl EmotionService { // Store emotion classification let us_db = self.us_db.lock().await; - db::insert_emotion(&us_db, message_id, &emotion, confidence, &self.model_version)?; + db::insert_emotion( + &us_db, + message_id, + &emotion, + confidence, + &self.model_version, + )?; // Randomly add to training set based on sample rate if rand::random::() < self.training_sample_rate { diff --git a/crates/server/src/services/mod.rs b/crates/server/src/services/mod.rs index 15ea300..e9c812f 100644 --- a/crates/server/src/services/mod.rs +++ b/crates/server/src/services/mod.rs @@ -4,4 +4,4 @@ pub mod emotion_service; pub use chat_poller::ChatPollerService; pub use embedding_service::EmbeddingService; -pub use emotion_service::EmotionService; \ No newline at end of file +pub use emotion_service::EmotionService; diff --git a/crates/server/src/systems/database.rs b/crates/server/src/systems/database.rs index 71ff22e..59eb520 100644 --- a/crates/server/src/systems/database.rs +++ b/crates/server/src/systems/database.rs @@ -3,10 +3,7 @@ use bevy::prelude::*; use crate::components::*; /// System: Poll chat.db for new messages using Bevy's task system -pub fn poll_chat_db( - _config: Res, - _db: Res, -) { +pub fn poll_chat_db(_config: Res, _db: Res) { // TODO: Use Bevy's AsyncComputeTaskPool to poll chat.db // This will replace the tokio::spawn chat poller } diff --git a/crates/server/src/systems/gossip.rs b/crates/server/src/systems/gossip.rs index 3828b81..66ee8e7 100644 --- a/crates/server/src/systems/gossip.rs +++ b/crates/server/src/systems/gossip.rs @@ -1,17 +1,17 @@ +use std::sync::Arc; + use bevy::prelude::*; use parking_lot::Mutex; -use std::sync::Arc; use crate::components::*; /// System: Poll the gossip init task and insert resources when complete -pub fn poll_gossip_init( - mut commands: Commands, - mut init_task: Option>, -) { +pub fn poll_gossip_init(mut commands: Commands, mut init_task: Option>) { if let Some(mut task) = init_task { // Check if the task is finished (non-blocking) - if let Some(result) = bevy::tasks::block_on(bevy::tasks::futures_lite::future::poll_once(&mut task.0)) { + if let Some(result) = + bevy::tasks::block_on(bevy::tasks::futures_lite::future::poll_once(&mut task.0)) + { if let Some((endpoint, gossip, router, sender, receiver)) = result { println!("Inserting gossip resources"); @@ -72,17 +72,19 @@ pub fn publish_to_gossip( // Serialize the message match serialize_sync_message(&sync_message) { - Ok(bytes) => { + | Ok(bytes) => { // TODO: Publish to gossip // For now, just log that we would publish println!("Would publish {} bytes to gossip", bytes.len()); - // Note: Direct async broadcasting from Bevy systems is tricky due to Sync requirements - // We'll need to use a different approach, possibly with channels or a dedicated task - } - Err(e) => { + // Note: Direct async broadcasting from Bevy systems is tricky + // due to Sync requirements We'll need to use a + // different approach, possibly with channels or a dedicated + // task + }, + | Err(e) => { eprintln!("Failed to serialize sync message: {}", e); - } + }, } } } @@ -98,19 +100,18 @@ pub fn receive_from_gossip( } // TODO: Implement proper async message reception - // This will require spawning a long-running task that listens for gossip events - // and sends them as Bevy messages. For now, this is a placeholder. + // This will require spawning a long-running task that listens for gossip + // events and sends them as Bevy messages. For now, this is a + // placeholder. } /// System: Save received gossip messages to SQLite -pub fn save_gossip_messages( - mut events: MessageReader, - _db: Res, -) { +pub fn save_gossip_messages(mut events: MessageReader, _db: Res) { for event in events.read() { - println!("Received message {} from gossip (published by {})", - event.sync_message.message.rowid, - event.sync_message.publisher_node_id); + println!( + "Received message {} from gossip (published by {})", + event.sync_message.message.rowid, event.sync_message.publisher_node_id + ); // TODO: Save to SQLite if we don't already have it } } diff --git a/crates/server/src/systems/setup.rs b/crates/server/src/systems/setup.rs index abeb34f..7e303f7 100644 --- a/crates/server/src/systems/setup.rs +++ b/crates/server/src/systems/setup.rs @@ -1,5 +1,7 @@ -use bevy::prelude::*; -use bevy::tasks::AsyncComputeTaskPool; +use bevy::{ + prelude::*, + tasks::AsyncComputeTaskPool, +}; use crate::components::*; diff --git a/crates/sync-macros/src/lib.rs b/crates/sync-macros/src/lib.rs index 820af4d..cc9de21 100644 --- a/crates/sync-macros/src/lib.rs +++ b/crates/sync-macros/src/lib.rs @@ -1,6 +1,16 @@ use proc_macro::TokenStream; -use quote::{quote, format_ident}; -use syn::{parse_macro_input, DeriveInput, Data, Fields, Type, ItemStruct}; +use quote::{ + format_ident, + quote, +}; +use syn::{ + Data, + DeriveInput, + Fields, + ItemStruct, + Type, + parse_macro_input, +}; /// Attribute macro for transparent CRDT sync /// @@ -10,17 +20,17 @@ use syn::{parse_macro_input, DeriveInput, Data, Fields, Type, ItemStruct}; /// ``` /// #[synced] /// struct EmotionGradientConfig { -/// canvas_width: f32, // Becomes SyncedValue internally -/// canvas_height: f32, // Auto-generates getters/setters +/// canvas_width: f32, // Becomes SyncedValue internally +/// canvas_height: f32, // Auto-generates getters/setters /// /// #[sync(skip)] -/// node_id: String, // Not synced +/// node_id: String, // Not synced /// } /// /// // Use it like a normal struct: /// let mut config = EmotionGradientConfig::new("node1".into()); -/// config.set_canvas_width(1024.0); // Auto-generates sync operation -/// println!("Width: {}", config.canvas_width()); // Transparent access +/// config.set_canvas_width(1024.0); // Auto-generates sync operation +/// println!("Width: {}", config.canvas_width()); // Transparent access /// ``` #[proc_macro_attribute] pub fn synced(_attr: TokenStream, item: TokenStream) -> TokenStream { @@ -30,8 +40,8 @@ pub fn synced(_attr: TokenStream, item: TokenStream) -> TokenStream { let op_enum_name = format_ident!("{}Op", name); let fields = match &input.fields { - Fields::Named(fields) => &fields.named, - _ => panic!("synced only supports structs with named fields"), + | Fields::Named(fields) => &fields.named, + | _ => panic!("synced only supports structs with named fields"), }; let mut internal_fields = Vec::new(); @@ -50,9 +60,8 @@ pub fn synced(_attr: TokenStream, item: TokenStream) -> TokenStream { // Check if field should be skipped let should_skip = field.attrs.iter().any(|attr| { - attr.path().is_ident("sync") - && attr - .parse_args::() + attr.path().is_ident("sync") && + attr.parse_args::() .map(|i| i == "skip") .unwrap_or(false) }); @@ -87,11 +96,7 @@ pub fn synced(_attr: TokenStream, item: TokenStream) -> TokenStream { .to_string() .chars() .enumerate() - .map(|(i, c)| if i == 0 { - c.to_ascii_uppercase() - } else { - c - }) + .map(|(i, c)| if i == 0 { c.to_ascii_uppercase() } else { c }) .collect::() ); @@ -209,11 +214,11 @@ pub fn derive_synced(input: TokenStream) -> TokenStream { let op_enum_name = format_ident!("{}Op", name); let fields = match &input.data { - Data::Struct(data) => match &data.fields { - Fields::Named(fields) => &fields.named, - _ => panic!("Synced only supports structs with named fields"), + | Data::Struct(data) => match &data.fields { + | Fields::Named(fields) => &fields.named, + | _ => panic!("Synced only supports structs with named fields"), }, - _ => panic!("Synced only supports structs"), + | _ => panic!("Synced only supports structs"), }; let mut field_ops = Vec::new(); @@ -226,20 +231,21 @@ pub fn derive_synced(input: TokenStream) -> TokenStream { let field_type = &field.ty; // Check if field should be skipped - let should_skip = field.attrs.iter() - .any(|attr| { - attr.path().is_ident("sync") && + let should_skip = field.attrs.iter().any(|attr| { + attr.path().is_ident("sync") && attr.parse_args::() .map(|i| i == "skip") .unwrap_or(false) - }); + }); if should_skip { continue; } - let op_variant = format_ident!("Set{}", - field_name.to_string() + let op_variant = format_ident!( + "Set{}", + field_name + .to_string() .chars() .enumerate() .map(|(i, c)| if i == 0 { c.to_ascii_uppercase() } else { c }) @@ -252,7 +258,7 @@ pub fn derive_synced(input: TokenStream) -> TokenStream { let crdt_strategy = get_crdt_strategy(field_type); match crdt_strategy.as_str() { - "lww" => { + | "lww" => { // LWW for simple types field_ops.push(quote! { #op_variant { @@ -283,10 +289,10 @@ pub fn derive_synced(input: TokenStream) -> TokenStream { merge_code.push(quote! { self.#field_name.merge(&other.#field_name); }); - } - _ => { + }, + | _ => { // Default to LWW - } + }, } }