Files
sol/src/main.rs
Sienna Meridian Satterwhite 7bf9e25361 per-message context headers, memory notes, conversation continuity
conversations API path now injects per-message context headers with
live timestamps, room name, and memory notes. this replaces the
template variables in agent instructions which were frozen at
creation time.

memory notes (topical + recent backfill) loaded before each response
in the conversations path — was previously only in the legacy path.

context hint seeds new conversations with recent room history after
resets, so sol doesn't lose conversational continuity on sneeze.

tool call results now logged with preview + length for debugging.
reset_all() clears both in-memory and sqlite conversation state.
2026-03-22 15:00:43 +00:00

449 lines
15 KiB
Rust

mod agent_ux;
mod agents;
mod archive;
mod brain;
mod config;
mod context;
mod conversations;
mod matrix_utils;
mod memory;
mod persistence;
mod sdk;
mod sync;
mod tools;
use std::sync::Arc;
use matrix_sdk::Client;
use opensearch::http::transport::TransportBuilder;
use opensearch::OpenSearch;
use ruma::{OwnedDeviceId, OwnedUserId};
use tokio::signal;
use tokio::sync::Mutex;
use tracing::{error, info, warn};
use url::Url;
use agents::registry::AgentRegistry;
use archive::indexer::Indexer;
use archive::schema::create_index_if_not_exists;
use brain::conversation::{ContextMessage, ConversationManager};
use conversations::ConversationRegistry;
use memory::schema::create_index_if_not_exists as create_memory_index;
use brain::evaluator::Evaluator;
use brain::personality::Personality;
use brain::responder::Responder;
use config::Config;
use sync::AppState;
use tools::ToolRegistry;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// Initialize tracing
tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("sol=info")),
)
.init();
// Load config
let config_path =
std::env::var("SOL_CONFIG").unwrap_or_else(|_| "/etc/sol/sol.toml".into());
let config = Config::load(&config_path)?;
info!("Loaded config from {config_path}");
// Load system prompt
let prompt_path = std::env::var("SOL_SYSTEM_PROMPT")
.unwrap_or_else(|_| "/etc/sol/system_prompt.md".into());
let system_prompt = std::fs::read_to_string(&prompt_path)?;
info!("Loaded system prompt from {prompt_path}");
// Read secrets from environment
let access_token = std::env::var("SOL_MATRIX_ACCESS_TOKEN")
.map_err(|_| anyhow::anyhow!("SOL_MATRIX_ACCESS_TOKEN not set"))?;
let device_id = std::env::var("SOL_MATRIX_DEVICE_ID")
.map_err(|_| anyhow::anyhow!("SOL_MATRIX_DEVICE_ID not set"))?;
let mistral_api_key = std::env::var("SOL_MISTRAL_API_KEY")
.map_err(|_| anyhow::anyhow!("SOL_MISTRAL_API_KEY not set"))?;
// Optional Gitea admin credentials for user impersonation
let gitea_admin_username = std::env::var("SOL_GITEA_ADMIN_USERNAME").ok();
let gitea_admin_password = std::env::var("SOL_GITEA_ADMIN_PASSWORD").ok();
let config = Arc::new(config);
// Initialize Matrix client with E2EE and sqlite store
let homeserver = Url::parse(&config.matrix.homeserver_url)?;
let matrix_client = Client::builder()
.homeserver_url(homeserver)
.sqlite_store(&config.matrix.state_store_path, None)
.build()
.await?;
// Restore session
let user_id: OwnedUserId = config.matrix.user_id.parse()?;
let device_id: OwnedDeviceId = device_id.into();
let session = matrix_sdk::AuthSession::Matrix(matrix_sdk::matrix_auth::MatrixSession {
meta: matrix_sdk::SessionMeta {
user_id,
device_id,
},
tokens: matrix_sdk::matrix_auth::MatrixSessionTokens {
access_token,
refresh_token: None,
},
});
matrix_client.restore_session(session).await?;
info!(user = %config.matrix.user_id, "Matrix session restored");
// Initialize OpenSearch client
let os_url = Url::parse(&config.opensearch.url)?;
let os_transport = TransportBuilder::new(
opensearch::http::transport::SingleNodeConnectionPool::new(os_url),
)
.build()?;
let os_client = OpenSearch::new(os_transport);
// Ensure indices exist
create_index_if_not_exists(&os_client, &config.opensearch.index).await?;
create_memory_index(&os_client, &config.opensearch.memory_index).await?;
// Initialize Mistral client
let mistral_client = mistralai_client::v1::client::Client::new(
Some(mistral_api_key),
None,
None,
None,
)?;
let mistral = Arc::new(mistral_client);
// Build components
let system_prompt_text = system_prompt.clone();
let personality = Arc::new(Personality::new(system_prompt));
let conversations = Arc::new(Mutex::new(ConversationManager::new(
config.behavior.room_context_window,
config.behavior.dm_context_window,
)));
// Backfill conversation context from archive before starting
if config.behavior.backfill_on_join {
info!("Backfilling conversation context from archive...");
if let Err(e) = backfill_conversations(&os_client, &config, &conversations).await {
error!("Backfill failed (non-fatal): {e}");
}
}
// Initialize persistent state database (needed by token store + agent/conversation registries)
let (store, state_recovery_failed) = match persistence::Store::open(&config.matrix.db_path) {
Ok(s) => (Arc::new(s), false),
Err(e) => {
error!("Failed to open state database at {}: {e}", config.matrix.db_path);
error!("Falling back to in-memory state — conversations will not survive restarts");
(Arc::new(persistence::Store::open_memory().expect("in-memory DB must work")), true)
}
};
// Initialize Vault client for secure token storage
let vault_client = Arc::new(sdk::vault::VaultClient::new(
&config.vault.url,
&config.vault.role,
&config.vault.mount,
));
// Initialize Gitea client if configured
let gitea_client: Option<Arc<sdk::gitea::GiteaClient>> =
if let (Some(gitea_config), Some(admin_user), Some(admin_pass)) =
(&config.services.gitea, &gitea_admin_username, &gitea_admin_password)
{
let token_store = Arc::new(sdk::tokens::TokenStore::new(
store.clone(),
vault_client.clone(),
));
info!(url = gitea_config.url.as_str(), "Gitea integration enabled");
Some(Arc::new(sdk::gitea::GiteaClient::new(
gitea_config.url.clone(),
admin_user.clone(),
admin_pass.clone(),
token_store,
)))
} else {
info!("Gitea integration disabled (missing config or credentials)");
None
};
let tool_registry = Arc::new(ToolRegistry::new(
os_client.clone(),
matrix_client.clone(),
config.clone(),
gitea_client,
));
let indexer = Arc::new(Indexer::new(os_client.clone(), config.clone()));
let evaluator = Arc::new(Evaluator::new(config.clone(), system_prompt_text.clone()));
let responder = Arc::new(Responder::new(
config.clone(),
personality,
tool_registry,
os_client.clone(),
));
// Start background flush task
let _flush_handle = indexer.start_flush_task();
// Initialize agent registry and conversation registry (with SQLite backing)
let agent_registry = Arc::new(AgentRegistry::new(store.clone()));
let conversation_registry = Arc::new(ConversationRegistry::new(
config.mistral.default_model.clone(),
config.agents.compaction_threshold,
store.clone(),
));
// Build shared state
let state = Arc::new(AppState {
config: config.clone(),
indexer,
evaluator,
responder,
conversations,
agent_registry,
conversation_registry,
mistral,
opensearch: os_client,
last_response: Arc::new(tokio::sync::Mutex::new(std::collections::HashMap::new())),
responding_in: Arc::new(tokio::sync::Mutex::new(std::collections::HashSet::new())),
});
// Initialize orchestrator agent if conversations API is enabled
let mut agent_recreated = false;
if config.agents.use_conversations_api {
info!("Conversations API enabled — ensuring orchestrator agent exists");
let agent_tools = tools::ToolRegistry::agent_tool_definitions(config.services.gitea.is_some());
match state
.agent_registry
.ensure_orchestrator(
&system_prompt_text,
&config.agents.orchestrator_model,
agent_tools,
&state.mistral,
&[], // no domain agents yet — delegation section added when they are
)
.await
{
Ok((agent_id, recreated)) => {
info!(agent_id = agent_id.as_str(), recreated, "Orchestrator agent ready");
state.conversation_registry.set_agent_id(agent_id).await;
if recreated {
// Agent was recreated (system prompt changed) — old conversations
// are bound to the stale agent and won't work. Reset everything.
state.conversation_registry.reset_all().await;
agent_recreated = true;
}
}
Err(e) => {
error!("Failed to create orchestrator agent: {e}");
error!("Falling back to model-only conversations (no orchestrator)");
}
}
}
// Backfill reactions from Matrix room timelines
info!("Backfilling reactions from room timelines...");
if let Err(e) = backfill_reactions(&matrix_client, &state.indexer).await {
error!("Reaction backfill failed (non-fatal): {e}");
}
// Start sync loop in background
let sync_client = matrix_client.clone();
let sync_state = state.clone();
let sync_handle = tokio::spawn(async move {
if let Err(e) = sync::start_sync(sync_client, sync_state).await {
error!("Sync loop error: {e}");
}
});
// If state recovery failed or agent was recreated, sneeze into all rooms
if state_recovery_failed || agent_recreated {
info!("State recovery failed — sneezing into all rooms");
for room in matrix_client.joined_rooms() {
let content = ruma::events::room::message::RoomMessageEventContent::text_plain(
"*sneezes*",
);
if let Err(e) = room.send(content).await {
warn!("Failed to sneeze into {}: {e}", room.room_id());
}
}
}
info!("Sol is running");
// Wait for shutdown signal
signal::ctrl_c().await?;
info!("Shutdown signal received");
// Cancel sync
sync_handle.abort();
info!("Sol has shut down");
Ok(())
}
/// Backfill conversation context from the OpenSearch archive.
///
/// Queries the most recent messages per room and seeds the ConversationManager
/// so Sol has context surviving restarts.
async fn backfill_conversations(
os_client: &OpenSearch,
config: &Config,
conversations: &Arc<Mutex<ConversationManager>>,
) -> anyhow::Result<()> {
use serde_json::json;
let window = config.behavior.room_context_window.max(config.behavior.dm_context_window);
let index = &config.opensearch.index;
// Get all distinct rooms
let agg_body = json!({
"size": 0,
"aggs": {
"rooms": {
"terms": { "field": "room_id", "size": 500 }
}
}
});
let response = os_client
.search(opensearch::SearchParts::Index(&[index]))
.body(agg_body)
.send()
.await?;
let body: serde_json::Value = response.json().await?;
let buckets = body["aggregations"]["rooms"]["buckets"]
.as_array()
.cloned()
.unwrap_or_default();
let mut total = 0;
for bucket in &buckets {
let room_id = bucket["key"].as_str().unwrap_or("");
if room_id.is_empty() {
continue;
}
// Fetch recent messages for this room
let query = json!({
"size": window,
"sort": [{ "timestamp": "asc" }],
"query": {
"bool": {
"filter": [
{ "term": { "room_id": room_id } },
{ "term": { "redacted": false } }
]
}
},
"_source": ["sender_name", "sender", "content", "timestamp"]
});
let resp = os_client
.search(opensearch::SearchParts::Index(&[index]))
.body(query)
.send()
.await?;
let data: serde_json::Value = resp.json().await?;
let hits = data["hits"]["hits"].as_array().cloned().unwrap_or_default();
if hits.is_empty() {
continue;
}
let mut convs = conversations.lock().await;
for hit in &hits {
let src = &hit["_source"];
let sender = src["sender_name"]
.as_str()
.or_else(|| src["sender"].as_str())
.unwrap_or("unknown");
let content = src["content"].as_str().unwrap_or("");
let timestamp = src["timestamp"].as_i64().unwrap_or(0);
convs.add_message(
room_id,
false, // we don't know if it's a DM from the archive, use group window
ContextMessage {
sender: sender.to_string(),
content: content.to_string(),
timestamp,
},
);
total += 1;
}
}
info!(rooms = buckets.len(), messages = total, "Backfill complete");
Ok(())
}
/// Backfill reactions from Matrix room timelines into the archive.
///
/// For each joined room, fetches recent timeline events and indexes any
/// m.reaction events that aren't already in the archive.
async fn backfill_reactions(
client: &Client,
indexer: &Arc<Indexer>,
) -> anyhow::Result<()> {
use matrix_sdk::room::MessagesOptions;
use ruma::events::AnySyncTimelineEvent;
use ruma::uint;
let rooms = client.joined_rooms();
let mut total = 0;
for room in &rooms {
let room_id = room.room_id().to_string();
// Fetch recent messages (backwards from now)
let mut options = MessagesOptions::backward();
options.limit = uint!(500);
let messages = match room.messages(options).await {
Ok(m) => m,
Err(e) => {
error!(room = room_id.as_str(), "Failed to fetch timeline for reaction backfill: {e}");
continue;
}
};
for event in &messages.chunk {
let Ok(deserialized) = event.raw().deserialize() else {
continue;
};
if let AnySyncTimelineEvent::MessageLike(
ruma::events::AnySyncMessageLikeEvent::Reaction(reaction_event),
) = deserialized
{
let original = match reaction_event {
ruma::events::SyncMessageLikeEvent::Original(ref o) => o,
_ => continue,
};
let target_event_id = original.content.relates_to.event_id.to_string();
let sender = original.sender.to_string();
let emoji = &original.content.relates_to.key;
let timestamp: i64 = original.origin_server_ts.0.into();
indexer.add_reaction(&target_event_id, &sender, emoji, timestamp).await;
total += 1;
}
}
}
info!(reactions = total, rooms = rooms.len(), "Reaction backfill complete");
Ok(())
}