Files
wfe/wfe-server/src/config.rs
Sienna Meridian Satterwhite cbbeaf6d67 feat(wfe-server): headless workflow server with gRPC, webhooks, and OIDC auth
Single-binary server exposing the WFE engine over gRPC (13 RPCs) with
HTTP webhook support (GitHub, Gitea, generic events).

Features:
- gRPC API: workflow CRUD, lifecycle event streaming, log streaming,
  log search via OpenSearch
- HTTP webhooks: HMAC-SHA256 verified GitHub/Gitea webhooks with
  configurable triggers that auto-start workflows
- OIDC/JWT auth: discovers JWKS from issuer, validates with asymmetric
  algorithm allowlist to prevent algorithm confusion attacks
- Static bearer token auth with constant-time comparison
- Lifecycle event broadcasting via tokio::broadcast
- Log streaming: real-time stdout/stderr via LogSink trait, history
  replay, follow mode
- Log search: full-text search via OpenSearch with workflow/step/stream
  filters
- Layered config: CLI flags > env vars > TOML file
- Fail-closed on OIDC discovery failure, fail-loud on config parse errors
- 2MB webhook payload size limit
- Blocked sensitive env var injection (PATH, LD_PRELOAD, etc.)
2026-04-01 14:37:25 +01:00

364 lines
11 KiB
Rust

