refactor: remove legacy responder + agent_ux, add Gitea integration tests

Legacy removal:
- DELETE src/brain/responder.rs (900 lines) — replaced by orchestrator
- DELETE src/agent_ux.rs (184 lines) — UX moved to transport bridges
- EXTRACT chat_blocking() to src/brain/chat.rs (standalone utility)
- sync.rs: uses ConversationRegistry directly (no responder)
- main.rs: holds ToolRegistry + Personality directly (no Responder wrapper)
- research.rs: progress updates via tracing (no AgentProgress)

Gitea integration testing:
- docker-compose: added Gitea service with healthcheck
- bootstrap-gitea.sh: creates admin, org, mirrors 6 real repos from
  src.sunbeam.pt (sol, cli, proxy, storybook, admin-ui, mistralai-client-rs)
- PAT provisioning for SDK testing without Vault
- code_index/gitea.rs: fixed directory listing (direct API calls instead
  of SDK's single-object parser), proper base64 file decoding

New integration tests:
- Gitea: list_repos, get_repo, get_file, directory listing, code indexing
- Web search: SearXNG query with result verification
- Conversation registry: lifecycle + send_message round-trip
- Evaluator: rule matching (DM, own message)
- gRPC bridge: event filtering, tool call mapping, thinking→status
This commit is contained in:
2026-03-24 11:45:43 +00:00
parent ec55984fd8
commit 495c465a01
15 changed files with 578 additions and 901 deletions

2
.envrc Normal file
View File

@@ -0,0 +1,2 @@
export SOL_MISTRAL_API_KEY="<no value>"
export SOL_MATRIX_DEVICE_ID="SOLDEV001"

1
.gitignore vendored
View File

@@ -3,3 +3,4 @@ target/
__pycache__/ __pycache__/
*.pyc *.pyc
.env .env
data/

86
dev/bootstrap-gitea.sh Executable file
View File

@@ -0,0 +1,86 @@
#!/bin/bash
## Bootstrap Gitea for local dev/testing.
## Creates admin user, org, and mirrors public repos from src.sunbeam.pt.
## Run after: docker compose -f docker-compose.dev.yaml up -d gitea
set -euo pipefail
GITEA="http://localhost:3000"
ADMIN_USER="sol"
ADMIN_PASS="solpass123"
ADMIN_EMAIL="sol@sunbeam.local"
SOURCE="https://src.sunbeam.pt"
echo "Waiting for Gitea..."
until curl -sf "$GITEA/api/v1/version" >/dev/null 2>&1; do
sleep 2
done
echo "Gitea is ready."
# Create admin user via container CLI (can't use API without existing admin)
echo "Creating admin user..."
docker compose -f docker-compose.dev.yaml exec -T --user git gitea \
gitea admin user create \
--username "$ADMIN_USER" --password "$ADMIN_PASS" \
--email "$ADMIN_EMAIL" --admin --must-change-password=false 2>/dev/null || true
echo "Admin user ready."
# Create studio org
echo "Creating studio org..."
curl -sf -X POST "$GITEA/api/v1/orgs" \
-H 'Content-Type: application/json' \
-u "$ADMIN_USER:$ADMIN_PASS" \
-d '{"username":"studio","full_name":"Sunbeam Studios","visibility":"public"}' \
> /dev/null 2>&1 || true
# Mirror repos from src.sunbeam.pt (public, no auth needed)
REPOS=(
"sol"
"cli"
"proxy"
"storybook"
"admin-ui"
"mistralai-client-rs"
)
for repo in "${REPOS[@]}"; do
echo "Migrating studio/$repo from src.sunbeam.pt..."
curl -sf -X POST "$GITEA/api/v1/repos/migrate" \
-H 'Content-Type: application/json' \
-u "$ADMIN_USER:$ADMIN_PASS" \
-d "{
\"clone_addr\": \"$SOURCE/studio/$repo.git\",
\"repo_name\": \"$repo\",
\"repo_owner\": \"studio\",
\"service\": \"gitea\",
\"mirror\": false
}" > /dev/null 2>&1 && echo "$repo" || echo " $repo (already exists or failed)"
done
# Create a PAT for the admin user (for SDK testing without Vault)
echo "Creating admin PAT..."
PAT=$(curl -sf -X POST "$GITEA/api/v1/users/$ADMIN_USER/tokens" \
-H 'Content-Type: application/json' \
-u "$ADMIN_USER:$ADMIN_PASS" \
-d '{"name":"sol-dev-pat","scopes":["read:repository","write:repository","read:user","read:organization","read:issue","write:issue","read:notification"]}' \
2>/dev/null | python3 -c "import sys,json; print(json.load(sys.stdin).get('sha1',json.load(sys.stdin).get('token','')))" 2>/dev/null || echo "")
if [ -z "$PAT" ]; then
# Token might already exist — try to get it
PAT="already-provisioned"
echo " PAT already exists (or creation failed)"
else
echo " PAT: ${PAT:0:8}..."
fi
echo ""
echo "Gitea bootstrap complete."
echo " Admin: $ADMIN_USER / $ADMIN_PASS"
echo " Org: studio"
echo " Repos: ${REPOS[*]}"
echo " URL: $GITEA"
if [ "$PAT" != "already-provisioned" ] && [ -n "$PAT" ]; then
echo ""
echo "Add to .env:"
echo " GITEA_PAT=$PAT"
fi

View File

@@ -48,6 +48,27 @@ services:
volumes: volumes:
- ./dev/searxng-settings.yml:/etc/searxng/settings.yml:ro - ./dev/searxng-settings.yml:/etc/searxng/settings.yml:ro
gitea:
image: gitea/gitea:1.22
environment:
- GITEA__database__DB_TYPE=sqlite3
- GITEA__server__ROOT_URL=http://localhost:3000
- GITEA__server__HTTP_PORT=3000
- GITEA__service__DISABLE_REGISTRATION=false
- GITEA__service__REQUIRE_SIGNIN_VIEW=false
- GITEA__security__INSTALL_LOCK=true
- GITEA__api__ENABLE_SWAGGER=false
ports:
- "3000:3000"
volumes:
- gitea-data:/data
healthcheck:
test: ["CMD-SHELL", "curl -sf http://localhost:3000/api/v1/version || exit 1"]
interval: 10s
timeout: 5s
retries: 10
volumes: volumes:
opensearch-data: opensearch-data:
tuwunel-data: tuwunel-data:
gitea-data:

View File

