diff --git a/Cargo.toml b/Cargo.toml index b1aab33..492b84b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,6 +22,7 @@ serde_json = "1" toml = "0.8" tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } +tracing-appender = "0.2" rand = "0.8" regex = "1" anyhow = "1" diff --git a/Procfile.dev b/Procfile.dev new file mode 100644 index 0000000..04b0461 --- /dev/null +++ b/Procfile.dev @@ -0,0 +1 @@ +sol: cargo run diff --git a/src/grpc/service.rs b/src/grpc/service.rs index 399a0db..c368418 100644 --- a/src/grpc/service.rs +++ b/src/grpc/service.rs @@ -223,41 +223,25 @@ async fn run_session( })) .await?; - // Check if orchestrator is available - let has_orch = state.orchestrator.is_some(); - info!(has_orchestrator = has_orch, "Checking orchestrator availability"); - let orchestrator = state.orchestrator.as_ref().cloned(); + let orchestrator = state.orchestrator.as_ref() + .ok_or_else(|| anyhow::anyhow!("Orchestrator not initialized"))? + .clone(); // Main message loop while let Some(msg) = in_stream.message().await? { match msg.payload { Some(client_message::Payload::Input(input)) => { - if let Some(ref orch) = orchestrator { - // Orchestrator path: delegate tool loop, bridge forwards events - if let Err(e) = session_chat_via_orchestrator( - &mut session, &input.text, orch, tx, in_stream, - ).await { - error!("Chat error: {e}"); - tx.send(Ok(ServerMessage { - payload: Some(server_message::Payload::Error(Error { - message: e.to_string(), - fatal: false, - })), - })) - .await?; - } - } else { - // Fallback: inline tool loop (legacy) - if let Err(e) = session.chat(&input.text, tx, in_stream).await { - error!("Chat error: {e}"); - tx.send(Ok(ServerMessage { - payload: Some(server_message::Payload::Error(Error { - message: e.to_string(), - fatal: false, - })), - })) - .await?; - } + if let Err(e) = session_chat_via_orchestrator( + &mut session, &input.text, &orchestrator, tx, in_stream, + ).await { + error!("Chat error: {e}"); + tx.send(Ok(ServerMessage { + payload: Some(server_message::Payload::Error(Error { + message: e.to_string(), + fatal: false, + })), + })) + .await?; } } Some(client_message::Payload::IndexSymbols(idx)) => { diff --git a/src/grpc/session.rs b/src/grpc/session.rs index db9db28..c86c530 100644 --- a/src/grpc/session.rs +++ b/src/grpc/session.rs @@ -1,21 +1,16 @@ use std::sync::Arc; use matrix_sdk::room::Room; -use matrix_sdk::ruma::api::client::room::create_room::v3::Request as CreateRoomRequest; use matrix_sdk::ruma::events::room::message::RoomMessageEventContent; -use mistralai_client::v1::client::Client as MistralClient; use mistralai_client::v1::conversations::{ AppendConversationRequest, ConversationInput, ConversationResponse, - CreateConversationRequest, FunctionResultEntry, ConversationEntry, + CreateConversationRequest, }; -use tokio::sync::mpsc; -use tracing::{debug, error, info, warn}; +use tracing::{info, warn}; use super::auth::Claims; use super::proto::*; -use super::router; use super::GrpcState; -use crate::context::ResponseContext; use crate::time_context::TimeContext; /// A live coding session — manages the Matrix room, Mistral conversation, @@ -265,6 +260,45 @@ you also have access to server-side tools: search_archive, search_web, research, header } + /// Create a fresh Mistral conversation, replacing any existing one. + async fn create_fresh_conversation( + &mut self, + input_text: String, + ) -> anyhow::Result { + let instructions = self.build_instructions(); + let req = CreateConversationRequest { + inputs: ConversationInput::Text(input_text), + model: Some(self.model.clone()), + agent_id: None, + agent_version: None, + name: Some(format!("code-{}", self.project_name)), + description: None, + instructions: Some(instructions), + completion_args: None, + tools: Some(self.build_tool_definitions()), + handoff_execution: None, + metadata: None, + store: Some(true), + stream: false, + }; + let resp = self.state.mistral + .create_conversation_async(&req) + .await + .map_err(|e| anyhow::anyhow!("create_conversation failed: {}", e.message))?; + + self.conversation_id = Some(resp.conversation_id.clone()); + self.state.store.set_code_session_conversation( + &self.session_id, + &resp.conversation_id, + ); + + info!( + conversation_id = resp.conversation_id.as_str(), + "Created Mistral conversation for code session" + ); + Ok(resp) + } + fn git_branch(&self) -> String { // Use the git branch from StartSession, fall back to "mainline" if self.project_path.is_empty() { @@ -283,204 +317,6 @@ you also have access to server-side tools: search_archive, search_web, research, } } - /// Send a user message and run the agent loop. - /// Returns tool calls that need client execution via the tx channel. - /// Server-side tools are executed inline. - pub async fn chat( - &mut self, - text: &str, - client_tx: &mpsc::Sender>, - client_rx: &mut tonic::Streaming, - ) -> anyhow::Result<()> { - let context_header = self.build_context_header(text).await; - let input_text = format!("{context_header}\n{text}"); - - // Post user message to Matrix room (as m.notice to distinguish from assistant) - if let Some(ref room) = self.room { - let content = RoomMessageEventContent::notice_plain(text); - let _ = room.send(content).await; - } - - // Send status - let _ = client_tx.send(Ok(ServerMessage { - payload: Some(server_message::Payload::Status(Status { - message: "generating…".into(), - kind: StatusKind::Thinking.into(), - })), - })).await; - - // Create or append to Mistral conversation - let response = if let Some(ref conv_id) = self.conversation_id { - let req = AppendConversationRequest { - inputs: ConversationInput::Text(input_text), - completion_args: None, - handoff_execution: None, - store: Some(true), - tool_confirmations: None, - stream: false, - }; - self.state.mistral - .append_conversation_async(conv_id, &req) - .await - .map_err(|e| anyhow::anyhow!("append_conversation failed: {}", e.message))? - } else { - let instructions = self.build_instructions(); - let req = CreateConversationRequest { - inputs: ConversationInput::Text(input_text), - model: Some(self.model.clone()), - agent_id: None, - agent_version: None, - name: Some(format!("code-{}", self.project_name)), - description: None, - instructions: Some(instructions), - completion_args: None, - tools: Some(self.build_tool_definitions()), - handoff_execution: None, - metadata: None, - store: Some(true), - stream: false, - }; - let resp = self.state.mistral - .create_conversation_async(&req) - .await - .map_err(|e| anyhow::anyhow!("create_conversation failed: {}", e.message))?; - - self.conversation_id = Some(resp.conversation_id.clone()); - self.state.store.set_code_session_conversation( - &self.session_id, - &resp.conversation_id, - ); - - info!( - conversation_id = resp.conversation_id.as_str(), - "Created Mistral conversation for code session" - ); - resp - }; - - // Tool call loop - let max_iterations = self.state.config.mistral.max_tool_iterations; - let mut current_response = response; - - for _iteration in 0..max_iterations { - let calls = current_response.function_calls(); - if calls.is_empty() { - break; - } - - let mut result_entries = Vec::new(); - - for fc in &calls { - let call_id = fc.tool_call_id.as_deref().unwrap_or("unknown"); - let is_local = router::is_client_tool(&fc.name); - - if is_local { - // Send to client for execution - let needs_approval = true; // TODO: check tool permissions from config - - client_tx.send(Ok(ServerMessage { - payload: Some(server_message::Payload::ToolCall(ToolCall { - call_id: call_id.into(), - name: fc.name.clone(), - args_json: fc.arguments.clone(), - is_local: true, - needs_approval, - })), - })).await?; - - // Wait for client to send back the result - let result = wait_for_tool_result(client_rx, call_id).await?; - - result_entries.push(ConversationEntry::FunctionResult(FunctionResultEntry { - tool_call_id: call_id.to_string(), - result: result.result, - id: None, - object: None, - created_at: None, - completed_at: None, - })); - } else { - // Execute server-side - client_tx.send(Ok(ServerMessage { - payload: Some(server_message::Payload::Status(Status { - message: format!("executing {}...", fc.name), - kind: StatusKind::ToolRunning.into(), - })), - })).await?; - - let response_ctx = ResponseContext { - matrix_user_id: format!("@{}:sunbeam.pt", crate::context::localpart(&self.user_id)), - user_id: self.user_id.clone(), - display_name: None, - is_dm: true, - is_reply: false, - room_id: self.room_id.clone(), - }; - - let result = match self.state.tools - .execute(&fc.name, &fc.arguments, &response_ctx) - .await - { - Ok(s) => s, - Err(e) => format!("Error: {e}"), - }; - - client_tx.send(Ok(ServerMessage { - payload: Some(server_message::Payload::Status(Status { - message: format!("{} done", fc.name), - kind: StatusKind::ToolDone.into(), - })), - })).await?; - - result_entries.push(ConversationEntry::FunctionResult(FunctionResultEntry { - tool_call_id: call_id.to_string(), - result, - id: None, - object: None, - created_at: None, - completed_at: None, - })); - } - } - - // Send results back to Mistral - let conv_id = self.conversation_id.as_ref().unwrap(); - let req = AppendConversationRequest { - inputs: ConversationInput::Entries(result_entries), - completion_args: None, - handoff_execution: None, - store: Some(true), - tool_confirmations: None, - stream: false, - }; - - current_response = self.state.mistral - .append_conversation_async(conv_id, &req) - .await - .map_err(|e| anyhow::anyhow!("append_conversation (tool results) failed: {}", e.message))?; - } - - // Extract final text - if let Some(text) = current_response.assistant_text() { - // Post to Matrix room - if let Some(ref room) = self.room { - let content = RoomMessageEventContent::text_markdown(&text); - let _ = room.send(content).await; - } - - client_tx.send(Ok(ServerMessage { - payload: Some(server_message::Payload::Done(TextDone { - full_text: text, - input_tokens: current_response.usage.prompt_tokens, - output_tokens: current_response.usage.completion_tokens, - })), - })).await?; - } - - self.state.store.touch_code_session(&self.session_id); - Ok(()) - } - /// Build tool definitions for the Mistral conversation. /// Union of server-side tools + client-side tool schemas. fn build_tool_definitions(&self) -> Vec { @@ -557,58 +393,40 @@ you also have access to server-side tools: search_archive, search_web, research, pub async fn create_or_append_conversation( &mut self, text: &str, - ) -> anyhow::Result { + ) -> anyhow::Result { let context_header = self.build_context_header(text).await; let input_text = format!("{context_header}\n{text}"); if let Some(ref conv_id) = self.conversation_id { let req = AppendConversationRequest { - inputs: ConversationInput::Text(input_text), + inputs: ConversationInput::Text(input_text.clone()), completion_args: None, handoff_execution: None, store: Some(true), tool_confirmations: None, stream: false, }; - self.state + match self.state .mistral .append_conversation_async(conv_id, &req) .await - .map_err(|e| anyhow::anyhow!("append_conversation failed: {}", e.message)) + { + Ok(resp) => Ok(resp), + Err(e) if e.message.contains("function calls and responses") + || e.message.contains("invalid_request_error") => + { + warn!( + conversation_id = conv_id.as_str(), + error = e.message.as_str(), + "Conversation corrupted — creating fresh conversation" + ); + self.conversation_id = None; + self.create_fresh_conversation(input_text).await + } + Err(e) => Err(anyhow::anyhow!("append_conversation failed: {}", e.message)), + } } else { - let instructions = self.build_instructions(); - let req = CreateConversationRequest { - inputs: ConversationInput::Text(input_text), - model: Some(self.model.clone()), - agent_id: None, - agent_version: None, - name: Some(format!("code-{}", self.project_name)), - description: None, - instructions: Some(instructions), - completion_args: None, - tools: Some(self.build_tool_definitions()), - handoff_execution: None, - metadata: None, - store: Some(true), - stream: false, - }; - let resp = self.state - .mistral - .create_conversation_async(&req) - .await - .map_err(|e| anyhow::anyhow!("create_conversation failed: {}", e.message))?; - - self.conversation_id = Some(resp.conversation_id.clone()); - self.state.store.set_code_session_conversation( - &self.session_id, - &resp.conversation_id, - ); - - info!( - conversation_id = resp.conversation_id.as_str(), - "Created Mistral conversation for code session" - ); - Ok(resp) + self.create_fresh_conversation(input_text).await } } @@ -641,41 +459,6 @@ you also have access to server-side tools: search_archive, search_web, research, } /// Wait for a ToolResult message from the client stream. -async fn wait_for_tool_result( - stream: &mut tonic::Streaming, - expected_call_id: &str, -) -> anyhow::Result { - // Read messages until we get the matching ToolResult - while let Some(msg) = stream.message().await? { - match msg.payload { - Some(client_message::Payload::ToolResult(result)) => { - if result.call_id == expected_call_id { - return Ok(result); - } - warn!( - expected = expected_call_id, - got = result.call_id.as_str(), - "Received tool result for wrong call ID" - ); - } - Some(client_message::Payload::Approval(approval)) => { - if approval.call_id == expected_call_id && !approval.approved { - return Ok(ToolResult { - call_id: expected_call_id.into(), - result: "Tool execution denied by user.".into(), - is_error: true, - }); - } - } - _ => { - debug!("Ignoring non-tool-result message while waiting for tool result"); - } - } - } - - anyhow::bail!("Stream closed while waiting for tool result {expected_call_id}") -} - /// Create a private Matrix room for a coding project. async fn create_project_room( client: &matrix_sdk::Client, diff --git a/src/main.rs b/src/main.rs index 21861ab..7f171cd 100644 --- a/src/main.rs +++ b/src/main.rs @@ -44,13 +44,39 @@ use tools::ToolRegistry; #[tokio::main] async fn main() -> anyhow::Result<()> { - // Initialize tracing - tracing_subscriber::fmt() - .with_env_filter( - tracing_subscriber::EnvFilter::try_from_default_env() - .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("sol=info")), - ) - .init(); + // Initialize tracing — optionally write to a rotating log file + let env_filter = tracing_subscriber::EnvFilter::try_from_default_env() + .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("sol=info")); + + let _log_guard = if let Ok(log_path) = std::env::var("SOL_LOG_FILE") { + let log_dir = std::path::Path::new(&log_path) + .parent() + .unwrap_or(std::path::Path::new(".")); + let log_name = std::path::Path::new(&log_path) + .file_name() + .and_then(|n| n.to_str()) + .unwrap_or("sol.log"); + + let file_appender = tracing_appender::rolling::Builder::new() + .max_log_files(3) + .rotation(tracing_appender::rolling::Rotation::NEVER) + .filename_prefix(log_name) + .build(log_dir) + .expect("Failed to create log file appender"); + + let (non_blocking, guard) = tracing_appender::non_blocking(file_appender); + tracing_subscriber::fmt() + .with_env_filter(env_filter) + .with_writer(non_blocking) + .with_ansi(false) + .init(); + Some(guard) + } else { + tracing_subscriber::fmt() + .with_env_filter(env_filter) + .init(); + None + }; // Load config let config_path =