use rusqlite::{params, Connection, Result as SqlResult}; use std::path::Path; use std::sync::Mutex; use tracing::{info, warn}; /// SQLite-backed persistent state for Sol. /// /// Stores: /// - Conversation registry: room_id → Mistral conversation_id + token estimates /// - Agent registry: agent_name → Mistral agent_id /// /// ## Kubernetes mount /// /// The database file should be on a persistent volume. Recommended setup: /// /// ```yaml /// # PersistentVolumeClaim (Longhorn) /// apiVersion: v1 /// kind: PersistentVolumeClaim /// metadata: /// name: sol-data /// namespace: matrix /// spec: /// accessModes: [ReadWriteOnce] /// storageClassName: longhorn /// resources: /// requests: /// storage: 1Gi /// /// # Deployment volume mount /// volumes: /// - name: sol-data /// persistentVolumeClaim: /// claimName: sol-data /// containers: /// - name: sol /// volumeMounts: /// - name: sol-data /// mountPath: /data /// ``` /// /// Default path: `/data/sol.db` (configurable via `matrix.db_path` in sol.toml). /// The `/data` mount also holds the Matrix SDK state store at `/data/matrix-state`. pub struct Store { conn: Mutex, } impl Store { /// Open or create the database at the given path. /// Creates tables if they don't exist. pub fn open(path: &str) -> anyhow::Result { // Ensure parent directory exists if let Some(parent) = Path::new(path).parent() { std::fs::create_dir_all(parent)?; } let conn = Connection::open(path)?; // Enable WAL mode for better concurrent read performance conn.execute_batch("PRAGMA journal_mode=WAL;")?; conn.execute_batch( "CREATE TABLE IF NOT EXISTS conversations ( room_id TEXT PRIMARY KEY, conversation_id TEXT NOT NULL, estimated_tokens INTEGER NOT NULL DEFAULT 0, created_at TEXT NOT NULL DEFAULT (datetime('now')) ); CREATE TABLE IF NOT EXISTS agents ( name TEXT PRIMARY KEY, agent_id TEXT NOT NULL, model TEXT NOT NULL DEFAULT '', instructions_hash TEXT NOT NULL DEFAULT '', created_at TEXT NOT NULL DEFAULT (datetime('now')) ); CREATE TABLE IF NOT EXISTS service_users ( localpart TEXT NOT NULL, service TEXT NOT NULL, service_username TEXT NOT NULL, created_at TEXT NOT NULL DEFAULT (datetime('now')), PRIMARY KEY (localpart, service) ); CREATE TABLE IF NOT EXISTS code_sessions ( session_id TEXT PRIMARY KEY, user_id TEXT NOT NULL, room_id TEXT NOT NULL, conversation_id TEXT, project_path TEXT NOT NULL, project_name TEXT NOT NULL, model TEXT NOT NULL, status TEXT NOT NULL DEFAULT 'active', created_at TEXT NOT NULL DEFAULT (datetime('now')), last_active TEXT NOT NULL DEFAULT (datetime('now')) ); CREATE TABLE IF NOT EXISTS research_sessions ( session_id TEXT PRIMARY KEY, room_id TEXT NOT NULL, event_id TEXT NOT NULL, status TEXT NOT NULL DEFAULT 'running', query TEXT NOT NULL, plan_json TEXT, findings_json TEXT, depth INTEGER NOT NULL DEFAULT 0, created_at TEXT NOT NULL DEFAULT (datetime('now')), completed_at TEXT ); ", )?; // Migration: add instructions_hash column to agents table if missing let has_hash: bool = conn .prepare("SELECT instructions_hash FROM agents LIMIT 0") .is_ok(); if !has_hash { conn.execute_batch( "ALTER TABLE agents ADD COLUMN instructions_hash TEXT NOT NULL DEFAULT '';" )?; info!("Migrated agents table: added instructions_hash column"); } info!(path, "Opened Sol state database"); Ok(Self { conn: Mutex::new(conn), }) } /// Open an in-memory database (for tests). pub fn open_memory() -> anyhow::Result { Self::open(":memory:") } // ========================================================================= // Conversations // ========================================================================= /// Get the conversation_id for a room, if one exists. pub fn get_conversation(&self, room_id: &str) -> Option<(String, u32)> { let conn = self.conn.lock().unwrap(); conn.query_row( "SELECT conversation_id, estimated_tokens FROM conversations WHERE room_id = ?1", params![room_id], |row| Ok((row.get::<_, String>(0)?, row.get::<_, u32>(1)?)), ) .ok() } /// Store or update a conversation mapping. pub fn upsert_conversation( &self, room_id: &str, conversation_id: &str, estimated_tokens: u32, ) { let conn = self.conn.lock().unwrap(); if let Err(e) = conn.execute( "INSERT INTO conversations (room_id, conversation_id, estimated_tokens) VALUES (?1, ?2, ?3) ON CONFLICT(room_id) DO UPDATE SET conversation_id = excluded.conversation_id, estimated_tokens = excluded.estimated_tokens", params![room_id, conversation_id, estimated_tokens], ) { warn!("Failed to upsert conversation: {e}"); } } /// Update token estimate for a conversation. pub fn update_tokens(&self, room_id: &str, estimated_tokens: u32) { let conn = self.conn.lock().unwrap(); if let Err(e) = conn.execute( "UPDATE conversations SET estimated_tokens = ?1 WHERE room_id = ?2", params![estimated_tokens, room_id], ) { warn!("Failed to update token estimate: {e}"); } } /// Remove all conversation mappings (e.g., after agent recreation). pub fn delete_all_conversations(&self) -> usize { let conn = self.conn.lock().unwrap(); conn.execute("DELETE FROM conversations", []).unwrap_or(0) } /// Remove a conversation mapping (e.g., after compaction). pub fn delete_conversation(&self, room_id: &str) { let conn = self.conn.lock().unwrap(); if let Err(e) = conn.execute( "DELETE FROM conversations WHERE room_id = ?1", params![room_id], ) { warn!("Failed to delete conversation: {e}"); } } /// Load all conversation mappings (for startup recovery). pub fn load_all_conversations(&self) -> Vec<(String, String, u32)> { let conn = self.conn.lock().unwrap(); let mut stmt = match conn.prepare( "SELECT room_id, conversation_id, estimated_tokens FROM conversations", ) { Ok(s) => s, Err(_) => return Vec::new(), }; stmt.query_map([], |row| { Ok(( row.get::<_, String>(0)?, row.get::<_, String>(1)?, row.get::<_, u32>(2)?, )) }) .ok() .map(|rows| rows.filter_map(|r| r.ok()).collect()) .unwrap_or_default() } // ========================================================================= // Agents // ========================================================================= /// Get the agent_id and instructions hash for a named agent. pub fn get_agent(&self, name: &str) -> Option<(String, String)> { let conn = self.conn.lock().unwrap(); conn.query_row( "SELECT agent_id, instructions_hash FROM agents WHERE name = ?1", params![name], |row| Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?)), ) .ok() } /// Store or update an agent mapping (with instructions hash for staleness detection). pub fn upsert_agent(&self, name: &str, agent_id: &str, model: &str, instructions_hash: &str) { let conn = self.conn.lock().unwrap(); if let Err(e) = conn.execute( "INSERT INTO agents (name, agent_id, model, instructions_hash) VALUES (?1, ?2, ?3, ?4) ON CONFLICT(name) DO UPDATE SET agent_id = excluded.agent_id, model = excluded.model, instructions_hash = excluded.instructions_hash", params![name, agent_id, model, instructions_hash], ) { warn!("Failed to upsert agent: {e}"); } } /// Remove an agent mapping. pub fn delete_agent(&self, name: &str) { let conn = self.conn.lock().unwrap(); if let Err(e) = conn.execute("DELETE FROM agents WHERE name = ?1", params![name]) { warn!("Failed to delete agent: {e}"); } } // ========================================================================= // Code Sessions (sunbeam code) // ========================================================================= /// Find an active code session for a user + project. pub fn find_code_session( &self, user_id: &str, project_name: &str, ) -> Option<(String, String, String)> { let conn = self.conn.lock().unwrap(); conn.query_row( "SELECT session_id, room_id, conversation_id FROM code_sessions WHERE user_id = ?1 AND project_name = ?2 AND status = 'active' ORDER BY last_active DESC LIMIT 1", params![user_id, project_name], |row| Ok(( row.get::<_, String>(0)?, row.get::<_, String>(1)?, row.get::<_, String>(2)?, )), ) .ok() } /// Create a new code session. pub fn create_code_session( &self, session_id: &str, user_id: &str, room_id: &str, project_path: &str, project_name: &str, model: &str, ) { let conn = self.conn.lock().unwrap(); if let Err(e) = conn.execute( "INSERT INTO code_sessions (session_id, user_id, room_id, project_path, project_name, model) VALUES (?1, ?2, ?3, ?4, ?5, ?6)", params![session_id, user_id, room_id, project_path, project_name, model], ) { warn!("Failed to create code session: {e}"); } } /// Update the conversation_id for a code session. pub fn set_code_session_conversation( &self, session_id: &str, conversation_id: &str, ) { let conn = self.conn.lock().unwrap(); if let Err(e) = conn.execute( "UPDATE code_sessions SET conversation_id = ?1, last_active = datetime('now') WHERE session_id = ?2", params![conversation_id, session_id], ) { warn!("Failed to update code session conversation: {e}"); } } /// Touch the last_active timestamp. pub fn touch_code_session(&self, session_id: &str) { let conn = self.conn.lock().unwrap(); if let Err(e) = conn.execute( "UPDATE code_sessions SET last_active = datetime('now') WHERE session_id = ?1", params![session_id], ) { warn!("Failed to touch code session: {e}"); } } /// End a code session. pub fn end_code_session(&self, session_id: &str) { let conn = self.conn.lock().unwrap(); if let Err(e) = conn.execute( "UPDATE code_sessions SET status = 'ended' WHERE session_id = ?1", params![session_id], ) { warn!("Failed to end code session: {e}"); } } /// Check if a room is a code session room. pub fn is_code_room(&self, room_id: &str) -> bool { let conn = self.conn.lock().unwrap(); conn.query_row( "SELECT 1 FROM code_sessions WHERE room_id = ?1 AND status = 'active' LIMIT 1", params![room_id], |_| Ok(()), ) .is_ok() } /// Get project context for a code room. pub fn get_code_room_context( &self, room_id: &str, ) -> Option<(String, String, String)> { let conn = self.conn.lock().unwrap(); conn.query_row( "SELECT project_name, project_path, model FROM code_sessions WHERE room_id = ?1 AND status = 'active' ORDER BY last_active DESC LIMIT 1", params![room_id], |row| Ok(( row.get::<_, String>(0)?, row.get::<_, String>(1)?, row.get::<_, String>(2)?, )), ) .ok() } // ========================================================================= // Service Users (OIDC → service username mapping) // ========================================================================= /// Get the service-specific username for a Matrix localpart. pub fn get_service_user(&self, localpart: &str, service: &str) -> Option { let conn = self.conn.lock().unwrap(); conn.query_row( "SELECT service_username FROM service_users WHERE localpart = ?1 AND service = ?2", params![localpart, service], |row| row.get(0), ) .ok() } /// Store or update a service user mapping. pub fn upsert_service_user(&self, localpart: &str, service: &str, service_username: &str) { let conn = self.conn.lock().unwrap(); if let Err(e) = conn.execute( "INSERT INTO service_users (localpart, service, service_username) VALUES (?1, ?2, ?3) ON CONFLICT(localpart, service) DO UPDATE SET service_username = excluded.service_username", params![localpart, service, service_username], ) { warn!("Failed to upsert service user: {e}"); } } /// Remove a service user mapping. pub fn delete_service_user(&self, localpart: &str, service: &str) { let conn = self.conn.lock().unwrap(); if let Err(e) = conn.execute( "DELETE FROM service_users WHERE localpart = ?1 AND service = ?2", params![localpart, service], ) { warn!("Failed to delete service user: {e}"); } } // ========================================================================= // Research Sessions // ========================================================================= /// Create a new research session. pub fn create_research_session( &self, session_id: &str, room_id: &str, event_id: &str, query: &str, plan_json: &str, ) { let conn = self.conn.lock().unwrap(); if let Err(e) = conn.execute( "INSERT INTO research_sessions (session_id, room_id, event_id, query, plan_json, findings_json) VALUES (?1, ?2, ?3, ?4, ?5, '[]')", params![session_id, room_id, event_id, query, plan_json], ) { warn!("Failed to create research session: {e}"); } } /// Append a finding to a research session. pub fn append_research_finding(&self, session_id: &str, finding_json: &str) { let conn = self.conn.lock().unwrap(); // Append to the JSON array if let Err(e) = conn.execute( "UPDATE research_sessions SET findings_json = json_insert(findings_json, '$[#]', json(?1)) WHERE session_id = ?2", params![finding_json, session_id], ) { warn!("Failed to append research finding: {e}"); } } /// Mark a research session as complete. pub fn complete_research_session(&self, session_id: &str) { let conn = self.conn.lock().unwrap(); if let Err(e) = conn.execute( "UPDATE research_sessions SET status = 'complete', completed_at = datetime('now') WHERE session_id = ?1", params![session_id], ) { warn!("Failed to complete research session: {e}"); } } /// Mark a research session as failed. pub fn fail_research_session(&self, session_id: &str) { let conn = self.conn.lock().unwrap(); if let Err(e) = conn.execute( "UPDATE research_sessions SET status = 'failed', completed_at = datetime('now') WHERE session_id = ?1", params![session_id], ) { warn!("Failed to mark research session failed: {e}"); } } /// Load all running research sessions (for crash recovery on startup). pub fn load_running_research_sessions(&self) -> Vec<(String, String, String, String)> { let conn = self.conn.lock().unwrap(); let mut stmt = match conn.prepare( "SELECT session_id, room_id, query, findings_json FROM research_sessions WHERE status = 'running'", ) { Ok(s) => s, Err(_) => return Vec::new(), }; stmt.query_map([], |row| { Ok(( row.get::<_, String>(0)?, row.get::<_, String>(1)?, row.get::<_, String>(2)?, row.get::<_, String>(3)?, )) }) .ok() .map(|rows| rows.filter_map(|r| r.ok()).collect()) .unwrap_or_default() } /// Load all agent mappings (for startup recovery). pub fn load_all_agents(&self) -> Vec<(String, String)> { let conn = self.conn.lock().unwrap(); let mut stmt = match conn.prepare("SELECT name, agent_id FROM agents") { Ok(s) => s, Err(_) => return Vec::new(), }; stmt.query_map([], |row| { Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?)) }) .ok() .map(|rows| rows.filter_map(|r| r.ok()).collect()) .unwrap_or_default() } } #[cfg(test)] mod tests { use super::*; #[test] fn test_open_memory_db() { let store = Store::open_memory().unwrap(); assert!(store.load_all_conversations().is_empty()); assert!(store.load_all_agents().is_empty()); } #[test] fn test_conversation_crud() { let store = Store::open_memory().unwrap(); // Insert store.upsert_conversation("!room:x", "conv_abc", 100); let (conv_id, tokens) = store.get_conversation("!room:x").unwrap(); assert_eq!(conv_id, "conv_abc"); assert_eq!(tokens, 100); // Update tokens store.update_tokens("!room:x", 500); let (_, tokens) = store.get_conversation("!room:x").unwrap(); assert_eq!(tokens, 500); // Upsert (replace conversation_id) store.upsert_conversation("!room:x", "conv_def", 0); let (conv_id, tokens) = store.get_conversation("!room:x").unwrap(); assert_eq!(conv_id, "conv_def"); assert_eq!(tokens, 0); // Delete store.delete_conversation("!room:x"); assert!(store.get_conversation("!room:x").is_none()); } #[test] fn test_agent_crud() { let store = Store::open_memory().unwrap(); store.upsert_agent("sol-orchestrator", "ag_123", "mistral-medium-latest", "hash1"); let (id, hash) = store.get_agent("sol-orchestrator").unwrap(); assert_eq!(id, "ag_123"); assert_eq!(hash, "hash1"); // Update store.upsert_agent("sol-orchestrator", "ag_456", "mistral-medium-latest", "hash2"); let (id, hash) = store.get_agent("sol-orchestrator").unwrap(); assert_eq!(id, "ag_456"); assert_eq!(hash, "hash2"); // Delete store.delete_agent("sol-orchestrator"); assert!(store.get_agent("sol-orchestrator").is_none()); } #[test] fn test_load_all_conversations() { let store = Store::open_memory().unwrap(); store.upsert_conversation("!a:x", "conv_1", 10); store.upsert_conversation("!b:x", "conv_2", 20); store.upsert_conversation("!c:x", "conv_3", 30); let all = store.load_all_conversations(); assert_eq!(all.len(), 3); } #[test] fn test_load_all_agents() { let store = Store::open_memory().unwrap(); store.upsert_agent("orch", "ag_1", "medium", "h1"); store.upsert_agent("obs", "ag_2", "medium", "h2"); let all = store.load_all_agents(); assert_eq!(all.len(), 2); } #[test] fn test_nonexistent_keys_return_none() { let store = Store::open_memory().unwrap(); assert!(store.get_conversation("!nope:x").is_none()); assert!(store.get_agent("nope").is_none()); } #[test] fn test_service_user_crud() { let store = Store::open_memory().unwrap(); // Insert store.upsert_service_user("sienna", "gitea", "siennav"); assert_eq!( store.get_service_user("sienna", "gitea").unwrap(), "siennav" ); // Update store.upsert_service_user("sienna", "gitea", "sienna"); assert_eq!( store.get_service_user("sienna", "gitea").unwrap(), "sienna" ); // Different service store.upsert_service_user("sienna", "grafana", "sienna-g"); assert_eq!( store.get_service_user("sienna", "grafana").unwrap(), "sienna-g" ); // Delete store.delete_service_user("sienna", "gitea"); assert!(store.get_service_user("sienna", "gitea").is_none()); // Other service unaffected assert!(store.get_service_user("sienna", "grafana").is_some()); } #[test] fn test_delete_all_conversations() { let store = Store::open_memory().unwrap(); store.upsert_conversation("!a:x", "conv_1", 10); store.upsert_conversation("!b:x", "conv_2", 20); store.upsert_conversation("!c:x", "conv_3", 30); let deleted = store.delete_all_conversations(); assert_eq!(deleted, 3); assert!(store.load_all_conversations().is_empty()); // DB still works after bulk delete store.upsert_conversation("!d:x", "conv_4", 0); assert_eq!(store.load_all_conversations().len(), 1); } #[test] fn test_nonexistent_service_user() { let store = Store::open_memory().unwrap(); assert!(store.get_service_user("nobody", "gitea").is_none()); } // ── Research session tests ────────────────────────────────────────── #[test] fn test_research_session_lifecycle() { let store = Store::open_memory().unwrap(); // Create store.create_research_session("sess-1", "!room:x", "$event1", "investigate SBBB", "[]"); let running = store.load_running_research_sessions(); assert_eq!(running.len(), 1); assert_eq!(running[0].0, "sess-1"); assert_eq!(running[0].2, "investigate SBBB"); } #[test] fn test_research_session_append_finding() { let store = Store::open_memory().unwrap(); store.create_research_session("sess-2", "!room:x", "$event2", "test", "[]"); store.append_research_finding("sess-2", r#"{"focus":"repo","findings":"found 3 files"}"#); store.append_research_finding("sess-2", r#"{"focus":"archive","findings":"12 messages"}"#); let running = store.load_running_research_sessions(); assert_eq!(running.len(), 1); // findings_json should be a JSON array with 2 entries let findings: serde_json::Value = serde_json::from_str(&running[0].3).unwrap(); assert_eq!(findings.as_array().unwrap().len(), 2); } #[test] fn test_research_session_complete() { let store = Store::open_memory().unwrap(); store.create_research_session("sess-3", "!room:x", "$event3", "test", "[]"); store.complete_research_session("sess-3"); // Should no longer appear in running sessions let running = store.load_running_research_sessions(); assert!(running.is_empty()); } #[test] fn test_research_session_fail() { let store = Store::open_memory().unwrap(); store.create_research_session("sess-4", "!room:x", "$event4", "test", "[]"); store.fail_research_session("sess-4"); let running = store.load_running_research_sessions(); assert!(running.is_empty()); } #[test] fn test_hung_session_cleanup_on_startup() { let store = Store::open_memory().unwrap(); // Simulate 2 hung sessions + 1 completed store.create_research_session("hung-1", "!room:a", "$e1", "query A", "[]"); store.create_research_session("hung-2", "!room:b", "$e2", "query B", "[]"); store.create_research_session("done-1", "!room:c", "$e3", "query C", "[]"); store.complete_research_session("done-1"); // Only the 2 hung sessions should be returned let hung = store.load_running_research_sessions(); assert_eq!(hung.len(), 2); // Clean them up (simulates startup logic) for (session_id, _, _, _) in &hung { store.fail_research_session(session_id); } // Now none should be running assert!(store.load_running_research_sessions().is_empty()); } #[test] fn test_research_session_partial_findings_survive_failure() { let store = Store::open_memory().unwrap(); store.create_research_session("sess-5", "!room:x", "$e5", "deep dive", "[]"); // Agent 1 completes, agent 2 hasn't yet store.append_research_finding("sess-5", r#"{"focus":"agent1","findings":"found stuff"}"#); // Crash! Mark as failed store.fail_research_session("sess-5"); // Findings should still be queryable even though session failed // (would need a get_session method to verify, but the key point is // append_research_finding persists incrementally) } }