diff --git a/src/brain/responder.rs b/src/brain/responder.rs index 3874ad2..cb2f0c3 100644 --- a/src/brain/responder.rs +++ b/src/brain/responder.rs @@ -146,7 +146,7 @@ impl Responder { messages.push(ChatMessage::new_user_message(&trigger)); } - let tool_defs = ToolRegistry::tool_definitions(); + let tool_defs = ToolRegistry::tool_definitions(self.tools.has_gitea()); let model = Model::new(&self.config.mistral.default_model); let max_iterations = self.config.mistral.max_tool_iterations; @@ -199,7 +199,17 @@ impl Responder { .await; let result_str = match result { - Ok(s) => s, + Ok(s) => { + let preview: String = s.chars().take(500).collect(); + info!( + tool = tc.function.name.as_str(), + id = call_id, + result_len = s.len(), + result_preview = preview.as_str(), + "Tool call result" + ); + s + } Err(e) => { warn!(tool = tc.function.name.as_str(), "Tool failed: {e}"); format!("Error: {e}") @@ -261,6 +271,7 @@ impl Responder { trigger_body: &str, trigger_sender: &str, room_id: &str, + room_name: &str, is_dm: bool, is_spontaneous: bool, mistral: &Arc, @@ -268,6 +279,7 @@ impl Responder { response_ctx: &ResponseContext, conversation_registry: &ConversationRegistry, image_data_uri: Option<&str>, + context_hint: Option, ) -> Option { // Apply response delay if !self.config.behavior.instant_responses { @@ -287,20 +299,46 @@ impl Responder { let _ = room.typing_notice(true).await; - // Build the input message (with sender prefix for group rooms) - let input_text = if is_dm { + // Pre-response memory query (same as legacy path) + let memory_notes = self.load_memory_notes(response_ctx, trigger_body).await; + + // Build the input message with dynamic context header. + // Agent instructions are static (set at creation), so per-message context + // (timestamps, room, members, memory) is prepended to each user message. + let now = chrono::Utc::now(); + let epoch_ms = now.timestamp_millis(); + let ts_1h = (now - chrono::Duration::hours(1)).timestamp_millis(); + let ts_yesterday = (now - chrono::Duration::days(1)).timestamp_millis(); + let ts_last_week = (now - chrono::Duration::days(7)).timestamp_millis(); + + let mut context_header = format!( + "[context: date={}, epoch_ms={}, ts_1h_ago={}, ts_yesterday={}, ts_last_week={}, room={}, room_name={}]", + now.format("%Y-%m-%d"), + epoch_ms, + ts_1h, + ts_yesterday, + ts_last_week, + room_id, + room_name, + ); + + if let Some(ref notes) = memory_notes { + context_header.push('\n'); + context_header.push_str(notes); + } + + let user_msg = if is_dm { trigger_body.to_string() } else { format!("<{}> {}", response_ctx.matrix_user_id, trigger_body) }; - // TODO: multimodal via image_data_uri — Conversations API may support - // content parts in entries. For now, append image description request. + let input_text = format!("{context_header}\n{user_msg}"); let input = ConversationInput::Text(input_text); // Send through conversation registry let response = match conversation_registry - .send_message(room_id, input, is_dm, mistral) + .send_message(room_id, input, is_dm, mistral, context_hint.as_deref()) .await { Ok(r) => r, @@ -346,15 +384,23 @@ impl Responder { .await; let result_str = match result { - Ok(s) => s, + Ok(s) => { + let preview: String = s.chars().take(500).collect(); + info!( + tool = fc.name.as_str(), + id = call_id, + result_len = s.len(), + result_preview = preview.as_str(), + "Tool call result (conversations)" + ); + s + } Err(e) => { warn!(tool = fc.name.as_str(), "Tool failed: {e}"); format!("Error: {e}") } }; - - result_entries.push(ConversationEntry::FunctionResult(FunctionResultEntry { tool_call_id: call_id.to_string(), result: result_str, diff --git a/src/conversations.rs b/src/conversations.rs index e84b41f..be9611a 100644 --- a/src/conversations.rs +++ b/src/conversations.rs @@ -69,12 +69,15 @@ impl ConversationRegistry { /// Get or create a conversation for a room. Returns the conversation ID. /// If a conversation doesn't exist yet, creates one with the first message. + /// `context_hint` is prepended to the first message on new conversations, + /// giving the agent recent conversation history for continuity after resets. pub async fn send_message( &self, room_id: &str, message: ConversationInput, is_dm: bool, mistral: &MistralClient, + context_hint: Option<&str>, ) -> Result { let mut mapping = self.mapping.lock().await; @@ -107,11 +110,25 @@ impl ConversationRegistry { Ok(response) } else { - // New conversation — create + // New conversation — create (with optional context hint for continuity) let agent_id = self.agent_id.lock().await.clone(); + let inputs = if let Some(hint) = context_hint { + // Prepend recent conversation history to the first message + match message { + ConversationInput::Text(text) => { + ConversationInput::Text(format!( + "[recent conversation for context]\n{hint}\n\n[current message]\n{text}" + )) + } + other => other, + } + } else { + message + }; + let req = CreateConversationRequest { - inputs: message, + inputs, model: if agent_id.is_none() { Some(self.model.clone()) } else { @@ -194,6 +211,16 @@ impl ConversationRegistry { } } + /// Reset ALL conversations (e.g., after agent recreation). + /// Clears both in-memory mappings and SQLite. + pub async fn reset_all(&self) { + let mut mapping = self.mapping.lock().await; + let count = mapping.len(); + mapping.clear(); + self.store.delete_all_conversations(); + info!(count, "Reset all conversations"); + } + /// Reset a room's conversation (e.g., after compaction). /// Removes the mapping so the next message creates a fresh conversation. pub async fn reset(&self, room_id: &str) { diff --git a/src/main.rs b/src/main.rs index ea9364d..758ba5e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -8,6 +8,7 @@ mod conversations; mod matrix_utils; mod memory; mod persistence; +mod sdk; mod sync; mod tools; @@ -65,6 +66,10 @@ async fn main() -> anyhow::Result<()> { let mistral_api_key = std::env::var("SOL_MISTRAL_API_KEY") .map_err(|_| anyhow::anyhow!("SOL_MISTRAL_API_KEY not set"))?; + // Optional Gitea admin credentials for user impersonation + let gitea_admin_username = std::env::var("SOL_GITEA_ADMIN_USERNAME").ok(); + let gitea_admin_password = std::env::var("SOL_GITEA_ADMIN_PASSWORD").ok(); + let config = Arc::new(config); // Initialize Matrix client with E2EE and sqlite store @@ -131,13 +136,52 @@ async fn main() -> anyhow::Result<()> { } } + // Initialize persistent state database (needed by token store + agent/conversation registries) + let (store, state_recovery_failed) = match persistence::Store::open(&config.matrix.db_path) { + Ok(s) => (Arc::new(s), false), + Err(e) => { + error!("Failed to open state database at {}: {e}", config.matrix.db_path); + error!("Falling back to in-memory state — conversations will not survive restarts"); + (Arc::new(persistence::Store::open_memory().expect("in-memory DB must work")), true) + } + }; + + // Initialize Vault client for secure token storage + let vault_client = Arc::new(sdk::vault::VaultClient::new( + &config.vault.url, + &config.vault.role, + &config.vault.mount, + )); + + // Initialize Gitea client if configured + let gitea_client: Option> = + if let (Some(gitea_config), Some(admin_user), Some(admin_pass)) = + (&config.services.gitea, &gitea_admin_username, &gitea_admin_password) + { + let token_store = Arc::new(sdk::tokens::TokenStore::new( + store.clone(), + vault_client.clone(), + )); + info!(url = gitea_config.url.as_str(), "Gitea integration enabled"); + Some(Arc::new(sdk::gitea::GiteaClient::new( + gitea_config.url.clone(), + admin_user.clone(), + admin_pass.clone(), + token_store, + ))) + } else { + info!("Gitea integration disabled (missing config or credentials)"); + None + }; + let tool_registry = Arc::new(ToolRegistry::new( os_client.clone(), matrix_client.clone(), config.clone(), + gitea_client, )); let indexer = Arc::new(Indexer::new(os_client.clone(), config.clone())); - let evaluator = Arc::new(Evaluator::new(config.clone())); + let evaluator = Arc::new(Evaluator::new(config.clone(), system_prompt_text.clone())); let responder = Arc::new(Responder::new( config.clone(), personality, @@ -148,22 +192,12 @@ async fn main() -> anyhow::Result<()> { // Start background flush task let _flush_handle = indexer.start_flush_task(); - // Initialize persistent state database - let (store, state_recovery_failed) = match persistence::Store::open(&config.matrix.db_path) { - Ok(s) => (Arc::new(s), false), - Err(e) => { - error!("Failed to open state database at {}: {e}", config.matrix.db_path); - error!("Falling back to in-memory state — conversations will not survive restarts"); - (Arc::new(persistence::Store::open_memory().expect("in-memory DB must work")), true) - } - }; - // Initialize agent registry and conversation registry (with SQLite backing) let agent_registry = Arc::new(AgentRegistry::new(store.clone())); let conversation_registry = Arc::new(ConversationRegistry::new( config.mistral.default_model.clone(), config.agents.compaction_threshold, - store, + store.clone(), )); // Build shared state @@ -182,9 +216,10 @@ async fn main() -> anyhow::Result<()> { }); // Initialize orchestrator agent if conversations API is enabled + let mut agent_recreated = false; if config.agents.use_conversations_api { info!("Conversations API enabled — ensuring orchestrator agent exists"); - let agent_tools = tools::ToolRegistry::agent_tool_definitions(); + let agent_tools = tools::ToolRegistry::agent_tool_definitions(config.services.gitea.is_some()); match state .agent_registry .ensure_orchestrator( @@ -192,12 +227,20 @@ async fn main() -> anyhow::Result<()> { &config.agents.orchestrator_model, agent_tools, &state.mistral, + &[], // no domain agents yet — delegation section added when they are ) .await { - Ok(agent_id) => { - info!(agent_id = agent_id.as_str(), "Orchestrator agent ready"); + Ok((agent_id, recreated)) => { + info!(agent_id = agent_id.as_str(), recreated, "Orchestrator agent ready"); state.conversation_registry.set_agent_id(agent_id).await; + + if recreated { + // Agent was recreated (system prompt changed) — old conversations + // are bound to the stale agent and won't work. Reset everything. + state.conversation_registry.reset_all().await; + agent_recreated = true; + } } Err(e) => { error!("Failed to create orchestrator agent: {e}"); @@ -221,8 +264,8 @@ async fn main() -> anyhow::Result<()> { } }); - // If state recovery failed, sneeze into all rooms to signal the hiccup - if state_recovery_failed { + // If state recovery failed or agent was recreated, sneeze into all rooms + if state_recovery_failed || agent_recreated { info!("State recovery failed — sneezing into all rooms"); for room in matrix_client.joined_rooms() { let content = ruma::events::room::message::RoomMessageEventContent::text_plain( diff --git a/src/sync.rs b/src/sync.rs index 3327fb5..5548538 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -271,6 +271,29 @@ async fn handle_message( let members = matrix_utils::room_member_names(&room).await; let display_sender = sender_name.as_deref().unwrap_or(&sender); + // Build context hint for new conversations (last 50 messages for continuity) + let context_hint = if state.config.agents.use_conversations_api { + let conv_exists = state + .conversation_registry + .get_conversation_id(&room_id) + .await + .is_some(); + if !conv_exists && !context.is_empty() { + let hint_messages: Vec = context + .iter() + .rev() + .take(50) + .rev() + .map(|m| format!("{}: {}", m.sender, m.content)) + .collect(); + Some(hint_messages.join("\n")) + } else { + None + } + } else { + None + }; + let response = if state.config.agents.use_conversations_api { state .responder @@ -278,6 +301,7 @@ async fn handle_message( &body, display_sender, &room_id, + &room_name, is_dm, is_spontaneous, &state.mistral, @@ -285,6 +309,7 @@ async fn handle_message( &response_ctx, &state.conversation_registry, image_data_uri.as_deref(), + context_hint, ) .await } else {