mod agent_ux; mod agents; mod archive; mod brain; mod breadcrumbs; mod code_index; mod config; mod context; mod conversations; mod matrix_utils; mod memory; mod persistence; mod grpc; mod orchestrator; #[cfg(test)] mod integration_test; mod sdk; mod sync; mod time_context; mod tokenizer; mod tools; use std::sync::Arc; use matrix_sdk::Client; use opensearch::http::transport::TransportBuilder; use opensearch::OpenSearch; use ruma::{OwnedDeviceId, OwnedUserId}; use tokio::signal; use tokio::sync::Mutex; use tracing::{error, info, warn}; use url::Url; use agents::registry::AgentRegistry; use archive::indexer::Indexer; use archive::schema::create_index_if_not_exists; use brain::conversation::{ContextMessage, ConversationManager}; use conversations::ConversationRegistry; use memory::schema::create_index_if_not_exists as create_memory_index; use brain::evaluator::Evaluator; use brain::personality::Personality; use brain::responder::Responder; use config::Config; use sync::AppState; use tools::ToolRegistry; #[tokio::main] async fn main() -> anyhow::Result<()> { // Initialize tracing tracing_subscriber::fmt() .with_env_filter( tracing_subscriber::EnvFilter::try_from_default_env() .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("sol=info")), ) .init(); // Load config let config_path = std::env::var("SOL_CONFIG").unwrap_or_else(|_| "/etc/sol/sol.toml".into()); let config = Config::load(&config_path)?; info!("Loaded config from {config_path}"); // Load system prompt let prompt_path = std::env::var("SOL_SYSTEM_PROMPT") .unwrap_or_else(|_| "/etc/sol/system_prompt.md".into()); let system_prompt = std::fs::read_to_string(&prompt_path)?; info!("Loaded system prompt from {prompt_path}"); // Read secrets from environment let access_token = std::env::var("SOL_MATRIX_ACCESS_TOKEN") .map_err(|_| anyhow::anyhow!("SOL_MATRIX_ACCESS_TOKEN not set"))?; let device_id = std::env::var("SOL_MATRIX_DEVICE_ID") .map_err(|_| anyhow::anyhow!("SOL_MATRIX_DEVICE_ID not set"))?; let mistral_api_key = std::env::var("SOL_MISTRAL_API_KEY") .map_err(|_| anyhow::anyhow!("SOL_MISTRAL_API_KEY not set"))?; // Optional Gitea admin credentials for user impersonation let gitea_admin_username = std::env::var("SOL_GITEA_ADMIN_USERNAME").ok(); let gitea_admin_password = std::env::var("SOL_GITEA_ADMIN_PASSWORD").ok(); let config = Arc::new(config); // Initialize Matrix client with E2EE and sqlite store let homeserver = Url::parse(&config.matrix.homeserver_url)?; let matrix_client = Client::builder() .homeserver_url(homeserver) .sqlite_store(&config.matrix.state_store_path, None) .build() .await?; // Restore session let user_id: OwnedUserId = config.matrix.user_id.parse()?; let device_id: OwnedDeviceId = device_id.into(); let session = matrix_sdk::AuthSession::Matrix(matrix_sdk::matrix_auth::MatrixSession { meta: matrix_sdk::SessionMeta { user_id, device_id, }, tokens: matrix_sdk::matrix_auth::MatrixSessionTokens { access_token, refresh_token: None, }, }); matrix_client.restore_session(session).await?; info!(user = %config.matrix.user_id, "Matrix session restored"); // Initialize OpenSearch client let os_url = Url::parse(&config.opensearch.url)?; let os_transport = TransportBuilder::new( opensearch::http::transport::SingleNodeConnectionPool::new(os_url), ) .build()?; let os_client = OpenSearch::new(os_transport); // Ensure indices exist create_index_if_not_exists(&os_client, &config.opensearch.index).await?; create_memory_index(&os_client, &config.opensearch.memory_index).await?; // Initialize Mistral client let mistral_client = mistralai_client::v1::client::Client::new( Some(mistral_api_key), None, None, None, )?; let mistral = Arc::new(mistral_client); // Initialize tokenizer for accurate token counting let _tokenizer = Arc::new( tokenizer::SolTokenizer::new(config.mistral.tokenizer_path.as_deref()) .expect("Failed to initialize tokenizer"), ); info!("Tokenizer initialized"); // Build components let system_prompt_text = system_prompt.clone(); let personality = Arc::new(Personality::new(system_prompt)); let conversations = Arc::new(Mutex::new(ConversationManager::new( config.behavior.room_context_window, config.behavior.dm_context_window, ))); // Backfill conversation context from archive before starting if config.behavior.backfill_on_join { info!("Backfilling conversation context from archive..."); if let Err(e) = backfill_conversations(&os_client, &config, &conversations).await { error!("Backfill failed (non-fatal): {e}"); } } // Initialize persistent state database (needed by token store + agent/conversation registries) let (store, state_recovery_failed) = match persistence::Store::open(&config.matrix.db_path) { Ok(s) => (Arc::new(s), false), Err(e) => { error!("Failed to open state database at {}: {e}", config.matrix.db_path); error!("Falling back to in-memory state — conversations will not survive restarts"); (Arc::new(persistence::Store::open_memory().expect("in-memory DB must work")), true) } }; // Initialize Vault client for secure token storage let vault_client = Arc::new(sdk::vault::VaultClient::new( &config.vault.url, &config.vault.role, &config.vault.mount, )); // Initialize Gitea client if configured let gitea_client: Option> = if let (Some(gitea_config), Some(admin_user), Some(admin_pass)) = (&config.services.gitea, &gitea_admin_username, &gitea_admin_password) { let token_store = Arc::new(sdk::tokens::TokenStore::new( store.clone(), vault_client.clone(), )); info!(url = gitea_config.url.as_str(), "Gitea integration enabled"); Some(Arc::new(sdk::gitea::GiteaClient::new( gitea_config.url.clone(), admin_user.clone(), admin_pass.clone(), token_store, ))) } else { info!("Gitea integration disabled (missing config or credentials)"); None }; // Initialize Kratos client if configured let kratos_client: Option> = if let Some(kratos_config) = &config.services.kratos { info!(url = kratos_config.admin_url.as_str(), "Kratos integration enabled"); Some(Arc::new(sdk::kratos::KratosClient::new( kratos_config.admin_url.clone(), ))) } else { info!("Kratos integration disabled (missing config)"); None }; let tool_registry = Arc::new(ToolRegistry::new( os_client.clone(), matrix_client.clone(), config.clone(), gitea_client, kratos_client, Some(mistral.clone()), Some(store.clone()), )); let indexer = Arc::new(Indexer::new(os_client.clone(), config.clone())); let evaluator = Arc::new(Evaluator::new(config.clone(), system_prompt_text.clone())); let responder = Arc::new(Responder::new( config.clone(), personality, tool_registry, os_client.clone(), )); // Start background flush task let _flush_handle = indexer.start_flush_task(); // Initialize agent registry and conversation registry (with SQLite backing) let agent_registry = Arc::new(AgentRegistry::new(store.clone())); let conversation_registry = Arc::new(ConversationRegistry::new( config.mistral.default_model.clone(), config.agents.compaction_threshold, store.clone(), )); // Build shared state let state = Arc::new(AppState { config: config.clone(), indexer, evaluator, responder, conversations, agent_registry, conversation_registry, mistral, opensearch: os_client, last_response: Arc::new(tokio::sync::Mutex::new(std::collections::HashMap::new())), responding_in: Arc::new(tokio::sync::Mutex::new(std::collections::HashSet::new())), silenced_until: Arc::new(tokio::sync::Mutex::new(std::collections::HashMap::new())), }); // Initialize orchestrator agent if conversations API is enabled let mut agent_recreated = false; if config.agents.use_conversations_api { info!("Conversations API enabled — ensuring orchestrator agent exists"); let agent_tools = tools::ToolRegistry::agent_tool_definitions( config.services.gitea.is_some(), config.services.kratos.is_some(), ); let mut active_agents: Vec<(&str, &str)> = vec![]; if config.services.gitea.is_some() { active_agents.push(("sol-devtools", "Git repos, issues, PRs, code (Gitea)")); } if config.services.kratos.is_some() { active_agents.push(("sol-identity", "User accounts, sessions, recovery (Kratos)")); } match state .agent_registry .ensure_orchestrator( &system_prompt_text, &config.agents.orchestrator_model, agent_tools, &state.mistral, &active_agents, &config.agents.agent_prefix, ) .await { Ok((agent_id, recreated)) => { info!(agent_id = agent_id.as_str(), recreated, "Orchestrator agent ready"); state.conversation_registry.set_agent_id(agent_id).await; if recreated { // Agent was recreated (system prompt changed) — old conversations // are bound to the stale agent and won't work. Reset everything. state.conversation_registry.reset_all().await; agent_recreated = true; } } Err(e) => { error!("Failed to create orchestrator agent: {e}"); error!("Falling back to model-only conversations (no orchestrator)"); } } } // Clean up hung research sessions from previous runs let hung_sessions = store.load_running_research_sessions(); if !hung_sessions.is_empty() { info!(count = hung_sessions.len(), "Found hung research sessions — marking as failed"); for (session_id, _room_id, query, _findings) in &hung_sessions { warn!(session_id = session_id.as_str(), query = query.as_str(), "Cleaning up hung research session"); store.fail_research_session(session_id); } } // Backfill reactions from Matrix room timelines info!("Backfilling reactions from room timelines..."); if let Err(e) = backfill_reactions(&matrix_client, &state.indexer).await { error!("Reaction backfill failed (non-fatal): {e}"); } // Start gRPC server if configured if config.grpc.is_some() { let orchestrator_id = state.conversation_registry.get_agent_id().await .unwrap_or_default(); let orch = Arc::new(orchestrator::Orchestrator::new( config.clone(), state.responder.tools(), state.mistral.clone(), state.conversation_registry.clone(), system_prompt_text.clone(), )); let grpc_state = std::sync::Arc::new(grpc::GrpcState { config: config.clone(), tools: state.responder.tools(), store: store.clone(), mistral: state.mistral.clone(), matrix: Some(matrix_client.clone()), opensearch: { // Rebuild a fresh OpenSearch client (os_client was moved into AppState) let os_url = url::Url::parse(&config.opensearch.url).ok(); os_url.map(|u| { let transport = opensearch::http::transport::TransportBuilder::new( opensearch::http::transport::SingleNodeConnectionPool::new(u), ).build().unwrap(); opensearch::OpenSearch::new(transport) }) }, system_prompt: system_prompt_text.clone(), orchestrator_agent_id: orchestrator_id, orchestrator: Some(orch), }); tokio::spawn(async move { if let Err(e) = grpc::start_server(grpc_state).await { error!("gRPC server error: {e}"); } }); } // Start sync loop in background let sync_client = matrix_client.clone(); let sync_state = state.clone(); let sync_handle = tokio::spawn(async move { if let Err(e) = sync::start_sync(sync_client, sync_state).await { error!("Sync loop error: {e}"); } }); // If state recovery failed or agent was recreated, sneeze into all rooms if state_recovery_failed || agent_recreated { info!("State recovery failed — sneezing into all rooms"); for room in matrix_client.joined_rooms() { let content = ruma::events::room::message::RoomMessageEventContent::text_plain( "*sneezes*", ); if let Err(e) = room.send(content).await { warn!("Failed to sneeze into {}: {e}", room.room_id()); } } } info!("Sol is running"); // Wait for shutdown signal signal::ctrl_c().await?; info!("Shutdown signal received"); // Cancel sync sync_handle.abort(); info!("Sol has shut down"); Ok(()) } /// Backfill conversation context from the OpenSearch archive. /// /// Queries the most recent messages per room and seeds the ConversationManager /// so Sol has context surviving restarts. async fn backfill_conversations( os_client: &OpenSearch, config: &Config, conversations: &Arc>, ) -> anyhow::Result<()> { use serde_json::json; let window = config.behavior.room_context_window.max(config.behavior.dm_context_window); let index = &config.opensearch.index; // Get all distinct rooms let agg_body = json!({ "size": 0, "aggs": { "rooms": { "terms": { "field": "room_id", "size": 500 } } } }); let response = os_client .search(opensearch::SearchParts::Index(&[index])) .body(agg_body) .send() .await?; let body: serde_json::Value = response.json().await?; let buckets = body["aggregations"]["rooms"]["buckets"] .as_array() .cloned() .unwrap_or_default(); let mut total = 0; for bucket in &buckets { let room_id = bucket["key"].as_str().unwrap_or(""); if room_id.is_empty() { continue; } // Fetch recent messages for this room let query = json!({ "size": window, "sort": [{ "timestamp": "asc" }], "query": { "bool": { "filter": [ { "term": { "room_id": room_id } }, { "term": { "redacted": false } } ] } }, "_source": ["sender_name", "sender", "content", "timestamp"] }); let resp = os_client .search(opensearch::SearchParts::Index(&[index])) .body(query) .send() .await?; let data: serde_json::Value = resp.json().await?; let hits = data["hits"]["hits"].as_array().cloned().unwrap_or_default(); if hits.is_empty() { continue; } let mut convs = conversations.lock().await; for hit in &hits { let src = &hit["_source"]; let sender = src["sender_name"] .as_str() .or_else(|| src["sender"].as_str()) .unwrap_or("unknown"); let content = src["content"].as_str().unwrap_or(""); let timestamp = src["timestamp"].as_i64().unwrap_or(0); convs.add_message( room_id, false, // we don't know if it's a DM from the archive, use group window ContextMessage { sender: sender.to_string(), content: content.to_string(), timestamp, }, ); total += 1; } } info!(rooms = buckets.len(), messages = total, "Backfill complete"); Ok(()) } /// Backfill reactions from Matrix room timelines into the archive. /// /// For each joined room, fetches recent timeline events and indexes any /// m.reaction events that aren't already in the archive. async fn backfill_reactions( client: &Client, indexer: &Arc, ) -> anyhow::Result<()> { use matrix_sdk::room::MessagesOptions; use ruma::events::AnySyncTimelineEvent; use ruma::uint; let rooms = client.joined_rooms(); let mut total = 0; for room in &rooms { let room_id = room.room_id().to_string(); // Fetch recent messages (backwards from now) let mut options = MessagesOptions::backward(); options.limit = uint!(500); let messages = match room.messages(options).await { Ok(m) => m, Err(e) => { error!(room = room_id.as_str(), "Failed to fetch timeline for reaction backfill: {e}"); continue; } }; for event in &messages.chunk { let Ok(deserialized) = event.raw().deserialize() else { continue; }; if let AnySyncTimelineEvent::MessageLike( ruma::events::AnySyncMessageLikeEvent::Reaction(reaction_event), ) = deserialized { let original = match reaction_event { ruma::events::SyncMessageLikeEvent::Original(ref o) => o, _ => continue, }; let target_event_id = original.content.relates_to.event_id.to_string(); let sender = original.sender.to_string(); let emoji = &original.content.relates_to.key; let timestamp: i64 = original.origin_server_ts.0.into(); indexer.add_reaction(&target_event_id, &sender, emoji, timestamp).await; total += 1; } } } info!(reactions = total, rooms = rooms.len(), "Reaction backfill complete"); Ok(()) }