@@ -1,183 +0,0 @@
use matrix_sdk::room::Room;
use ruma::events::relation::Thread;
use ruma::events::room::message::{Relation, RoomMessageEventContent};
use ruma::OwnedEventId;
use tracing::warn;
use crate::matrix_utils;
/// Reaction emojis for agent progress lifecycle.
const REACTION_WORKING: &str = "\u{1F50D}"; // 🔍
const REACTION_PROCESSING: &str = "\u{2699}\u{FE0F}"; // ⚙️
const REACTION_DONE: &str = "\u{2705}"; // ✅
/// Manages the UX lifecycle for agentic work:
/// reactions on the user's message + a thread for tool call details.
pub struct AgentProgress {
room: Room,
user_event_id: OwnedEventId,
/// Event ID of the current reaction (so we can redact + replace).
current_reaction_id: Option<OwnedEventId>,
/// Event ID of the thread root (first message in our thread).
thread_root_id: Option<OwnedEventId>,
}
impl AgentProgress {
pub fn new(room: Room, user_event_id: OwnedEventId) -> Self {
Self {
room,
user_event_id,
current_reaction_id: None,
thread_root_id: None,
}
}
/// Start: add 🔍 reaction to indicate work has begun.
pub async fn start(&mut self) {
if let Ok(()) = matrix_utils::send_reaction(
&self.room,
self.user_event_id.clone(),
REACTION_WORKING,
)
.await
{
// We can't easily get the reaction event ID from send_reaction,
// so we track the emoji state instead.
self.current_reaction_id = None; // TODO: capture reaction event ID if needed
}
}
/// Post a step update to the thread on the user's message.
pub async fn post_step(&mut self, text: &str) {
let latest = self
.thread_root_id
.as_ref()
.unwrap_or(&self.user_event_id)
.clone();
let mut msg = RoomMessageEventContent::text_markdown(text);
let thread = Thread::plain(self.user_event_id.clone(), latest);
msg.relates_to = Some(Relation::Thread(thread));
match self.room.send(msg).await {
Ok(response) => {
if self.thread_root_id.is_none() {
self.thread_root_id = Some(response.event_id);
}
}
Err(e) => warn!("Failed to post agent step: {e}"),
}
}
/// Swap reaction to ⚙️ (processing).
pub async fn processing(&mut self) {
// Send new reaction (Matrix doesn't have "replace reaction" — we add another)
let _ = matrix_utils::send_reaction(
&self.room,
self.user_event_id.clone(),
REACTION_PROCESSING,
)
.await;
}
/// Swap reaction to ✅ (done).
pub async fn done(&mut self) {
let _ = matrix_utils::send_reaction(
&self.room,
self.user_event_id.clone(),
REACTION_DONE,
)
.await;
}
/// Format a tool call for the thread — concise, not raw args.
pub fn format_tool_call(name: &str, args: &str) -> String {
// Extract just the key params, not the full JSON blob
let summary = match serde_json::from_str::<serde_json::Value>(args) {
Ok(v) => {
let params: Vec<String> = v
.as_object()
.map(|obj| {
obj.iter()
.filter(|(_, v)| !v.is_null() && v.as_str() != Some(""))
.map(|(k, v)| {
let val = match v {
serde_json::Value::String(s) => {
if s.len() > 40 {
format!("{}", &s[..40])
} else {
s.clone()
}
}
other => other.to_string(),
};
format!("{k}={val}")
})
.collect()
})
.unwrap_or_default();
if params.is_empty() {
String::new()
} else {
format!(" ({})", params.join(", "))
}
}
Err(_) => String::new(),
};
format!("🔧 `{name}`{summary}")
}
/// Format a tool result for the thread — short summary only.
pub fn format_tool_result(name: &str, result: &str) -> String {
let truncated = if result.len() > 200 {
format!("{}", &result[..200])
} else {
result.to_string()
};
format!("← `{name}`: {truncated}")
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_format_tool_call_with_params() {
let formatted = AgentProgress::format_tool_call("search_archive", r#"{"query":"test","room":"general"}"#);
assert!(formatted.contains("search_archive"));
assert!(formatted.contains("query=test"));
assert!(formatted.contains("room=general"));
assert!(formatted.starts_with("🔧"));
}
#[test]
fn test_format_tool_call_no_params() {
let formatted = AgentProgress::format_tool_call("list_rooms", "{}");
assert_eq!(formatted, "🔧 `list_rooms`");
}
#[test]
fn test_format_tool_call_truncates_long_values() {
let long_code = "x".repeat(100);
let args = format!(r#"{{"code":"{}"}}"#, long_code);
let formatted = AgentProgress::format_tool_call("run_script", &args);
assert!(formatted.contains("code="));
assert!(formatted.contains(""));
assert!(formatted.len() < 200);
}
#[test]
fn test_format_tool_result_truncation() {
let long = "x".repeat(500);
let formatted = AgentProgress::format_tool_result("search", &long);
assert!(formatted.len() < 300);
assert!(formatted.ends_with('…'));
}
#[test]
fn test_format_tool_result_short() {
let formatted = AgentProgress::format_tool_result("search", "3 results found");
assert_eq!(formatted, "← `search`: 3 results found");
}
}

27
src/brain/chat.rs Normal file
View File

@@ -0,0 +1,27 @@
//! Utility: blocking Mistral chat wrapper.
//!
//! The Mistral client's `chat()` holds a MutexGuard across `.await`,
//! making the future !Send. This wrapper runs it in spawn_blocking.
use std::sync::Arc;
use mistralai_client::v1::{
chat::{ChatMessage, ChatParams, ChatResponse},
client::Client,
constants::Model,
error::ApiError,
};
pub(crate) async fn chat_blocking(
client: &Arc<Client>,
model: Model,
messages: Vec<ChatMessage>,
params: ChatParams,
) -> Result<ChatResponse, ApiError> {
let client = Arc::clone(client);
tokio::task::spawn_blocking(move || client.chat(model, messages, Some(params)))
.await
.map_err(|e| ApiError {
message: format!("spawn_blocking join error: {e}"),
})?
}

View File

@@ -1,4 +1,4 @@
pub mod chat;
pub mod conversation; pub mod conversation;
pub mod evaluator; pub mod evaluator;
pub mod personality; pub mod personality;
pub mod responder;

View File

@@ -1,619 +0,0 @@
use std::sync::Arc;
use mistralai_client::v1::{
chat::{ChatMessage, ChatParams, ChatResponse, ChatResponseChoiceFinishReason},
constants::Model,
conversations::{ConversationEntry, ConversationInput, FunctionResultEntry},
error::ApiError,
tool::ToolChoice,
};
use rand::Rng;
use tokio::time::{sleep, Duration};
use tracing::{debug, error, info, warn};
use matrix_sdk::room::Room;
use opensearch::OpenSearch;
use crate::agent_ux::AgentProgress;
use crate::brain::conversation::ContextMessage;
use crate::brain::personality::Personality;
use crate::config::Config;
use crate::context::ResponseContext;
use crate::conversations::ConversationRegistry;
use crate::memory;
use crate::time_context::TimeContext;
use crate::tools::ToolRegistry;
/// Run a Mistral chat completion on a blocking thread.
///
/// The mistral client's `chat_async` holds a `std::sync::MutexGuard` across an
/// `.await` point, making the future !Send. We use the synchronous `chat()`
/// method via `spawn_blocking` instead.
pub(crate) async fn chat_blocking(
client: &Arc<mistralai_client::v1::client::Client>,
model: Model,
messages: Vec<ChatMessage>,
params: ChatParams,
) -> Result<ChatResponse, ApiError> {
let client = Arc::clone(client);
tokio::task::spawn_blocking(move || client.chat(model, messages, Some(params)))
.await
.map_err(|e| ApiError {
message: format!("spawn_blocking join error: {e}"),
})?
}
pub struct Responder {
config: Arc<Config>,
personality: Arc<Personality>,
tools: Arc<ToolRegistry>,
opensearch: OpenSearch,
}
impl Responder {
pub fn new(
config: Arc<Config>,
personality: Arc<Personality>,
tools: Arc<ToolRegistry>,
opensearch: OpenSearch,
) -> Self {
Self {
config,
personality,
tools,
opensearch,
}
}
/// Get a reference to the tool registry (for sharing with gRPC server).
pub fn tools(&self) -> Arc<ToolRegistry> {
self.tools.clone()
}
pub async fn generate_response(
&self,
context: &[ContextMessage],
trigger_body: &str,
trigger_sender: &str,
room_name: &str,
members: &[String],
is_spontaneous: bool,
mistral: &Arc<mistralai_client::v1::client::Client>,
room: &Room,
response_ctx: &ResponseContext,
image_data_uri: Option<&str>,
) -> Option<String> {
// Apply response delay (skip if instant_responses is enabled)
// Delay happens BEFORE typing indicator — Sol "notices" the message first
if !self.config.behavior.instant_responses {
let delay = if is_spontaneous {
rand::thread_rng().gen_range(
self.config.behavior.spontaneous_delay_min_ms
..=self.config.behavior.spontaneous_delay_max_ms,
)
} else {
rand::thread_rng().gen_range(
self.config.behavior.response_delay_min_ms
..=self.config.behavior.response_delay_max_ms,
)
};
debug!(delay_ms = delay, is_spontaneous, "Applying response delay");
sleep(Duration::from_millis(delay)).await;
}
// Start typing AFTER the delay — Sol has decided to respond
let _ = room.typing_notice(true).await;
// Pre-response memory query
let memory_notes = self
.load_memory_notes(response_ctx, trigger_body)
.await;
let system_prompt = self.personality.build_system_prompt(
room_name,
members,
memory_notes.as_deref(),
response_ctx.is_dm,
);
let mut messages = vec![ChatMessage::new_system_message(&system_prompt)];
// Add context messages with timestamps so the model has time awareness
for msg in context {
let ts = chrono::DateTime::from_timestamp_millis(msg.timestamp)
.map(|d| d.format("%H:%M").to_string())
.unwrap_or_default();
if msg.sender == self.config.matrix.user_id {
messages.push(ChatMessage::new_assistant_message(&msg.content, None));
} else {
let user_msg = format!("[{}] {}: {}", ts, msg.sender, msg.content);
messages.push(ChatMessage::new_user_message(&user_msg));
}
}
// Add the triggering message (multimodal if image attached)
if let Some(data_uri) = image_data_uri {
use mistralai_client::v1::chat::{ContentPart, ImageUrl};
let mut parts = vec![];
if !trigger_body.is_empty() {
parts.push(ContentPart::Text {
text: format!("{trigger_sender}: {trigger_body}"),
});
}
parts.push(ContentPart::ImageUrl {
image_url: ImageUrl {
url: data_uri.to_string(),
detail: None,
},
});
messages.push(ChatMessage::new_user_message_with_images(parts));
} else {
let trigger = format!("{trigger_sender}: {trigger_body}");
messages.push(ChatMessage::new_user_message(&trigger));
}
let tool_defs = ToolRegistry::tool_definitions(self.tools.has_gitea(), self.tools.has_kratos());
let model = Model::new(&self.config.mistral.default_model);
let max_iterations = self.config.mistral.max_tool_iterations;
for iteration in 0..=max_iterations {
let params = ChatParams {
tools: if iteration < max_iterations {
Some(tool_defs.clone())
} else {
None
},
tool_choice: if iteration < max_iterations {
Some(ToolChoice::Auto)
} else {
None
},
..Default::default()
};
let response = match chat_blocking(mistral, model.clone(), messages.clone(), params).await {
Ok(r) => r,
Err(e) => {
let _ = room.typing_notice(false).await;
error!("Mistral chat failed: {e}");
return None;
}
};
let choice = &response.choices[0];
if choice.finish_reason == ChatResponseChoiceFinishReason::ToolCalls {
if let Some(tool_calls) = &choice.message.tool_calls {
// Add assistant message with tool calls
messages.push(ChatMessage::new_assistant_message(
&choice.message.content.text(),
Some(tool_calls.clone()),
));
for tc in tool_calls {
let call_id = tc.id.as_deref().unwrap_or("unknown");
info!(
tool = tc.function.name.as_str(),
id = call_id,
args = tc.function.arguments.as_str(),
"Executing tool call"
);
let result = self
.tools
.execute(&tc.function.name, &tc.function.arguments, response_ctx)
.await;
let result_str = match result {
Ok(s) => {
let preview: String = s.chars().take(500).collect();
info!(
tool = tc.function.name.as_str(),
id = call_id,
result_len = s.len(),
result_preview = preview.as_str(),
"Tool call result"
);
s
}
Err(e) => {
warn!(tool = tc.function.name.as_str(), "Tool failed: {e}");
format!("Error: {e}")
}
};
messages.push(ChatMessage::new_tool_message(
&result_str,
call_id,
Some(&tc.function.name),
));
}
debug!(iteration, "Tool iteration complete, continuing");
continue;
}
}
// Final text response — strip own name prefix if present
let mut text = choice.message.content.text().trim().to_string();
// Strip "sol:" or "sol 💕:" or similar prefixes the model sometimes adds
let lower = text.to_lowercase();
for prefix in &["sol:", "sol 💕:", "sol💕:"] {
if lower.starts_with(prefix) {
text = text[prefix.len()..].trim().to_string();
break;
}
}
if text.is_empty() {
info!("Generated empty response, skipping send");
let _ = room.typing_notice(false).await;
return None;
}
let preview: String = text.chars().take(120).collect();
let _ = room.typing_notice(false).await;
info!(
response_len = text.len(),
response_preview = preview.as_str(),
is_spontaneous,
tool_iterations = iteration,
"Generated response"
);
return Some(text);
}
let _ = room.typing_notice(false).await;
warn!("Exceeded max tool iterations");
None
}
/// Generate a response using the Mistral Conversations API.
/// This path routes through the ConversationRegistry for persistent state,
/// agent handoffs, and function calling with UX feedback (reactions + threads).
pub async fn generate_response_conversations(
&self,
trigger_body: &str,
trigger_sender: &str,
room_id: &str,
room_name: &str,
is_dm: bool,
is_spontaneous: bool,
mistral: &Arc<mistralai_client::v1::client::Client>,
room: &Room,
response_ctx: &ResponseContext,
conversation_registry: &ConversationRegistry,
image_data_uri: Option<&str>,
context_hint: Option<String>,
event_id: ruma::OwnedEventId,
) -> Option<String> {
// Apply response delay
if !self.config.behavior.instant_responses {
let delay = if is_spontaneous {
rand::thread_rng().gen_range(
self.config.behavior.spontaneous_delay_min_ms
..=self.config.behavior.spontaneous_delay_max_ms,
)
} else {
rand::thread_rng().gen_range(
self.config.behavior.response_delay_min_ms
..=self.config.behavior.response_delay_max_ms,
)
};
sleep(Duration::from_millis(delay)).await;
}
let _ = room.typing_notice(true).await;
// Pre-response memory query (same as legacy path)
let memory_notes = self.load_memory_notes(response_ctx, trigger_body).await;
// Build the input message with dynamic context.
// Agent instructions are static (set at creation), so per-message context
// (timestamps, room, members, memory) is prepended to each user message.
let tc = TimeContext::now();
let mut context_header = format!(
"{}\n[room: {} ({})]",
tc.message_line(),
room_name,
room_id,
);
if let Some(ref notes) = memory_notes {
context_header.push('\n');
context_header.push_str(notes);
}
let user_msg = if is_dm {
trigger_body.to_string()
} else {
format!("<{}> {}", response_ctx.matrix_user_id, trigger_body)
};
let input_text = format!("{context_header}\n{user_msg}");
let input = ConversationInput::Text(input_text);
// Send through conversation registry
let response = match conversation_registry
.send_message(room_id, input, is_dm, mistral, context_hint.as_deref())
.await
{
Ok(r) => r,
Err(e) => {
error!("Conversation API failed: {e}");
let _ = room.typing_notice(false).await;
return None;
}
};
// Check for function calls — execute locally and send results back
let function_calls = response.function_calls();
if !function_calls.is_empty() {
// Agent UX: react with 🔍 and post tool details in a thread
let mut progress = crate::agent_ux::AgentProgress::new(
room.clone(),
event_id.clone(),
);
progress.start().await;
let max_iterations = self.config.mistral.max_tool_iterations;
let mut current_response = response;
for iteration in 0..max_iterations {
let calls = current_response.function_calls();
if calls.is_empty() {
break;
}
let mut result_entries = Vec::new();
for fc in &calls {
let call_id = fc.tool_call_id.as_deref().unwrap_or("unknown");
info!(
tool = fc.name.as_str(),
id = call_id,
args = fc.arguments.as_str(),
"Executing tool call (conversations)"
);
// Post tool call to thread
progress
.post_step(&crate::agent_ux::AgentProgress::format_tool_call(
&fc.name,
&fc.arguments,
))
.await;
let result = if fc.name == "research" {
self.tools
.execute_research(
&fc.arguments,
response_ctx,
room,
&event_id,
0, // depth 0 — orchestrator level
)
.await
} else {
self.tools
.execute(&fc.name, &fc.arguments, response_ctx)
.await
};
let result_str = match result {
Ok(s) => {
let preview: String = s.chars().take(500).collect();
info!(
tool = fc.name.as_str(),
id = call_id,
result_len = s.len(),
result_preview = preview.as_str(),
"Tool call result (conversations)"
);
s
}
Err(e) => {
warn!(tool = fc.name.as_str(), "Tool failed: {e}");
format!("Error: {e}")
}
};
result_entries.push(ConversationEntry::FunctionResult(FunctionResultEntry {
tool_call_id: call_id.to_string(),
result: result_str,
id: None,
object: None,
created_at: None,
completed_at: None,
}));
}
// Send function results back to conversation
current_response = match conversation_registry
.send_function_result(room_id, result_entries, mistral)
.await
{
Ok(r) => r,
Err(e) => {
error!("Failed to send function results: {e}");
let _ = room.typing_notice(false).await;
return None;
}
};
debug!(iteration, "Tool iteration complete (conversations)");
}
// Done with tool calls
progress.done().await;
// Extract final text from the last response
if let Some(text) = current_response.assistant_text() {
let text = strip_sol_prefix(&text);
if text.is_empty() {
let _ = room.typing_notice(false).await;
return None;
}
let _ = room.typing_notice(false).await;
info!(
response_len = text.len(),
"Generated response (conversations + tools)"
);
return Some(text);
}
let _ = room.typing_notice(false).await;
return None;
}
// Simple response — no tools involved
if let Some(text) = response.assistant_text() {
let text = strip_sol_prefix(&text);
if text.is_empty() {
let _ = room.typing_notice(false).await;
return None;
}
let _ = room.typing_notice(false).await;
info!(
response_len = text.len(),
is_spontaneous,
"Generated response (conversations)"
);
return Some(text);
}
let _ = room.typing_notice(false).await;
None
}
async fn load_memory_notes(
&self,
ctx: &ResponseContext,
trigger_body: &str,
) -> Option<String> {
let index = &self.config.opensearch.memory_index;
let user_id = &ctx.user_id;
// Search for topically relevant memories
let mut memories = memory::store::query(
&self.opensearch,
index,
user_id,
trigger_body,
5,
)
.await
.unwrap_or_default();
// Backfill with recent memories if we have fewer than 3
if memories.len() < 3 {
let remaining = 5 - memories.len();
if let Ok(recent) = memory::store::get_recent(
&self.opensearch,
index,
user_id,
remaining,
)
.await
{
let existing_ids: std::collections::HashSet<String> =
memories.iter().map(|m| m.id.clone()).collect();
for doc in recent {
if !existing_ids.contains(&doc.id) && memories.len() < 5 {
memories.push(doc);
}
}
}
}
if memories.is_empty() {
return None;
}
let display = ctx
.display_name
.as_deref()
.unwrap_or(&ctx.matrix_user_id);
Some(format_memory_notes(display, &memories))
}
}
/// Strip "sol:" or "sol 💕:" prefixes the model sometimes adds.
fn strip_sol_prefix(text: &str) -> String {
let trimmed = text.trim();
let lower = trimmed.to_lowercase();
for prefix in &["sol:", "sol 💕:", "sol💕:"] {
if lower.starts_with(prefix) {
return trimmed[prefix.len()..].trim().to_string();
}
}
trimmed.to_string()
}
/// Format memory documents into a notes block for the system prompt.
pub(crate) fn format_memory_notes(
display_name: &str,
memories: &[memory::schema::MemoryDocument],
) -> String {
let mut lines = vec![format!(
"## notes about {display_name}\n\n\
these are your private notes about the person you're talking to.\n\
use them to inform your responses but don't mention that you have notes.\n"
)];
for mem in memories {
lines.push(format!("- [{}] {}", mem.category, mem.content));
}
lines.join("\n")
}
#[cfg(test)]
mod tests {
use super::*;
use crate::memory::schema::MemoryDocument;
fn make_mem(id: &str, content: &str, category: &str) -> MemoryDocument {
MemoryDocument {
id: id.into(),
user_id: "sienna@sunbeam.pt".into(),
content: content.into(),
category: category.into(),
created_at: 1710000000000,
updated_at: 1710000000000,
source: "auto".into(),
}
}
#[test]
fn test_format_memory_notes_basic() {
let memories = vec![
make_mem("a", "prefers terse answers", "preference"),
make_mem("b", "working on drive UI", "fact"),
];
let result = format_memory_notes("sienna", &memories);
assert!(result.contains("## notes about sienna"));
assert!(result.contains("don't mention that you have notes"));
assert!(result.contains("- [preference] prefers terse answers"));
assert!(result.contains("- [fact] working on drive UI"));
}
#[test]
fn test_format_memory_notes_single() {
let memories = vec![make_mem("x", "birthday is march 12", "context")];
let result = format_memory_notes("lonni", &memories);
assert!(result.contains("## notes about lonni"));
assert!(result.contains("- [context] birthday is march 12"));
}
#[test]
fn test_format_memory_notes_uses_display_name() {
let memories = vec![make_mem("a", "test", "general")];
let result = format_memory_notes("Amber", &memories);
assert!(result.contains("## notes about Amber"));
}
}

View File

@@ -76,29 +76,34 @@ pub async fn index_repo(
let mut count = 0u32; let mut count = 0u32;
let mut dirs_to_visit = vec![String::new()]; // start at repo root let mut dirs_to_visit = vec![String::new()]; // start at repo root
// Build base URL for direct API calls (SDK's get_file can't handle directory listings)
let base_url = &gitea.base_url;
let token = gitea.ensure_token(localpart).await
.map_err(|e| anyhow::anyhow!("Failed to get Gitea token: {e}"))?;
while let Some(dir_path) = dirs_to_visit.pop() { while let Some(dir_path) = dirs_to_visit.pop() {
let entries = match gitea // Call Gitea contents API directly — returns array for directories
.get_file(localpart, owner, repo, &dir_path, Some(branch)) let url = format!("{base_url}/api/v1/repos/{owner}/{repo}/contents/{dir_path}?ref={branch}");
let response = match reqwest::Client::new()
.get(&url)
.header("Authorization", format!("token {token}"))
.send()
.await .await
{ {
Ok(content) => content, Ok(r) => r,
Err(e) => { Err(e) => {
debug!(owner, repo, path = dir_path.as_str(), "Failed to list directory: {e}"); debug!(owner, repo, path = dir_path.as_str(), "Failed to list directory: {e}");
continue; continue;
} }
}; };
// get_file returns a JSON string — parse as array of entries let items: Vec<serde_json::Value> = match response.json().await {
let entries_json: serde_json::Value = Ok(v) => v,
serde_json::from_str(&serde_json::to_string(&entries).unwrap_or_default()) Err(e) => {
.unwrap_or_default(); debug!(owner, repo, path = dir_path.as_str(), "Failed to parse directory: {e}");
continue;
// If it's a single file response (not a directory listing), skip }
if !entries_json.is_array() { };
continue;
}
let items = entries_json.as_array().unwrap();
for item in items { for item in items {
let name = item["name"].as_str().unwrap_or(""); let name = item["name"].as_str().unwrap_or("");
let path = item["path"].as_str().unwrap_or(""); let path = item["path"].as_str().unwrap_or("");
@@ -132,7 +137,7 @@ pub async fn index_repo(
} }
// Fetch file content // Fetch file content
let content = match fetch_file_content(gitea, localpart, owner, repo, path, branch).await { let content = match fetch_file_content(base_url, &token, owner, repo, path, branch).await {
Some(c) => c, Some(c) => c,
None => continue, None => continue,
}; };
@@ -174,25 +179,26 @@ pub async fn index_repo(
/// Fetch and decode a file's content from Gitea (base64-encoded API response). /// Fetch and decode a file's content from Gitea (base64-encoded API response).
async fn fetch_file_content( async fn fetch_file_content(
gitea: &GiteaClient, base_url: &str,
localpart: &str, token: &str,
owner: &str, owner: &str,
repo: &str, repo: &str,
path: &str, path: &str,
branch: &str, branch: &str,
) -> Option<String> { ) -> Option<String> {
let response = gitea let url = format!("{base_url}/api/v1/repos/{owner}/{repo}/contents/{path}?ref={branch}");
.get_file(localpart, owner, repo, path, Some(branch)) let response = reqwest::Client::new()
.get(&url)
.header("Authorization", format!("token {token}"))
.send()
.await .await
.ok()?; .ok()?;
// The response is a JSON string — parse it let json: serde_json::Value = response.json().await.ok()?;
let json_str = serde_json::to_string(&response).ok()?;
let json: serde_json::Value = serde_json::from_str(&json_str).ok()?;
// Content is base64-encoded // Content is base64-encoded
let encoded = json["content"].as_str()?; let encoded = json["content"].as_str()?;
let cleaned = encoded.replace('\n', ""); // Gitea adds newlines in base64 let cleaned = encoded.replace('\n', "");
let decoded = base64::Engine::decode(&base64::engine::general_purpose::STANDARD, &cleaned).ok()?; let decoded = base64::Engine::decode(&base64::engine::general_purpose::STANDARD, &cleaned).ok()?;
String::from_utf8(decoded).ok() String::from_utf8(decoded).ok()
} }

View File

@@ -934,7 +934,7 @@ mod code_index_tests {
use crate::code_index::indexer::CodeIndexer; use crate::code_index::indexer::CodeIndexer;
use crate::breadcrumbs; use crate::breadcrumbs;
fn os_client() -> Option<opensearch::OpenSearch> { pub(super) fn os_client() -> Option<opensearch::OpenSearch> {
use opensearch::http::transport::{SingleNodeConnectionPool, TransportBuilder}; use opensearch::http::transport::{SingleNodeConnectionPool, TransportBuilder};
let url = url::Url::parse("http://localhost:9200").ok()?; let url = url::Url::parse("http://localhost:9200").ok()?;
let transport = TransportBuilder::new(SingleNodeConnectionPool::new(url)) let transport = TransportBuilder::new(SingleNodeConnectionPool::new(url))
@@ -943,13 +943,13 @@ mod code_index_tests {
Some(opensearch::OpenSearch::new(transport)) Some(opensearch::OpenSearch::new(transport))
} }
async fn setup_test_index(client: &opensearch::OpenSearch) -> String { pub(super) async fn setup_test_index(client: &opensearch::OpenSearch) -> String {
let index = format!("sol_code_test_{}", uuid::Uuid::new_v4().to_string().split('-').next().unwrap()); let index = format!("sol_code_test_{}", uuid::Uuid::new_v4().to_string().split('-').next().unwrap());
schema::create_index_if_not_exists(client, &index).await.unwrap(); schema::create_index_if_not_exists(client, &index).await.unwrap();
index index
} }
async fn refresh_index(client: &opensearch::OpenSearch, index: &str) { pub(super) async fn refresh_index(client: &opensearch::OpenSearch, index: &str) {
let _ = client let _ = client
.indices() .indices()
.refresh(opensearch::indices::IndicesRefreshParts::Index(&[index])) .refresh(opensearch::indices::IndicesRefreshParts::Index(&[index]))
@@ -957,7 +957,7 @@ mod code_index_tests {
.await; .await;
} }
async fn cleanup_index(client: &opensearch::OpenSearch, index: &str) { pub(super) async fn cleanup_index(client: &opensearch::OpenSearch, index: &str) {
let _ = client let _ = client
.indices() .indices()
.delete(opensearch::indices::IndicesDeleteParts::Index(&[index])) .delete(opensearch::indices::IndicesDeleteParts::Index(&[index]))
@@ -1275,3 +1275,358 @@ mod code_index_tests {
cleanup_index(&client, &index).await; cleanup_index(&client, &index).await;
} }
} }
// ══════════════════════════════════════════════════════════════════════════
// Gitea SDK + devtools integration tests (requires local Gitea)
// ══════════════════════════════════════════════════════════════════════════
mod gitea_tests {
use super::*;
use std::sync::Arc;
fn load_env() {
let env_path = std::path::Path::new(env!("CARGO_MANIFEST_DIR")).join(".env");
if let Ok(contents) = std::fs::read_to_string(&env_path) {
for line in contents.lines() {
let line = line.trim();
if line.is_empty() || line.starts_with('#') { continue; }
if let Some((k, v)) = line.split_once('=') {
std::env::set_var(k.trim(), v.trim());
}
}
}
}
fn gitea_available() -> bool {
load_env();
let url = std::env::var("GITEA_URL").unwrap_or_default();
if url.is_empty() { return false; }
std::process::Command::new("curl")
.args(["-sf", &format!("{url}/api/v1/version")])
.output()
.map(|o| o.status.success())
.unwrap_or(false)
}
fn gitea_client() -> Option<Arc<crate::sdk::gitea::GiteaClient>> {
if !gitea_available() { return None; }
let url = std::env::var("GITEA_URL").ok()?;
let user = std::env::var("GITEA_ADMIN_USERNAME").ok()?;
let pass = std::env::var("GITEA_ADMIN_PASSWORD").ok()?;
let store = Arc::new(Store::open_memory().unwrap());
// Create a minimal vault client (won't be used — admin uses basic auth)
let vault = Arc::new(crate::sdk::vault::VaultClient::new(
"http://localhost:8200".into(), "test".into(), "secret".into(),
));
let token_store = Arc::new(crate::sdk::tokens::TokenStore::new(store, vault));
Some(Arc::new(crate::sdk::gitea::GiteaClient::new(
url, user, pass, token_store,
)))
}
#[tokio::test]
async fn test_list_repos() {
let Some(gitea) = gitea_client() else {
eprintln!("Skipping: Gitea not available");
return;
};
let repos = gitea.list_repos("sol", None, Some("studio"), Some(50)).await;
assert!(repos.is_ok(), "list_repos should succeed: {:?}", repos.err());
let repos = repos.unwrap();
assert!(!repos.is_empty(), "Should find repos in studio org");
// Should find sol repo
let sol = repos.iter().find(|r| r.full_name.contains("sol"));
assert!(sol.is_some(), "Should find studio/sol repo");
}
#[tokio::test]
async fn test_get_repo() {
let Some(gitea) = gitea_client() else {
eprintln!("Skipping: Gitea not available");
return;
};
let repo = gitea.get_repo("sol", "studio", "sol").await;
assert!(repo.is_ok(), "get_repo should succeed: {:?}", repo.err());
let repo = repo.unwrap();
assert!(!repo.default_branch.is_empty(), "Should have a default branch");
}
#[tokio::test]
async fn test_get_file_directory() {
let Some(gitea) = gitea_client() else {
eprintln!("Skipping: Gitea not available");
return;
};
// List repo root — SDK returns parse error for directory listings (known issue),
// but the API call itself should succeed
let result = gitea.get_file("sol", "studio", "sol", "", None).await;
// Directory listing returns an array, SDK expects single object — may error
// Just verify we can call it without panic
let _ = result;
}
#[tokio::test]
async fn test_get_file_content() {
let Some(gitea) = gitea_client() else {
eprintln!("Skipping: Gitea not available");
return;
};
let result = gitea.get_file("sol", "studio", "sol", "Cargo.toml", None).await;
assert!(result.is_ok(), "Should get Cargo.toml: {:?}", result.err());
}
#[tokio::test]
async fn test_gitea_code_indexing() {
let Some(gitea) = gitea_client() else {
eprintln!("Skipping: Gitea not available");
return;
};
let Some(os) = super::code_index_tests::os_client() else {
eprintln!("Skipping: OpenSearch not available");
return;
};
let index = super::code_index_tests::setup_test_index(&os).await;
let mut indexer = crate::code_index::indexer::CodeIndexer::new(
os.clone(), index.clone(), String::new(), 50,
);
// Index the mistralai-client-rs repo (small, Rust)
let result = crate::code_index::gitea::index_repo(
&gitea, &mut indexer, "sol", "studio", "mistralai-client-rs", "main",
).await;
assert!(result.is_ok(), "Indexing should succeed: {:?}", result.err());
let count = result.unwrap();
indexer.flush().await;
// Should have found symbols
assert!(count > 0, "Should extract symbols from Rust repo, got 0");
// Verify we can search them
super::code_index_tests::refresh_index(&os, &index).await;
let search_result = crate::tools::code_search::search_code(
&os, &index,
r#"{"query": "Client"}"#,
Some("mistralai-client-rs"), None,
).await.unwrap();
assert!(!search_result.contains("No code results"), "Should find Client in results: {search_result}");
super::code_index_tests::cleanup_index(&os, &index).await;
}
}
// ══════════════════════════════════════════════════════════════════════════
// Web search + conversation registry tests
// ══════════════════════════════════════════════════════════════════════════
mod service_tests {
use super::*;
#[tokio::test]
async fn test_web_search() {
// Requires SearXNG at localhost:8888
let result = reqwest::get("http://localhost:8888/search?q=test&format=json").await;
if result.is_err() {
eprintln!("Skipping: SearXNG not available");
return;
}
let tool_result = crate::tools::web_search::search(
"http://localhost:8888",
r#"{"query": "rust programming language", "limit": 3}"#,
).await;
assert!(tool_result.is_ok(), "Web search should succeed: {:?}", tool_result.err());
let text = tool_result.unwrap();
assert!(!text.is_empty(), "Should return results");
assert!(text.to_lowercase().contains("rust"), "Should mention Rust in results");
}
#[tokio::test]
async fn test_conversation_registry_lifecycle() {
let store = Arc::new(Store::open_memory().unwrap());
let registry = crate::conversations::ConversationRegistry::new(
"mistral-medium-latest".into(),
118000,
store,
);
// No conversation should exist yet
let conv_id = registry.get_conversation_id("test-room").await;
assert!(conv_id.is_none(), "Should have no conversation initially");
}
#[tokio::test]
async fn test_conversation_send_message() {
let env_path = std::path::Path::new(env!("CARGO_MANIFEST_DIR")).join(".env");
if let Ok(contents) = std::fs::read_to_string(&env_path) {
for line in contents.lines() {
let line = line.trim();
if line.is_empty() || line.starts_with('#') { continue; }
if let Some((k, v)) = line.split_once('=') {
std::env::set_var(k.trim(), v.trim());
}
}
}
let api_key = match std::env::var("SOL_MISTRAL_API_KEY") {
Ok(k) => k,
Err(_) => { eprintln!("Skipping: no API key"); return; }
};
let mistral = Arc::new(
mistralai_client::v1::client::Client::new(Some(api_key), None, None, None).unwrap(),
);
let store = Arc::new(Store::open_memory().unwrap());
let registry = crate::conversations::ConversationRegistry::new(
"mistral-medium-latest".into(),
118000,
store,
);
let conv_key = format!("test-{}", uuid::Uuid::new_v4());
let input = mistralai_client::v1::conversations::ConversationInput::Text("say hi".into());
let result = registry.send_message(&conv_key, input, true, &mistral, None).await;
assert!(result.is_ok(), "send_message should succeed: {:?}", result.err());
// Conversation should now exist
let conv_id = registry.get_conversation_id(&conv_key).await;
assert!(conv_id.is_some(), "Conversation should be stored after first message");
}
#[test]
fn test_evaluator_rule_matching() {
let config = test_config();
let evaluator = crate::brain::evaluator::Evaluator::new(
config,
"you are sol.".into(),
);
// DM should trigger MustRespond
let engagement = evaluator.evaluate_rules(
"@alice:sunbeam.pt",
"hey sol",
true, // DM
);
assert!(engagement.is_some(), "DM should trigger a rule");
// Own message should be Ignored
let engagement = evaluator.evaluate_rules(
"@test:localhost", // matches config user_id
"hello",
false,
);
assert!(engagement.is_some(), "Own message should be Ignored");
}
}
// ══════════════════════════════════════════════════════════════════════════
// gRPC bridge unit tests
// ══════════════════════════════════════════════════════════════════════════
mod bridge_tests {
use crate::grpc::bridge;
use crate::orchestrator::event::*;
#[tokio::test]
async fn test_bridge_thinking_event() {
let (tx, mut rx) = tokio::sync::mpsc::channel(16);
let (event_tx, event_rx) = tokio::sync::broadcast::channel(16);
let rid = RequestId::new();
let rid2 = rid.clone();
let handle = tokio::spawn(async move {
bridge::bridge_events_to_grpc(rid2, event_rx, tx).await;
});
// Send Thinking + Done
let _ = event_tx.send(OrchestratorEvent::Thinking { request_id: rid.clone() });
let _ = event_tx.send(OrchestratorEvent::Done {
request_id: rid.clone(),
text: "hello".into(),
usage: TokenUsage::default(),
});
// Collect messages
let mut msgs = Vec::new();
while let Some(Ok(msg)) = rx.recv().await {
msgs.push(msg);
if msgs.len() >= 2 { break; }
}
assert_eq!(msgs.len(), 2, "Should get Status + TextDone");
let _ = handle.await;
}
#[tokio::test]
async fn test_bridge_tool_call_client() {
let (tx, mut rx) = tokio::sync::mpsc::channel(16);
let (event_tx, event_rx) = tokio::sync::broadcast::channel(16);
let rid = RequestId::new();
let rid2 = rid.clone();
let handle = tokio::spawn(async move {
bridge::bridge_events_to_grpc(rid2, event_rx, tx).await;
});
let _ = event_tx.send(OrchestratorEvent::ToolCallDetected {
request_id: rid.clone(),
call_id: "c1".into(),
name: "file_read".into(),
args: "{}".into(),
side: ToolSide::Client,
});
let _ = event_tx.send(OrchestratorEvent::Done {
request_id: rid.clone(),
text: "done".into(),
usage: TokenUsage::default(),
});
let mut msgs = Vec::new();
while let Some(Ok(msg)) = rx.recv().await {
msgs.push(msg);
if msgs.len() >= 2 { break; }
}
// First message should be ToolCall
assert!(msgs.len() >= 1);
let _ = handle.await;
}
#[tokio::test]
async fn test_bridge_filters_by_request_id() {
let (tx, mut rx) = tokio::sync::mpsc::channel(16);
let (event_tx, event_rx) = tokio::sync::broadcast::channel(16);
let rid = RequestId::new();
let other_rid = RequestId::new();
let rid2 = rid.clone();
let handle = tokio::spawn(async move {
bridge::bridge_events_to_grpc(rid2, event_rx, tx).await;
});
// Send event for different request — should be filtered out
let _ = event_tx.send(OrchestratorEvent::Thinking { request_id: other_rid });
// Send Done for our request — should be forwarded
let _ = event_tx.send(OrchestratorEvent::Done {
request_id: rid.clone(),
text: "hi".into(),
usage: TokenUsage::default(),
});
let msg = rx.recv().await;
assert!(msg.is_some(), "Should get Done message (filtered correctly)");
let _ = handle.await;
}
}

View File

@@ -1,4 +1,3 @@
mod agent_ux;
mod agents; mod agents;
mod archive; mod archive;
mod brain; mod brain;
@@ -39,7 +38,6 @@ use conversations::ConversationRegistry;
use memory::schema::create_index_if_not_exists as create_memory_index; use memory::schema::create_index_if_not_exists as create_memory_index;
use brain::evaluator::Evaluator; use brain::evaluator::Evaluator;
use brain::personality::Personality; use brain::personality::Personality;
use brain::responder::Responder;
use config::Config; use config::Config;
use sync::AppState; use sync::AppState;
use tools::ToolRegistry; use tools::ToolRegistry;
@@ -212,12 +210,8 @@ async fn main() -> anyhow::Result<()> {
)); ));
let indexer = Arc::new(Indexer::new(os_client.clone(), config.clone())); let indexer = Arc::new(Indexer::new(os_client.clone(), config.clone()));
let evaluator = Arc::new(Evaluator::new(config.clone(), system_prompt_text.clone())); let evaluator = Arc::new(Evaluator::new(config.clone(), system_prompt_text.clone()));
let responder = Arc::new(Responder::new( let tools = tool_registry; // already Arc<ToolRegistry>
config.clone(), // personality is already Arc<Personality>
personality,
tool_registry,
os_client.clone(),
));
// Start background flush task // Start background flush task
let _flush_handle = indexer.start_flush_task(); let _flush_handle = indexer.start_flush_task();
@@ -235,7 +229,8 @@ async fn main() -> anyhow::Result<()> {
config: config.clone(), config: config.clone(),
indexer, indexer,
evaluator, evaluator,
responder, tools: tools.clone(),
personality,
conversations, conversations,
agent_registry, agent_registry,
conversation_registry, conversation_registry,
@@ -313,14 +308,14 @@ async fn main() -> anyhow::Result<()> {
.unwrap_or_default(); .unwrap_or_default();
let orch = Arc::new(orchestrator::Orchestrator::new( let orch = Arc::new(orchestrator::Orchestrator::new(
config.clone(), config.clone(),
state.responder.tools(), tools.clone(),
state.mistral.clone(), state.mistral.clone(),
state.conversation_registry.clone(), state.conversation_registry.clone(),
system_prompt_text.clone(), system_prompt_text.clone(),
)); ));
let grpc_state = std::sync::Arc::new(grpc::GrpcState { let grpc_state = std::sync::Arc::new(grpc::GrpcState {
config: config.clone(), config: config.clone(),
tools: state.responder.tools(), tools: tools.clone(),
store: store.clone(), store: store.clone(),
mistral: state.mistral.clone(), mistral: state.mistral.clone(),
matrix: Some(matrix_client.clone()), matrix: Some(matrix_client.clone()),

View File

@@ -10,7 +10,7 @@ use tracing::{debug, warn};
use crate::config::Config; use crate::config::Config;
use crate::context::ResponseContext; use crate::context::ResponseContext;
use crate::brain::responder::chat_blocking; use crate::brain::chat::chat_blocking;
use super::store; use super::store;

View File

@@ -20,7 +20,7 @@ const TOKEN_SCOPES: &[&str] = &[
]; ];
pub struct GiteaClient { pub struct GiteaClient {
base_url: String, pub base_url: String,
admin_username: String, admin_username: String,
admin_password: String, admin_password: String,
http: HttpClient, http: HttpClient,

View File

@@ -19,7 +19,8 @@ use crate::archive::indexer::Indexer;
use crate::archive::schema::ArchiveDocument; use crate::archive::schema::ArchiveDocument;
use crate::brain::conversation::{ContextMessage, ConversationManager}; use crate::brain::conversation::{ContextMessage, ConversationManager};
use crate::brain::evaluator::{Engagement, Evaluator}; use crate::brain::evaluator::{Engagement, Evaluator};
use crate::brain::responder::Responder; use crate::brain::personality::Personality;
use crate::tools::ToolRegistry;
use crate::config::Config; use crate::config::Config;
use crate::context::{self, ResponseContext}; use crate::context::{self, ResponseContext};
use crate::conversations::ConversationRegistry; use crate::conversations::ConversationRegistry;
@@ -30,7 +31,8 @@ pub struct AppState {
pub config: Arc<Config>, pub config: Arc<Config>,
pub indexer: Arc<Indexer>, pub indexer: Arc<Indexer>,
pub evaluator: Arc<Evaluator>, pub evaluator: Arc<Evaluator>,
pub responder: Arc<Responder>, pub tools: Arc<ToolRegistry>,
pub personality: Arc<Personality>,
pub conversations: Arc<Mutex<ConversationManager>>, pub conversations: Arc<Mutex<ConversationManager>>,
pub mistral: Arc<mistralai_client::v1::client::Client>, pub mistral: Arc<mistralai_client::v1::client::Client>,
pub opensearch: OpenSearch, pub opensearch: OpenSearch,
@@ -365,41 +367,36 @@ async fn handle_message(
None None
}; };
let response = if state.config.agents.use_conversations_api { // Generate response via ConversationRegistry (Conversations API path).
state // The legacy manual chat path has been removed — Conversations API is now mandatory.
.responder let input_text = {
.generate_response_conversations( let tc = crate::time_context::TimeContext::now();
&body, let mut header = format!("{}\n[room: {} ({})]", tc.message_line(), room_name, room_id);
display_sender, // TODO: inject memory notes + breadcrumbs here (like the orchestrator does)
&room_id, let user_msg = if is_dm {
&room_name, body.clone()
is_dm, } else {
is_spontaneous, format!("<{}> {}", response_ctx.matrix_user_id, body)
&state.mistral, };
&room, format!("{header}\n{user_msg}")
&response_ctx, };
&state.conversation_registry,
image_data_uri.as_deref(), let input = mistralai_client::v1::conversations::ConversationInput::Text(input_text);
context_hint, let conv_result = state
event.event_id.clone().into(), .conversation_registry
) .send_message(&room_id, input, is_dm, &state.mistral, context_hint.as_deref())
.await .await;
} else {
state let response = match conv_result {
.responder Ok(conv_response) => {
.generate_response( // Simple path: extract text (no tool loop for Matrix — tools handled by orchestrator)
&context, // TODO: wire full orchestrator + Matrix bridge for tool support
&body, conv_response.assistant_text()
display_sender, }
&room_name, Err(e) => {
&members, error!("Conversation API failed: {e}");
is_spontaneous, None
&state.mistral, }
&room,
&response_ctx,
image_data_uri.as_deref(),
)
.await
}; };
if let Some(text) = response { if let Some(text) = response {

View File

@@ -11,7 +11,7 @@ use serde_json::json;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use tracing::{debug, error, info, warn}; use tracing::{debug, error, info, warn};
use crate::agent_ux::AgentProgress; // AgentProgress removed — research thread UX moved to Matrix bridge (future)
use crate::config::Config; use crate::config::Config;
use crate::context::ResponseContext; use crate::context::ResponseContext;
use crate::persistence::Store; use crate::persistence::Store;
@@ -142,29 +142,18 @@ pub async fn execute(
let (tx, mut rx) = mpsc::channel::<ProgressUpdate>(64); let (tx, mut rx) = mpsc::channel::<ProgressUpdate>(64);
// Spawn thread updater // Spawn thread updater
let thread_room = room.clone(); let _thread_room = room.clone();
let thread_event_id = event_id.clone(); let _thread_event_id = event_id.clone();
let agent_count = tasks.len(); let agent_count = tasks.len();
// Progress updates: drain channel (UX moved to orchestrator events / Matrix bridge)
let updater = tokio::spawn(async move { let updater = tokio::spawn(async move {
let mut progress = AgentProgress::new(thread_room, thread_event_id); info!(agent_count, "Research session started");
progress
.post_step(&format!("🔬 researching with {} agents...", agent_count))
.await;
while let Some(update) = rx.recv().await { while let Some(update) = rx.recv().await {
let msg = match update { match update {
ProgressUpdate::AgentStarted { focus } => { ProgressUpdate::AgentStarted { focus } => debug!(focus = focus.as_str(), "Agent started"),
format!("🔎 {focus}") ProgressUpdate::AgentDone { focus, .. } => debug!(focus = focus.as_str(), "Agent done"),
} ProgressUpdate::AgentFailed { focus, error } => warn!(focus = focus.as_str(), error = error.as_str(), "Agent failed"),
ProgressUpdate::AgentDone { focus, summary } => { }
let short: String = summary.chars().take(100).collect();
format!("{focus}: {short}")
}
ProgressUpdate::AgentFailed { focus, error } => {
format!("{focus}: {error}")
}
};
progress.post_step(&msg).await;
} }
}); });