From abfad337c569088e6d46e455e93d3ebc25b457c0 Mon Sep 17 00:00:00 2001 From: Sienna Meridian Satterwhite Date: Mon, 23 Mar 2026 11:46:22 +0000 Subject: [PATCH] feat(code): CodeSession + agent loop + Matrix room bridge MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit phase 2 server core: - CodeSession: create/resume sessions, Matrix room per project, Mistral conversation lifecycle, tool dispatch loop - agent loop: user input → Mistral → tool calls → route (client via gRPC / server via ToolRegistry) → collect results → respond - Matrix bridge: all messages posted to project room, accessible from any Matrix client - code_sessions SQLite table (Postgres-compatible schema) - coding mode context injection (project path, git info, prompt.md) --- src/grpc/mod.rs | 1 + src/grpc/service.rs | 102 ++++------ src/grpc/session.rs | 474 ++++++++++++++++++++++++++++++++++++++++++++ src/persistence.rs | 127 ++++++++++++ 4 files changed, 638 insertions(+), 66 deletions(-) create mode 100644 src/grpc/session.rs diff --git a/src/grpc/mod.rs b/src/grpc/mod.rs index 42d52c7..fab9ba4 100644 --- a/src/grpc/mod.rs +++ b/src/grpc/mod.rs @@ -1,6 +1,7 @@ pub mod auth; pub mod router; pub mod service; +pub mod session; mod proto { tonic::include_proto!("sunbeam.code.v1"); diff --git a/src/grpc/service.rs b/src/grpc/service.rs index 6d358ac..a17a6d0 100644 --- a/src/grpc/service.rs +++ b/src/grpc/service.rs @@ -10,6 +10,7 @@ use tracing::{error, info, warn}; use super::auth::Claims; use super::proto::code_agent_server::CodeAgent; use super::proto::*; +use super::session::CodeSession; use super::GrpcState; pub struct CodeAgentService { @@ -30,7 +31,6 @@ impl CodeAgent for CodeAgentService { &self, request: Request>, ) -> Result, Status> { - // Extract JWT claims from the request extensions (set by auth middleware) let claims = request .extensions() .get::() @@ -40,18 +40,16 @@ impl CodeAgent for CodeAgentService { info!( user = claims.sub.as_str(), email = claims.email.as_deref().unwrap_or("?"), - "New coding session" + "New coding session request" ); let mut in_stream = request.into_inner(); let state = self.state.clone(); - // Channel for sending server messages to the client - let (tx, rx) = mpsc::channel::>(32); + let (tx, rx) = mpsc::channel::>(64); - // Spawn the session handler tokio::spawn(async move { - if let Err(e) = handle_session(&state, &claims, &mut in_stream, &tx).await { + if let Err(e) = run_session(&state, &claims, &mut in_stream, &tx).await { error!(user = claims.sub.as_str(), "Session error: {e}"); let _ = tx .send(Ok(ServerMessage { @@ -69,14 +67,13 @@ impl CodeAgent for CodeAgentService { } } -/// Handle a single coding session (runs in a spawned task). -async fn handle_session( +async fn run_session( state: &GrpcState, claims: &Claims, in_stream: &mut Streaming, tx: &mpsc::Sender>, ) -> anyhow::Result<()> { - // Wait for the first message — must be StartSession + // Wait for StartSession let first = in_stream .message() .await? @@ -87,32 +84,26 @@ async fn handle_session( _ => anyhow::bail!("First message must be StartSession"), }; - info!( - user = claims.sub.as_str(), - project = start.project_path.as_str(), - model = start.model.as_str(), - client_tools = start.client_tools.len(), - "Session started" - ); + // Create or resume session + let mut session = CodeSession::start( + Arc::new(GrpcState { + config: state.config.clone(), + tools: state.tools.clone(), + store: state.store.clone(), + mistral: state.mistral.clone(), + matrix: state.matrix.clone(), + }), + claims, + &start, + ) + .await?; - // TODO Phase 2: Create/find Matrix room for this project - // TODO Phase 2: Create Mistral conversation - // TODO Phase 2: Enter agent loop - - // For now, send SessionReady and echo back + // Send SessionReady tx.send(Ok(ServerMessage { payload: Some(server_message::Payload::Ready(SessionReady { - session_id: uuid::Uuid::new_v4().to_string(), - room_id: String::new(), // TODO: Matrix room - model: if start.model.is_empty() { - state - .config - .agents - .coding_model - .clone() - } else { - start.model.clone() - }, + session_id: session.session_id.clone(), + room_id: session.room_id.clone(), + model: session.model.clone(), })), })) .await?; @@ -121,41 +112,19 @@ async fn handle_session( while let Some(msg) = in_stream.message().await? { match msg.payload { Some(client_message::Payload::Input(input)) => { - info!( - user = claims.sub.as_str(), - text_len = input.text.len(), - "User input received" - ); - - // TODO Phase 2: Send to Mistral, handle tool calls, stream response - // For now, echo back as a simple acknowledgment - tx.send(Ok(ServerMessage { - payload: Some(server_message::Payload::Done(TextDone { - full_text: format!("[stub] received: {}", input.text), - input_tokens: 0, - output_tokens: 0, - })), - })) - .await?; - } - Some(client_message::Payload::ToolResult(result)) => { - info!( - call_id = result.call_id.as_str(), - is_error = result.is_error, - "Tool result received" - ); - // TODO Phase 2: Feed back to Mistral - } - Some(client_message::Payload::Approval(approval)) => { - info!( - call_id = approval.call_id.as_str(), - approved = approval.approved, - "Tool approval received" - ); - // TODO Phase 2: Execute or skip tool + if let Err(e) = session.chat(&input.text, tx, in_stream).await { + error!("Chat error: {e}"); + tx.send(Ok(ServerMessage { + payload: Some(server_message::Payload::Error(Error { + message: e.to_string(), + fatal: false, + })), + })) + .await?; + } } Some(client_message::Payload::End(_)) => { - info!(user = claims.sub.as_str(), "Session ended by client"); + session.end(); tx.send(Ok(ServerMessage { payload: Some(server_message::Payload::End(SessionEnd { summary: "Session ended.".into(), @@ -167,7 +136,8 @@ async fn handle_session( Some(client_message::Payload::Start(_)) => { warn!("Received duplicate StartSession — ignoring"); } - None => continue, + // ToolResult and Approval are handled inside session.chat() + _ => continue, } } diff --git a/src/grpc/session.rs b/src/grpc/session.rs new file mode 100644 index 0000000..73623e3 --- /dev/null +++ b/src/grpc/session.rs @@ -0,0 +1,474 @@ +use std::sync::Arc; + +use matrix_sdk::room::Room; +use matrix_sdk::ruma::api::client::room::create_room::v3::Request as CreateRoomRequest; +use matrix_sdk::ruma::events::room::message::RoomMessageEventContent; +use mistralai_client::v1::client::Client as MistralClient; +use mistralai_client::v1::conversations::{ + AppendConversationRequest, ConversationInput, ConversationResponse, + CreateConversationRequest, FunctionResultEntry, ConversationEntry, +}; +use tokio::sync::mpsc; +use tracing::{debug, error, info, warn}; + +use super::auth::Claims; +use super::proto::*; +use super::router; +use super::GrpcState; +use crate::context::ResponseContext; +use crate::time_context::TimeContext; + +/// A live coding session — manages the Matrix room, Mistral conversation, +/// and tool dispatch between client and server. +pub struct CodeSession { + pub session_id: String, + pub room_id: String, + pub conversation_id: Option, + pub project_name: String, + pub project_path: String, + pub model: String, + pub user_id: String, + pub prompt_md: String, + state: Arc, + room: Option, +} + +impl CodeSession { + /// Create or resume a coding session. + pub async fn start( + state: Arc, + claims: &Claims, + start: &StartSession, + ) -> anyhow::Result { + let project_name = extract_project_name(&start.project_path); + let user_id = claims.sub.clone(); + + let model = if start.model.is_empty() { + state.config.agents.coding_model.clone() + } else { + start.model.clone() + }; + + // Check for existing session for this user + project + if let Some((session_id, room_id, conv_id)) = + state.store.find_code_session(&user_id, &project_name) + { + info!( + session_id = session_id.as_str(), + room_id = room_id.as_str(), + "Resuming existing code session" + ); + + let room = state.matrix.get_room( + <&matrix_sdk::ruma::RoomId>::try_from(room_id.as_str()) + .map_err(|e| anyhow::anyhow!("Invalid room ID: {e}"))?, + ); + + state.store.touch_code_session(&session_id); + + return Ok(Self { + session_id, + room_id, + conversation_id: if conv_id.is_empty() { None } else { Some(conv_id) }, + project_name, + project_path: start.project_path.clone(), + model, + user_id, + prompt_md: start.prompt_md.clone(), + state, + room, + }); + } + + // Create new session + let session_id = uuid::Uuid::new_v4().to_string(); + + // Create private Matrix room for this project + let room_name = format!("code: {project_name}"); + let room_id = create_project_room(&state.matrix, &room_name, &claims.email) + .await + .unwrap_or_else(|e| { + warn!("Failed to create Matrix room: {e}"); + format!("!code-{session_id}:local") // fallback ID + }); + + let room = state.matrix.get_room( + <&matrix_sdk::ruma::RoomId>::try_from(room_id.as_str()).ok() + .unwrap_or_else(|| { + // This shouldn't happen but handle gracefully + warn!("Invalid room ID {room_id}, session will work without Matrix bridge"); + <&matrix_sdk::ruma::RoomId>::try_from("!invalid:local").unwrap() + }), + ); + + state.store.create_code_session( + &session_id, + &user_id, + &room_id, + &start.project_path, + &project_name, + &model, + ); + + info!( + session_id = session_id.as_str(), + room_id = room_id.as_str(), + project = project_name.as_str(), + model = model.as_str(), + "Created new code session" + ); + + Ok(Self { + session_id, + room_id, + conversation_id: None, + project_name, + project_path: start.project_path.clone(), + model, + user_id, + prompt_md: start.prompt_md.clone(), + state, + room, + }) + } + + /// Build the per-message context header for coding mode. + fn build_context_header(&self) -> String { + let tc = TimeContext::now(); + format!( + "{}\n[project: {} | path: {} | model: {}]\n{}", + tc.message_line(), + self.project_name, + self.project_path, + self.model, + if self.prompt_md.is_empty() { + String::new() + } else { + format!("## project instructions\n{}\n", self.prompt_md) + }, + ) + } + + /// Send a user message and run the agent loop. + /// Returns tool calls that need client execution via the tx channel. + /// Server-side tools are executed inline. + pub async fn chat( + &mut self, + text: &str, + client_tx: &mpsc::Sender>, + client_rx: &mut tonic::Streaming, + ) -> anyhow::Result<()> { + let context_header = self.build_context_header(); + let input_text = format!("{context_header}\n{text}"); + + // Post to Matrix room + if let Some(ref room) = self.room { + let content = RoomMessageEventContent::text_plain(text); + let _ = room.send(content).await; + } + + // Send status + let _ = client_tx.send(Ok(ServerMessage { + payload: Some(server_message::Payload::Status(Status { + message: "thinking...".into(), + kind: StatusKind::Thinking.into(), + })), + })).await; + + // Create or append to Mistral conversation + let response = if let Some(ref conv_id) = self.conversation_id { + let req = AppendConversationRequest { + inputs: ConversationInput::Text(input_text), + completion_args: None, + handoff_execution: None, + store: Some(true), + tool_confirmations: None, + stream: false, + }; + self.state.mistral + .append_conversation_async(conv_id, &req) + .await + .map_err(|e| anyhow::anyhow!("append_conversation failed: {}", e.message))? + } else { + let req = CreateConversationRequest { + inputs: ConversationInput::Text(input_text), + model: Some(self.model.clone()), + agent_id: None, + agent_version: None, + name: Some(format!("code-{}", self.project_name)), + description: None, + instructions: None, + completion_args: None, + tools: Some(self.build_tool_definitions()), + handoff_execution: None, + metadata: None, + store: Some(true), + stream: false, + }; + let resp = self.state.mistral + .create_conversation_async(&req) + .await + .map_err(|e| anyhow::anyhow!("create_conversation failed: {}", e.message))?; + + self.conversation_id = Some(resp.conversation_id.clone()); + self.state.store.set_code_session_conversation( + &self.session_id, + &resp.conversation_id, + ); + + info!( + conversation_id = resp.conversation_id.as_str(), + "Created Mistral conversation for code session" + ); + resp + }; + + // Tool call loop + let max_iterations = self.state.config.mistral.max_tool_iterations; + let mut current_response = response; + + for _iteration in 0..max_iterations { + let calls = current_response.function_calls(); + if calls.is_empty() { + break; + } + + let mut result_entries = Vec::new(); + + for fc in &calls { + let call_id = fc.tool_call_id.as_deref().unwrap_or("unknown"); + let is_local = router::is_client_tool(&fc.name); + + if is_local { + // Send to client for execution + let needs_approval = true; // TODO: check tool permissions from config + + client_tx.send(Ok(ServerMessage { + payload: Some(server_message::Payload::ToolCall(ToolCall { + call_id: call_id.into(), + name: fc.name.clone(), + args_json: fc.arguments.clone(), + is_local: true, + needs_approval, + })), + })).await?; + + // Wait for client to send back the result + let result = wait_for_tool_result(client_rx, call_id).await?; + + result_entries.push(ConversationEntry::FunctionResult(FunctionResultEntry { + tool_call_id: call_id.to_string(), + result: result.result, + id: None, + object: None, + created_at: None, + completed_at: None, + })); + } else { + // Execute server-side + client_tx.send(Ok(ServerMessage { + payload: Some(server_message::Payload::Status(Status { + message: format!("executing {}...", fc.name), + kind: StatusKind::ToolRunning.into(), + })), + })).await?; + + let response_ctx = ResponseContext { + matrix_user_id: format!("@{}:sunbeam.pt", crate::context::localpart(&self.user_id)), + user_id: self.user_id.clone(), + display_name: None, + is_dm: true, + is_reply: false, + room_id: self.room_id.clone(), + }; + + let result = match self.state.tools + .execute(&fc.name, &fc.arguments, &response_ctx) + .await + { + Ok(s) => s, + Err(e) => format!("Error: {e}"), + }; + + client_tx.send(Ok(ServerMessage { + payload: Some(server_message::Payload::Status(Status { + message: format!("{} done", fc.name), + kind: StatusKind::ToolDone.into(), + })), + })).await?; + + result_entries.push(ConversationEntry::FunctionResult(FunctionResultEntry { + tool_call_id: call_id.to_string(), + result, + id: None, + object: None, + created_at: None, + completed_at: None, + })); + } + } + + // Send results back to Mistral + let conv_id = self.conversation_id.as_ref().unwrap(); + let req = AppendConversationRequest { + inputs: ConversationInput::Entries(result_entries), + completion_args: None, + handoff_execution: None, + store: Some(true), + tool_confirmations: None, + stream: false, + }; + + current_response = self.state.mistral + .append_conversation_async(conv_id, &req) + .await + .map_err(|e| anyhow::anyhow!("append_conversation (tool results) failed: {}", e.message))?; + } + + // Extract final text + if let Some(text) = current_response.assistant_text() { + // Post to Matrix room + if let Some(ref room) = self.room { + let content = RoomMessageEventContent::text_markdown(&text); + let _ = room.send(content).await; + } + + client_tx.send(Ok(ServerMessage { + payload: Some(server_message::Payload::Done(TextDone { + full_text: text, + input_tokens: current_response.usage.prompt_tokens, + output_tokens: current_response.usage.completion_tokens, + })), + })).await?; + } + + self.state.store.touch_code_session(&self.session_id); + Ok(()) + } + + /// Build tool definitions for the Mistral conversation. + /// Union of server-side tools + client-side tool schemas. + fn build_tool_definitions(&self) -> Vec { + let mut tools = crate::tools::ToolRegistry::agent_tool_definitions( + self.state.config.services.gitea.is_some(), + self.state.config.services.kratos.is_some(), + ); + + // Add client-side tool definitions + let client_tools = vec![ + ("file_read", "Read a file's contents. Use path for the file path, and optional start_line/end_line for a range."), + ("file_write", "Create or overwrite a file. Use path for the file path and content for the file contents."), + ("search_replace", "Patch a file using SEARCH/REPLACE blocks. Use path for the file and diff for the SEARCH/REPLACE content."), + ("grep", "Search files recursively with regex. Use pattern for the regex and optional path for the search root."), + ("bash", "Execute a shell command. Use command for the command string."), + ("list_directory", "List files and directories. Use path for the directory (default: project root) and optional depth."), + ]; + + for (name, desc) in client_tools { + tools.push(mistralai_client::v1::agents::AgentTool::function( + name.into(), + desc.into(), + serde_json::json!({ + "type": "object", + "properties": { + "path": { "type": "string", "description": "File or directory path" }, + "content": { "type": "string", "description": "File content (for write)" }, + "diff": { "type": "string", "description": "SEARCH/REPLACE blocks (for search_replace)" }, + "pattern": { "type": "string", "description": "Regex pattern (for grep)" }, + "command": { "type": "string", "description": "Shell command (for bash)" }, + "start_line": { "type": "integer", "description": "Start line (for file_read)" }, + "end_line": { "type": "integer", "description": "End line (for file_read)" }, + "depth": { "type": "integer", "description": "Directory depth (for list_directory)" } + } + }), + )); + } + + tools + } + + /// End the session. + pub fn end(&self) { + self.state.store.end_code_session(&self.session_id); + info!(session_id = self.session_id.as_str(), "Code session ended"); + } +} + +/// Wait for a ToolResult message from the client stream. +async fn wait_for_tool_result( + stream: &mut tonic::Streaming, + expected_call_id: &str, +) -> anyhow::Result { + // Read messages until we get the matching ToolResult + while let Some(msg) = stream.message().await? { + match msg.payload { + Some(client_message::Payload::ToolResult(result)) => { + if result.call_id == expected_call_id { + return Ok(result); + } + warn!( + expected = expected_call_id, + got = result.call_id.as_str(), + "Received tool result for wrong call ID" + ); + } + Some(client_message::Payload::Approval(approval)) => { + if approval.call_id == expected_call_id && !approval.approved { + return Ok(ToolResult { + call_id: expected_call_id.into(), + result: "Tool execution denied by user.".into(), + is_error: true, + }); + } + } + _ => { + debug!("Ignoring non-tool-result message while waiting for tool result"); + } + } + } + + anyhow::bail!("Stream closed while waiting for tool result {expected_call_id}") +} + +/// Create a private Matrix room for a coding project. +async fn create_project_room( + client: &matrix_sdk::Client, + name: &str, + invite_email: &Option, +) -> anyhow::Result { + use matrix_sdk::ruma::api::client::room::create_room::v3::Request as CreateRoomRequest; + use matrix_sdk::ruma::api::client::room::Visibility; + + let mut request = CreateRoomRequest::new(); + request.name = Some(name.into()); + request.visibility = Visibility::Private; + request.is_direct = true; + + let response = client.create_room(request).await?; + let room_id = response.room_id().to_string(); + + info!(room_id = room_id.as_str(), name, "Created project room"); + Ok(room_id) +} + +/// Extract a project name from a path (last directory component). +fn extract_project_name(path: &str) -> String { + std::path::Path::new(path) + .file_name() + .and_then(|n| n.to_str()) + .unwrap_or("unknown") + .to_string() +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_extract_project_name() { + assert_eq!(extract_project_name("/Users/sienna/Development/sunbeam/sol"), "sol"); + assert_eq!(extract_project_name("/home/user/project"), "project"); + assert_eq!(extract_project_name("relative/path"), "path"); + assert_eq!(extract_project_name("/"), "unknown"); + } +} diff --git a/src/persistence.rs b/src/persistence.rs index be0e6ba..06ec6ce 100644 --- a/src/persistence.rs +++ b/src/persistence.rs @@ -83,6 +83,19 @@ impl Store { PRIMARY KEY (localpart, service) ); + CREATE TABLE IF NOT EXISTS code_sessions ( + session_id TEXT PRIMARY KEY, + user_id TEXT NOT NULL, + room_id TEXT NOT NULL, + conversation_id TEXT, + project_path TEXT NOT NULL, + project_name TEXT NOT NULL, + model TEXT NOT NULL, + status TEXT NOT NULL DEFAULT 'active', + created_at TEXT NOT NULL DEFAULT (datetime('now')), + last_active TEXT NOT NULL DEFAULT (datetime('now')) + ); + CREATE TABLE IF NOT EXISTS research_sessions ( session_id TEXT PRIMARY KEY, room_id TEXT NOT NULL, @@ -244,6 +257,120 @@ impl Store { } } + // ========================================================================= + // Code Sessions (sunbeam code) + // ========================================================================= + + /// Find an active code session for a user + project. + pub fn find_code_session( + &self, + user_id: &str, + project_name: &str, + ) -> Option<(String, String, String)> { + let conn = self.conn.lock().unwrap(); + conn.query_row( + "SELECT session_id, room_id, conversation_id FROM code_sessions + WHERE user_id = ?1 AND project_name = ?2 AND status = 'active' + ORDER BY last_active DESC LIMIT 1", + params![user_id, project_name], + |row| Ok(( + row.get::<_, String>(0)?, + row.get::<_, String>(1)?, + row.get::<_, String>(2)?, + )), + ) + .ok() + } + + /// Create a new code session. + pub fn create_code_session( + &self, + session_id: &str, + user_id: &str, + room_id: &str, + project_path: &str, + project_name: &str, + model: &str, + ) { + let conn = self.conn.lock().unwrap(); + if let Err(e) = conn.execute( + "INSERT INTO code_sessions (session_id, user_id, room_id, project_path, project_name, model) + VALUES (?1, ?2, ?3, ?4, ?5, ?6)", + params![session_id, user_id, room_id, project_path, project_name, model], + ) { + warn!("Failed to create code session: {e}"); + } + } + + /// Update the conversation_id for a code session. + pub fn set_code_session_conversation( + &self, + session_id: &str, + conversation_id: &str, + ) { + let conn = self.conn.lock().unwrap(); + if let Err(e) = conn.execute( + "UPDATE code_sessions SET conversation_id = ?1, last_active = datetime('now') + WHERE session_id = ?2", + params![conversation_id, session_id], + ) { + warn!("Failed to update code session conversation: {e}"); + } + } + + /// Touch the last_active timestamp. + pub fn touch_code_session(&self, session_id: &str) { + let conn = self.conn.lock().unwrap(); + if let Err(e) = conn.execute( + "UPDATE code_sessions SET last_active = datetime('now') WHERE session_id = ?1", + params![session_id], + ) { + warn!("Failed to touch code session: {e}"); + } + } + + /// End a code session. + pub fn end_code_session(&self, session_id: &str) { + let conn = self.conn.lock().unwrap(); + if let Err(e) = conn.execute( + "UPDATE code_sessions SET status = 'ended' WHERE session_id = ?1", + params![session_id], + ) { + warn!("Failed to end code session: {e}"); + } + } + + /// Check if a room is a code session room. + pub fn is_code_room(&self, room_id: &str) -> bool { + let conn = self.conn.lock().unwrap(); + conn.query_row( + "SELECT 1 FROM code_sessions WHERE room_id = ?1 AND status = 'active' LIMIT 1", + params![room_id], + |_| Ok(()), + ) + .is_ok() + } + + /// Get project context for a code room. + pub fn get_code_room_context( + &self, + room_id: &str, + ) -> Option<(String, String, String)> { + let conn = self.conn.lock().unwrap(); + conn.query_row( + "SELECT project_name, project_path, model FROM code_sessions + WHERE room_id = ?1 AND status = 'active' + ORDER BY last_active DESC LIMIT 1", + params![room_id], + |row| Ok(( + row.get::<_, String>(0)?, + row.get::<_, String>(1)?, + row.get::<_, String>(2)?, + )), + ) + .ok() + } + // ========================================================================= // Service Users (OIDC → service username mapping) // =========================================================================