use std::collections::HashMap;
use std::net::SocketAddr;
use std::path::PathBuf;
use clap::Parser;
use serde::Deserialize;
/// WFE workflow server.
#[derive(Parser, Debug)]
#[command(name = "wfe-server", version, about)]
pub struct Cli {
/// Config file path.
#[arg(short, long, default_value = "wfe-server.toml")]
pub config: PathBuf,
/// gRPC listen address.
#[arg(long, env = "WFE_GRPC_ADDR")]
pub grpc_addr: Option<SocketAddr>,
/// HTTP listen address (webhooks).
#[arg(long, env = "WFE_HTTP_ADDR")]
pub http_addr: Option<SocketAddr>,
/// Persistence backend: sqlite or postgres.
#[arg(long, env = "WFE_PERSISTENCE")]
pub persistence: Option<String>,
/// Database URL or path.
#[arg(long, env = "WFE_DB_URL")]
pub db_url: Option<String>,
/// Queue backend: memory or valkey.
#[arg(long, env = "WFE_QUEUE")]
pub queue: Option<String>,
/// Queue URL (for valkey).
#[arg(long, env = "WFE_QUEUE_URL")]
pub queue_url: Option<String>,
/// OpenSearch URL (enables log + workflow search).
#[arg(long, env = "WFE_SEARCH_URL")]
pub search_url: Option<String>,
/// Directory to auto-load YAML workflow definitions from.
#[arg(long, env = "WFE_WORKFLOWS_DIR")]
pub workflows_dir: Option<PathBuf>,
/// Comma-separated bearer tokens for API auth.
#[arg(long, env = "WFE_AUTH_TOKENS")]
pub auth_tokens: Option<String>,
}
/// Server configuration (deserialized from TOML).
#[derive(Debug, Deserialize, Clone)]
#[serde(default)]
pub struct ServerConfig {
pub grpc_addr: SocketAddr,
pub http_addr: SocketAddr,
pub persistence: PersistenceConfig,
pub queue: QueueConfig,
pub search: Option<SearchConfig>,
pub auth: AuthConfig,
pub webhook: WebhookConfig,
pub workflows_dir: Option<PathBuf>,
}
impl Default for ServerConfig {
fn default() -> Self {
Self {
grpc_addr: "0.0.0.0:50051".parse().unwrap(),
http_addr: "0.0.0.0:8080".parse().unwrap(),
persistence: PersistenceConfig::default(),
queue: QueueConfig::default(),
search: None,
auth: AuthConfig::default(),
webhook: WebhookConfig::default(),
workflows_dir: None,
}
}
}
#[derive(Debug, Deserialize, Clone)]
#[serde(tag = "backend")]
pub enum PersistenceConfig {
#[serde(rename = "sqlite")]
Sqlite { path: String },
#[serde(rename = "postgres")]
Postgres { url: String },
}
impl Default for PersistenceConfig {
fn default() -> Self {
Self::Sqlite {
path: "wfe.db".to_string(),
}
}
}
#[derive(Debug, Deserialize, Clone)]
#[serde(tag = "backend")]
pub enum QueueConfig {
#[serde(rename = "memory")]
InMemory,
#[serde(rename = "valkey")]
Valkey { url: String },
}
impl Default for QueueConfig {
fn default() -> Self {
Self::InMemory
}
}
#[derive(Debug, Deserialize, Clone)]
pub struct SearchConfig {
pub url: String,
}
#[derive(Debug, Deserialize, Clone, Default)]
pub struct AuthConfig {
/// Static bearer tokens (simple auth, no OIDC needed).
#[serde(default)]
pub tokens: Vec<String>,
/// OIDC issuer URL (e.g., https://auth.example.com/realms/myapp).
/// Enables JWT validation via OIDC discovery + JWKS.
#[serde(default)]
pub oidc_issuer: Option<String>,
/// Expected JWT audience claim.
#[serde(default)]
pub oidc_audience: Option<String>,
/// Webhook HMAC secrets per source.
#[serde(default)]
pub webhook_secrets: HashMap<String, String>,
}
#[derive(Debug, Deserialize, Clone, Default)]
pub struct WebhookConfig {
#[serde(default)]
pub triggers: Vec<WebhookTrigger>,
}
#[derive(Debug, Deserialize, Clone)]
pub struct WebhookTrigger {
pub source: String,
pub event: String,
#[serde(default)]
pub match_ref: Option<String>,
pub workflow_id: String,
pub version: u32,
#[serde(default)]
pub data_mapping: HashMap<String, String>,
}
/// Load configuration with layered overrides: CLI > env > file.
pub fn load(cli: &Cli) -> ServerConfig {
let mut config = if cli.config.exists() {
let content = std::fs::read_to_string(&cli.config)
.unwrap_or_else(|e| panic!("failed to read config file {}: {e}", cli.config.display()));
toml::from_str(&content)
.unwrap_or_else(|e| panic!("failed to parse config file {}: {e}", cli.config.display()))
} else {
ServerConfig::default()
};
if let Some(addr) = cli.grpc_addr {
config.grpc_addr = addr;
}
if let Some(addr) = cli.http_addr {
config.http_addr = addr;
}
if let Some(ref dir) = cli.workflows_dir {
config.workflows_dir = Some(dir.clone());
}
// Persistence override.
if let Some(ref backend) = cli.persistence {
let url = cli
.db_url
.clone()
.unwrap_or_else(|| "wfe.db".to_string());
config.persistence = match backend.as_str() {
"postgres" => PersistenceConfig::Postgres { url },
_ => PersistenceConfig::Sqlite { path: url },
};
} else if let Some(ref url) = cli.db_url {
// Infer backend from URL.
if url.starts_with("postgres") {
config.persistence = PersistenceConfig::Postgres { url: url.clone() };
} else {
config.persistence = PersistenceConfig::Sqlite { path: url.clone() };
}
}
// Queue override.
if let Some(ref backend) = cli.queue {
config.queue = match backend.as_str() {
"valkey" | "redis" => {
let url = cli
.queue_url
.clone()
.unwrap_or_else(|| "redis://127.0.0.1:6379".to_string());
QueueConfig::Valkey { url }
}
_ => QueueConfig::InMemory,
};
}
// Search override.
if let Some(ref url) = cli.search_url {
config.search = Some(SearchConfig { url: url.clone() });
}
// Auth tokens override.
if let Some(ref tokens) = cli.auth_tokens {
config.auth.tokens = tokens
.split(',')
.map(|t| t.trim().to_string())
.filter(|t| !t.is_empty())
.collect();
}
config
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn default_config() {
let config = ServerConfig::default();
assert_eq!(config.grpc_addr, "0.0.0.0:50051".parse().unwrap());
assert_eq!(config.http_addr, "0.0.0.0:8080".parse().unwrap());
assert!(matches!(config.persistence, PersistenceConfig::Sqlite { .. }));
assert!(matches!(config.queue, QueueConfig::InMemory));
assert!(config.search.is_none());
assert!(config.auth.tokens.is_empty());
assert!(config.webhook.triggers.is_empty());
}
#[test]
fn parse_toml_config() {
let toml = r#"
grpc_addr = "127.0.0.1:9090"
http_addr = "127.0.0.1:8081"
[persistence]
backend = "postgres"
url = "postgres://localhost/wfe"
[queue]
backend = "valkey"
url = "redis://localhost:6379"
[search]
url = "http://localhost:9200"
[auth]
tokens = ["token1", "token2"]
[auth.webhook_secrets]
github = "mysecret"
[[webhook.triggers]]
source = "github"
event = "push"
match_ref = "refs/heads/main"
workflow_id = "ci"
version = 1
"#;
let config: ServerConfig = toml::from_str(toml).unwrap();
assert_eq!(config.grpc_addr, "127.0.0.1:9090".parse().unwrap());
assert!(matches!(config.persistence, PersistenceConfig::Postgres { .. }));
assert!(matches!(config.queue, QueueConfig::Valkey { .. }));
assert!(config.search.is_some());
assert_eq!(config.auth.tokens.len(), 2);
assert_eq!(config.auth.webhook_secrets.get("github").unwrap(), "mysecret");
assert_eq!(config.webhook.triggers.len(), 1);
assert_eq!(config.webhook.triggers[0].workflow_id, "ci");
}
#[test]
fn cli_overrides_file() {
let cli = Cli {
config: PathBuf::from("/nonexistent"),
grpc_addr: Some("127.0.0.1:9999".parse().unwrap()),
http_addr: None,
persistence: Some("postgres".to_string()),
db_url: Some("postgres://db/wfe".to_string()),
queue: Some("valkey".to_string()),
queue_url: Some("redis://valkey:6379".to_string()),
search_url: Some("http://os:9200".to_string()),
workflows_dir: Some(PathBuf::from("/workflows")),
auth_tokens: Some("tok1, tok2".to_string()),
};
let config = load(&cli);
assert_eq!(config.grpc_addr, "127.0.0.1:9999".parse().unwrap());
assert!(matches!(config.persistence, PersistenceConfig::Postgres { ref url } if url == "postgres://db/wfe"));
assert!(matches!(config.queue, QueueConfig::Valkey { ref url } if url == "redis://valkey:6379"));
assert_eq!(config.search.unwrap().url, "http://os:9200");
assert_eq!(config.workflows_dir.unwrap(), PathBuf::from("/workflows"));
assert_eq!(config.auth.tokens, vec!["tok1", "tok2"]);
}
#[test]
fn infer_postgres_from_url() {
let cli = Cli {
config: PathBuf::from("/nonexistent"),
grpc_addr: None,
http_addr: None,
persistence: None,
db_url: Some("postgres://localhost/wfe".to_string()),
queue: None,
queue_url: None,
search_url: None,
workflows_dir: None,
auth_tokens: None,
};
let config = load(&cli);
assert!(matches!(config.persistence, PersistenceConfig::Postgres { .. }));
}
// ── Security regression tests ──
#[test]
#[should_panic(expected = "failed to parse config file")]
fn security_malformed_config_panics() {
// HIGH-19: Malformed config must NOT silently fall back to defaults.
let tmp = tempfile::NamedTempFile::new().unwrap();
std::fs::write(tmp.path(), "this is not { valid toml @@@@").unwrap();
let cli = Cli {
config: tmp.path().to_path_buf(),
grpc_addr: None,
http_addr: None,
persistence: None,
db_url: None,
queue: None,
queue_url: None,
search_url: None,
workflows_dir: None,
auth_tokens: None,
};
load(&cli);
}
#[test]
fn trigger_data_mapping() {
let toml = r#"
[[triggers]]
source = "github"
event = "push"
workflow_id = "ci"
version = 1
[triggers.data_mapping]
repo = "$.repository.full_name"
commit = "$.head_commit.id"
"#;
let config: WebhookConfig = toml::from_str(toml).unwrap();
assert_eq!(config.triggers[0].data_mapping.len(), 2);
assert_eq!(config.triggers[0].data_mapping["repo"], "$.repository.full_name");
}
}