feat(code): CodeSession + agent loop + Matrix room bridge

phase 2 server core:
- CodeSession: create/resume sessions, Matrix room per project,
  Mistral conversation lifecycle, tool dispatch loop
- agent loop: user input → Mistral → tool calls → route (client
  via gRPC / server via ToolRegistry) → collect results → respond
- Matrix bridge: all messages posted to project room, accessible
  from any Matrix client
- code_sessions SQLite table (Postgres-compatible schema)
- coding mode context injection (project path, git info, prompt.md)
This commit is contained in:
2026-03-23 11:46:22 +00:00
parent 35b6246fa7
commit abfad337c5
4 changed files with 638 additions and 66 deletions

View File

@@ -1,6 +1,7 @@
pub mod auth;
pub mod router;
pub mod service;
pub mod session;
mod proto {
tonic::include_proto!("sunbeam.code.v1");

View File

@@ -10,6 +10,7 @@ use tracing::{error, info, warn};
use super::auth::Claims;
use super::proto::code_agent_server::CodeAgent;
use super::proto::*;
use super::session::CodeSession;
use super::GrpcState;
pub struct CodeAgentService {
@@ -30,7 +31,6 @@ impl CodeAgent for CodeAgentService {
&self,
request: Request<Streaming<ClientMessage>>,
) -> Result<Response<Self::SessionStream>, Status> {
// Extract JWT claims from the request extensions (set by auth middleware)
let claims = request
.extensions()
.get::<Claims>()
@@ -40,18 +40,16 @@ impl CodeAgent for CodeAgentService {
info!(
user = claims.sub.as_str(),
email = claims.email.as_deref().unwrap_or("?"),
"New coding session"
"New coding session request"
);
let mut in_stream = request.into_inner();
let state = self.state.clone();
// Channel for sending server messages to the client
let (tx, rx) = mpsc::channel::<Result<ServerMessage, Status>>(32);
let (tx, rx) = mpsc::channel::<Result<ServerMessage, Status>>(64);
// Spawn the session handler
tokio::spawn(async move {
if let Err(e) = handle_session(&state, &claims, &mut in_stream, &tx).await {
if let Err(e) = run_session(&state, &claims, &mut in_stream, &tx).await {
error!(user = claims.sub.as_str(), "Session error: {e}");
let _ = tx
.send(Ok(ServerMessage {
@@ -69,14 +67,13 @@ impl CodeAgent for CodeAgentService {
}
}
/// Handle a single coding session (runs in a spawned task).
async fn handle_session(
async fn run_session(
state: &GrpcState,
claims: &Claims,
in_stream: &mut Streaming<ClientMessage>,
tx: &mpsc::Sender<Result<ServerMessage, Status>>,
) -> anyhow::Result<()> {
// Wait for the first message — must be StartSession
// Wait for StartSession
let first = in_stream
.message()
.await?
@@ -87,32 +84,26 @@ async fn handle_session(
_ => anyhow::bail!("First message must be StartSession"),
};
info!(
user = claims.sub.as_str(),
project = start.project_path.as_str(),
model = start.model.as_str(),
client_tools = start.client_tools.len(),
"Session started"
);
// Create or resume session
let mut session = CodeSession::start(
Arc::new(GrpcState {
config: state.config.clone(),
tools: state.tools.clone(),
store: state.store.clone(),
mistral: state.mistral.clone(),
matrix: state.matrix.clone(),
}),
claims,
&start,
)
.await?;
// TODO Phase 2: Create/find Matrix room for this project
// TODO Phase 2: Create Mistral conversation
// TODO Phase 2: Enter agent loop
// For now, send SessionReady and echo back
// Send SessionReady
tx.send(Ok(ServerMessage {
payload: Some(server_message::Payload::Ready(SessionReady {
session_id: uuid::Uuid::new_v4().to_string(),
room_id: String::new(), // TODO: Matrix room
model: if start.model.is_empty() {
state
.config
.agents
.coding_model
.clone()
} else {
start.model.clone()
},
session_id: session.session_id.clone(),
room_id: session.room_id.clone(),
model: session.model.clone(),
})),
}))
.await?;
@@ -121,41 +112,19 @@ async fn handle_session(
while let Some(msg) = in_stream.message().await? {
match msg.payload {
Some(client_message::Payload::Input(input)) => {
info!(
user = claims.sub.as_str(),
text_len = input.text.len(),
"User input received"
);
// TODO Phase 2: Send to Mistral, handle tool calls, stream response
// For now, echo back as a simple acknowledgment
tx.send(Ok(ServerMessage {
payload: Some(server_message::Payload::Done(TextDone {
full_text: format!("[stub] received: {}", input.text),
input_tokens: 0,
output_tokens: 0,
})),
}))
.await?;
}
Some(client_message::Payload::ToolResult(result)) => {
info!(
call_id = result.call_id.as_str(),
is_error = result.is_error,
"Tool result received"
);
// TODO Phase 2: Feed back to Mistral
}
Some(client_message::Payload::Approval(approval)) => {
info!(
call_id = approval.call_id.as_str(),
approved = approval.approved,
"Tool approval received"
);
// TODO Phase 2: Execute or skip tool
if let Err(e) = session.chat(&input.text, tx, in_stream).await {
error!("Chat error: {e}");
tx.send(Ok(ServerMessage {
payload: Some(server_message::Payload::Error(Error {
message: e.to_string(),
fatal: false,
})),
}))
.await?;
}
}
Some(client_message::Payload::End(_)) => {
info!(user = claims.sub.as_str(), "Session ended by client");
session.end();
tx.send(Ok(ServerMessage {
payload: Some(server_message::Payload::End(SessionEnd {
summary: "Session ended.".into(),
@@ -167,7 +136,8 @@ async fn handle_session(
Some(client_message::Payload::Start(_)) => {
warn!("Received duplicate StartSession — ignoring");
}
None => continue,
// ToolResult and Approval are handled inside session.chat()
_ => continue,
}
}

474
src/grpc/session.rs Normal file
View File

@@ -0,0 +1,474 @@
use std::sync::Arc;
use matrix_sdk::room::Room;
use matrix_sdk::ruma::api::client::room::create_room::v3::Request as CreateRoomRequest;
use matrix_sdk::ruma::events::room::message::RoomMessageEventContent;
use mistralai_client::v1::client::Client as MistralClient;
use mistralai_client::v1::conversations::{
AppendConversationRequest, ConversationInput, ConversationResponse,
CreateConversationRequest, FunctionResultEntry, ConversationEntry,
};
use tokio::sync::mpsc;
use tracing::{debug, error, info, warn};
use super::auth::Claims;
use super::proto::*;
use super::router;
use super::GrpcState;
use crate::context::ResponseContext;
use crate::time_context::TimeContext;
/// A live coding session — manages the Matrix room, Mistral conversation,
/// and tool dispatch between client and server.
pub struct CodeSession {
pub session_id: String,
pub room_id: String,
pub conversation_id: Option<String>,
pub project_name: String,
pub project_path: String,
pub model: String,
pub user_id: String,
pub prompt_md: String,
state: Arc<GrpcState>,
room: Option<Room>,
}
impl CodeSession {
/// Create or resume a coding session.
pub async fn start(
state: Arc<GrpcState>,
claims: &Claims,
start: &StartSession,
) -> anyhow::Result<Self> {
let project_name = extract_project_name(&start.project_path);
let user_id = claims.sub.clone();
let model = if start.model.is_empty() {
state.config.agents.coding_model.clone()
} else {
start.model.clone()
};
// Check for existing session for this user + project
if let Some((session_id, room_id, conv_id)) =
state.store.find_code_session(&user_id, &project_name)
{
info!(
session_id = session_id.as_str(),
room_id = room_id.as_str(),
"Resuming existing code session"
);
let room = state.matrix.get_room(
<&matrix_sdk::ruma::RoomId>::try_from(room_id.as_str())
.map_err(|e| anyhow::anyhow!("Invalid room ID: {e}"))?,
);
state.store.touch_code_session(&session_id);
return Ok(Self {
session_id,
room_id,
conversation_id: if conv_id.is_empty() { None } else { Some(conv_id) },
project_name,
project_path: start.project_path.clone(),
model,
user_id,
prompt_md: start.prompt_md.clone(),
state,
room,
});
}
// Create new session
let session_id = uuid::Uuid::new_v4().to_string();
// Create private Matrix room for this project
let room_name = format!("code: {project_name}");
let room_id = create_project_room(&state.matrix, &room_name, &claims.email)
.await
.unwrap_or_else(|e| {
warn!("Failed to create Matrix room: {e}");
format!("!code-{session_id}:local") // fallback ID
});
let room = state.matrix.get_room(
<&matrix_sdk::ruma::RoomId>::try_from(room_id.as_str()).ok()
.unwrap_or_else(|| {
// This shouldn't happen but handle gracefully
warn!("Invalid room ID {room_id}, session will work without Matrix bridge");
<&matrix_sdk::ruma::RoomId>::try_from("!invalid:local").unwrap()
}),
);
state.store.create_code_session(
&session_id,
&user_id,
&room_id,
&start.project_path,
&project_name,
&model,
);
info!(
session_id = session_id.as_str(),
room_id = room_id.as_str(),
project = project_name.as_str(),
model = model.as_str(),
"Created new code session"
);
Ok(Self {
session_id,
room_id,
conversation_id: None,
project_name,
project_path: start.project_path.clone(),
model,
user_id,
prompt_md: start.prompt_md.clone(),
state,
room,
})
}
/// Build the per-message context header for coding mode.
fn build_context_header(&self) -> String {
let tc = TimeContext::now();
format!(
"{}\n[project: {} | path: {} | model: {}]\n{}",
tc.message_line(),
self.project_name,
self.project_path,
self.model,
if self.prompt_md.is_empty() {
String::new()
} else {
format!("## project instructions\n{}\n", self.prompt_md)
},
)
}
/// Send a user message and run the agent loop.
/// Returns tool calls that need client execution via the tx channel.
/// Server-side tools are executed inline.
pub async fn chat(
&mut self,
text: &str,
client_tx: &mpsc::Sender<Result<ServerMessage, tonic::Status>>,
client_rx: &mut tonic::Streaming<ClientMessage>,
) -> anyhow::Result<()> {
let context_header = self.build_context_header();
let input_text = format!("{context_header}\n{text}");
// Post to Matrix room
if let Some(ref room) = self.room {
let content = RoomMessageEventContent::text_plain(text);
let _ = room.send(content).await;
}
// Send status
let _ = client_tx.send(Ok(ServerMessage {
payload: Some(server_message::Payload::Status(Status {
message: "thinking...".into(),
kind: StatusKind::Thinking.into(),
})),
})).await;
// Create or append to Mistral conversation
let response = if let Some(ref conv_id) = self.conversation_id {
let req = AppendConversationRequest {
inputs: ConversationInput::Text(input_text),
completion_args: None,
handoff_execution: None,
store: Some(true),
tool_confirmations: None,
stream: false,
};
self.state.mistral
.append_conversation_async(conv_id, &req)
.await
.map_err(|e| anyhow::anyhow!("append_conversation failed: {}", e.message))?
} else {
let req = CreateConversationRequest {
inputs: ConversationInput::Text(input_text),
model: Some(self.model.clone()),
agent_id: None,
agent_version: None,
name: Some(format!("code-{}", self.project_name)),
description: None,
instructions: None,
completion_args: None,
tools: Some(self.build_tool_definitions()),
handoff_execution: None,
metadata: None,
store: Some(true),
stream: false,
};
let resp = self.state.mistral
.create_conversation_async(&req)
.await
.map_err(|e| anyhow::anyhow!("create_conversation failed: {}", e.message))?;
self.conversation_id = Some(resp.conversation_id.clone());
self.state.store.set_code_session_conversation(
&self.session_id,
&resp.conversation_id,
);
info!(
conversation_id = resp.conversation_id.as_str(),
"Created Mistral conversation for code session"
);
resp
};
// Tool call loop
let max_iterations = self.state.config.mistral.max_tool_iterations;
let mut current_response = response;
for _iteration in 0..max_iterations {
let calls = current_response.function_calls();
if calls.is_empty() {
break;
}
let mut result_entries = Vec::new();
for fc in &calls {
let call_id = fc.tool_call_id.as_deref().unwrap_or("unknown");
let is_local = router::is_client_tool(&fc.name);
if is_local {
// Send to client for execution
let needs_approval = true; // TODO: check tool permissions from config
client_tx.send(Ok(ServerMessage {
payload: Some(server_message::Payload::ToolCall(ToolCall {
call_id: call_id.into(),
name: fc.name.clone(),
args_json: fc.arguments.clone(),
is_local: true,
needs_approval,
})),
})).await?;
// Wait for client to send back the result
let result = wait_for_tool_result(client_rx, call_id).await?;
result_entries.push(ConversationEntry::FunctionResult(FunctionResultEntry {
tool_call_id: call_id.to_string(),
result: result.result,
id: None,
object: None,
created_at: None,
completed_at: None,
}));
} else {
// Execute server-side
client_tx.send(Ok(ServerMessage {
payload: Some(server_message::Payload::Status(Status {
message: format!("executing {}...", fc.name),
kind: StatusKind::ToolRunning.into(),
})),
})).await?;
let response_ctx = ResponseContext {
matrix_user_id: format!("@{}:sunbeam.pt", crate::context::localpart(&self.user_id)),
user_id: self.user_id.clone(),
display_name: None,
is_dm: true,
is_reply: false,
room_id: self.room_id.clone(),
};
let result = match self.state.tools
.execute(&fc.name, &fc.arguments, &response_ctx)
.await
{
Ok(s) => s,
Err(e) => format!("Error: {e}"),
};
client_tx.send(Ok(ServerMessage {
payload: Some(server_message::Payload::Status(Status {
message: format!("{} done", fc.name),
kind: StatusKind::ToolDone.into(),
})),
})).await?;
result_entries.push(ConversationEntry::FunctionResult(FunctionResultEntry {
tool_call_id: call_id.to_string(),
result,
id: None,
object: None,
created_at: None,
completed_at: None,
}));
}
}
// Send results back to Mistral
let conv_id = self.conversation_id.as_ref().unwrap();
let req = AppendConversationRequest {
inputs: ConversationInput::Entries(result_entries),
completion_args: None,
handoff_execution: None,
store: Some(true),
tool_confirmations: None,
stream: false,
};
current_response = self.state.mistral
.append_conversation_async(conv_id, &req)
.await
.map_err(|e| anyhow::anyhow!("append_conversation (tool results) failed: {}", e.message))?;
}
// Extract final text
if let Some(text) = current_response.assistant_text() {
// Post to Matrix room
if let Some(ref room) = self.room {
let content = RoomMessageEventContent::text_markdown(&text);
let _ = room.send(content).await;
}
client_tx.send(Ok(ServerMessage {
payload: Some(server_message::Payload::Done(TextDone {
full_text: text,
input_tokens: current_response.usage.prompt_tokens,
output_tokens: current_response.usage.completion_tokens,
})),
})).await?;
}
self.state.store.touch_code_session(&self.session_id);
Ok(())
}
/// Build tool definitions for the Mistral conversation.
/// Union of server-side tools + client-side tool schemas.
fn build_tool_definitions(&self) -> Vec<mistralai_client::v1::agents::AgentTool> {
let mut tools = crate::tools::ToolRegistry::agent_tool_definitions(
self.state.config.services.gitea.is_some(),
self.state.config.services.kratos.is_some(),
);
// Add client-side tool definitions
let client_tools = vec![
("file_read", "Read a file's contents. Use path for the file path, and optional start_line/end_line for a range."),
("file_write", "Create or overwrite a file. Use path for the file path and content for the file contents."),
("search_replace", "Patch a file using SEARCH/REPLACE blocks. Use path for the file and diff for the SEARCH/REPLACE content."),
("grep", "Search files recursively with regex. Use pattern for the regex and optional path for the search root."),
("bash", "Execute a shell command. Use command for the command string."),
("list_directory", "List files and directories. Use path for the directory (default: project root) and optional depth."),
];
for (name, desc) in client_tools {
tools.push(mistralai_client::v1::agents::AgentTool::function(
name.into(),
desc.into(),
serde_json::json!({
"type": "object",
"properties": {
"path": { "type": "string", "description": "File or directory path" },
"content": { "type": "string", "description": "File content (for write)" },
"diff": { "type": "string", "description": "SEARCH/REPLACE blocks (for search_replace)" },
"pattern": { "type": "string", "description": "Regex pattern (for grep)" },
"command": { "type": "string", "description": "Shell command (for bash)" },
"start_line": { "type": "integer", "description": "Start line (for file_read)" },
"end_line": { "type": "integer", "description": "End line (for file_read)" },
"depth": { "type": "integer", "description": "Directory depth (for list_directory)" }
}
}),
));
}
tools
}
/// End the session.
pub fn end(&self) {
self.state.store.end_code_session(&self.session_id);
info!(session_id = self.session_id.as_str(), "Code session ended");
}
}
/// Wait for a ToolResult message from the client stream.
async fn wait_for_tool_result(
stream: &mut tonic::Streaming<ClientMessage>,
expected_call_id: &str,
) -> anyhow::Result<ToolResult> {
// Read messages until we get the matching ToolResult
while let Some(msg) = stream.message().await? {
match msg.payload {
Some(client_message::Payload::ToolResult(result)) => {
if result.call_id == expected_call_id {
return Ok(result);
}
warn!(
expected = expected_call_id,
got = result.call_id.as_str(),
"Received tool result for wrong call ID"
);
}
Some(client_message::Payload::Approval(approval)) => {
if approval.call_id == expected_call_id && !approval.approved {
return Ok(ToolResult {
call_id: expected_call_id.into(),
result: "Tool execution denied by user.".into(),
is_error: true,
});
}
}
_ => {
debug!("Ignoring non-tool-result message while waiting for tool result");
}
}
}
anyhow::bail!("Stream closed while waiting for tool result {expected_call_id}")
}
/// Create a private Matrix room for a coding project.
async fn create_project_room(
client: &matrix_sdk::Client,
name: &str,
invite_email: &Option<String>,
) -> anyhow::Result<String> {
use matrix_sdk::ruma::api::client::room::create_room::v3::Request as CreateRoomRequest;
use matrix_sdk::ruma::api::client::room::Visibility;
let mut request = CreateRoomRequest::new();
request.name = Some(name.into());
request.visibility = Visibility::Private;
request.is_direct = true;
let response = client.create_room(request).await?;
let room_id = response.room_id().to_string();
info!(room_id = room_id.as_str(), name, "Created project room");
Ok(room_id)
}
/// Extract a project name from a path (last directory component).
fn extract_project_name(path: &str) -> String {
std::path::Path::new(path)
.file_name()
.and_then(|n| n.to_str())
.unwrap_or("unknown")
.to_string()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_extract_project_name() {
assert_eq!(extract_project_name("/Users/sienna/Development/sunbeam/sol"), "sol");
assert_eq!(extract_project_name("/home/user/project"), "project");
assert_eq!(extract_project_name("relative/path"), "path");
assert_eq!(extract_project_name("/"), "unknown");
}
}

View File

@@ -83,6 +83,19 @@ impl Store {
PRIMARY KEY (localpart, service)
);
CREATE TABLE IF NOT EXISTS code_sessions (
session_id TEXT PRIMARY KEY,
user_id TEXT NOT NULL,
room_id TEXT NOT NULL,
conversation_id TEXT,
project_path TEXT NOT NULL,
project_name TEXT NOT NULL,
model TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'active',
created_at TEXT NOT NULL DEFAULT (datetime('now')),
last_active TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS research_sessions (
session_id TEXT PRIMARY KEY,
room_id TEXT NOT NULL,
@@ -244,6 +257,120 @@ impl Store {
}
}
// =========================================================================
// Code Sessions (sunbeam code)
// =========================================================================
/// Find an active code session for a user + project.
pub fn find_code_session(
&self,
user_id: &str,
project_name: &str,
) -> Option<(String, String, String)> {
let conn = self.conn.lock().unwrap();
conn.query_row(
"SELECT session_id, room_id, conversation_id FROM code_sessions
WHERE user_id = ?1 AND project_name = ?2 AND status = 'active'
ORDER BY last_active DESC LIMIT 1",
params![user_id, project_name],
|row| Ok((
row.get::<_, String>(0)?,
row.get::<_, String>(1)?,
row.get::<_, String>(2)?,
)),
)
.ok()
}
/// Create a new code session.
pub fn create_code_session(
&self,
session_id: &str,
user_id: &str,
room_id: &str,
project_path: &str,
project_name: &str,
model: &str,
) {
let conn = self.conn.lock().unwrap();
if let Err(e) = conn.execute(
"INSERT INTO code_sessions (session_id, user_id, room_id, project_path, project_name, model)
VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
params![session_id, user_id, room_id, project_path, project_name, model],
) {
warn!("Failed to create code session: {e}");
}
}
/// Update the conversation_id for a code session.
pub fn set_code_session_conversation(
&self,
session_id: &str,
conversation_id: &str,
) {
let conn = self.conn.lock().unwrap();
if let Err(e) = conn.execute(
"UPDATE code_sessions SET conversation_id = ?1, last_active = datetime('now')
WHERE session_id = ?2",
params![conversation_id, session_id],
) {
warn!("Failed to update code session conversation: {e}");
}
}
/// Touch the last_active timestamp.
pub fn touch_code_session(&self, session_id: &str) {
let conn = self.conn.lock().unwrap();
if let Err(e) = conn.execute(
"UPDATE code_sessions SET last_active = datetime('now') WHERE session_id = ?1",
params![session_id],
) {
warn!("Failed to touch code session: {e}");
}
}
/// End a code session.
pub fn end_code_session(&self, session_id: &str) {
let conn = self.conn.lock().unwrap();
if let Err(e) = conn.execute(
"UPDATE code_sessions SET status = 'ended' WHERE session_id = ?1",
params![session_id],
) {
warn!("Failed to end code session: {e}");
}
}
/// Check if a room is a code session room.
pub fn is_code_room(&self, room_id: &str) -> bool {
let conn = self.conn.lock().unwrap();
conn.query_row(
"SELECT 1 FROM code_sessions WHERE room_id = ?1 AND status = 'active' LIMIT 1",
params![room_id],
|_| Ok(()),
)
.is_ok()
}
/// Get project context for a code room.
pub fn get_code_room_context(
&self,
room_id: &str,
) -> Option<(String, String, String)> {
let conn = self.conn.lock().unwrap();
conn.query_row(
"SELECT project_name, project_path, model FROM code_sessions
WHERE room_id = ?1 AND status = 'active'
ORDER BY last_active DESC LIMIT 1",
params![room_id],
|row| Ok((
row.get::<_, String>(0)?,
row.get::<_, String>(1)?,
row.get::<_, String>(2)?,
)),
)
.ok()
}
// =========================================================================
// Service Users (OIDC → service username mapping)
// =========================================================================