Signed-off-by: Sienna Meridian Satterwhite <sienna@r3t.io>
This commit is contained in:
2025-11-16 11:50:49 +00:00
parent a15e018876
commit 0f65b1baa2
33 changed files with 766 additions and 460 deletions

View File

@@ -1,11 +1,21 @@
//! Database schema and operations for persistence layer
use crate::persistence::types::*;
use crate::persistence::error::{PersistenceError, Result};
use chrono::Utc;
use rusqlite::{Connection, OptionalExtension};
use std::path::Path;
use chrono::Utc;
use rusqlite::{
Connection,
OptionalExtension,
};
use crate::persistence::{
error::{
PersistenceError,
Result,
},
types::*,
};
/// Default SQLite page size in bytes (4KB)
const DEFAULT_PAGE_SIZE: i64 = 4096;
@@ -164,7 +174,7 @@ pub fn flush_to_sqlite(ops: &[PersistenceOp], conn: &mut Connection) -> Result<u
for op in ops {
match op {
PersistenceOp::UpsertEntity { id, data } => {
| PersistenceOp::UpsertEntity { id, data } => {
tx.execute(
"INSERT OR REPLACE INTO entities (id, entity_type, created_at, updated_at)
VALUES (?1, ?2, ?3, ?4)",
@@ -176,9 +186,9 @@ pub fn flush_to_sqlite(ops: &[PersistenceOp], conn: &mut Connection) -> Result<u
],
)?;
count += 1;
}
},
PersistenceOp::UpsertComponent {
| PersistenceOp::UpsertComponent {
entity_id,
component_type,
data,
@@ -194,9 +204,9 @@ pub fn flush_to_sqlite(ops: &[PersistenceOp], conn: &mut Connection) -> Result<u
],
)?;
count += 1;
}
},
PersistenceOp::LogOperation {
| PersistenceOp::LogOperation {
node_id,
sequence,
operation,
@@ -212,23 +222,26 @@ pub fn flush_to_sqlite(ops: &[PersistenceOp], conn: &mut Connection) -> Result<u
],
)?;
count += 1;
}
},
PersistenceOp::UpdateVectorClock { node_id, counter } => {
| PersistenceOp::UpdateVectorClock { node_id, counter } => {
tx.execute(
"INSERT OR REPLACE INTO vector_clock (node_id, counter, updated_at)
VALUES (?1, ?2, ?3)",
rusqlite::params![node_id, counter, current_timestamp()],
)?;
count += 1;
}
},
PersistenceOp::DeleteEntity { id } => {
tx.execute("DELETE FROM entities WHERE id = ?1", rusqlite::params![id.as_bytes()])?;
| PersistenceOp::DeleteEntity { id } => {
tx.execute(
"DELETE FROM entities WHERE id = ?1",
rusqlite::params![id.as_bytes()],
)?;
count += 1;
}
},
PersistenceOp::DeleteComponent {
| PersistenceOp::DeleteComponent {
entity_id,
component_type,
} => {
@@ -237,7 +250,7 @@ pub fn flush_to_sqlite(ops: &[PersistenceOp], conn: &mut Connection) -> Result<u
rusqlite::params![entity_id.as_bytes(), component_type],
)?;
count += 1;
}
},
}
}
@@ -255,7 +268,8 @@ pub fn flush_to_sqlite(ops: &[PersistenceOp], conn: &mut Connection) -> Result<u
///
/// # Parameters
/// - `conn`: Mutable reference to the SQLite connection
/// - `mode`: Checkpoint mode controlling blocking behavior (see [`CheckpointMode`])
/// - `mode`: Checkpoint mode controlling blocking behavior (see
/// [`CheckpointMode`])
///
/// # Returns
/// - `Ok(CheckpointInfo)`: Information about the checkpoint operation
@@ -276,17 +290,19 @@ pub fn flush_to_sqlite(ops: &[PersistenceOp], conn: &mut Connection) -> Result<u
/// ```
pub fn checkpoint_wal(conn: &mut Connection, mode: CheckpointMode) -> Result<CheckpointInfo> {
let mode_str = match mode {
CheckpointMode::Passive => "PASSIVE",
CheckpointMode::Full => "FULL",
CheckpointMode::Restart => "RESTART",
CheckpointMode::Truncate => "TRUNCATE",
| CheckpointMode::Passive => "PASSIVE",
| CheckpointMode::Full => "FULL",
| CheckpointMode::Restart => "RESTART",
| CheckpointMode::Truncate => "TRUNCATE",
};
let query = format!("PRAGMA wal_checkpoint({})", mode_str);
// Returns (busy, log_pages, checkpointed_pages)
let (busy, log_pages, checkpointed_pages): (i32, i32, i32) =
conn.query_row(&query, [], |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)))?;
conn.query_row(&query, [], |row| {
Ok((row.get(0)?, row.get(1)?, row.get(2)?))
})?;
// Update checkpoint state
conn.execute(
@@ -303,15 +319,16 @@ pub fn checkpoint_wal(conn: &mut Connection, mode: CheckpointMode) -> Result<Che
/// Get the size of the WAL file in bytes
///
/// This checks the actual WAL file size on disk without triggering a checkpoint.
/// Large WAL files consume disk space and can slow down recovery, so monitoring
/// size helps maintain optimal performance.
/// This checks the actual WAL file size on disk without triggering a
/// checkpoint. Large WAL files consume disk space and can slow down recovery,
/// so monitoring size helps maintain optimal performance.
///
/// # Parameters
/// - `conn`: Reference to the SQLite connection
///
/// # Returns
/// - `Ok(i64)`: WAL file size in bytes (0 if no WAL exists or in-memory database)
/// - `Ok(i64)`: WAL file size in bytes (0 if no WAL exists or in-memory
/// database)
/// - `Err`: If the database path query fails
///
/// # Note
@@ -332,8 +349,8 @@ pub fn get_wal_size(conn: &Connection) -> Result<i64> {
// Check if WAL file exists and get its size
match std::fs::metadata(&wal_path) {
Ok(metadata) => Ok(metadata.len() as i64),
Err(_) => Ok(0), // WAL doesn't exist yet
| Ok(metadata) => Ok(metadata.len() as i64),
| Err(_) => Ok(0), // WAL doesn't exist yet
}
}
@@ -360,8 +377,9 @@ pub struct CheckpointInfo {
/// Set a session state value in the database
///
/// Session state is used to track application lifecycle events and detect crashes.
/// Values persist across restarts, enabling crash detection and recovery.
/// Session state is used to track application lifecycle events and detect
/// crashes. Values persist across restarts, enabling crash detection and
/// recovery.
///
/// # Parameters
/// - `conn`: Mutable reference to the SQLite connection
@@ -404,12 +422,13 @@ pub fn get_session_state(conn: &Connection, key: &str) -> Result<Option<String>>
/// Check if the previous session had a clean shutdown
///
/// This is critical for crash detection. When the application starts, this checks
/// if the previous session ended cleanly. If not, it indicates a crash occurred,
/// and recovery procedures may be needed.
/// This is critical for crash detection. When the application starts, this
/// checks if the previous session ended cleanly. If not, it indicates a crash
/// occurred, and recovery procedures may be needed.
///
/// **Side effect**: Resets the clean_shutdown flag to "false" for the current session.
/// Call [`mark_clean_shutdown`] during normal shutdown to set it back to "true".
/// **Side effect**: Resets the clean_shutdown flag to "false" for the current
/// session. Call [`mark_clean_shutdown`] during normal shutdown to set it back
/// to "true".
///
/// # Parameters
/// - `conn`: Mutable reference to the SQLite connection (mutates session state)
@@ -537,7 +556,11 @@ mod tests {
// After checking clean shutdown, flag should be reset to false
// So if we check again without marking, it should report as crash
let value = get_session_state(&conn, "clean_shutdown")?;
assert_eq!(value, Some("false".to_string()), "Flag should be reset after check");
assert_eq!(
value,
Some("false".to_string()),
"Flag should be reset after check"
);
Ok(())
}