per-message context headers, memory notes, conversation continuity
conversations API path now injects per-message context headers with live timestamps, room name, and memory notes. this replaces the template variables in agent instructions which were frozen at creation time. memory notes (topical + recent backfill) loaded before each response in the conversations path — was previously only in the legacy path. context hint seeds new conversations with recent room history after resets, so sol doesn't lose conversational continuity on sneeze. tool call results now logged with preview + length for debugging. reset_all() clears both in-memory and sqlite conversation state.
This commit is contained in:
@@ -146,7 +146,7 @@ impl Responder {
|
|||||||
messages.push(ChatMessage::new_user_message(&trigger));
|
messages.push(ChatMessage::new_user_message(&trigger));
|
||||||
}
|
}
|
||||||
|
|
||||||
let tool_defs = ToolRegistry::tool_definitions();
|
let tool_defs = ToolRegistry::tool_definitions(self.tools.has_gitea());
|
||||||
let model = Model::new(&self.config.mistral.default_model);
|
let model = Model::new(&self.config.mistral.default_model);
|
||||||
let max_iterations = self.config.mistral.max_tool_iterations;
|
let max_iterations = self.config.mistral.max_tool_iterations;
|
||||||
|
|
||||||
@@ -199,7 +199,17 @@ impl Responder {
|
|||||||
.await;
|
.await;
|
||||||
|
|
||||||
let result_str = match result {
|
let result_str = match result {
|
||||||
Ok(s) => s,
|
Ok(s) => {
|
||||||
|
let preview: String = s.chars().take(500).collect();
|
||||||
|
info!(
|
||||||
|
tool = tc.function.name.as_str(),
|
||||||
|
id = call_id,
|
||||||
|
result_len = s.len(),
|
||||||
|
result_preview = preview.as_str(),
|
||||||
|
"Tool call result"
|
||||||
|
);
|
||||||
|
s
|
||||||
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
warn!(tool = tc.function.name.as_str(), "Tool failed: {e}");
|
warn!(tool = tc.function.name.as_str(), "Tool failed: {e}");
|
||||||
format!("Error: {e}")
|
format!("Error: {e}")
|
||||||
@@ -261,6 +271,7 @@ impl Responder {
|
|||||||
trigger_body: &str,
|
trigger_body: &str,
|
||||||
trigger_sender: &str,
|
trigger_sender: &str,
|
||||||
room_id: &str,
|
room_id: &str,
|
||||||
|
room_name: &str,
|
||||||
is_dm: bool,
|
is_dm: bool,
|
||||||
is_spontaneous: bool,
|
is_spontaneous: bool,
|
||||||
mistral: &Arc<mistralai_client::v1::client::Client>,
|
mistral: &Arc<mistralai_client::v1::client::Client>,
|
||||||
@@ -268,6 +279,7 @@ impl Responder {
|
|||||||
response_ctx: &ResponseContext,
|
response_ctx: &ResponseContext,
|
||||||
conversation_registry: &ConversationRegistry,
|
conversation_registry: &ConversationRegistry,
|
||||||
image_data_uri: Option<&str>,
|
image_data_uri: Option<&str>,
|
||||||
|
context_hint: Option<String>,
|
||||||
) -> Option<String> {
|
) -> Option<String> {
|
||||||
// Apply response delay
|
// Apply response delay
|
||||||
if !self.config.behavior.instant_responses {
|
if !self.config.behavior.instant_responses {
|
||||||
@@ -287,20 +299,46 @@ impl Responder {
|
|||||||
|
|
||||||
let _ = room.typing_notice(true).await;
|
let _ = room.typing_notice(true).await;
|
||||||
|
|
||||||
// Build the input message (with sender prefix for group rooms)
|
// Pre-response memory query (same as legacy path)
|
||||||
let input_text = if is_dm {
|
let memory_notes = self.load_memory_notes(response_ctx, trigger_body).await;
|
||||||
|
|
||||||
|
// Build the input message with dynamic context header.
|
||||||
|
// Agent instructions are static (set at creation), so per-message context
|
||||||
|
// (timestamps, room, members, memory) is prepended to each user message.
|
||||||
|
let now = chrono::Utc::now();
|
||||||
|
let epoch_ms = now.timestamp_millis();
|
||||||
|
let ts_1h = (now - chrono::Duration::hours(1)).timestamp_millis();
|
||||||
|
let ts_yesterday = (now - chrono::Duration::days(1)).timestamp_millis();
|
||||||
|
let ts_last_week = (now - chrono::Duration::days(7)).timestamp_millis();
|
||||||
|
|
||||||
|
let mut context_header = format!(
|
||||||
|
"[context: date={}, epoch_ms={}, ts_1h_ago={}, ts_yesterday={}, ts_last_week={}, room={}, room_name={}]",
|
||||||
|
now.format("%Y-%m-%d"),
|
||||||
|
epoch_ms,
|
||||||
|
ts_1h,
|
||||||
|
ts_yesterday,
|
||||||
|
ts_last_week,
|
||||||
|
room_id,
|
||||||
|
room_name,
|
||||||
|
);
|
||||||
|
|
||||||
|
if let Some(ref notes) = memory_notes {
|
||||||
|
context_header.push('\n');
|
||||||
|
context_header.push_str(notes);
|
||||||
|
}
|
||||||
|
|
||||||
|
let user_msg = if is_dm {
|
||||||
trigger_body.to_string()
|
trigger_body.to_string()
|
||||||
} else {
|
} else {
|
||||||
format!("<{}> {}", response_ctx.matrix_user_id, trigger_body)
|
format!("<{}> {}", response_ctx.matrix_user_id, trigger_body)
|
||||||
};
|
};
|
||||||
|
|
||||||
// TODO: multimodal via image_data_uri — Conversations API may support
|
let input_text = format!("{context_header}\n{user_msg}");
|
||||||
// content parts in entries. For now, append image description request.
|
|
||||||
let input = ConversationInput::Text(input_text);
|
let input = ConversationInput::Text(input_text);
|
||||||
|
|
||||||
// Send through conversation registry
|
// Send through conversation registry
|
||||||
let response = match conversation_registry
|
let response = match conversation_registry
|
||||||
.send_message(room_id, input, is_dm, mistral)
|
.send_message(room_id, input, is_dm, mistral, context_hint.as_deref())
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
Ok(r) => r,
|
Ok(r) => r,
|
||||||
@@ -346,15 +384,23 @@ impl Responder {
|
|||||||
.await;
|
.await;
|
||||||
|
|
||||||
let result_str = match result {
|
let result_str = match result {
|
||||||
Ok(s) => s,
|
Ok(s) => {
|
||||||
|
let preview: String = s.chars().take(500).collect();
|
||||||
|
info!(
|
||||||
|
tool = fc.name.as_str(),
|
||||||
|
id = call_id,
|
||||||
|
result_len = s.len(),
|
||||||
|
result_preview = preview.as_str(),
|
||||||
|
"Tool call result (conversations)"
|
||||||
|
);
|
||||||
|
s
|
||||||
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
warn!(tool = fc.name.as_str(), "Tool failed: {e}");
|
warn!(tool = fc.name.as_str(), "Tool failed: {e}");
|
||||||
format!("Error: {e}")
|
format!("Error: {e}")
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
result_entries.push(ConversationEntry::FunctionResult(FunctionResultEntry {
|
result_entries.push(ConversationEntry::FunctionResult(FunctionResultEntry {
|
||||||
tool_call_id: call_id.to_string(),
|
tool_call_id: call_id.to_string(),
|
||||||
result: result_str,
|
result: result_str,
|
||||||
|
|||||||
@@ -69,12 +69,15 @@ impl ConversationRegistry {
|
|||||||
|
|
||||||
/// Get or create a conversation for a room. Returns the conversation ID.
|
/// Get or create a conversation for a room. Returns the conversation ID.
|
||||||
/// If a conversation doesn't exist yet, creates one with the first message.
|
/// If a conversation doesn't exist yet, creates one with the first message.
|
||||||
|
/// `context_hint` is prepended to the first message on new conversations,
|
||||||
|
/// giving the agent recent conversation history for continuity after resets.
|
||||||
pub async fn send_message(
|
pub async fn send_message(
|
||||||
&self,
|
&self,
|
||||||
room_id: &str,
|
room_id: &str,
|
||||||
message: ConversationInput,
|
message: ConversationInput,
|
||||||
is_dm: bool,
|
is_dm: bool,
|
||||||
mistral: &MistralClient,
|
mistral: &MistralClient,
|
||||||
|
context_hint: Option<&str>,
|
||||||
) -> Result<ConversationResponse, String> {
|
) -> Result<ConversationResponse, String> {
|
||||||
let mut mapping = self.mapping.lock().await;
|
let mut mapping = self.mapping.lock().await;
|
||||||
|
|
||||||
@@ -107,11 +110,25 @@ impl ConversationRegistry {
|
|||||||
|
|
||||||
Ok(response)
|
Ok(response)
|
||||||
} else {
|
} else {
|
||||||
// New conversation — create
|
// New conversation — create (with optional context hint for continuity)
|
||||||
let agent_id = self.agent_id.lock().await.clone();
|
let agent_id = self.agent_id.lock().await.clone();
|
||||||
|
|
||||||
|
let inputs = if let Some(hint) = context_hint {
|
||||||
|
// Prepend recent conversation history to the first message
|
||||||
|
match message {
|
||||||
|
ConversationInput::Text(text) => {
|
||||||
|
ConversationInput::Text(format!(
|
||||||
|
"[recent conversation for context]\n{hint}\n\n[current message]\n{text}"
|
||||||
|
))
|
||||||
|
}
|
||||||
|
other => other,
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
message
|
||||||
|
};
|
||||||
|
|
||||||
let req = CreateConversationRequest {
|
let req = CreateConversationRequest {
|
||||||
inputs: message,
|
inputs,
|
||||||
model: if agent_id.is_none() {
|
model: if agent_id.is_none() {
|
||||||
Some(self.model.clone())
|
Some(self.model.clone())
|
||||||
} else {
|
} else {
|
||||||
@@ -194,6 +211,16 @@ impl ConversationRegistry {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Reset ALL conversations (e.g., after agent recreation).
|
||||||
|
/// Clears both in-memory mappings and SQLite.
|
||||||
|
pub async fn reset_all(&self) {
|
||||||
|
let mut mapping = self.mapping.lock().await;
|
||||||
|
let count = mapping.len();
|
||||||
|
mapping.clear();
|
||||||
|
self.store.delete_all_conversations();
|
||||||
|
info!(count, "Reset all conversations");
|
||||||
|
}
|
||||||
|
|
||||||
/// Reset a room's conversation (e.g., after compaction).
|
/// Reset a room's conversation (e.g., after compaction).
|
||||||
/// Removes the mapping so the next message creates a fresh conversation.
|
/// Removes the mapping so the next message creates a fresh conversation.
|
||||||
pub async fn reset(&self, room_id: &str) {
|
pub async fn reset(&self, room_id: &str) {
|
||||||
|
|||||||
77
src/main.rs
77
src/main.rs
@@ -8,6 +8,7 @@ mod conversations;
|
|||||||
mod matrix_utils;
|
mod matrix_utils;
|
||||||
mod memory;
|
mod memory;
|
||||||
mod persistence;
|
mod persistence;
|
||||||
|
mod sdk;
|
||||||
mod sync;
|
mod sync;
|
||||||
mod tools;
|
mod tools;
|
||||||
|
|
||||||
@@ -65,6 +66,10 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
let mistral_api_key = std::env::var("SOL_MISTRAL_API_KEY")
|
let mistral_api_key = std::env::var("SOL_MISTRAL_API_KEY")
|
||||||
.map_err(|_| anyhow::anyhow!("SOL_MISTRAL_API_KEY not set"))?;
|
.map_err(|_| anyhow::anyhow!("SOL_MISTRAL_API_KEY not set"))?;
|
||||||
|
|
||||||
|
// Optional Gitea admin credentials for user impersonation
|
||||||
|
let gitea_admin_username = std::env::var("SOL_GITEA_ADMIN_USERNAME").ok();
|
||||||
|
let gitea_admin_password = std::env::var("SOL_GITEA_ADMIN_PASSWORD").ok();
|
||||||
|
|
||||||
let config = Arc::new(config);
|
let config = Arc::new(config);
|
||||||
|
|
||||||
// Initialize Matrix client with E2EE and sqlite store
|
// Initialize Matrix client with E2EE and sqlite store
|
||||||
@@ -131,13 +136,52 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Initialize persistent state database (needed by token store + agent/conversation registries)
|
||||||
|
let (store, state_recovery_failed) = match persistence::Store::open(&config.matrix.db_path) {
|
||||||
|
Ok(s) => (Arc::new(s), false),
|
||||||
|
Err(e) => {
|
||||||
|
error!("Failed to open state database at {}: {e}", config.matrix.db_path);
|
||||||
|
error!("Falling back to in-memory state — conversations will not survive restarts");
|
||||||
|
(Arc::new(persistence::Store::open_memory().expect("in-memory DB must work")), true)
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// Initialize Vault client for secure token storage
|
||||||
|
let vault_client = Arc::new(sdk::vault::VaultClient::new(
|
||||||
|
&config.vault.url,
|
||||||
|
&config.vault.role,
|
||||||
|
&config.vault.mount,
|
||||||
|
));
|
||||||
|
|
||||||
|
// Initialize Gitea client if configured
|
||||||
|
let gitea_client: Option<Arc<sdk::gitea::GiteaClient>> =
|
||||||
|
if let (Some(gitea_config), Some(admin_user), Some(admin_pass)) =
|
||||||
|
(&config.services.gitea, &gitea_admin_username, &gitea_admin_password)
|
||||||
|
{
|
||||||
|
let token_store = Arc::new(sdk::tokens::TokenStore::new(
|
||||||
|
store.clone(),
|
||||||
|
vault_client.clone(),
|
||||||
|
));
|
||||||
|
info!(url = gitea_config.url.as_str(), "Gitea integration enabled");
|
||||||
|
Some(Arc::new(sdk::gitea::GiteaClient::new(
|
||||||
|
gitea_config.url.clone(),
|
||||||
|
admin_user.clone(),
|
||||||
|
admin_pass.clone(),
|
||||||
|
token_store,
|
||||||
|
)))
|
||||||
|
} else {
|
||||||
|
info!("Gitea integration disabled (missing config or credentials)");
|
||||||
|
None
|
||||||
|
};
|
||||||
|
|
||||||
let tool_registry = Arc::new(ToolRegistry::new(
|
let tool_registry = Arc::new(ToolRegistry::new(
|
||||||
os_client.clone(),
|
os_client.clone(),
|
||||||
matrix_client.clone(),
|
matrix_client.clone(),
|
||||||
config.clone(),
|
config.clone(),
|
||||||
|
gitea_client,
|
||||||
));
|
));
|
||||||
let indexer = Arc::new(Indexer::new(os_client.clone(), config.clone()));
|
let indexer = Arc::new(Indexer::new(os_client.clone(), config.clone()));
|
||||||
let evaluator = Arc::new(Evaluator::new(config.clone()));
|
let evaluator = Arc::new(Evaluator::new(config.clone(), system_prompt_text.clone()));
|
||||||
let responder = Arc::new(Responder::new(
|
let responder = Arc::new(Responder::new(
|
||||||
config.clone(),
|
config.clone(),
|
||||||
personality,
|
personality,
|
||||||
@@ -148,22 +192,12 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
// Start background flush task
|
// Start background flush task
|
||||||
let _flush_handle = indexer.start_flush_task();
|
let _flush_handle = indexer.start_flush_task();
|
||||||
|
|
||||||
// Initialize persistent state database
|
|
||||||
let (store, state_recovery_failed) = match persistence::Store::open(&config.matrix.db_path) {
|
|
||||||
Ok(s) => (Arc::new(s), false),
|
|
||||||
Err(e) => {
|
|
||||||
error!("Failed to open state database at {}: {e}", config.matrix.db_path);
|
|
||||||
error!("Falling back to in-memory state — conversations will not survive restarts");
|
|
||||||
(Arc::new(persistence::Store::open_memory().expect("in-memory DB must work")), true)
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
// Initialize agent registry and conversation registry (with SQLite backing)
|
// Initialize agent registry and conversation registry (with SQLite backing)
|
||||||
let agent_registry = Arc::new(AgentRegistry::new(store.clone()));
|
let agent_registry = Arc::new(AgentRegistry::new(store.clone()));
|
||||||
let conversation_registry = Arc::new(ConversationRegistry::new(
|
let conversation_registry = Arc::new(ConversationRegistry::new(
|
||||||
config.mistral.default_model.clone(),
|
config.mistral.default_model.clone(),
|
||||||
config.agents.compaction_threshold,
|
config.agents.compaction_threshold,
|
||||||
store,
|
store.clone(),
|
||||||
));
|
));
|
||||||
|
|
||||||
// Build shared state
|
// Build shared state
|
||||||
@@ -182,9 +216,10 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
});
|
});
|
||||||
|
|
||||||
// Initialize orchestrator agent if conversations API is enabled
|
// Initialize orchestrator agent if conversations API is enabled
|
||||||
|
let mut agent_recreated = false;
|
||||||
if config.agents.use_conversations_api {
|
if config.agents.use_conversations_api {
|
||||||
info!("Conversations API enabled — ensuring orchestrator agent exists");
|
info!("Conversations API enabled — ensuring orchestrator agent exists");
|
||||||
let agent_tools = tools::ToolRegistry::agent_tool_definitions();
|
let agent_tools = tools::ToolRegistry::agent_tool_definitions(config.services.gitea.is_some());
|
||||||
match state
|
match state
|
||||||
.agent_registry
|
.agent_registry
|
||||||
.ensure_orchestrator(
|
.ensure_orchestrator(
|
||||||
@@ -192,12 +227,20 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
&config.agents.orchestrator_model,
|
&config.agents.orchestrator_model,
|
||||||
agent_tools,
|
agent_tools,
|
||||||
&state.mistral,
|
&state.mistral,
|
||||||
|
&[], // no domain agents yet — delegation section added when they are
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
Ok(agent_id) => {
|
Ok((agent_id, recreated)) => {
|
||||||
info!(agent_id = agent_id.as_str(), "Orchestrator agent ready");
|
info!(agent_id = agent_id.as_str(), recreated, "Orchestrator agent ready");
|
||||||
state.conversation_registry.set_agent_id(agent_id).await;
|
state.conversation_registry.set_agent_id(agent_id).await;
|
||||||
|
|
||||||
|
if recreated {
|
||||||
|
// Agent was recreated (system prompt changed) — old conversations
|
||||||
|
// are bound to the stale agent and won't work. Reset everything.
|
||||||
|
state.conversation_registry.reset_all().await;
|
||||||
|
agent_recreated = true;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
error!("Failed to create orchestrator agent: {e}");
|
error!("Failed to create orchestrator agent: {e}");
|
||||||
@@ -221,8 +264,8 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
// If state recovery failed, sneeze into all rooms to signal the hiccup
|
// If state recovery failed or agent was recreated, sneeze into all rooms
|
||||||
if state_recovery_failed {
|
if state_recovery_failed || agent_recreated {
|
||||||
info!("State recovery failed — sneezing into all rooms");
|
info!("State recovery failed — sneezing into all rooms");
|
||||||
for room in matrix_client.joined_rooms() {
|
for room in matrix_client.joined_rooms() {
|
||||||
let content = ruma::events::room::message::RoomMessageEventContent::text_plain(
|
let content = ruma::events::room::message::RoomMessageEventContent::text_plain(
|
||||||
|
|||||||
25
src/sync.rs
25
src/sync.rs
@@ -271,6 +271,29 @@ async fn handle_message(
|
|||||||
let members = matrix_utils::room_member_names(&room).await;
|
let members = matrix_utils::room_member_names(&room).await;
|
||||||
let display_sender = sender_name.as_deref().unwrap_or(&sender);
|
let display_sender = sender_name.as_deref().unwrap_or(&sender);
|
||||||
|
|
||||||
|
// Build context hint for new conversations (last 50 messages for continuity)
|
||||||
|
let context_hint = if state.config.agents.use_conversations_api {
|
||||||
|
let conv_exists = state
|
||||||
|
.conversation_registry
|
||||||
|
.get_conversation_id(&room_id)
|
||||||
|
.await
|
||||||
|
.is_some();
|
||||||
|
if !conv_exists && !context.is_empty() {
|
||||||
|
let hint_messages: Vec<String> = context
|
||||||
|
.iter()
|
||||||
|
.rev()
|
||||||
|
.take(50)
|
||||||
|
.rev()
|
||||||
|
.map(|m| format!("{}: {}", m.sender, m.content))
|
||||||
|
.collect();
|
||||||
|
Some(hint_messages.join("\n"))
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
};
|
||||||
|
|
||||||
let response = if state.config.agents.use_conversations_api {
|
let response = if state.config.agents.use_conversations_api {
|
||||||
state
|
state
|
||||||
.responder
|
.responder
|
||||||
@@ -278,6 +301,7 @@ async fn handle_message(
|
|||||||
&body,
|
&body,
|
||||||
display_sender,
|
display_sender,
|
||||||
&room_id,
|
&room_id,
|
||||||
|
&room_name,
|
||||||
is_dm,
|
is_dm,
|
||||||
is_spontaneous,
|
is_spontaneous,
|
||||||
&state.mistral,
|
&state.mistral,
|
||||||
@@ -285,6 +309,7 @@ async fn handle_message(
|
|||||||
&response_ctx,
|
&response_ctx,
|
||||||
&state.conversation_registry,
|
&state.conversation_registry,
|
||||||
image_data_uri.as_deref(),
|
image_data_uri.as_deref(),
|
||||||
|
context_hint,
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
} else {
|
} else {
|
||||||
|
|||||||
Reference in New Issue
Block a user