refactor: remove legacy chat path, fix corrupted conversation recovery

- Delete CodeSession::chat() — the legacy inline tool loop that
  duplicated the orchestrator's conversation + tool dispatch logic
- Delete wait_for_tool_result() — only used by the legacy path
- Make orchestrator mandatory in run_session (no more if/else fallback)
- Unify conversation creation into create_fresh_conversation()
- Add corrupted conversation recovery to create_or_append_conversation:
  detects "function calls and responses" errors from Mistral (caused by
  disconnecting mid-tool-call) and auto-creates a fresh conversation
- Add tracing-appender for optional rotating log file (SOL_LOG_FILE env)
- Add Procfile.dev for overmind process management
This commit is contained in:
2026-03-24 19:49:07 +00:00
parent d58bbfce66
commit 6a2aafdccc
5 changed files with 109 additions and 314 deletions

View File

@@ -22,6 +22,7 @@ serde_json = "1"
toml = "0.8"
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
tracing-appender = "0.2"
rand = "0.8"
regex = "1"
anyhow = "1"

1
Procfile.dev Normal file
View File

@@ -0,0 +1 @@
sol: cargo run

View File

@@ -223,41 +223,25 @@ async fn run_session(
}))
.await?;
// Check if orchestrator is available
let has_orch = state.orchestrator.is_some();
info!(has_orchestrator = has_orch, "Checking orchestrator availability");
let orchestrator = state.orchestrator.as_ref().cloned();
let orchestrator = state.orchestrator.as_ref()
.ok_or_else(|| anyhow::anyhow!("Orchestrator not initialized"))?
.clone();
// Main message loop
while let Some(msg) = in_stream.message().await? {
match msg.payload {
Some(client_message::Payload::Input(input)) => {
if let Some(ref orch) = orchestrator {
// Orchestrator path: delegate tool loop, bridge forwards events
if let Err(e) = session_chat_via_orchestrator(
&mut session, &input.text, orch, tx, in_stream,
).await {
error!("Chat error: {e}");
tx.send(Ok(ServerMessage {
payload: Some(server_message::Payload::Error(Error {
message: e.to_string(),
fatal: false,
})),
}))
.await?;
}
} else {
// Fallback: inline tool loop (legacy)
if let Err(e) = session.chat(&input.text, tx, in_stream).await {
error!("Chat error: {e}");
tx.send(Ok(ServerMessage {
payload: Some(server_message::Payload::Error(Error {
message: e.to_string(),
fatal: false,
})),
}))
.await?;
}
if let Err(e) = session_chat_via_orchestrator(
&mut session, &input.text, &orchestrator, tx, in_stream,
).await {
error!("Chat error: {e}");
tx.send(Ok(ServerMessage {
payload: Some(server_message::Payload::Error(Error {
message: e.to_string(),
fatal: false,
})),
}))
.await?;
}
}
Some(client_message::Payload::IndexSymbols(idx)) => {

View File

@@ -1,21 +1,16 @@
use std::sync::Arc;
use matrix_sdk::room::Room;
use matrix_sdk::ruma::api::client::room::create_room::v3::Request as CreateRoomRequest;
use matrix_sdk::ruma::events::room::message::RoomMessageEventContent;
use mistralai_client::v1::client::Client as MistralClient;
use mistralai_client::v1::conversations::{
AppendConversationRequest, ConversationInput, ConversationResponse,
CreateConversationRequest, FunctionResultEntry, ConversationEntry,
CreateConversationRequest,
};
use tokio::sync::mpsc;
use tracing::{debug, error, info, warn};
use tracing::{info, warn};
use super::auth::Claims;
use super::proto::*;
use super::router;
use super::GrpcState;
use crate::context::ResponseContext;
use crate::time_context::TimeContext;
/// A live coding session — manages the Matrix room, Mistral conversation,
@@ -265,6 +260,45 @@ you also have access to server-side tools: search_archive, search_web, research,
header
}
/// Create a fresh Mistral conversation, replacing any existing one.
async fn create_fresh_conversation(
&mut self,
input_text: String,
) -> anyhow::Result<ConversationResponse> {
let instructions = self.build_instructions();
let req = CreateConversationRequest {
inputs: ConversationInput::Text(input_text),
model: Some(self.model.clone()),
agent_id: None,
agent_version: None,
name: Some(format!("code-{}", self.project_name)),
description: None,
instructions: Some(instructions),
completion_args: None,
tools: Some(self.build_tool_definitions()),
handoff_execution: None,
metadata: None,
store: Some(true),
stream: false,
};
let resp = self.state.mistral
.create_conversation_async(&req)
.await
.map_err(|e| anyhow::anyhow!("create_conversation failed: {}", e.message))?;
self.conversation_id = Some(resp.conversation_id.clone());
self.state.store.set_code_session_conversation(
&self.session_id,
&resp.conversation_id,
);
info!(
conversation_id = resp.conversation_id.as_str(),
"Created Mistral conversation for code session"
);
Ok(resp)
}
fn git_branch(&self) -> String {
// Use the git branch from StartSession, fall back to "mainline"
if self.project_path.is_empty() {
@@ -283,204 +317,6 @@ you also have access to server-side tools: search_archive, search_web, research,
}
}
/// Send a user message and run the agent loop.
/// Returns tool calls that need client execution via the tx channel.
/// Server-side tools are executed inline.
pub async fn chat(
&mut self,
text: &str,
client_tx: &mpsc::Sender<Result<ServerMessage, tonic::Status>>,
client_rx: &mut tonic::Streaming<ClientMessage>,
) -> anyhow::Result<()> {
let context_header = self.build_context_header(text).await;
let input_text = format!("{context_header}\n{text}");
// Post user message to Matrix room (as m.notice to distinguish from assistant)
if let Some(ref room) = self.room {
let content = RoomMessageEventContent::notice_plain(text);
let _ = room.send(content).await;
}
// Send status
let _ = client_tx.send(Ok(ServerMessage {
payload: Some(server_message::Payload::Status(Status {
message: "generating…".into(),
kind: StatusKind::Thinking.into(),
})),
})).await;
// Create or append to Mistral conversation
let response = if let Some(ref conv_id) = self.conversation_id {
let req = AppendConversationRequest {
inputs: ConversationInput::Text(input_text),
completion_args: None,
handoff_execution: None,
store: Some(true),
tool_confirmations: None,
stream: false,
};
self.state.mistral
.append_conversation_async(conv_id, &req)
.await
.map_err(|e| anyhow::anyhow!("append_conversation failed: {}", e.message))?
} else {
let instructions = self.build_instructions();
let req = CreateConversationRequest {
inputs: ConversationInput::Text(input_text),
model: Some(self.model.clone()),
agent_id: None,
agent_version: None,
name: Some(format!("code-{}", self.project_name)),
description: None,
instructions: Some(instructions),
completion_args: None,
tools: Some(self.build_tool_definitions()),
handoff_execution: None,
metadata: None,
store: Some(true),
stream: false,
};
let resp = self.state.mistral
.create_conversation_async(&req)
.await
.map_err(|e| anyhow::anyhow!("create_conversation failed: {}", e.message))?;
self.conversation_id = Some(resp.conversation_id.clone());
self.state.store.set_code_session_conversation(
&self.session_id,
&resp.conversation_id,
);
info!(
conversation_id = resp.conversation_id.as_str(),
"Created Mistral conversation for code session"
);
resp
};
// Tool call loop
let max_iterations = self.state.config.mistral.max_tool_iterations;
let mut current_response = response;
for _iteration in 0..max_iterations {
let calls = current_response.function_calls();
if calls.is_empty() {
break;
}
let mut result_entries = Vec::new();
for fc in &calls {
let call_id = fc.tool_call_id.as_deref().unwrap_or("unknown");
let is_local = router::is_client_tool(&fc.name);
if is_local {
// Send to client for execution
let needs_approval = true; // TODO: check tool permissions from config
client_tx.send(Ok(ServerMessage {
payload: Some(server_message::Payload::ToolCall(ToolCall {
call_id: call_id.into(),
name: fc.name.clone(),
args_json: fc.arguments.clone(),
is_local: true,
needs_approval,
})),
})).await?;
// Wait for client to send back the result
let result = wait_for_tool_result(client_rx, call_id).await?;
result_entries.push(ConversationEntry::FunctionResult(FunctionResultEntry {
tool_call_id: call_id.to_string(),
result: result.result,
id: None,
object: None,
created_at: None,
completed_at: None,
}));
} else {
// Execute server-side
client_tx.send(Ok(ServerMessage {
payload: Some(server_message::Payload::Status(Status {
message: format!("executing {}...", fc.name),
kind: StatusKind::ToolRunning.into(),
})),
})).await?;
let response_ctx = ResponseContext {
matrix_user_id: format!("@{}:sunbeam.pt", crate::context::localpart(&self.user_id)),
user_id: self.user_id.clone(),
display_name: None,
is_dm: true,
is_reply: false,
room_id: self.room_id.clone(),
};
let result = match self.state.tools
.execute(&fc.name, &fc.arguments, &response_ctx)
.await
{
Ok(s) => s,
Err(e) => format!("Error: {e}"),
};
client_tx.send(Ok(ServerMessage {
payload: Some(server_message::Payload::Status(Status {
message: format!("{} done", fc.name),
kind: StatusKind::ToolDone.into(),
})),
})).await?;
result_entries.push(ConversationEntry::FunctionResult(FunctionResultEntry {
tool_call_id: call_id.to_string(),
result,
id: None,
object: None,
created_at: None,
completed_at: None,
}));
}
}
// Send results back to Mistral
let conv_id = self.conversation_id.as_ref().unwrap();
let req = AppendConversationRequest {
inputs: ConversationInput::Entries(result_entries),
completion_args: None,
handoff_execution: None,
store: Some(true),
tool_confirmations: None,
stream: false,
};
current_response = self.state.mistral
.append_conversation_async(conv_id, &req)
.await
.map_err(|e| anyhow::anyhow!("append_conversation (tool results) failed: {}", e.message))?;
}
// Extract final text
if let Some(text) = current_response.assistant_text() {
// Post to Matrix room
if let Some(ref room) = self.room {
let content = RoomMessageEventContent::text_markdown(&text);
let _ = room.send(content).await;
}
client_tx.send(Ok(ServerMessage {
payload: Some(server_message::Payload::Done(TextDone {
full_text: text,
input_tokens: current_response.usage.prompt_tokens,
output_tokens: current_response.usage.completion_tokens,
})),
})).await?;
}
self.state.store.touch_code_session(&self.session_id);
Ok(())
}
/// Build tool definitions for the Mistral conversation.
/// Union of server-side tools + client-side tool schemas.
fn build_tool_definitions(&self) -> Vec<mistralai_client::v1::agents::AgentTool> {
@@ -557,58 +393,40 @@ you also have access to server-side tools: search_archive, search_web, research,
pub async fn create_or_append_conversation(
&mut self,
text: &str,
) -> anyhow::Result<mistralai_client::v1::conversations::ConversationResponse> {
) -> anyhow::Result<ConversationResponse> {
let context_header = self.build_context_header(text).await;
let input_text = format!("{context_header}\n{text}");
if let Some(ref conv_id) = self.conversation_id {
let req = AppendConversationRequest {
inputs: ConversationInput::Text(input_text),
inputs: ConversationInput::Text(input_text.clone()),
completion_args: None,
handoff_execution: None,
store: Some(true),
tool_confirmations: None,
stream: false,
};
self.state
match self.state
.mistral
.append_conversation_async(conv_id, &req)
.await
.map_err(|e| anyhow::anyhow!("append_conversation failed: {}", e.message))
{
Ok(resp) => Ok(resp),
Err(e) if e.message.contains("function calls and responses")
|| e.message.contains("invalid_request_error") =>
{
warn!(
conversation_id = conv_id.as_str(),
error = e.message.as_str(),
"Conversation corrupted — creating fresh conversation"
);
self.conversation_id = None;
self.create_fresh_conversation(input_text).await
}
Err(e) => Err(anyhow::anyhow!("append_conversation failed: {}", e.message)),
}
} else {
let instructions = self.build_instructions();
let req = CreateConversationRequest {
inputs: ConversationInput::Text(input_text),
model: Some(self.model.clone()),
agent_id: None,
agent_version: None,
name: Some(format!("code-{}", self.project_name)),
description: None,
instructions: Some(instructions),
completion_args: None,
tools: Some(self.build_tool_definitions()),
handoff_execution: None,
metadata: None,
store: Some(true),
stream: false,
};
let resp = self.state
.mistral
.create_conversation_async(&req)
.await
.map_err(|e| anyhow::anyhow!("create_conversation failed: {}", e.message))?;
self.conversation_id = Some(resp.conversation_id.clone());
self.state.store.set_code_session_conversation(
&self.session_id,
&resp.conversation_id,
);
info!(
conversation_id = resp.conversation_id.as_str(),
"Created Mistral conversation for code session"
);
Ok(resp)
self.create_fresh_conversation(input_text).await
}
}
@@ -641,41 +459,6 @@ you also have access to server-side tools: search_archive, search_web, research,
}
/// Wait for a ToolResult message from the client stream.
async fn wait_for_tool_result(
stream: &mut tonic::Streaming<ClientMessage>,
expected_call_id: &str,
) -> anyhow::Result<ToolResult> {
// Read messages until we get the matching ToolResult
while let Some(msg) = stream.message().await? {
match msg.payload {
Some(client_message::Payload::ToolResult(result)) => {
if result.call_id == expected_call_id {
return Ok(result);
}
warn!(
expected = expected_call_id,
got = result.call_id.as_str(),
"Received tool result for wrong call ID"
);
}
Some(client_message::Payload::Approval(approval)) => {
if approval.call_id == expected_call_id && !approval.approved {
return Ok(ToolResult {
call_id: expected_call_id.into(),
result: "Tool execution denied by user.".into(),
is_error: true,
});
}
}
_ => {
debug!("Ignoring non-tool-result message while waiting for tool result");
}
}
}
anyhow::bail!("Stream closed while waiting for tool result {expected_call_id}")
}
/// Create a private Matrix room for a coding project.
async fn create_project_room(
client: &matrix_sdk::Client,

View File

@@ -44,13 +44,39 @@ use tools::ToolRegistry;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// Initialize tracing
tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("sol=info")),
)
.init();
// Initialize tracing — optionally write to a rotating log file
let env_filter = tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("sol=info"));
let _log_guard = if let Ok(log_path) = std::env::var("SOL_LOG_FILE") {
let log_dir = std::path::Path::new(&log_path)
.parent()
.unwrap_or(std::path::Path::new("."));
let log_name = std::path::Path::new(&log_path)
.file_name()
.and_then(|n| n.to_str())
.unwrap_or("sol.log");
let file_appender = tracing_appender::rolling::Builder::new()
.max_log_files(3)
.rotation(tracing_appender::rolling::Rotation::NEVER)
.filename_prefix(log_name)
.build(log_dir)
.expect("Failed to create log file appender");
let (non_blocking, guard) = tracing_appender::non_blocking(file_appender);
tracing_subscriber::fmt()
.with_env_filter(env_filter)
.with_writer(non_blocking)
.with_ansi(false)
.init();
Some(guard)
} else {
tracing_subscriber::fmt()
.with_env_filter(env_filter)
.init();
None
};
// Load config
let config_path =