feat(grpc): dev mode, agent prefix, system prompt, error UX

- gRPC dev_mode config: disables JWT auth, uses fixed dev identity
- Agent prefix (agents.agent_prefix): dev agents use "dev-sol-orchestrator"
  to avoid colliding with production on shared Mistral accounts
- Coding sessions use instructions (system prompt + coding addendum)
  with mistral-medium-latest for personality adherence
- Conversations API: don't send both model + agent_id (422 fix)
- GrpcState carries system_prompt + orchestrator_agent_id
- Session.end() keeps session active for reuse (not "ended")
- User messages posted as m.notice, assistant as m.text (role detection)
- History loaded from Matrix room on session resume
- Docker Compose local dev stack: OpenSearch 3 + Tuwunel + SearXNG
- Dev config: localhost URLs, dev_mode, opensearch-init.sh for ML setup
This commit is contained in:
2026-03-23 17:07:50 +00:00
parent 71392cef9c
commit b8b76687a5
18 changed files with 1035 additions and 65 deletions

View File

@@ -3,10 +3,18 @@ use mistralai_client::v1::agents::{AgentTool, CompletionArgs, CreateAgentRequest
/// Domain agent definitions — each scoped to a subset of sunbeam-sdk tools.
/// These are created on startup via the Agents API and cached by the registry.
pub const ORCHESTRATOR_NAME: &str = "sol-orchestrator";
pub const ORCHESTRATOR_BASE_NAME: &str = "sol-orchestrator";
pub const ORCHESTRATOR_DESCRIPTION: &str =
"Sol — virtual librarian for Sunbeam Studios. Routes to domain agents or responds directly.";
pub fn orchestrator_name(prefix: &str) -> String {
if prefix.is_empty() {
ORCHESTRATOR_BASE_NAME.to_string()
} else {
format!("{prefix}-{ORCHESTRATOR_BASE_NAME}")
}
}
/// Build the orchestrator agent instructions.
/// The orchestrator carries Sol's personality. If domain agents are available,
/// a delegation section is appended describing them.
@@ -61,12 +69,13 @@ pub fn orchestrator_request(
model: &str,
tools: Vec<AgentTool>,
active_agents: &[(&str, &str)],
name: &str,
) -> CreateAgentRequest {
let instructions = orchestrator_instructions(system_prompt, active_agents);
CreateAgentRequest {
model: model.to_string(),
name: ORCHESTRATOR_NAME.to_string(),
name: name.to_string(),
description: Some(ORCHESTRATOR_DESCRIPTION.to_string()),
instructions: Some(instructions),
tools: if tools.is_empty() { None } else { Some(tools) },

View File

@@ -51,57 +51,55 @@ impl AgentRegistry {
tools: Vec<mistralai_client::v1::agents::AgentTool>,
mistral: &MistralClient,
active_agents: &[(&str, &str)],
agent_prefix: &str,
) -> Result<(String, bool), String> {
let agent_name = definitions::orchestrator_name(agent_prefix);
let mut agents = self.agents.lock().await;
let current_instructions = definitions::orchestrator_instructions(system_prompt, active_agents);
let current_hash = instructions_hash(&current_instructions);
// Check in-memory cache
if let Some(agent) = agents.get(definitions::ORCHESTRATOR_NAME) {
if let Some(agent) = agents.get(&agent_name) {
return Ok((agent.id.clone(), false));
}
// Check SQLite for persisted agent ID
if let Some((agent_id, stored_hash)) = self.store.get_agent(definitions::ORCHESTRATOR_NAME) {
if let Some((agent_id, stored_hash)) = self.store.get_agent(&agent_name) {
if stored_hash == current_hash {
// Instructions haven't changed — verify agent still exists on server
match mistral.get_agent_async(&agent_id).await {
Ok(agent) => {
info!(agent_id = agent.id.as_str(), "Restored orchestrator agent from database");
agents.insert(definitions::ORCHESTRATOR_NAME.to_string(), agent);
agents.insert(agent_name.clone(), agent);
return Ok((agent_id, false));
}
Err(_) => {
warn!("Persisted orchestrator agent {agent_id} no longer exists on server");
self.store.delete_agent(definitions::ORCHESTRATOR_NAME);
self.store.delete_agent(&agent_name);
}
}
} else {
// Instructions changed — delete old agent, will create new below
info!(
old_hash = stored_hash.as_str(),
new_hash = current_hash.as_str(),
"System prompt changed — recreating orchestrator agent"
);
// Try to delete old agent from Mistral (best-effort)
if let Err(e) = mistral.delete_agent_async(&agent_id).await {
warn!("Failed to delete old orchestrator agent: {}", e.message);
}
self.store.delete_agent(definitions::ORCHESTRATOR_NAME);
self.store.delete_agent(&agent_name);
}
}
// Check if it exists on the server by name (but skip reuse if hash changed)
let existing = self.find_by_name(definitions::ORCHESTRATOR_NAME, mistral).await;
// Check if it exists on the server by name
let existing = self.find_by_name(&agent_name, mistral).await;
if let Some(agent) = existing {
// Delete it — we need a fresh one with current instructions
info!(agent_id = agent.id.as_str(), "Deleting stale orchestrator agent from server");
let _ = mistral.delete_agent_async(&agent.id).await;
}
// Create new
let req = definitions::orchestrator_request(system_prompt, model, tools, active_agents);
let req = definitions::orchestrator_request(system_prompt, model, tools, active_agents, &agent_name);
let agent = mistral
.create_agent_async(&req)
.await
@@ -109,8 +107,8 @@ impl AgentRegistry {
let id = agent.id.clone();
info!(agent_id = id.as_str(), "Created orchestrator agent");
self.store.upsert_agent(definitions::ORCHESTRATOR_NAME, &id, model, &current_hash);
agents.insert(definitions::ORCHESTRATOR_NAME.to_string(), agent);
self.store.upsert_agent(&agent_name, &id, model, &current_hash);
agents.insert(agent_name, agent);
Ok((id, true))
}

View File

@@ -45,6 +45,9 @@ pub struct AgentsConfig {
/// Model for coding agent sessions (sunbeam code).
#[serde(default = "default_coding_model")]
pub coding_model: String,
/// Agent name prefix — set to "dev" in local dev to avoid colliding with production agents.
#[serde(default)]
pub agent_prefix: String,
}
impl Default for AgentsConfig {
@@ -59,6 +62,7 @@ impl Default for AgentsConfig {
research_max_agents: default_research_max_agents(),
research_max_depth: default_research_max_depth(),
coding_model: default_coding_model(),
agent_prefix: String::new(),
}
}
}
@@ -239,16 +243,19 @@ fn default_research_agent_model() -> String { "ministral-3b-latest".into() }
fn default_research_max_iterations() -> usize { 10 }
fn default_research_max_agents() -> usize { 25 }
fn default_research_max_depth() -> usize { 4 }
fn default_coding_model() -> String { "devstral-small-2506".into() }
fn default_coding_model() -> String { "mistral-medium-latest".into() }
#[derive(Debug, Clone, Deserialize)]
pub struct GrpcConfig {
/// Address to listen on (default: 0.0.0.0:50051).
#[serde(default = "default_grpc_addr")]
pub listen_addr: String,
/// JWKS URL for JWT validation (default: Hydra's .well-known endpoint).
/// JWKS URL for JWT validation. Required unless dev_mode is true.
#[serde(default)]
pub jwks_url: Option<String>,
/// Dev mode: disables JWT auth, uses a fixed dev identity.
#[serde(default)]
pub dev_mode: bool,
}
fn default_grpc_addr() -> String { "0.0.0.0:50051".into() }

View File

@@ -67,6 +67,11 @@ impl ConversationRegistry {
*id = Some(agent_id);
}
/// Get the current orchestrator agent ID, if set.
pub async fn get_agent_id(&self) -> Option<String> {
self.agent_id.lock().await.clone()
}
/// Get or create a conversation for a room. Returns the conversation ID.
/// If a conversation doesn't exist yet, creates one with the first message.
/// `context_hint` is prepended to the first message on new conversations,

View File

@@ -25,6 +25,8 @@ pub struct GrpcState {
pub store: Arc<Store>,
pub mistral: Arc<mistralai_client::v1::client::Client>,
pub matrix: matrix_sdk::Client,
pub system_prompt: String,
pub orchestrator_agent_id: String,
}
/// Start the gRPC server. Call from main.rs alongside the Matrix sync loop.
@@ -38,28 +40,31 @@ pub async fn start_server(state: Arc<GrpcState>) -> anyhow::Result<()> {
let addr = addr.parse()?;
let jwks_url = state
.config
.grpc
.as_ref()
.and_then(|g| g.jwks_url.clone())
.unwrap_or_else(|| {
"http://hydra-public.ory.svc.cluster.local:4444/.well-known/jwks.json".into()
});
// Initialize JWT validator (fetches JWKS from Hydra)
let jwt_validator = Arc::new(auth::JwtValidator::new(&jwks_url).await?);
let interceptor = auth::JwtInterceptor::new(jwt_validator);
let grpc_cfg = state.config.grpc.as_ref();
let dev_mode = grpc_cfg.map(|g| g.dev_mode).unwrap_or(false);
let jwks_url = grpc_cfg.and_then(|g| g.jwks_url.clone());
let svc = service::CodeAgentService::new(state);
let svc = CodeAgentServer::with_interceptor(svc, interceptor);
info!(%addr, "Starting gRPC server");
let mut builder = Server::builder();
Server::builder()
.add_service(svc)
.serve(addr)
.await?;
if dev_mode {
info!(%addr, "Starting gRPC server (dev mode — no auth)");
builder
.add_service(CodeAgentServer::new(svc))
.serve(addr)
.await?;
} else if let Some(ref url) = jwks_url {
info!(%addr, jwks_url = %url, "Starting gRPC server with JWT auth");
let jwt_validator = Arc::new(auth::JwtValidator::new(url).await?);
let interceptor = auth::JwtInterceptor::new(jwt_validator);
builder
.add_service(CodeAgentServer::with_interceptor(svc, interceptor))
.serve(addr)
.await?;
} else {
anyhow::bail!("gRPC requires either dev_mode = true or a jwks_url for JWT auth");
};
Ok(())
}

View File

@@ -31,10 +31,25 @@ impl CodeAgent for CodeAgentService {
&self,
request: Request<Streaming<ClientMessage>>,
) -> Result<Response<Self::SessionStream>, Status> {
let dev_mode = self
.state
.config
.grpc
.as_ref()
.map(|g| g.dev_mode)
.unwrap_or(false);
let claims = request
.extensions()
.get::<Claims>()
.cloned()
.or_else(|| {
dev_mode.then(|| Claims {
sub: "dev".into(),
email: Some("dev@sunbeam.local".into()),
exp: 0,
})
})
.ok_or_else(|| Status::unauthenticated("No valid authentication token"))?;
info!(
@@ -68,7 +83,7 @@ impl CodeAgent for CodeAgentService {
}
async fn run_session(
state: &GrpcState,
state: &Arc<GrpcState>,
claims: &Claims,
in_stream: &mut Streaming<ClientMessage>,
tx: &mpsc::Sender<Result<ServerMessage, Status>>,
@@ -85,18 +100,15 @@ async fn run_session(
};
// Create or resume session
let mut session = CodeSession::start(
Arc::new(GrpcState {
config: state.config.clone(),
tools: state.tools.clone(),
store: state.store.clone(),
mistral: state.mistral.clone(),
matrix: state.matrix.clone(),
}),
claims,
&start,
)
.await?;
let mut session = CodeSession::start(state.clone(), claims, &start).await?;
// Fetch history if resuming
let resumed = session.resumed();
let history = if resumed {
session.fetch_history(50).await
} else {
Vec::new()
};
// Send SessionReady
tx.send(Ok(ServerMessage {
@@ -104,6 +116,8 @@ async fn run_session(
session_id: session.session_id.clone(),
room_id: session.room_id.clone(),
model: session.model.clone(),
resumed,
history,
})),
}))
.await?;

View File

@@ -132,6 +132,98 @@ impl CodeSession {
})
}
/// Whether this session was resumed from a prior connection.
pub fn resumed(&self) -> bool {
self.conversation_id.is_some()
}
/// Fetch recent messages from the Matrix room for history display.
pub async fn fetch_history(&self, limit: usize) -> Vec<HistoryEntry> {
use matrix_sdk::room::MessagesOptions;
use matrix_sdk::ruma::events::AnySyncTimelineEvent;
use matrix_sdk::ruma::uint;
let Some(ref room) = self.room else {
return Vec::new();
};
let mut options = MessagesOptions::backward();
options.limit = uint!(50);
let messages = match room.messages(options).await {
Ok(m) => m,
Err(e) => {
warn!("Failed to fetch room history: {e}");
return Vec::new();
}
};
let sol_user = &self.state.config.matrix.user_id;
let mut entries = Vec::new();
// Messages come newest-first (backward), collect then reverse
for event in &messages.chunk {
let Ok(deserialized) = event.raw().deserialize() else {
continue;
};
if let AnySyncTimelineEvent::MessageLike(
matrix_sdk::ruma::events::AnySyncMessageLikeEvent::RoomMessage(msg),
) = deserialized
{
let original = match msg {
matrix_sdk::ruma::events::SyncMessageLikeEvent::Original(ref o) => o,
_ => continue,
};
use matrix_sdk::ruma::events::room::message::MessageType;
let (body, role) = match &original.content.msgtype {
MessageType::Text(t) => (t.body.clone(), "assistant"),
MessageType::Notice(t) => (t.body.clone(), "user"),
_ => continue,
};
entries.push(HistoryEntry {
role: role.into(),
content: body,
});
if entries.len() >= limit {
break;
}
}
}
entries.reverse(); // oldest first
entries
}
/// Build conversation instructions: Sol's personality + coding mode context.
fn build_instructions(&self) -> String {
let base = &self.state.system_prompt;
let coding_addendum = format!(
r#"
## coding mode
you are in a `sunbeam code` terminal session with a developer. you have direct access to their local filesystem through tools: file_read, file_write, search_replace, grep, bash, list_directory.
you also have access to server-side tools: search_archive, search_web, research, run_script, and gitea tools.
### how to work
- read before you edit. understand existing code before suggesting changes.
- use search_replace for targeted patches, file_write only for new files or complete rewrites.
- run tests after changes. use bash for builds, tests, git operations.
- keep changes minimal and focused. don't refactor what wasn't asked for.
- when uncertain, ask — you have an ask_user tool for that.
### project: {}
"#,
self.project_name
);
format!("{base}{coding_addendum}")
}
/// Build the per-message context header for coding mode.
fn build_context_header(&self) -> String {
let tc = TimeContext::now();
@@ -161,16 +253,16 @@ impl CodeSession {
let context_header = self.build_context_header();
let input_text = format!("{context_header}\n{text}");
// Post to Matrix room
// Post user message to Matrix room (as m.notice to distinguish from assistant)
if let Some(ref room) = self.room {
let content = RoomMessageEventContent::text_plain(text);
let content = RoomMessageEventContent::notice_plain(text);
let _ = room.send(content).await;
}
// Send status
let _ = client_tx.send(Ok(ServerMessage {
payload: Some(server_message::Payload::Status(Status {
message: "thinking...".into(),
message: "generating…".into(),
kind: StatusKind::Thinking.into(),
})),
})).await;
@@ -190,6 +282,7 @@ impl CodeSession {
.await
.map_err(|e| anyhow::anyhow!("append_conversation failed: {}", e.message))?
} else {
let instructions = self.build_instructions();
let req = CreateConversationRequest {
inputs: ConversationInput::Text(input_text),
model: Some(self.model.clone()),
@@ -197,7 +290,7 @@ impl CodeSession {
agent_version: None,
name: Some(format!("code-{}", self.project_name)),
description: None,
instructions: None,
instructions: Some(instructions),
completion_args: None,
tools: Some(self.build_tool_definitions()),
handoff_execution: None,
@@ -387,10 +480,10 @@ impl CodeSession {
tools
}
/// End the session.
/// Disconnect from the session (keeps it active for future reconnection).
pub fn end(&self) {
self.state.store.end_code_session(&self.session_id);
info!(session_id = self.session_id.as_str(), "Code session ended");
self.state.store.touch_code_session(&self.session_id);
info!(session_id = self.session_id.as_str(), "Code session disconnected (stays active for reuse)");
}
}

View File

@@ -256,6 +256,7 @@ async fn main() -> anyhow::Result<()> {
agent_tools,
&state.mistral,
&active_agents,
&config.agents.agent_prefix,
)
.await
{
@@ -295,12 +296,16 @@ async fn main() -> anyhow::Result<()> {
// Start gRPC server if configured
if config.grpc.is_some() {
let orchestrator_id = state.conversation_registry.get_agent_id().await
.unwrap_or_default();
let grpc_state = std::sync::Arc::new(grpc::GrpcState {
config: config.clone(),
tools: state.responder.tools(),
store: store.clone(),
mistral: state.mistral.clone(),
matrix: matrix_client.clone(),
system_prompt: system_prompt_text.clone(),
orchestrator_agent_id: orchestrator_id,
});
tokio::spawn(async move {
if let Err(e) = grpc::start_server(grpc_state).await {