diff --git a/Dockerfile.ci b/Dockerfile.ci new file mode 100644 index 0000000..c359c27 --- /dev/null +++ b/Dockerfile.ci @@ -0,0 +1,55 @@ +# wfe-ci: Prebuilt image for running wfe CI workflows in Kubernetes. +# +# Contains: +# - Rust stable toolchain +# - cargo-nextest, cargo-llvm-cov +# - sccache (configured via env vars from Vault) +# - buildkit client (buildctl) for in-cluster buildkitd +# - tea CLI for Gitea release management +# - git, curl, kubectl +# +# Usage in workflows: type: kubernetes, image: src.sunbeam.pt/studio/wfe-ci:latest + +FROM rust:bookworm + +# System packages +RUN apt-get update && apt-get install -y --no-install-recommends \ + ca-certificates \ + curl \ + git \ + jq \ + libssl-dev \ + pkg-config \ + protobuf-compiler \ + unzip \ + xz-utils \ + && rm -rf /var/lib/apt/lists/* + +# Cargo tools +RUN cargo install --locked cargo-nextest cargo-llvm-cov sccache && \ + rm -rf /usr/local/cargo/registry + +# Buildkit client (buildctl) +ARG BUILDKIT_VERSION=v0.28.0 +RUN curl -fsSL "https://github.com/moby/buildkit/releases/download/${BUILDKIT_VERSION}/buildkit-${BUILDKIT_VERSION}.linux-amd64.tar.gz" \ + | tar -xz -C /usr/local --strip-components=1 bin/buildctl + +# kubectl +RUN curl -fsSL "https://dl.k8s.io/release/$(curl -fsSL https://dl.k8s.io/release/stable.txt)/bin/linux/amd64/kubectl" \ + -o /usr/local/bin/kubectl && chmod +x /usr/local/bin/kubectl + +# tea CLI for Gitea +ARG TEA_VERSION=0.11.0 +RUN curl -fsSL "https://gitea.com/gitea/tea/releases/download/v${TEA_VERSION}/tea-${TEA_VERSION}-linux-amd64" \ + -o /usr/local/bin/tea && chmod +x /usr/local/bin/tea + +# llvm tools (needed by cargo-llvm-cov) +RUN rustup component add llvm-tools-preview + +# Sccache wrapper config — expects SCCACHE_S3_ENDPOINT, SCCACHE_BUCKET, etc. via env. +ENV RUSTC_WRAPPER=/usr/local/cargo/bin/sccache \ + CARGO_INCREMENTAL=0 + +WORKDIR /workspace + +CMD ["bash"] diff --git a/wfectl/Cargo.toml b/wfectl/Cargo.toml new file mode 100644 index 0000000..e59d566 --- /dev/null +++ b/wfectl/Cargo.toml @@ -0,0 +1,54 @@ +[package] +name = "wfectl" +version.workspace = true +edition.workspace = true +license.workspace = true +repository.workspace = true +homepage.workspace = true +description = "Command-line client for wfe-server" + +[[bin]] +name = "wfectl" +path = "src/main.rs" + +[dependencies] +# `wfectl validate` compiles YAML locally so users get instant feedback +# without a server round-trip. Enable the full executor feature set so every +# step type the server supports is also recognized by the CLI. +wfe-yaml = { workspace = true, features = ["rustlang", "buildkit", "containerd", "kubernetes", "deno"] } +wfe-server-protos = { workspace = true } +tonic = { version = "0.14", features = ["tls-native-roots"] } +tokio = { workspace = true } +tokio-stream = { workspace = true } +clap = { version = "4", features = ["derive", "env"] } +serde = { workspace = true } +serde_json = { workspace = true } +serde_yaml = { workspace = true } +toml = "0.8" +reqwest = { workspace = true, features = ["rustls-tls"] } +tracing = { workspace = true } +tracing-subscriber = { workspace = true } +chrono = { workspace = true } +dirs = "5" +sha2 = "0.10" +base64 = "0.22" +rand = "0.8" +url = { workspace = true } +prost-types = "0.14" +prost = "0.14" +comfy-table = "7" +indicatif = "0.17" +hyper = { version = "1", features = ["server", "http1"] } +hyper-util = { version = "0.1", features = ["tokio"] } +http-body-util = "0.1" +anyhow = "1" +thiserror = { workspace = true } +futures = { workspace = true } + +[dev-dependencies] +pretty_assertions = { workspace = true } +tokio = { workspace = true, features = ["test-util"] } +tempfile = { workspace = true } +wiremock = { workspace = true } +wfe-core = { workspace = true, features = ["test-support"] } +wfe = { path = "../wfe" } diff --git a/wfectl/src/auth.rs b/wfectl/src/auth.rs new file mode 100644 index 0000000..c15f604 --- /dev/null +++ b/wfectl/src/auth.rs @@ -0,0 +1,524 @@ +//! OAuth2 Authorization Code flow with PKCE for the wfectl CLI. +//! +//! Reuses the `sunbeam-cli` Ory Hydra client and stores tokens at the same +//! path as the sunbeam CLI (`~/.sunbeam/auth/{domain}.json`) so a single +//! login works for both tools. + +use std::path::PathBuf; +use std::time::Duration; + +use anyhow::{Context, Result, anyhow}; +use base64::Engine; +use base64::engine::general_purpose::URL_SAFE_NO_PAD; +use chrono::{DateTime, Utc}; +use rand::RngCore; +use serde::{Deserialize, Serialize}; +use sha2::{Digest, Sha256}; + +/// Shared OIDC client used by both sunbeam and wfectl CLIs. +pub const CLIENT_ID: &str = "sunbeam-cli"; +/// Standard OIDC scopes. +pub const SCOPES: &str = "openid email profile offline_access"; +/// Loopback callback ports tried in order. +pub const CALLBACK_PORTS: [u16; 5] = [9876, 9877, 9878, 9879, 9880]; + +/// Persisted OAuth token state, written to `~/.sunbeam/auth/{domain}.json`. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct StoredToken { + pub access_token: String, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub refresh_token: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub id_token: Option, + pub expires_at: DateTime, + pub issuer: String, + pub domain: String, +} + +impl StoredToken { + /// True if the access token has at least `min_remaining` left. + pub fn is_valid_for(&self, min_remaining: Duration) -> bool { + let now = Utc::now(); + let cutoff = now + chrono::Duration::from_std(min_remaining).unwrap_or_default(); + self.expires_at > cutoff + } + + /// Decode and return claims from the embedded id_token (no signature + /// verification -- relies on TLS). + pub fn id_claims(&self) -> Option { + let token = self.id_token.as_deref()?; + let mut parts = token.split('.'); + let _header = parts.next()?; + let payload = parts.next()?; + let bytes = URL_SAFE_NO_PAD.decode(payload).ok()?; + serde_json::from_slice(&bytes).ok() + } +} + +/// PKCE verifier + challenge pair. +#[derive(Debug, Clone)] +pub struct Pkce { + pub verifier: String, + pub challenge: String, +} + +impl Pkce { + /// Generate a fresh PKCE pair (32-byte verifier, S256 challenge). + pub fn generate() -> Self { + let mut bytes = [0u8; 32]; + rand::thread_rng().fill_bytes(&mut bytes); + let verifier = URL_SAFE_NO_PAD.encode(bytes); + let challenge = challenge_for(&verifier); + Self { + verifier, + challenge, + } + } +} + +/// Compute the S256 PKCE challenge for a given verifier. +pub fn challenge_for(verifier: &str) -> String { + let mut hasher = Sha256::new(); + hasher.update(verifier.as_bytes()); + URL_SAFE_NO_PAD.encode(hasher.finalize()) +} + +/// Generate a CSRF state token. +pub fn random_state() -> String { + let mut bytes = [0u8; 16]; + rand::thread_rng().fill_bytes(&mut bytes); + URL_SAFE_NO_PAD.encode(bytes) +} + +/// Where the token cache file lives for a given OIDC domain. +pub fn token_path(domain: &str) -> PathBuf { + let home = dirs::home_dir().unwrap_or_else(|| PathBuf::from("/tmp")); + home.join(".sunbeam/auth").join(format!("{domain}.json")) +} + +/// Read the cached token for a domain. Returns Ok(None) if not present. +pub fn load_token(domain: &str) -> Result> { + let path = token_path(domain); + if !path.exists() { + return Ok(None); + } + let bytes = std::fs::read(&path) + .with_context(|| format!("failed to read token cache at {}", path.display()))?; + let token: StoredToken = serde_json::from_slice(&bytes) + .with_context(|| format!("failed to parse token cache at {}", path.display()))?; + Ok(Some(token)) +} + +/// Persist a token to the cache, creating parent directories as needed. +/// File is written with mode 0600 on Unix. +pub fn save_token(token: &StoredToken) -> Result<()> { + let path = token_path(&token.domain); + if let Some(parent) = path.parent() { + std::fs::create_dir_all(parent) + .with_context(|| format!("failed to create token dir {}", parent.display()))?; + } + let bytes = serde_json::to_vec_pretty(token).context("failed to serialize token")?; + std::fs::write(&path, &bytes) + .with_context(|| format!("failed to write token cache to {}", path.display()))?; + #[cfg(unix)] + { + use std::os::unix::fs::PermissionsExt; + let perms = std::fs::Permissions::from_mode(0o600); + std::fs::set_permissions(&path, perms).ok(); + } + Ok(()) +} + +/// Delete the token cache for a domain. Returns true if a file was deleted. +pub fn delete_token(domain: &str) -> Result { + let path = token_path(domain); + if !path.exists() { + return Ok(false); + } + std::fs::remove_file(&path) + .with_context(|| format!("failed to delete token cache at {}", path.display()))?; + Ok(true) +} + +/// Extract the domain from an OIDC issuer URL. +/// +/// `https://auth.sunbeam.pt/` -> `sunbeam.pt` +/// `https://auth.example.com:8443/realms/foo` -> `example.com` +pub fn domain_from_issuer(issuer: &str) -> Result { + let url = url::Url::parse(issuer).with_context(|| format!("invalid issuer URL: {issuer}"))?; + let host = url + .host_str() + .ok_or_else(|| anyhow!("issuer has no host"))?; + // Strip a leading "auth." subdomain so all sunbeam.pt CLIs share a token cache. + let domain = host.strip_prefix("auth.").unwrap_or(host).to_string(); + Ok(domain) +} + +/// OIDC discovery document subset. +#[derive(Debug, Clone, Deserialize)] +pub struct DiscoveryDoc { + pub authorization_endpoint: String, + pub token_endpoint: String, +} + +/// Fetch the OIDC discovery document for an issuer. +pub async fn discover(issuer: &str) -> Result { + let trimmed = issuer.trim_end_matches('/'); + let url = format!("{trimmed}/.well-known/openid-configuration"); + let resp = reqwest::get(&url) + .await + .with_context(|| format!("failed to fetch OIDC discovery from {url}"))?; + if !resp.status().is_success() { + return Err(anyhow!( + "OIDC discovery returned HTTP {} from {}", + resp.status(), + url + )); + } + let doc: DiscoveryDoc = resp + .json() + .await + .context("failed to parse discovery JSON")?; + Ok(doc) +} + +/// Token endpoint response. +#[derive(Debug, Clone, Deserialize)] +struct TokenResponse { + access_token: String, + #[serde(default)] + refresh_token: Option, + #[serde(default)] + id_token: Option, + #[serde(default)] + expires_in: Option, +} + +impl TokenResponse { + fn into_stored(self, issuer: &str, domain: &str) -> StoredToken { + let expires_in = self.expires_in.unwrap_or(3600); + StoredToken { + access_token: self.access_token, + refresh_token: self.refresh_token, + id_token: self.id_token, + expires_at: Utc::now() + chrono::Duration::seconds(expires_in), + issuer: issuer.to_string(), + domain: domain.to_string(), + } + } +} + +/// Exchange an authorization code for tokens. +pub async fn exchange_code( + discovery: &DiscoveryDoc, + code: &str, + verifier: &str, + redirect_uri: &str, + issuer: &str, + domain: &str, +) -> Result { + let params = [ + ("grant_type", "authorization_code"), + ("code", code), + ("redirect_uri", redirect_uri), + ("client_id", CLIENT_ID), + ("code_verifier", verifier), + ]; + let resp = reqwest::Client::new() + .post(&discovery.token_endpoint) + .form(¶ms) + .send() + .await + .context("token endpoint request failed")?; + if !resp.status().is_success() { + let status = resp.status(); + let body = resp.text().await.unwrap_or_default(); + return Err(anyhow!("token endpoint returned HTTP {status}: {body}")); + } + let token: TokenResponse = resp + .json() + .await + .context("failed to parse token response")?; + Ok(token.into_stored(issuer, domain)) +} + +/// Use a refresh token to obtain a fresh access token. +pub async fn refresh(token: &StoredToken) -> Result { + let refresh_token = token + .refresh_token + .as_deref() + .ok_or_else(|| anyhow!("no refresh token available"))?; + + let discovery = discover(&token.issuer).await?; + + let params = [ + ("grant_type", "refresh_token"), + ("refresh_token", refresh_token), + ("client_id", CLIENT_ID), + ]; + let resp = reqwest::Client::new() + .post(&discovery.token_endpoint) + .form(¶ms) + .send() + .await + .context("refresh request failed")?; + if !resp.status().is_success() { + let status = resp.status(); + let body = resp.text().await.unwrap_or_default(); + return Err(anyhow!("refresh endpoint returned HTTP {status}: {body}")); + } + let resp: TokenResponse = resp + .json() + .await + .context("failed to parse refresh response")?; + let mut new_token = resp.into_stored(&token.issuer, &token.domain); + // Some IdPs don't return a new refresh token; preserve the original. + if new_token.refresh_token.is_none() { + new_token.refresh_token = token.refresh_token.clone(); + } + Ok(new_token) +} + +/// Load a token, refreshing it if it has less than 60s remaining. +pub async fn ensure_valid(domain: &str) -> Result { + let token = + load_token(domain)?.ok_or_else(|| anyhow!("not logged in -- run `wfectl login` first"))?; + if token.is_valid_for(Duration::from_secs(60)) { + return Ok(token); + } + let refreshed = refresh(&token).await.context("token refresh failed")?; + save_token(&refreshed)?; + Ok(refreshed) +} + +/// Build the authorization URL for the browser. +pub fn build_auth_url( + discovery: &DiscoveryDoc, + redirect_uri: &str, + state: &str, + challenge: &str, +) -> String { + let mut url = url::Url::parse(&discovery.authorization_endpoint) + .expect("authorization_endpoint must be a valid URL"); + url.query_pairs_mut() + .append_pair("client_id", CLIENT_ID) + .append_pair("redirect_uri", redirect_uri) + .append_pair("response_type", "code") + .append_pair("scope", SCOPES) + .append_pair("code_challenge", challenge) + .append_pair("code_challenge_method", "S256") + .append_pair("state", state); + url.to_string() +} + +/// Open the user's browser at a given URL (best effort). +pub fn open_browser(url: &str) { + #[cfg(target_os = "macos")] + let prog = "open"; + #[cfg(target_os = "linux")] + let prog = "xdg-open"; + #[cfg(not(any(target_os = "macos", target_os = "linux")))] + let prog: &str = ""; + + if !prog.is_empty() { + let _ = std::process::Command::new(prog).arg(url).spawn(); + } +} + +#[cfg(test)] +mod tests { + use super::*; + use pretty_assertions::assert_eq; + + #[test] + fn pkce_challenge_is_deterministic() { + let c1 = challenge_for("verifier"); + let c2 = challenge_for("verifier"); + assert_eq!(c1, c2); + } + + #[test] + fn pkce_challenge_known_value() { + // RFC 7636 example. + let verifier = "dBjftJeZ4CVP-mB92K27uhbUJU1p1r_wW1gFWFOEjXk"; + let challenge = challenge_for(verifier); + assert_eq!(challenge, "E9Melhoa2OwvFrEMTJguCHaoeK1t8URWbuGJSstw-cM"); + } + + #[test] + fn pkce_generate_produces_unique_pairs() { + let a = Pkce::generate(); + let b = Pkce::generate(); + assert_ne!(a.verifier, b.verifier); + assert_ne!(a.challenge, b.challenge); + // Verifier round-trips through challenge. + assert_eq!(a.challenge, challenge_for(&a.verifier)); + } + + #[test] + fn random_state_is_unique_and_long() { + let s1 = random_state(); + let s2 = random_state(); + assert_ne!(s1, s2); + assert!(s1.len() >= 22); + } + + #[test] + fn domain_from_issuer_strips_auth_subdomain() { + assert_eq!( + domain_from_issuer("https://auth.sunbeam.pt/").unwrap(), + "sunbeam.pt" + ); + assert_eq!( + domain_from_issuer("https://auth.example.com").unwrap(), + "example.com" + ); + assert_eq!( + domain_from_issuer("https://example.com/").unwrap(), + "example.com" + ); + } + + #[test] + fn domain_from_issuer_invalid_url() { + assert!(domain_from_issuer("not a url").is_err()); + } + + #[test] + fn token_path_is_under_sunbeam_auth() { + let p = token_path("sunbeam.pt"); + assert!( + p.to_string_lossy() + .ends_with(".sunbeam/auth/sunbeam.pt.json") + ); + } + + #[test] + fn stored_token_serde_round_trip() { + let token = StoredToken { + access_token: "ory_at_abc".into(), + refresh_token: Some("ory_rt_xyz".into()), + id_token: Some("eyJhbGc".into()), + expires_at: "2030-01-01T00:00:00Z".parse().unwrap(), + issuer: "https://auth.sunbeam.pt/".into(), + domain: "sunbeam.pt".into(), + }; + let json = serde_json::to_string(&token).unwrap(); + let parsed: StoredToken = serde_json::from_str(&json).unwrap(); + assert_eq!(parsed.access_token, "ory_at_abc"); + assert_eq!(parsed.refresh_token, Some("ory_rt_xyz".into())); + assert_eq!(parsed.domain, "sunbeam.pt"); + } + + #[test] + fn token_validity_check() { + let valid = StoredToken { + access_token: "x".into(), + refresh_token: None, + id_token: None, + expires_at: Utc::now() + chrono::Duration::seconds(3600), + issuer: "x".into(), + domain: "x".into(), + }; + assert!(valid.is_valid_for(Duration::from_secs(60))); + + let expiring = StoredToken { + access_token: "x".into(), + refresh_token: None, + id_token: None, + expires_at: Utc::now() + chrono::Duration::seconds(30), + issuer: "x".into(), + domain: "x".into(), + }; + assert!(!expiring.is_valid_for(Duration::from_secs(60))); + + let expired = StoredToken { + access_token: "x".into(), + refresh_token: None, + id_token: None, + expires_at: Utc::now() - chrono::Duration::seconds(10), + issuer: "x".into(), + domain: "x".into(), + }; + assert!(!expired.is_valid_for(Duration::from_secs(0))); + } + + #[test] + fn id_claims_decodes_jwt_payload() { + // {"email":"user@example.com","name":"Test"} + let payload = "eyJlbWFpbCI6InVzZXJAZXhhbXBsZS5jb20iLCJuYW1lIjoiVGVzdCJ9"; + let token = StoredToken { + access_token: "x".into(), + refresh_token: None, + id_token: Some(format!("h.{payload}.s")), + expires_at: Utc::now(), + issuer: "x".into(), + domain: "x".into(), + }; + let claims = token.id_claims().unwrap(); + assert_eq!(claims["email"], "user@example.com"); + assert_eq!(claims["name"], "Test"); + } + + #[test] + fn id_claims_returns_none_without_token() { + let token = StoredToken { + access_token: "x".into(), + refresh_token: None, + id_token: None, + expires_at: Utc::now(), + issuer: "x".into(), + domain: "x".into(), + }; + assert!(token.id_claims().is_none()); + } + + #[test] + fn build_auth_url_contains_pkce_and_state() { + let discovery = DiscoveryDoc { + authorization_endpoint: "https://auth.example.com/oauth2/auth".into(), + token_endpoint: "https://auth.example.com/oauth2/token".into(), + }; + let url = build_auth_url( + &discovery, + "http://127.0.0.1:9876/callback", + "state-123", + "challenge-abc", + ); + assert!(url.contains("client_id=sunbeam-cli")); + assert!(url.contains("code_challenge=challenge-abc")); + assert!(url.contains("code_challenge_method=S256")); + assert!(url.contains("state=state-123")); + assert!(url.contains("response_type=code")); + assert!(url.contains("scope=openid")); + } + + #[test] + fn save_load_delete_token_round_trip() { + let tmp = tempfile::tempdir().unwrap(); + // Override HOME so token_path resolves into the temp dir. + unsafe { std::env::set_var("HOME", tmp.path()) }; + + let token = StoredToken { + access_token: "ory_at_test".into(), + refresh_token: Some("ory_rt_test".into()), + id_token: None, + expires_at: Utc::now() + chrono::Duration::seconds(3600), + issuer: "https://auth.test.com/".into(), + domain: "test.com".into(), + }; + + save_token(&token).unwrap(); + let loaded = load_token("test.com").unwrap().unwrap(); + assert_eq!(loaded.access_token, "ory_at_test"); + + let deleted = delete_token("test.com").unwrap(); + assert!(deleted); + let after = load_token("test.com").unwrap(); + assert!(after.is_none()); + + // Deleting a non-existent token returns false, not error. + let again = delete_token("test.com").unwrap(); + assert!(!again); + } +} diff --git a/wfectl/src/client.rs b/wfectl/src/client.rs new file mode 100644 index 0000000..87b63ab --- /dev/null +++ b/wfectl/src/client.rs @@ -0,0 +1,110 @@ +//! Tonic gRPC client wrapper with bearer-token authentication. + +use anyhow::{Context, Result, anyhow}; +use tonic::metadata::MetadataValue; +use tonic::service::Interceptor; +use tonic::service::interceptor::InterceptedService; +use tonic::transport::{Channel, ClientTlsConfig, Endpoint}; + +use wfe_server_protos::wfe::v1::wfe_client::WfeClient as GeneratedWfeClient; + +/// Type alias for the fully-instantiated wfe client with auth interceptor. +pub type AuthClient = GeneratedWfeClient>; + +/// Tonic interceptor that injects an `Authorization: Bearer ` header +/// on every gRPC request. +#[derive(Clone)] +pub struct BearerAuth { + header: Option>, +} + +impl BearerAuth { + /// Construct a new bearer-auth interceptor. An empty token results in + /// no header being injected (useful for unauthenticated calls). + pub fn new(token: &str) -> Result { + if token.is_empty() { + return Ok(Self { header: None }); + } + let value = format!("Bearer {token}"); + let header = MetadataValue::try_from(value) + .map_err(|e| anyhow!("invalid auth token (cannot encode as header): {e}"))?; + Ok(Self { + header: Some(header), + }) + } +} + +impl Interceptor for BearerAuth { + fn call(&mut self, mut req: tonic::Request<()>) -> Result, tonic::Status> { + if let Some(header) = &self.header { + req.metadata_mut().insert("authorization", header.clone()); + } + Ok(req) + } +} + +/// Build a tonic channel for the given server URL, configuring TLS automatically +/// when the URL scheme is `https`. +pub async fn connect(server: &str) -> Result { + let mut endpoint = Endpoint::from_shared(server.to_string()) + .with_context(|| format!("invalid server URL: {server}"))?; + + if server.starts_with("https://") { + endpoint = endpoint + .tls_config(ClientTlsConfig::new().with_native_roots()) + .context("failed to configure TLS")?; + } + + endpoint + .connect() + .await + .with_context(|| format!("failed to connect to {server}")) +} + +/// Build an authenticated wfe client. +pub async fn build(server: &str, token: &str) -> Result { + let channel = connect(server).await?; + let auth = BearerAuth::new(token)?; + Ok(GeneratedWfeClient::with_interceptor(channel, auth)) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn bearer_auth_with_empty_token_injects_nothing() { + let mut auth = BearerAuth::new("").unwrap(); + let req = tonic::Request::new(()); + let out = auth.call(req).unwrap(); + assert!(out.metadata().get("authorization").is_none()); + } + + #[test] + fn bearer_auth_injects_header() { + let mut auth = BearerAuth::new("ory_at_xyz").unwrap(); + let req = tonic::Request::new(()); + let out = auth.call(req).unwrap(); + let header = out.metadata().get("authorization").unwrap(); + assert_eq!(header.to_str().unwrap(), "Bearer ory_at_xyz"); + } + + #[test] + fn bearer_auth_rejects_invalid_chars() { + // Tokens containing newlines can't be encoded as HTTP headers. + let result = BearerAuth::new("bad\ntoken"); + assert!(result.is_err()); + } + + #[tokio::test] + async fn connect_invalid_url_returns_error() { + let result = connect("not a valid url").await; + assert!(result.is_err()); + } + + #[tokio::test] + async fn connect_to_unreachable_address_fails() { + let result = connect("http://127.0.0.1:1").await; + assert!(result.is_err()); + } +} diff --git a/wfectl/src/commands/auth.rs b/wfectl/src/commands/auth.rs new file mode 100644 index 0000000..0416daf --- /dev/null +++ b/wfectl/src/commands/auth.rs @@ -0,0 +1,73 @@ +//! Shared authentication helpers for commands. +//! +//! Resolves a bearer token from (in order): +//! 1. `--token` CLI flag +//! 2. `WFECTL_TOKEN` env var +//! 3. cached OIDC token at `~/.sunbeam/auth/{domain}.json` (refreshed if needed) + +use anyhow::Result; + +use crate::auth; + +/// Resolve a bearer token to use for gRPC requests. +pub async fn resolve_token(cli_token: Option<&str>, issuer: &str) -> Result { + if let Some(token) = cli_token { + if !token.is_empty() { + return Ok(token.to_string()); + } + } + if let Ok(token) = std::env::var("WFECTL_TOKEN") { + if !token.is_empty() { + return Ok(token); + } + } + let domain = auth::domain_from_issuer(issuer)?; + let stored = auth::ensure_valid(&domain).await?; + Ok(stored.access_token) +} + +#[cfg(test)] +mod tests { + use super::*; + use std::sync::Mutex; + + /// Serialize env-var tests to avoid races with parallel execution. + static ENV_LOCK: Mutex<()> = Mutex::new(()); + + #[tokio::test] + async fn resolve_uses_cli_token_first() { + let _g = ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner()); + // Save and clear env var so this test is independent. + let saved = std::env::var("WFECTL_TOKEN").ok(); + unsafe { std::env::remove_var("WFECTL_TOKEN") }; + let token = resolve_token(Some("explicit-token"), "https://auth.example.com/") + .await + .unwrap(); + assert_eq!(token, "explicit-token"); + if let Some(v) = saved { + unsafe { std::env::set_var("WFECTL_TOKEN", v) }; + } + } + + #[tokio::test] + async fn resolve_uses_env_when_no_cli() { + let _g = ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner()); + unsafe { std::env::set_var("WFECTL_TOKEN", "env-token") }; + let token = resolve_token(None, "https://auth.example.com/") + .await + .unwrap(); + assert_eq!(token, "env-token"); + unsafe { std::env::remove_var("WFECTL_TOKEN") }; + } + + #[tokio::test] + async fn resolve_skips_empty_cli_token() { + let _g = ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner()); + unsafe { std::env::set_var("WFECTL_TOKEN", "env-token") }; + let token = resolve_token(Some(""), "https://auth.example.com/") + .await + .unwrap(); + assert_eq!(token, "env-token"); + unsafe { std::env::remove_var("WFECTL_TOKEN") }; + } +} diff --git a/wfectl/src/commands/cancel.rs b/wfectl/src/commands/cancel.rs new file mode 100644 index 0000000..4b4be32 --- /dev/null +++ b/wfectl/src/commands/cancel.rs @@ -0,0 +1,23 @@ +//! `wfectl cancel ` -- cancel a running workflow. + +use anyhow::Result; +use clap::Args; +use wfe_server_protos::wfe::v1::CancelWorkflowRequest; + +use crate::client::AuthClient; + +#[derive(Debug, Args)] +pub struct CancelArgs { + /// Workflow instance identifier — UUID or human-friendly name (e.g. "ci-42"). + pub workflow_id: String, +} + +pub async fn run(args: CancelArgs, mut client: AuthClient) -> Result<()> { + client + .cancel_workflow(CancelWorkflowRequest { + workflow_id: args.workflow_id.clone(), + }) + .await?; + println!("✓ Cancelled workflow {}", args.workflow_id); + Ok(()) +} diff --git a/wfectl/src/commands/definitions.rs b/wfectl/src/commands/definitions.rs new file mode 100644 index 0000000..6339171 --- /dev/null +++ b/wfectl/src/commands/definitions.rs @@ -0,0 +1,78 @@ +//! `wfectl definitions list` -- list registered workflow definitions. + +use anyhow::Result; +use clap::{Args, Subcommand}; +use wfe_server_protos::wfe::v1::ListDefinitionsRequest; + +use crate::client::AuthClient; +use crate::output::{OutputFormat, render_table}; + +#[derive(Debug, Args)] +pub struct DefinitionsArgs { + #[command(subcommand)] + pub cmd: DefinitionsCmd, +} + +#[derive(Debug, Subcommand)] +pub enum DefinitionsCmd { + /// List all registered workflow definitions. + List, +} + +pub async fn run( + args: DefinitionsArgs, + mut client: AuthClient, + format: OutputFormat, +) -> Result<()> { + match args.cmd { + DefinitionsCmd::List => { + let resp = client + .list_definitions(ListDefinitionsRequest {}) + .await? + .into_inner(); + + if matches!(format, OutputFormat::Json) { + let json = serde_json::json!({ + "definitions": resp.definitions.iter().map(|d| serde_json::json!({ + "id": d.id, + "name": d.name, + "version": d.version, + "description": d.description, + "step_count": d.step_count, + })).collect::>(), + }); + println!("{}", serde_json::to_string_pretty(&json)?); + } else { + let rows: Vec> = resp + .definitions + .iter() + .map(|d| { + // Fall back to the slug id when no display name is set + // so the Name column is always populated. + let display = if d.name.is_empty() { + d.id.clone() + } else { + d.name.clone() + }; + vec![ + display, + d.id.clone(), + d.version.to_string(), + d.step_count.to_string(), + d.description.clone(), + ] + }) + .collect(); + println!( + "{}", + render_table( + &["Name", "ID", "Version", "Steps", "Description"], + &rows + ) + ); + println!("{} definition(s)", resp.definitions.len()); + } + } + } + Ok(()) +} diff --git a/wfectl/src/commands/get.rs b/wfectl/src/commands/get.rs new file mode 100644 index 0000000..bce3ea6 --- /dev/null +++ b/wfectl/src/commands/get.rs @@ -0,0 +1,107 @@ +//! `wfectl get ` -- fetch a workflow instance. + +use anyhow::Result; +use clap::Args; +use wfe_server_protos::wfe::v1::GetWorkflowRequest; + +use crate::client::AuthClient; +use crate::output::{OutputFormat, fmt_proto_time, render_kv, render_table}; +use crate::struct_util::prost_struct_to_json; + +#[derive(Debug, Args)] +pub struct GetArgs { + /// Workflow instance identifier — either the UUID (`id`) or the + /// human-friendly name (e.g. "ci-42"). The server resolves either form. + pub workflow_id: String, +} + +pub async fn run(args: GetArgs, mut client: AuthClient, format: OutputFormat) -> Result<()> { + let resp = client + .get_workflow(GetWorkflowRequest { + workflow_id: args.workflow_id.clone(), + }) + .await? + .into_inner(); + + let instance = resp + .instance + .ok_or_else(|| anyhow::anyhow!("server returned empty instance"))?; + + if matches!(format, OutputFormat::Json) { + let data = instance + .data + .as_ref() + .map(prost_struct_to_json) + .unwrap_or(serde_json::Value::Null); + let json = serde_json::json!({ + "id": instance.id, + "name": instance.name, + "definition_id": instance.definition_id, + "version": instance.version, + "status": instance.status, + "description": instance.description, + "reference": instance.reference, + "data": data, + "create_time": instance.create_time.as_ref().map(fmt_proto_time), + "complete_time": instance.complete_time.as_ref().map(fmt_proto_time), + "execution_pointers": instance.execution_pointers.iter().map(|p| serde_json::json!({ + "id": p.id, + "step_id": p.step_id, + "step_name": p.step_name, + "status": p.status, + "active": p.active, + "retry_count": p.retry_count, + "start_time": p.start_time.as_ref().map(fmt_proto_time), + "end_time": p.end_time.as_ref().map(fmt_proto_time), + })).collect::>(), + }); + println!("{}", serde_json::to_string_pretty(&json)?); + return Ok(()); + } + + let mut fields = vec![ + ("Name", instance.name.clone()), + ("ID", instance.id.clone()), + ( + "Definition", + format!("{} v{}", instance.definition_id, instance.version), + ), + ("Status", format!("{:?}", instance.status)), + ]; + if !instance.description.is_empty() { + fields.push(("Description", instance.description.clone())); + } + if !instance.reference.is_empty() { + fields.push(("Reference", instance.reference.clone())); + } + if let Some(ts) = &instance.create_time { + fields.push(("Created", fmt_proto_time(ts))); + } + if let Some(ts) = &instance.complete_time { + fields.push(("Completed", fmt_proto_time(ts))); + } + let display: Vec<(&str, String)> = fields.iter().map(|(k, v)| (*k, v.clone())).collect(); + println!("{}", render_kv(&display)); + + if !instance.execution_pointers.is_empty() { + println!("\nExecution pointers:"); + let rows: Vec> = instance + .execution_pointers + .iter() + .map(|p| { + vec![ + p.step_name.clone(), + p.step_id.to_string(), + format!("{:?}", p.status), + if p.active { "yes".into() } else { "no".into() }, + p.retry_count.to_string(), + ] + }) + .collect(); + println!( + "{}", + render_table(&["Step", "ID", "Status", "Active", "Retries"], &rows) + ); + } + Ok(()) +} diff --git a/wfectl/src/commands/list.rs b/wfectl/src/commands/list.rs new file mode 100644 index 0000000..8724e04 --- /dev/null +++ b/wfectl/src/commands/list.rs @@ -0,0 +1,100 @@ +//! `wfectl list` -- search workflow instances. + +use anyhow::Result; +use clap::{Args, ValueEnum}; +use wfe_server_protos::wfe::v1::{SearchWorkflowsRequest, WorkflowStatus}; + +use crate::client::AuthClient; +use crate::output::{OutputFormat, fmt_proto_time, render_table}; + +#[derive(Debug, Args)] +pub struct ListArgs { + /// Free-text query. + #[arg(long)] + pub query: Option, + /// Filter by status. + #[arg(long)] + pub status: Option, + /// Maximum number of results. + #[arg(long, default_value_t = 50)] + pub limit: u64, + /// Skip the first N results. + #[arg(long, default_value_t = 0)] + pub skip: u64, +} + +#[derive(Debug, Clone, Copy, ValueEnum)] +#[value(rename_all = "lowercase")] +pub enum StatusFilter { + Runnable, + Suspended, + Complete, + Terminated, +} + +impl From for WorkflowStatus { + fn from(s: StatusFilter) -> Self { + match s { + StatusFilter::Runnable => WorkflowStatus::Runnable, + StatusFilter::Suspended => WorkflowStatus::Suspended, + StatusFilter::Complete => WorkflowStatus::Complete, + StatusFilter::Terminated => WorkflowStatus::Terminated, + } + } +} + +pub async fn run(args: ListArgs, mut client: AuthClient, format: OutputFormat) -> Result<()> { + let status: WorkflowStatus = args + .status + .map(Into::into) + .unwrap_or(WorkflowStatus::Unspecified); + let resp = client + .search_workflows(SearchWorkflowsRequest { + query: args.query.unwrap_or_default(), + status_filter: status as i32, + skip: args.skip, + take: args.limit, + }) + .await? + .into_inner(); + + if matches!(format, OutputFormat::Json) { + let json = serde_json::json!({ + "total": resp.total, + "results": resp.results.iter().map(|r| serde_json::json!({ + "id": r.id, + "name": r.name, + "definition_id": r.definition_id, + "version": r.version, + "status": r.status, + "reference": r.reference, + "description": r.description, + "create_time": r.create_time.as_ref().map(fmt_proto_time), + })).collect::>(), + }); + println!("{}", serde_json::to_string_pretty(&json)?); + } else { + let rows: Vec> = resp + .results + .iter() + .map(|r| { + vec![ + r.name.clone(), + r.id.clone(), + format!("{} v{}", r.definition_id, r.version), + format!("{:?}", r.status), + r.create_time + .as_ref() + .map(fmt_proto_time) + .unwrap_or_default(), + ] + }) + .collect(); + println!( + "{}", + render_table(&["Name", "ID", "Definition", "Status", "Created"], &rows) + ); + println!("{} of {} result(s)", resp.results.len(), resp.total); + } + Ok(()) +} diff --git a/wfectl/src/commands/login.rs b/wfectl/src/commands/login.rs new file mode 100644 index 0000000..fd98a90 --- /dev/null +++ b/wfectl/src/commands/login.rs @@ -0,0 +1,291 @@ +//! `wfectl login` -- run OAuth2 PKCE flow against the configured OIDC issuer. + +use std::collections::HashMap; +use std::convert::Infallible; +use std::sync::Arc; +use std::time::Duration; + +use anyhow::{Context, Result, anyhow}; +use clap::Args; +use http_body_util::Full; +use hyper::body::Bytes; +use hyper::server::conn::http1; +use hyper::service::service_fn; +use hyper::{Request, Response}; +use hyper_util::rt::TokioIo; +use indicatif::{ProgressBar, ProgressStyle}; +use tokio::net::TcpListener; +use tokio::sync::oneshot; + +use crate::auth::{ + self, CALLBACK_PORTS, Pkce, build_auth_url, discover, exchange_code, open_browser, + random_state, save_token, +}; +use crate::config; + +#[derive(Debug, Args)] +pub struct LoginArgs { + /// OIDC issuer URL (e.g., https://auth.sunbeam.pt/). + #[arg(long)] + pub issuer: Option, +} + +const CALLBACK_TIMEOUT: Duration = Duration::from_secs(300); + +const SUCCESS_HTML: &str = r#" +wfectl login + +

You're logged in.

+

You can close this window and return to the terminal.

+"#; + +const ERROR_HTML: &str = r#" +wfectl login error + +

Login failed

+

See the terminal for details.

+"#; + +pub async fn run(args: LoginArgs, server_cfg: &config::Config) -> Result<()> { + let issuer = args.issuer.unwrap_or_else(|| server_cfg.issuer.clone()); + let domain = auth::domain_from_issuer(&issuer)?; + + println!("Discovering OIDC endpoints at {issuer}..."); + let discovery = discover(&issuer).await?; + + let pkce = Pkce::generate(); + let state = random_state(); + + // Bind a callback listener on the first available port. + let (listener, port) = bind_callback_listener().await?; + let redirect_uri = format!("http://127.0.0.1:{port}/callback"); + let auth_url = build_auth_url(&discovery, &redirect_uri, &state, &pkce.challenge); + + println!(); + println!("Opening browser for authentication..."); + println!("If your browser doesn't open, visit:"); + println!(" {auth_url}"); + println!(); + + open_browser(&auth_url); + + let progress = ProgressBar::new_spinner(); + progress.set_style( + ProgressStyle::default_spinner() + .template("{spinner} {msg}") + .unwrap(), + ); + progress.set_message("Waiting for authorization callback..."); + progress.enable_steady_tick(Duration::from_millis(100)); + + let result = + tokio::time::timeout(CALLBACK_TIMEOUT, await_callback(listener, state.clone())).await; + progress.finish_and_clear(); + + let callback = match result { + Ok(Ok(cb)) => cb, + Ok(Err(e)) => return Err(e), + Err(_) => { + return Err(anyhow!( + "login timed out after {} seconds", + CALLBACK_TIMEOUT.as_secs() + )); + } + }; + + println!("Got authorization code, exchanging for tokens..."); + let token = exchange_code( + &discovery, + &callback.code, + &pkce.verifier, + &redirect_uri, + &issuer, + &domain, + ) + .await?; + + save_token(&token)?; + println!(); + println!("✓ Logged in to {domain} (token cached at ~/.sunbeam/auth/{domain}.json)"); + if let Some(claims) = token.id_claims() { + if let Some(email) = claims.get("email").and_then(|v| v.as_str()) { + println!(" Identity: {email}"); + } + } + Ok(()) +} + +#[derive(Debug)] +struct Callback { + code: String, +} + +async fn bind_callback_listener() -> Result<(TcpListener, u16)> { + for port in CALLBACK_PORTS { + let addr = format!("127.0.0.1:{port}"); + if let Ok(listener) = TcpListener::bind(&addr).await { + return Ok((listener, port)); + } + } + // Fall back to ephemeral. + let listener = TcpListener::bind("127.0.0.1:0") + .await + .context("failed to bind callback listener on any port")?; + let port = listener.local_addr()?.port(); + Ok((listener, port)) +} + +async fn await_callback(listener: TcpListener, expected_state: String) -> Result { + let (tx, rx) = oneshot::channel::>(); + let tx = Arc::new(tokio::sync::Mutex::new(Some(tx))); + + tokio::spawn(async move { + loop { + let (stream, _) = match listener.accept().await { + Ok(s) => s, + Err(_) => continue, + }; + + let tx = tx.clone(); + let expected = expected_state.clone(); + tokio::spawn(async move { + let io = TokioIo::new(stream); + let svc = service_fn(move |req: Request| { + let tx = tx.clone(); + let expected = expected.clone(); + async move { handle_callback_request(req, tx, expected).await } + }); + let _ = http1::Builder::new().serve_connection(io, svc).await; + }); + } + }); + + rx.await.context("callback channel closed")? +} + +async fn handle_callback_request( + req: Request, + tx: Arc>>>>, + expected_state: String, +) -> Result>, Infallible> { + if !req.uri().path().starts_with("/callback") { + return Ok(Response::builder() + .status(404) + .body(Full::new(Bytes::new())) + .unwrap()); + } + + let query = req.uri().query().unwrap_or(""); + let params: HashMap = url::form_urlencoded::parse(query.as_bytes()) + .into_owned() + .collect(); + + let result = match (params.get("code"), params.get("state"), params.get("error")) { + (_, _, Some(err)) => Err(anyhow!("OAuth error: {err}")), + (Some(code), Some(state), None) if state == &expected_state => { + Ok(Callback { code: code.clone() }) + } + (_, Some(_), None) => Err(anyhow!("OAuth state mismatch (possible CSRF)")), + _ => Err(anyhow!("missing code or state in callback")), + }; + + let (status, body) = if result.is_ok() { + (200, SUCCESS_HTML) + } else { + (400, ERROR_HTML) + }; + + // Send the result through the oneshot, taking it out of the Mutex. + { + let mut guard = tx.lock().await; + if let Some(sender) = guard.take() { + let _ = sender.send(result); + } + } + + Ok(Response::builder() + .status(status) + .header("content-type", "text/html; charset=utf-8") + .body(Full::new(Bytes::from(body))) + .unwrap()) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn bind_callback_listener_succeeds_on_some_port() { + let (listener, port) = bind_callback_listener().await.unwrap(); + assert!(port > 0); + drop(listener); + } + + /// Drive the full callback path: bind listener, send a request, observe result. + async fn drive_callback(query: &str) -> Result { + let (listener, port) = bind_callback_listener().await.unwrap(); + let state = "test-state"; + + // Spawn the await_callback future. + let handle = + tokio::spawn(async move { await_callback(listener, "test-state".to_string()).await }); + + // Give it a moment to start the accept loop. + tokio::time::sleep(Duration::from_millis(50)).await; + + // Send a request. + let url = format!("http://127.0.0.1:{port}/callback?{query}"); + let _ = reqwest::get(&url).await; + + // Drop unused state to avoid warnings. + let _ = state; + + tokio::time::timeout(Duration::from_secs(2), handle) + .await + .unwrap() + .unwrap() + } + + #[tokio::test] + async fn callback_with_valid_code_and_state_succeeds() { + let result = drive_callback("code=abc123&state=test-state").await; + let cb = result.unwrap(); + assert_eq!(cb.code, "abc123"); + } + + #[tokio::test] + async fn callback_with_state_mismatch_fails() { + let result = drive_callback("code=abc&state=wrong-state").await; + assert!(result.is_err()); + let err = format!("{}", result.unwrap_err()); + assert!(err.contains("state mismatch")); + } + + #[tokio::test] + async fn callback_with_oauth_error_fails() { + let result = drive_callback("error=access_denied&state=test-state").await; + assert!(result.is_err()); + let err = format!("{}", result.unwrap_err()); + assert!(err.contains("access_denied")); + } + + #[tokio::test] + async fn callback_with_missing_params_fails() { + let result = drive_callback("nothing=here").await; + assert!(result.is_err()); + } + + #[tokio::test] + async fn callback_404_for_non_callback_path() { + // This tests the 404 branch in handle_callback_request. + let (listener, port) = bind_callback_listener().await.unwrap(); + let _handle = tokio::spawn(async move { + let _ = await_callback(listener, "s".to_string()).await; + }); + tokio::time::sleep(Duration::from_millis(50)).await; + let resp = reqwest::get(format!("http://127.0.0.1:{port}/not-callback")) + .await + .unwrap(); + assert_eq!(resp.status(), 404); + } +} diff --git a/wfectl/src/commands/logout.rs b/wfectl/src/commands/logout.rs new file mode 100644 index 0000000..ca319b4 --- /dev/null +++ b/wfectl/src/commands/logout.rs @@ -0,0 +1,27 @@ +//! `wfectl logout` -- delete cached OIDC token. + +use anyhow::Result; +use clap::Args; + +use crate::auth; +use crate::config; + +#[derive(Debug, Args)] +pub struct LogoutArgs { + /// OIDC issuer to log out from (defaults to configured issuer). + #[arg(long)] + pub issuer: Option, +} + +pub async fn run(args: LogoutArgs, server_cfg: &config::Config) -> Result<()> { + let issuer = args.issuer.unwrap_or_else(|| server_cfg.issuer.clone()); + let domain = auth::domain_from_issuer(&issuer)?; + + let deleted = auth::delete_token(&domain)?; + if deleted { + println!("✓ Logged out of {domain}"); + } else { + println!("Not logged in to {domain} (no token cache found)"); + } + Ok(()) +} diff --git a/wfectl/src/commands/logs.rs b/wfectl/src/commands/logs.rs new file mode 100644 index 0000000..ee57609 --- /dev/null +++ b/wfectl/src/commands/logs.rs @@ -0,0 +1,49 @@ +//! `wfectl logs ` -- stream logs. + +use anyhow::Result; +use clap::Args; +use futures::StreamExt; +use wfe_server_protos::wfe::v1::{LogStream, StreamLogsRequest}; + +use crate::client::AuthClient; + +#[derive(Debug, Args)] +pub struct LogsArgs { + /// Workflow instance identifier — UUID or human-friendly name (e.g. "ci-42"). + pub workflow_id: String, + /// Filter to a single step name. + #[arg(long)] + pub step: Option, + /// Follow mode (`tail -f`). + #[arg(short, long)] + pub follow: bool, +} + +pub async fn run(args: LogsArgs, mut client: AuthClient) -> Result<()> { + let mut stream = client + .stream_logs(StreamLogsRequest { + workflow_id: args.workflow_id.clone(), + step_name: args.step.unwrap_or_default(), + follow: args.follow, + }) + .await? + .into_inner(); + + while let Some(entry) = stream.next().await { + let entry = entry?; + let prefix = if entry.step_name.is_empty() { + String::new() + } else { + format!("[{}] ", entry.step_name) + }; + let line = String::from_utf8_lossy(&entry.data); + let line = line.trim_end_matches('\n'); + let stream_kind = LogStream::try_from(entry.stream).unwrap_or(LogStream::Unspecified); + if matches!(stream_kind, LogStream::Stderr) { + eprintln!("{prefix}{line}"); + } else { + println!("{prefix}{line}"); + } + } + Ok(()) +} diff --git a/wfectl/src/commands/mod.rs b/wfectl/src/commands/mod.rs new file mode 100644 index 0000000..40c30a2 --- /dev/null +++ b/wfectl/src/commands/mod.rs @@ -0,0 +1,19 @@ +//! Subcommand modules for `wfectl`. + +pub mod auth; +pub mod cancel; +pub mod definitions; +pub mod get; +pub mod list; +pub mod login; +pub mod logout; +pub mod logs; +pub mod publish; +pub mod register; +pub mod resume; +pub mod run; +pub mod search_logs; +pub mod suspend; +pub mod validate; +pub mod watch; +pub mod whoami; diff --git a/wfectl/src/commands/publish.rs b/wfectl/src/commands/publish.rs new file mode 100644 index 0000000..1475a7c --- /dev/null +++ b/wfectl/src/commands/publish.rs @@ -0,0 +1,63 @@ +//! `wfectl publish ` -- publish an event. + +use std::path::PathBuf; + +use anyhow::{Context, Result}; +use clap::Args; +use wfe_server_protos::wfe::v1::PublishEventRequest; + +use crate::client::AuthClient; +use crate::output::OutputFormat; +use crate::struct_util::json_object_to_struct; + +#[derive(Debug, Args)] +pub struct PublishArgs { + /// Event name (e.g., "order.paid"). + pub event_name: String, + /// Event key (e.g., the order ID). + pub event_key: String, + /// Path to a JSON file with event data. + #[arg(long)] + pub data: Option, + /// Inline JSON data (overrides --data). + #[arg(long)] + pub data_json: Option, +} + +pub async fn run(args: PublishArgs, mut client: AuthClient, format: OutputFormat) -> Result<()> { + let data_json = match (args.data_json.as_ref(), args.data.as_ref()) { + (Some(json), _) => json.clone(), + (None, Some(path)) => std::fs::read_to_string(path) + .with_context(|| format!("failed to read data file {}", path.display()))?, + (None, None) => "{}".to_string(), + }; + let json_value: serde_json::Value = + serde_json::from_str(&data_json).context("data must be valid JSON")?; + let data = json_object_to_struct(&json_value); + + let resp = client + .publish_event(PublishEventRequest { + event_name: args.event_name.clone(), + event_key: args.event_key.clone(), + data: Some(data), + }) + .await? + .into_inner(); + + if matches!(format, OutputFormat::Json) { + println!( + "{}", + serde_json::to_string_pretty(&serde_json::json!({ + "event_id": resp.event_id, + "event_name": args.event_name, + "event_key": args.event_key, + }))? + ); + } else { + println!( + "✓ Published event {} key={} (event_id={})", + args.event_name, args.event_key, resp.event_id + ); + } + Ok(()) +} diff --git a/wfectl/src/commands/register.rs b/wfectl/src/commands/register.rs new file mode 100644 index 0000000..2c1c3c5 --- /dev/null +++ b/wfectl/src/commands/register.rs @@ -0,0 +1,98 @@ +//! `wfectl register ` -- register one or more workflow definitions. + +use std::collections::HashMap; +use std::path::PathBuf; + +use anyhow::{Context, Result}; +use clap::Args; +use wfe_server_protos::wfe::v1::RegisterWorkflowRequest; + +use crate::client::AuthClient; +use crate::output::{OutputFormat, render_table}; + +#[derive(Debug, Args)] +pub struct RegisterArgs { + /// Path to a workflow YAML file. + pub file: PathBuf, + /// Config interpolation values: `key=value`. Repeatable. + #[arg(long = "config", short = 'c', value_parser = parse_kv)] + pub config: Vec<(String, String)>, +} + +fn parse_kv(raw: &str) -> Result<(String, String), String> { + raw.split_once('=') + .map(|(k, v)| (k.to_string(), v.to_string())) + .ok_or_else(|| format!("expected key=value, got: {raw}")) +} + +pub async fn run(args: RegisterArgs, mut client: AuthClient, format: OutputFormat) -> Result<()> { + let yaml = std::fs::read_to_string(&args.file) + .with_context(|| format!("failed to read {}", args.file.display()))?; + + let config: HashMap = args.config.into_iter().collect(); + + let resp = client + .register_workflow(RegisterWorkflowRequest { yaml, config }) + .await? + .into_inner(); + + if matches!(format, OutputFormat::Json) { + let json = serde_json::json!({ + "definitions": resp.definitions.iter().map(|d| serde_json::json!({ + "id": d.definition_id, + "name": d.name, + "version": d.version, + "step_count": d.step_count, + })).collect::>(), + }); + println!("{}", serde_json::to_string_pretty(&json)?); + } else { + let rows: Vec> = resp + .definitions + .iter() + .map(|d| { + let display = if d.name.is_empty() { + d.definition_id.clone() + } else { + d.name.clone() + }; + vec![ + display, + d.definition_id.clone(), + d.version.to_string(), + d.step_count.to_string(), + ] + }) + .collect(); + println!( + "{}", + render_table(&["Name", "ID", "Version", "Steps"], &rows) + ); + println!("Registered {} workflow(s)", resp.definitions.len()); + } + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn parse_kv_valid() { + let (k, v) = parse_kv("foo=bar").unwrap(); + assert_eq!(k, "foo"); + assert_eq!(v, "bar"); + } + + #[test] + fn parse_kv_with_equals_in_value() { + let (k, v) = parse_kv("k=a=b=c").unwrap(); + assert_eq!(k, "k"); + assert_eq!(v, "a=b=c"); + } + + #[test] + fn parse_kv_missing_equals() { + assert!(parse_kv("invalid").is_err()); + } +} diff --git a/wfectl/src/commands/resume.rs b/wfectl/src/commands/resume.rs new file mode 100644 index 0000000..2dd51c6 --- /dev/null +++ b/wfectl/src/commands/resume.rs @@ -0,0 +1,23 @@ +//! `wfectl resume ` -- resume a suspended workflow. + +use anyhow::Result; +use clap::Args; +use wfe_server_protos::wfe::v1::ResumeWorkflowRequest; + +use crate::client::AuthClient; + +#[derive(Debug, Args)] +pub struct ResumeArgs { + /// Workflow instance identifier — UUID or human-friendly name (e.g. "ci-42"). + pub workflow_id: String, +} + +pub async fn run(args: ResumeArgs, mut client: AuthClient) -> Result<()> { + client + .resume_workflow(ResumeWorkflowRequest { + workflow_id: args.workflow_id.clone(), + }) + .await?; + println!("✓ Resumed workflow {}", args.workflow_id); + Ok(()) +} diff --git a/wfectl/src/commands/run.rs b/wfectl/src/commands/run.rs new file mode 100644 index 0000000..bde33a3 --- /dev/null +++ b/wfectl/src/commands/run.rs @@ -0,0 +1,69 @@ +//! `wfectl run ` -- start a new workflow instance. + +use std::path::PathBuf; + +use anyhow::{Context, Result}; +use clap::Args; +use wfe_server_protos::wfe::v1::StartWorkflowRequest; + +use crate::client::AuthClient; +use crate::output::OutputFormat; +use crate::struct_util::json_object_to_struct; + +#[derive(Debug, Args)] +pub struct RunArgs { + /// Workflow definition ID. + pub definition_id: String, + /// Workflow version (default: 1). + #[arg(long, default_value_t = 1)] + pub version: u32, + /// Path to a JSON file with input data. + #[arg(long)] + pub data: Option, + /// Inline JSON data (overrides --data). + #[arg(long)] + pub data_json: Option, + /// Human-friendly name for this instance. Must be unique across all + /// workflow instances. Leave unset to let the server auto-assign + /// `{definition_id}-{N}` using a per-definition monotonic counter. + #[arg(long)] + pub name: Option, +} + +pub async fn run(args: RunArgs, mut client: AuthClient, format: OutputFormat) -> Result<()> { + let data_json = match (args.data_json.as_ref(), args.data.as_ref()) { + (Some(json), _) => json.clone(), + (None, Some(path)) => std::fs::read_to_string(path) + .with_context(|| format!("failed to read data file {}", path.display()))?, + (None, None) => "{}".to_string(), + }; + + let json_value: serde_json::Value = + serde_json::from_str(&data_json).context("data must be valid JSON")?; + let data = json_object_to_struct(&json_value); + + let resp = client + .start_workflow(StartWorkflowRequest { + definition_id: args.definition_id.clone(), + version: args.version, + data: Some(data), + name: args.name.unwrap_or_default(), + }) + .await? + .into_inner(); + + if matches!(format, OutputFormat::Json) { + let json = serde_json::json!({ + "workflow_id": resp.workflow_id, + "name": resp.name, + "definition_id": args.definition_id, + "version": args.version, + }); + println!("{}", serde_json::to_string_pretty(&json)?); + } else { + println!("Started workflow: {}", resp.name); + println!(" ID: {}", resp.workflow_id); + println!(" Definition: {} v{}", args.definition_id, args.version); + } + Ok(()) +} diff --git a/wfectl/src/commands/search_logs.rs b/wfectl/src/commands/search_logs.rs new file mode 100644 index 0000000..7635e66 --- /dev/null +++ b/wfectl/src/commands/search_logs.rs @@ -0,0 +1,97 @@ +//! `wfectl search-logs ` -- full-text search log lines. + +use anyhow::Result; +use clap::{Args, ValueEnum}; +use wfe_server_protos::wfe::v1::{LogStream, SearchLogsRequest}; + +use crate::client::AuthClient; +use crate::output::{OutputFormat, fmt_proto_time, render_table}; + +#[derive(Debug, Args)] +pub struct SearchLogsArgs { + /// Full-text search query. + pub query: String, + /// Filter to a specific workflow. + #[arg(long)] + pub workflow: Option, + /// Filter to a specific step. + #[arg(long)] + pub step: Option, + /// Filter to stdout or stderr. + #[arg(long)] + pub stream: Option, + /// Maximum number of results. + #[arg(long, default_value_t = 50)] + pub limit: u64, + /// Skip the first N results. + #[arg(long, default_value_t = 0)] + pub skip: u64, +} + +#[derive(Debug, Clone, Copy, ValueEnum)] +#[value(rename_all = "lowercase")] +pub enum StreamFilter { + Stdout, + Stderr, +} + +impl From for LogStream { + fn from(s: StreamFilter) -> Self { + match s { + StreamFilter::Stdout => LogStream::Stdout, + StreamFilter::Stderr => LogStream::Stderr, + } + } +} + +pub async fn run(args: SearchLogsArgs, mut client: AuthClient, format: OutputFormat) -> Result<()> { + let stream_filter: LogStream = args + .stream + .map(Into::into) + .unwrap_or(LogStream::Unspecified); + let resp = client + .search_logs(SearchLogsRequest { + query: args.query, + workflow_id: args.workflow.unwrap_or_default(), + step_name: args.step.unwrap_or_default(), + stream_filter: stream_filter as i32, + skip: args.skip, + take: args.limit, + }) + .await? + .into_inner(); + + if matches!(format, OutputFormat::Json) { + let json = serde_json::json!({ + "total": resp.total, + "results": resp.results.iter().map(|r| serde_json::json!({ + "workflow_id": r.workflow_id, + "definition_id": r.definition_id, + "step_name": r.step_name, + "stream": r.stream, + "line": r.line, + "timestamp": r.timestamp.as_ref().map(fmt_proto_time), + })).collect::>(), + }); + println!("{}", serde_json::to_string_pretty(&json)?); + } else { + let rows: Vec> = resp + .results + .iter() + .map(|r| { + vec![ + r.timestamp.as_ref().map(fmt_proto_time).unwrap_or_default(), + r.workflow_id.clone(), + r.step_name.clone(), + r.line.clone(), + ] + }) + .collect(); + println!( + "{}", + render_table(&["Time", "Workflow", "Step", "Line"], &rows) + ); + println!("{} of {} result(s)", resp.results.len(), resp.total); + } + Ok(()) +} diff --git a/wfectl/src/commands/suspend.rs b/wfectl/src/commands/suspend.rs new file mode 100644 index 0000000..8e31b82 --- /dev/null +++ b/wfectl/src/commands/suspend.rs @@ -0,0 +1,23 @@ +//! `wfectl suspend ` -- pause a running workflow. + +use anyhow::Result; +use clap::Args; +use wfe_server_protos::wfe::v1::SuspendWorkflowRequest; + +use crate::client::AuthClient; + +#[derive(Debug, Args)] +pub struct SuspendArgs { + /// Workflow instance identifier — UUID or human-friendly name (e.g. "ci-42"). + pub workflow_id: String, +} + +pub async fn run(args: SuspendArgs, mut client: AuthClient) -> Result<()> { + client + .suspend_workflow(SuspendWorkflowRequest { + workflow_id: args.workflow_id.clone(), + }) + .await?; + println!("✓ Suspended workflow {}", args.workflow_id); + Ok(()) +} diff --git a/wfectl/src/commands/validate.rs b/wfectl/src/commands/validate.rs new file mode 100644 index 0000000..7a9c046 --- /dev/null +++ b/wfectl/src/commands/validate.rs @@ -0,0 +1,173 @@ +//! `wfectl validate ` -- locally compile a workflow YAML file. +//! +//! Validation runs in-process via `wfe_yaml::load_workflow_from_str`, which +//! is the exact same compile path the server uses at registration time. The +//! wfectl crate enables the full executor feature set (kubernetes, deno, +//! buildkit, containerd, rustlang) so every step type is recognized. No +//! server round-trip, no auth required — giving users instant feedback +//! before they push to a shared host. + +use std::collections::HashMap; +use std::path::PathBuf; + +use anyhow::{Context, Result}; +use clap::Args; + +use crate::output::{OutputFormat, render_table}; + +#[derive(Debug, Args)] +pub struct ValidateArgs { + /// Path to a workflow YAML file. + pub file: PathBuf, + /// Config interpolation values: `key=value`. Repeatable. Mirrors + /// `wfectl register` so validation sees the same interpolated text. + #[arg(long = "config", short = 'c', value_parser = parse_kv)] + pub config: Vec<(String, String)>, +} + +fn parse_kv(raw: &str) -> Result<(String, String), String> { + raw.split_once('=') + .map(|(k, v)| (k.to_string(), v.to_string())) + .ok_or_else(|| format!("expected key=value, got: {raw}")) +} + +pub async fn run(args: ValidateArgs, format: OutputFormat) -> Result<()> { + let yaml = std::fs::read_to_string(&args.file) + .with_context(|| format!("failed to read {}", args.file.display()))?; + + // wfe_yaml takes a JSON-valued config map because YAML interpolation + // supports non-string types; wrap every CLI-supplied value as a string. + let config: HashMap = args + .config + .into_iter() + .map(|(k, v)| (k, serde_json::Value::String(v))) + .collect(); + + let compiled = wfe_yaml::load_workflow_from_str(&yaml, &config) + .with_context(|| format!("YAML compilation failed for {}", args.file.display()))?; + + if matches!(format, OutputFormat::Json) { + let json = serde_json::json!({ + "file": args.file.display().to_string(), + "definitions": compiled.iter().map(|c| serde_json::json!({ + "id": c.definition.id, + "name": c.definition.name, + "version": c.definition.version, + "description": c.definition.description, + "step_count": c.definition.steps.len(), + })).collect::>(), + }); + println!("{}", serde_json::to_string_pretty(&json)?); + } else { + let rows: Vec> = compiled + .iter() + .map(|c| { + let display = c + .definition + .name + .clone() + .unwrap_or_else(|| c.definition.id.clone()); + vec![ + display, + c.definition.id.clone(), + c.definition.version.to_string(), + c.definition.steps.len().to_string(), + ] + }) + .collect(); + println!( + "{}", + render_table(&["Name", "ID", "Version", "Steps"], &rows) + ); + println!("✓ {} valid workflow(s)", compiled.len()); + } + + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn parse_kv_valid() { + let (k, v) = parse_kv("foo=bar").unwrap(); + assert_eq!(k, "foo"); + assert_eq!(v, "bar"); + } + + #[test] + fn parse_kv_missing_equals() { + assert!(parse_kv("invalid").is_err()); + } + + #[tokio::test] + async fn validate_accepts_simple_workflow() { + // Use a `kubernetes` step because the validator enables the full + // executor feature set. We don't actually run the step, only + // compile the definition. + let tmp = tempfile::NamedTempFile::new().unwrap(); + std::fs::write( + tmp.path(), + r#" +workflow: + id: simple + name: "Simple Test Workflow" + version: 1 + steps: + - name: hello + type: kubernetes + config: + image: alpine:3.19 + run: echo hello +"#, + ) + .unwrap(); + + let args = ValidateArgs { + file: tmp.path().to_path_buf(), + config: vec![], + }; + run(args, OutputFormat::Json).await.unwrap(); + } + + #[tokio::test] + async fn validate_rejects_unknown_step_type() { + let tmp = tempfile::NamedTempFile::new().unwrap(); + std::fs::write( + tmp.path(), + r#" +workflow: + id: broken + version: 1 + steps: + - name: nope + type: not-a-real-step-type + config: {} +"#, + ) + .unwrap(); + + let args = ValidateArgs { + file: tmp.path().to_path_buf(), + config: vec![], + }; + let err = run(args, OutputFormat::Table).await.unwrap_err(); + let msg = format!("{err:#}"); + assert!( + msg.contains("Unknown step type") || msg.contains("compilation failed"), + "unexpected error: {msg}" + ); + } + + #[tokio::test] + async fn validate_rejects_missing_file() { + let args = ValidateArgs { + file: PathBuf::from("/definitely/does/not/exist.yaml"), + config: vec![], + }; + let err = run(args, OutputFormat::Table).await.unwrap_err(); + let msg = format!("{err:#}"); + assert!(msg.contains("failed to read"), "unexpected error: {msg}"); + } +} diff --git a/wfectl/src/commands/watch.rs b/wfectl/src/commands/watch.rs new file mode 100644 index 0000000..d79ccd8 --- /dev/null +++ b/wfectl/src/commands/watch.rs @@ -0,0 +1,82 @@ +//! `wfectl watch []` -- stream lifecycle events. + +use anyhow::Result; +use clap::Args; +use futures::StreamExt; +use wfe_server_protos::wfe::v1::{LifecycleEventType, WatchLifecycleRequest}; + +use crate::client::AuthClient; +use crate::output::fmt_proto_time; + +#[derive(Debug, Args)] +pub struct WatchArgs { + /// Optional workflow ID to filter to. Empty = all workflows. + pub workflow_id: Option, +} + +pub async fn run(args: WatchArgs, mut client: AuthClient) -> Result<()> { + let mut stream = client + .watch_lifecycle(WatchLifecycleRequest { + workflow_id: args.workflow_id.unwrap_or_default(), + }) + .await? + .into_inner(); + + while let Some(event) = stream.next().await { + let event = event?; + let ts = event + .event_time + .as_ref() + .map(fmt_proto_time) + .unwrap_or_default(); + let event_type = format_event_type(event.event_type); + let mut line = format!( + "[{ts}] [{event_type}] workflow={} def={} v={}", + event.workflow_id, event.definition_id, event.version + ); + if !event.step_name.is_empty() { + line.push_str(&format!(" step={}", event.step_name)); + } + if !event.error_message.is_empty() { + line.push_str(&format!(" error={}", event.error_message)); + } + println!("{line}"); + } + Ok(()) +} + +fn format_event_type(t: i32) -> &'static str { + match LifecycleEventType::try_from(t).unwrap_or(LifecycleEventType::Unspecified) { + LifecycleEventType::Started => "STARTED", + LifecycleEventType::Completed => "COMPLETED", + LifecycleEventType::Terminated => "TERMINATED", + LifecycleEventType::Suspended => "SUSPENDED", + LifecycleEventType::Resumed => "RESUMED", + LifecycleEventType::Error => "ERROR", + LifecycleEventType::StepStarted => "STEP_STARTED", + LifecycleEventType::StepCompleted => "STEP_COMPLETED", + LifecycleEventType::Unspecified => "UNKNOWN", + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn format_all_event_types() { + assert_eq!( + format_event_type(LifecycleEventType::Started as i32), + "STARTED" + ); + assert_eq!( + format_event_type(LifecycleEventType::Completed as i32), + "COMPLETED" + ); + assert_eq!( + format_event_type(LifecycleEventType::StepCompleted as i32), + "STEP_COMPLETED" + ); + assert_eq!(format_event_type(999), "UNKNOWN"); + } +} diff --git a/wfectl/src/commands/whoami.rs b/wfectl/src/commands/whoami.rs new file mode 100644 index 0000000..91569f8 --- /dev/null +++ b/wfectl/src/commands/whoami.rs @@ -0,0 +1,70 @@ +//! `wfectl whoami` -- print current user identity from cached token. + +use anyhow::Result; +use clap::Args; + +use crate::auth; +use crate::config; +use crate::output::{OutputFormat, render_kv}; + +#[derive(Debug, Args)] +pub struct WhoamiArgs { + /// OIDC issuer to inspect (defaults to configured issuer). + #[arg(long)] + pub issuer: Option, +} + +pub async fn run( + args: WhoamiArgs, + server_cfg: &config::Config, + format: OutputFormat, +) -> Result<()> { + let issuer = args.issuer.unwrap_or_else(|| server_cfg.issuer.clone()); + let domain = auth::domain_from_issuer(&issuer)?; + + let token = match auth::load_token(&domain)? { + Some(t) => t, + None => { + println!("Not logged in to {domain}. Run `wfectl login` first."); + return Ok(()); + } + }; + + let claims = token.id_claims(); + + if matches!(format, OutputFormat::Json) { + let json = serde_json::json!({ + "domain": token.domain, + "issuer": token.issuer, + "expires_at": token.expires_at, + "claims": claims, + }); + println!("{}", serde_json::to_string_pretty(&json)?); + return Ok(()); + } + + let mut fields = vec![ + ("Domain", token.domain.clone()), + ("Issuer", token.issuer.clone()), + ("Expires", crate::output::fmt_time(&token.expires_at)), + ]; + if let Some(claims) = &claims { + if let Some(email) = claims.get("email").and_then(|v| v.as_str()) { + fields.push(("Email", email.to_string())); + } + if let Some(name) = claims.get("name").and_then(|v| v.as_str()) { + fields.push(("Name", name.to_string())); + } + if let Some(groups) = claims.get("groups").and_then(|v| v.as_array()) { + let joined = groups + .iter() + .filter_map(|g| g.as_str()) + .collect::>() + .join(", "); + fields.push(("Groups", joined)); + } + } + let display: Vec<(&str, String)> = fields.iter().map(|(k, v)| (*k, v.clone())).collect(); + println!("{}", render_kv(&display)); + Ok(()) +} diff --git a/wfectl/src/config.rs b/wfectl/src/config.rs new file mode 100644 index 0000000..2e4924b --- /dev/null +++ b/wfectl/src/config.rs @@ -0,0 +1,187 @@ +//! Persistent configuration loaded from `~/.config/wfectl/config.toml`. +//! +//! Resolution precedence: CLI flag > env var > config file > built-in default. + +use std::path::PathBuf; + +use anyhow::{Context, Result}; +use serde::{Deserialize, Serialize}; + +/// Default wfe-server endpoint (Pingora terminates TLS, h2c upstream). +pub const DEFAULT_SERVER: &str = "https://builds.sunbeam.pt:443"; +/// Default OIDC issuer. +pub const DEFAULT_ISSUER: &str = "https://auth.sunbeam.pt/"; + +/// Persisted user configuration. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Config { + #[serde(default = "default_server")] + pub server: String, + #[serde(default = "default_issuer")] + pub issuer: String, + #[serde(default)] + pub default_format: OutputFormatPref, +} + +impl Default for Config { + fn default() -> Self { + Self { + server: default_server(), + issuer: default_issuer(), + default_format: OutputFormatPref::default(), + } + } +} + +#[derive(Debug, Clone, Copy, Default, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "lowercase")] +pub enum OutputFormatPref { + #[default] + Table, + Json, +} + +fn default_server() -> String { + DEFAULT_SERVER.into() +} +fn default_issuer() -> String { + DEFAULT_ISSUER.into() +} + +/// Path to the config file. +pub fn config_path() -> PathBuf { + let base = dirs::config_dir().unwrap_or_else(|| PathBuf::from(".config")); + base.join("wfectl/config.toml") +} + +/// Load the config file. Returns the default config if the file doesn't exist. +pub fn load() -> Result { + let path = config_path(); + if !path.exists() { + return Ok(Config::default()); + } + let bytes = std::fs::read_to_string(&path) + .with_context(|| format!("failed to read config at {}", path.display()))?; + let config: Config = toml::from_str(&bytes) + .with_context(|| format!("failed to parse config at {}", path.display()))?; + Ok(config) +} + +/// Save the config to disk, creating parent directories as needed. +pub fn save(config: &Config) -> Result<()> { + let path = config_path(); + if let Some(parent) = path.parent() { + std::fs::create_dir_all(parent) + .with_context(|| format!("failed to create config dir {}", parent.display()))?; + } + let toml = toml::to_string_pretty(config).context("failed to serialize config")?; + std::fs::write(&path, toml) + .with_context(|| format!("failed to write config to {}", path.display()))?; + Ok(()) +} + +/// Resolve a setting with CLI > env > file > default precedence. +pub fn resolve>(cli: Option, env_key: &str, file_value: &str) -> String { + if let Some(v) = cli { + return v.as_ref().to_string(); + } + if let Ok(v) = std::env::var(env_key) { + if !v.is_empty() { + return v; + } + } + file_value.to_string() +} + +#[cfg(test)] +mod tests { + use super::*; + use pretty_assertions::assert_eq; + + #[test] + fn config_default_values() { + let config = Config::default(); + assert_eq!(config.server, DEFAULT_SERVER); + assert_eq!(config.issuer, DEFAULT_ISSUER); + assert_eq!(config.default_format, OutputFormatPref::Table); + } + + #[test] + fn config_serde_round_trip() { + let original = Config { + server: "http://localhost:50051".into(), + issuer: "https://auth.dev.com/".into(), + default_format: OutputFormatPref::Json, + }; + let toml_str = toml::to_string(&original).unwrap(); + let parsed: Config = toml::from_str(&toml_str).unwrap(); + assert_eq!(parsed.server, original.server); + assert_eq!(parsed.issuer, original.issuer); + assert_eq!(parsed.default_format, original.default_format); + } + + #[test] + fn config_partial_uses_defaults() { + let toml_str = r#"server = "http://other:50051""#; + let config: Config = toml::from_str(toml_str).unwrap(); + assert_eq!(config.server, "http://other:50051"); + assert_eq!(config.issuer, DEFAULT_ISSUER); + } + + #[test] + fn resolve_prefers_cli() { + unsafe { std::env::set_var("WFECTL_TEST_X", "from_env") }; + let v = resolve(Some("from_cli"), "WFECTL_TEST_X", "from_file"); + assert_eq!(v, "from_cli"); + unsafe { std::env::remove_var("WFECTL_TEST_X") }; + } + + #[test] + fn resolve_prefers_env_when_no_cli() { + unsafe { std::env::set_var("WFECTL_TEST_Y", "from_env") }; + let v = resolve(None::<&str>, "WFECTL_TEST_Y", "from_file"); + assert_eq!(v, "from_env"); + unsafe { std::env::remove_var("WFECTL_TEST_Y") }; + } + + #[test] + fn resolve_falls_back_to_file() { + unsafe { std::env::remove_var("WFECTL_TEST_Z") }; + let v = resolve(None::<&str>, "WFECTL_TEST_Z", "from_file"); + assert_eq!(v, "from_file"); + } + + #[test] + fn resolve_treats_empty_env_as_unset() { + unsafe { std::env::set_var("WFECTL_TEST_EMPTY", "") }; + let v = resolve(None::<&str>, "WFECTL_TEST_EMPTY", "from_file"); + assert_eq!(v, "from_file"); + unsafe { std::env::remove_var("WFECTL_TEST_EMPTY") }; + } + + #[test] + fn save_load_round_trip_in_temp_home() { + let tmp = tempfile::tempdir().unwrap(); + unsafe { std::env::set_var("XDG_CONFIG_HOME", tmp.path()) }; + + let config = Config { + server: "http://localhost:50051".into(), + issuer: "https://auth.local/".into(), + default_format: OutputFormatPref::Json, + }; + + save(&config).unwrap(); + let loaded = load().unwrap(); + assert_eq!(loaded.server, "http://localhost:50051"); + assert_eq!(loaded.default_format, OutputFormatPref::Json); + } + + #[test] + fn load_returns_default_when_missing() { + let tmp = tempfile::tempdir().unwrap(); + unsafe { std::env::set_var("XDG_CONFIG_HOME", tmp.path()) }; + let config = load().unwrap(); + // Should match defaults. + assert_eq!(config.server, DEFAULT_SERVER); + } +} diff --git a/wfectl/src/lib.rs b/wfectl/src/lib.rs new file mode 100644 index 0000000..12cf973 --- /dev/null +++ b/wfectl/src/lib.rs @@ -0,0 +1,8 @@ +//! wfectl: command-line client for wfe-server. + +pub mod auth; +pub mod client; +pub mod commands; +pub mod config; +pub mod output; +pub mod struct_util; diff --git a/wfectl/src/main.rs b/wfectl/src/main.rs new file mode 100644 index 0000000..1256eaf --- /dev/null +++ b/wfectl/src/main.rs @@ -0,0 +1,139 @@ +//! wfectl: command-line client for wfe-server. + +use anyhow::Result; +use clap::{Parser, Subcommand}; +use tracing_subscriber::EnvFilter; + +use wfectl::client::build as build_client; +use wfectl::commands::{ + auth::resolve_token, cancel, definitions, get, list, login, logout, logs, publish, register, + resume, run, search_logs, suspend, validate, watch, whoami, +}; +use wfectl::config; +use wfectl::output::OutputFormat; + +#[derive(Debug, Parser)] +#[command( + name = "wfectl", + version, + about = "Command-line client for wfe-server", + long_about = "Authenticate, register, run, monitor, and manage WFE workflows from the terminal." +)] +struct Cli { + /// Override the wfe-server URL. + #[arg(long, env = "WFECTL_SERVER", global = true)] + server: Option, + + /// Override the OIDC issuer URL (used by login/whoami). + #[arg(long, env = "WFECTL_ISSUER", global = true)] + issuer: Option, + + /// Bearer token for direct auth (skips OIDC). Falls back to WFECTL_TOKEN env then cached login. + #[arg(long, env = "WFECTL_TOKEN", global = true)] + token: Option, + + /// Output format. + #[arg(short, long, value_enum, default_value_t = OutputFormat::Table, global = true)] + output: OutputFormat, + + #[command(subcommand)] + cmd: Command, +} + +#[derive(Debug, Subcommand)] +enum Command { + /// Run the OAuth2 PKCE login flow. + Login(login::LoginArgs), + /// Delete cached OIDC token. + Logout(logout::LogoutArgs), + /// Show current user identity. + Whoami(whoami::WhoamiArgs), + + /// Register a workflow definition from a YAML file. + Register(register::RegisterArgs), + /// Locally validate a workflow YAML file (no server round-trip). + Validate(validate::ValidateArgs), + /// Manage registered workflow definitions. + Definitions(definitions::DefinitionsArgs), + + /// Start a new workflow instance. + Run(run::RunArgs), + /// Get a workflow instance by ID. + Get(get::GetArgs), + /// List/search workflow instances. + List(list::ListArgs), + /// Cancel a running workflow. + Cancel(cancel::CancelArgs), + /// Suspend a running workflow. + Suspend(suspend::SuspendArgs), + /// Resume a suspended workflow. + Resume(resume::ResumeArgs), + + /// Publish an event to waiting workflows. + Publish(publish::PublishArgs), + /// Stream lifecycle events. + Watch(watch::WatchArgs), + /// Stream step logs. + Logs(logs::LogsArgs), + /// Full-text search log lines. + SearchLogs(search_logs::SearchLogsArgs), +} + +#[tokio::main] +async fn main() -> Result<()> { + tracing_subscriber::fmt() + .with_env_filter( + EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("warn")), + ) + .with_writer(std::io::stderr) + .init(); + + let cli = Cli::parse(); + let mut cfg = config::load().unwrap_or_default(); + if let Some(s) = cli.server.clone() { + cfg.server = s; + } + if let Some(i) = cli.issuer.clone() { + cfg.issuer = i; + } + + match cli.cmd { + // --- Commands that don't need a gRPC client --- + Command::Login(args) => login::run(args, &cfg).await, + Command::Logout(args) => logout::run(args, &cfg).await, + Command::Whoami(args) => whoami::run(args, &cfg, cli.output).await, + Command::Validate(args) => validate::run(args, cli.output).await, + + // --- Commands that need an authenticated gRPC client --- + cmd => { + let token = resolve_token(cli.token.as_deref(), &cfg.issuer).await?; + let client = build_client(&cfg.server, &token).await?; + dispatch(cmd, client, cli.output).await + } + } +} + +async fn dispatch( + cmd: Command, + client: wfectl::client::AuthClient, + format: OutputFormat, +) -> Result<()> { + match cmd { + Command::Register(args) => register::run(args, client, format).await, + Command::Definitions(args) => definitions::run(args, client, format).await, + Command::Run(args) => run::run(args, client, format).await, + Command::Get(args) => get::run(args, client, format).await, + Command::List(args) => list::run(args, client, format).await, + Command::Cancel(args) => cancel::run(args, client).await, + Command::Suspend(args) => suspend::run(args, client).await, + Command::Resume(args) => resume::run(args, client).await, + Command::Publish(args) => publish::run(args, client, format).await, + Command::Watch(args) => watch::run(args, client).await, + Command::Logs(args) => logs::run(args, client).await, + Command::SearchLogs(args) => search_logs::run(args, client, format).await, + Command::Login(_) + | Command::Logout(_) + | Command::Whoami(_) + | Command::Validate(_) => unreachable!(), + } +} diff --git a/wfectl/src/output.rs b/wfectl/src/output.rs new file mode 100644 index 0000000..a2e3fe0 --- /dev/null +++ b/wfectl/src/output.rs @@ -0,0 +1,177 @@ +//! Output formatting helpers (table + JSON). + +use clap::ValueEnum; +use comfy_table::{Cell, ContentArrangement, Table, presets::UTF8_FULL}; +use serde::Serialize; + +#[derive(Debug, Clone, Copy, ValueEnum, PartialEq, Eq)] +#[value(rename_all = "lowercase")] +pub enum OutputFormat { + Table, + Json, +} + +impl Default for OutputFormat { + fn default() -> Self { + OutputFormat::Table + } +} + +/// Render a list of rows as a table or JSON depending on `format`. +pub fn render_list( + format: OutputFormat, + headers: &[&str], + rows: &[Vec], + items: &[T], +) -> String { + match format { + OutputFormat::Json => { + serde_json::to_string_pretty(items).unwrap_or_else(|e| format!("error: {e}")) + } + OutputFormat::Table => render_table(headers, rows), + } +} + +/// Render a single object as a key-value table or JSON. +pub fn render_object( + format: OutputFormat, + fields: &[(&str, String)], + item: &T, +) -> String { + match format { + OutputFormat::Json => { + serde_json::to_string_pretty(item).unwrap_or_else(|e| format!("error: {e}")) + } + OutputFormat::Table => render_kv(fields), + } +} + +/// Build a comfy-table from headers + rows. +pub fn render_table(headers: &[&str], rows: &[Vec]) -> String { + let mut table = Table::new(); + table + .load_preset(UTF8_FULL) + .set_content_arrangement(ContentArrangement::Dynamic) + .set_header( + headers + .iter() + .map(|h| Cell::new(h).fg(comfy_table::Color::Cyan)), + ); + for row in rows { + table.add_row(row.iter().map(Cell::new).collect::>()); + } + table.to_string() +} + +/// Build a key-value layout (one row per field). +pub fn render_kv(fields: &[(&str, String)]) -> String { + let mut table = Table::new(); + table + .load_preset(UTF8_FULL) + .set_content_arrangement(ContentArrangement::Dynamic); + for (key, value) in fields { + table.add_row(vec![ + Cell::new(key).fg(comfy_table::Color::Cyan), + Cell::new(value), + ]); + } + table.to_string() +} + +/// Format a chrono UTC timestamp as a short ISO 8601 string. +pub fn fmt_time(ts: &chrono::DateTime) -> String { + ts.format("%Y-%m-%d %H:%M:%S").to_string() +} + +/// Format a prost Timestamp. +pub fn fmt_proto_time(ts: &prost_types::Timestamp) -> String { + let dt = chrono::DateTime::::from_timestamp(ts.seconds, ts.nanos as u32); + match dt { + Some(dt) => fmt_time(&dt), + None => "".into(), + } +} + +#[cfg(test)] +mod tests { + use super::*; + use serde_json::json; + + #[test] + fn output_format_default_is_table() { + assert_eq!(OutputFormat::default(), OutputFormat::Table); + } + + #[test] + fn render_table_includes_headers() { + let out = render_table(&["a", "b"], &[vec!["1".into(), "2".into()]]); + assert!(out.contains("a")); + assert!(out.contains("b")); + assert!(out.contains("1")); + assert!(out.contains("2")); + } + + #[test] + fn render_table_empty_rows() { + let out = render_table(&["a", "b"], &[]); + assert!(out.contains("a")); + } + + #[test] + fn render_kv_layout() { + let out = render_kv(&[("Name", "Foo".into()), ("Status", "OK".into())]); + assert!(out.contains("Name")); + assert!(out.contains("Foo")); + assert!(out.contains("Status")); + assert!(out.contains("OK")); + } + + #[test] + fn render_list_json_serializes() { + let items = vec![json!({"a": 1}), json!({"a": 2})]; + let out = render_list(OutputFormat::Json, &[], &[], &items); + let parsed: serde_json::Value = serde_json::from_str(&out).unwrap(); + assert_eq!(parsed[0]["a"], 1); + } + + #[test] + fn render_list_table_uses_rows() { + let items: Vec = vec![]; + let out = render_list(OutputFormat::Table, &["x"], &[vec!["row1".into()]], &items); + assert!(out.contains("row1")); + } + + #[test] + fn render_object_json() { + let item = json!({"k": "v"}); + let out = render_object(OutputFormat::Json, &[], &item); + assert!(out.contains("\"k\"")); + assert!(out.contains("\"v\"")); + } + + #[test] + fn fmt_time_iso_format() { + let ts: chrono::DateTime = "2026-04-07T12:34:56Z".parse().unwrap(); + assert_eq!(fmt_time(&ts), "2026-04-07 12:34:56"); + } + + #[test] + fn fmt_proto_time_valid() { + let ts = prost_types::Timestamp { + seconds: 1_700_000_000, + nanos: 0, + }; + let out = fmt_proto_time(&ts); + assert!(out.starts_with("2023")); + } + + #[test] + fn fmt_proto_time_invalid() { + let ts = prost_types::Timestamp { + seconds: i64::MAX, + nanos: 0, + }; + let out = fmt_proto_time(&ts); + assert_eq!(out, ""); + } +} diff --git a/wfectl/src/struct_util.rs b/wfectl/src/struct_util.rs new file mode 100644 index 0000000..8c0f0ba --- /dev/null +++ b/wfectl/src/struct_util.rs @@ -0,0 +1,99 @@ +//! Conversions between `serde_json::Value` and `prost_types::Struct`. + +use prost_types::value::Kind; +use prost_types::{ListValue, Struct, Value}; + +/// Convert a `serde_json::Value` to a `prost_types::Value`. +pub fn json_to_prost(json: &serde_json::Value) -> Value { + let kind = match json { + serde_json::Value::Null => Kind::NullValue(0), + serde_json::Value::Bool(b) => Kind::BoolValue(*b), + serde_json::Value::Number(n) => Kind::NumberValue(n.as_f64().unwrap_or(0.0)), + serde_json::Value::String(s) => Kind::StringValue(s.clone()), + serde_json::Value::Array(arr) => Kind::ListValue(ListValue { + values: arr.iter().map(json_to_prost).collect(), + }), + serde_json::Value::Object(obj) => Kind::StructValue(Struct { + fields: obj + .iter() + .map(|(k, v)| (k.clone(), json_to_prost(v))) + .collect(), + }), + }; + Value { kind: Some(kind) } +} + +/// Convert a top-level JSON object into a `prost_types::Struct`. +pub fn json_object_to_struct(json: &serde_json::Value) -> Struct { + match json { + serde_json::Value::Object(map) => Struct { + fields: map + .iter() + .map(|(k, v)| (k.clone(), json_to_prost(v))) + .collect(), + }, + _ => Struct::default(), + } +} + +/// Convert a `prost_types::Value` back to `serde_json::Value`. +pub fn prost_to_json(value: &Value) -> serde_json::Value { + match &value.kind { + Some(Kind::NullValue(_)) | None => serde_json::Value::Null, + Some(Kind::BoolValue(b)) => serde_json::Value::Bool(*b), + Some(Kind::NumberValue(n)) => serde_json::Number::from_f64(*n) + .map(serde_json::Value::Number) + .unwrap_or(serde_json::Value::Null), + Some(Kind::StringValue(s)) => serde_json::Value::String(s.clone()), + Some(Kind::ListValue(list)) => { + serde_json::Value::Array(list.values.iter().map(prost_to_json).collect()) + } + Some(Kind::StructValue(s)) => prost_struct_to_json(s), + } +} + +/// Convert a `prost_types::Struct` to a `serde_json::Value::Object`. +pub fn prost_struct_to_json(s: &Struct) -> serde_json::Value { + serde_json::Value::Object( + s.fields + .iter() + .map(|(k, v)| (k.clone(), prost_to_json(v))) + .collect(), + ) +} + +#[cfg(test)] +mod tests { + use super::*; + use serde_json::json; + + #[test] + fn round_trip_object() { + let original = json!({ + "name": "test", + "count": 42, + "active": true, + "tags": ["a", "b"], + "nested": {"key": "value"} + }); + let s = json_object_to_struct(&original); + let back = prost_struct_to_json(&s); + assert_eq!(back["name"], "test"); + assert_eq!(back["count"], 42.0); // numbers become f64 + assert_eq!(back["active"], true); + assert_eq!(back["tags"][0], "a"); + assert_eq!(back["nested"]["key"], "value"); + } + + #[test] + fn json_to_prost_null() { + let v = json_to_prost(&serde_json::Value::Null); + assert!(matches!(v.kind, Some(Kind::NullValue(_)))); + } + + #[test] + fn json_object_to_struct_non_object_returns_empty() { + let s = json_object_to_struct(&json!("not an object")); + assert!(s.fields.is_empty()); + } +} diff --git a/wfectl/tests/auth_commands.rs b/wfectl/tests/auth_commands.rs new file mode 100644 index 0000000..cbf05f8 --- /dev/null +++ b/wfectl/tests/auth_commands.rs @@ -0,0 +1,194 @@ +//! Tests for `whoami` and `logout` commands using a temp HOME with a fake token. + +use chrono::Utc; +use std::path::PathBuf; +use std::sync::Mutex; + +use wfectl::auth::{StoredToken, save_token}; +use wfectl::commands::{logout, whoami}; +use wfectl::config::Config; +use wfectl::output::OutputFormat; + +/// Serialize HOME-mutating tests so they don't race. +static HOME_LOCK: Mutex<()> = Mutex::new(()); + +fn make_test_token(domain: &str) -> StoredToken { + StoredToken { + access_token: "ory_at_test".into(), + refresh_token: Some("ory_rt_test".into()), + // {"email":"alice@test.com","name":"Alice","groups":["admin","employee"]} + id_token: Some("h.eyJlbWFpbCI6ImFsaWNlQHRlc3QuY29tIiwibmFtZSI6IkFsaWNlIiwiZ3JvdXBzIjpbImFkbWluIiwiZW1wbG95ZWUiXX0.s".into()), + expires_at: Utc::now() + chrono::Duration::seconds(3600), + issuer: "https://auth.test.com/".into(), + domain: domain.into(), + } +} + +fn with_temp_home(f: F) { + let _g = HOME_LOCK.lock().unwrap_or_else(|p| p.into_inner()); + let tmp = tempfile::tempdir().unwrap(); + let saved_home = std::env::var("HOME").ok(); + unsafe { std::env::set_var("HOME", tmp.path()) }; + f(tmp.path().to_path_buf()); + if let Some(h) = saved_home { + unsafe { std::env::set_var("HOME", h) }; + } +} + +#[tokio::test] +async fn whoami_table_format_with_full_claims() { + with_temp_home(|_| {}); + // Re-acquire the lock for the async section. + let _g = HOME_LOCK.lock().unwrap_or_else(|p| p.into_inner()); + let tmp = tempfile::tempdir().unwrap(); + let saved_home = std::env::var("HOME").ok(); + unsafe { std::env::set_var("HOME", tmp.path()) }; + + let token = make_test_token("test.com"); + save_token(&token).unwrap(); + + let cfg = Config { + server: "http://localhost".into(), + issuer: "https://auth.test.com/".into(), + default_format: Default::default(), + }; + let args = whoami::WhoamiArgs { issuer: None }; + whoami::run(args, &cfg, OutputFormat::Table).await.unwrap(); + + if let Some(h) = saved_home { + unsafe { std::env::set_var("HOME", h) }; + } +} + +#[tokio::test] +async fn whoami_json_format() { + let _g = HOME_LOCK.lock().unwrap_or_else(|p| p.into_inner()); + let tmp = tempfile::tempdir().unwrap(); + let saved_home = std::env::var("HOME").ok(); + unsafe { std::env::set_var("HOME", tmp.path()) }; + + let token = make_test_token("test.com"); + save_token(&token).unwrap(); + + let cfg = Config { + server: "http://localhost".into(), + issuer: "https://auth.test.com/".into(), + default_format: Default::default(), + }; + let args = whoami::WhoamiArgs { issuer: None }; + whoami::run(args, &cfg, OutputFormat::Json).await.unwrap(); + + if let Some(h) = saved_home { + unsafe { std::env::set_var("HOME", h) }; + } +} + +#[tokio::test] +async fn whoami_when_not_logged_in() { + let _g = HOME_LOCK.lock().unwrap_or_else(|p| p.into_inner()); + let tmp = tempfile::tempdir().unwrap(); + let saved_home = std::env::var("HOME").ok(); + unsafe { std::env::set_var("HOME", tmp.path()) }; + + let cfg = Config { + server: "http://localhost".into(), + issuer: "https://auth.notlogged.in/".into(), + default_format: Default::default(), + }; + let args = whoami::WhoamiArgs { issuer: None }; + // Should not panic, just print a message. + whoami::run(args, &cfg, OutputFormat::Table).await.unwrap(); + + if let Some(h) = saved_home { + unsafe { std::env::set_var("HOME", h) }; + } +} + +#[tokio::test] +async fn whoami_with_explicit_issuer_arg() { + let _g = HOME_LOCK.lock().unwrap_or_else(|p| p.into_inner()); + let tmp = tempfile::tempdir().unwrap(); + let saved_home = std::env::var("HOME").ok(); + unsafe { std::env::set_var("HOME", tmp.path()) }; + + let token = make_test_token("override.com"); + save_token(&token).unwrap(); + + let cfg = Config::default(); + let args = whoami::WhoamiArgs { + issuer: Some("https://auth.override.com/".into()), + }; + whoami::run(args, &cfg, OutputFormat::Table).await.unwrap(); + + if let Some(h) = saved_home { + unsafe { std::env::set_var("HOME", h) }; + } +} + +#[tokio::test] +async fn logout_removes_token() { + let _g = HOME_LOCK.lock().unwrap_or_else(|p| p.into_inner()); + let tmp = tempfile::tempdir().unwrap(); + let saved_home = std::env::var("HOME").ok(); + unsafe { std::env::set_var("HOME", tmp.path()) }; + + let token = make_test_token("test.com"); + save_token(&token).unwrap(); + + let cfg = Config { + server: "http://localhost".into(), + issuer: "https://auth.test.com/".into(), + default_format: Default::default(), + }; + let args = logout::LogoutArgs { issuer: None }; + logout::run(args, &cfg).await.unwrap(); + + // Verify the token file is gone. + let path = wfectl::auth::token_path("test.com"); + assert!(!path.exists()); + + if let Some(h) = saved_home { + unsafe { std::env::set_var("HOME", h) }; + } +} + +#[tokio::test] +async fn logout_when_not_logged_in_is_noop() { + let _g = HOME_LOCK.lock().unwrap_or_else(|p| p.into_inner()); + let tmp = tempfile::tempdir().unwrap(); + let saved_home = std::env::var("HOME").ok(); + unsafe { std::env::set_var("HOME", tmp.path()) }; + + let cfg = Config { + server: "http://localhost".into(), + issuer: "https://auth.nothing.com/".into(), + default_format: Default::default(), + }; + let args = logout::LogoutArgs { issuer: None }; + logout::run(args, &cfg).await.unwrap(); + + if let Some(h) = saved_home { + unsafe { std::env::set_var("HOME", h) }; + } +} + +#[tokio::test] +async fn logout_with_explicit_issuer() { + let _g = HOME_LOCK.lock().unwrap_or_else(|p| p.into_inner()); + let tmp = tempfile::tempdir().unwrap(); + let saved_home = std::env::var("HOME").ok(); + unsafe { std::env::set_var("HOME", tmp.path()) }; + + let token = make_test_token("explicit.com"); + save_token(&token).unwrap(); + + let cfg = Config::default(); + let args = logout::LogoutArgs { + issuer: Some("https://auth.explicit.com/".into()), + }; + logout::run(args, &cfg).await.unwrap(); + + if let Some(h) = saved_home { + unsafe { std::env::set_var("HOME", h) }; + } +} diff --git a/wfectl/tests/auth_oidc.rs b/wfectl/tests/auth_oidc.rs new file mode 100644 index 0000000..2d96422 --- /dev/null +++ b/wfectl/tests/auth_oidc.rs @@ -0,0 +1,261 @@ +//! Tests for OAuth2/OIDC code paths in auth.rs that talk to a real (mock) HTTP server. + +use chrono::Utc; +use std::sync::Mutex; +use wiremock::matchers::{body_string_contains, method, path}; +use wiremock::{Mock, MockServer, ResponseTemplate}; + +use wfectl::auth::{StoredToken, discover, ensure_valid, exchange_code, refresh, save_token}; + +static HOME_LOCK: Mutex<()> = Mutex::new(()); + +fn discovery_body(server: &MockServer) -> serde_json::Value { + serde_json::json!({ + "issuer": server.uri(), + "authorization_endpoint": format!("{}/oauth2/auth", server.uri()), + "token_endpoint": format!("{}/oauth2/token", server.uri()), + }) +} + +#[tokio::test] +async fn discover_fetches_endpoints() { + let server = MockServer::start().await; + Mock::given(method("GET")) + .and(path("/.well-known/openid-configuration")) + .respond_with(ResponseTemplate::new(200).set_body_json(discovery_body(&server))) + .mount(&server) + .await; + + let doc = discover(&server.uri()).await.unwrap(); + assert!(doc.authorization_endpoint.ends_with("/oauth2/auth")); + assert!(doc.token_endpoint.ends_with("/oauth2/token")); +} + +#[tokio::test] +async fn discover_handles_404() { + let server = MockServer::start().await; + Mock::given(method("GET")) + .and(path("/.well-known/openid-configuration")) + .respond_with(ResponseTemplate::new(404)) + .mount(&server) + .await; + let result = discover(&server.uri()).await; + assert!(result.is_err()); +} + +#[tokio::test] +async fn discover_handles_invalid_json() { + let server = MockServer::start().await; + Mock::given(method("GET")) + .and(path("/.well-known/openid-configuration")) + .respond_with(ResponseTemplate::new(200).set_body_string("not json")) + .mount(&server) + .await; + let result = discover(&server.uri()).await; + assert!(result.is_err()); +} + +#[tokio::test] +async fn exchange_code_success() { + let server = MockServer::start().await; + Mock::given(method("POST")) + .and(path("/oauth2/token")) + .and(body_string_contains("grant_type=authorization_code")) + .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({ + "access_token": "ory_at_new", + "refresh_token": "ory_rt_new", + "id_token": "header.eyJzdWIiOiJ1c2VyIn0.sig", + "expires_in": 3600, + }))) + .mount(&server) + .await; + + let discovery = wfectl::auth::DiscoveryDoc { + authorization_endpoint: format!("{}/oauth2/auth", server.uri()), + token_endpoint: format!("{}/oauth2/token", server.uri()), + }; + let token = exchange_code( + &discovery, + "auth-code", + "verifier", + "http://127.0.0.1:9876/callback", + &server.uri(), + "test.com", + ) + .await + .unwrap(); + assert_eq!(token.access_token, "ory_at_new"); + assert_eq!(token.refresh_token, Some("ory_rt_new".into())); +} + +#[tokio::test] +async fn exchange_code_handles_error_response() { + let server = MockServer::start().await; + Mock::given(method("POST")) + .and(path("/oauth2/token")) + .respond_with(ResponseTemplate::new(400).set_body_json(serde_json::json!({ + "error": "invalid_grant", + }))) + .mount(&server) + .await; + + let discovery = wfectl::auth::DiscoveryDoc { + authorization_endpoint: format!("{}/oauth2/auth", server.uri()), + token_endpoint: format!("{}/oauth2/token", server.uri()), + }; + let result = exchange_code(&discovery, "bad", "v", "uri", &server.uri(), "test.com").await; + assert!(result.is_err()); +} + +#[tokio::test] +async fn refresh_token_success() { + let server = MockServer::start().await; + Mock::given(method("GET")) + .and(path("/.well-known/openid-configuration")) + .respond_with(ResponseTemplate::new(200).set_body_json(discovery_body(&server))) + .mount(&server) + .await; + Mock::given(method("POST")) + .and(path("/oauth2/token")) + .and(body_string_contains("grant_type=refresh_token")) + .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({ + "access_token": "ory_at_refreshed", + "expires_in": 3600, + }))) + .mount(&server) + .await; + + let token = StoredToken { + access_token: "ory_at_old".into(), + refresh_token: Some("ory_rt_old".into()), + id_token: None, + expires_at: Utc::now() - chrono::Duration::seconds(10), + issuer: server.uri(), + domain: "test.com".into(), + }; + let refreshed = refresh(&token).await.unwrap(); + assert_eq!(refreshed.access_token, "ory_at_refreshed"); + // Old refresh token preserved when new one isn't returned. + assert_eq!(refreshed.refresh_token, Some("ory_rt_old".into())); +} + +#[tokio::test] +async fn refresh_without_refresh_token_fails() { + let token = StoredToken { + access_token: "ory_at".into(), + refresh_token: None, + id_token: None, + expires_at: Utc::now(), + issuer: "https://auth.test.com/".into(), + domain: "test.com".into(), + }; + let result = refresh(&token).await; + assert!(result.is_err()); +} + +#[tokio::test] +async fn refresh_handles_token_endpoint_error() { + let server = MockServer::start().await; + Mock::given(method("GET")) + .and(path("/.well-known/openid-configuration")) + .respond_with(ResponseTemplate::new(200).set_body_json(discovery_body(&server))) + .mount(&server) + .await; + Mock::given(method("POST")) + .and(path("/oauth2/token")) + .respond_with(ResponseTemplate::new(401)) + .mount(&server) + .await; + + let token = StoredToken { + access_token: "ory_at_old".into(), + refresh_token: Some("ory_rt_old".into()), + id_token: None, + expires_at: Utc::now() - chrono::Duration::seconds(10), + issuer: server.uri(), + domain: "test.com".into(), + }; + let result = refresh(&token).await; + assert!(result.is_err()); +} + +#[tokio::test] +async fn ensure_valid_returns_existing_when_fresh() { + let _g = HOME_LOCK.lock().unwrap_or_else(|p| p.into_inner()); + let tmp = tempfile::tempdir().unwrap(); + let saved = std::env::var("HOME").ok(); + unsafe { std::env::set_var("HOME", tmp.path()) }; + + let token = StoredToken { + access_token: "ory_at_fresh".into(), + refresh_token: Some("ory_rt_x".into()), + id_token: None, + expires_at: Utc::now() + chrono::Duration::seconds(3600), + issuer: "https://auth.fresh.com/".into(), + domain: "fresh.com".into(), + }; + save_token(&token).unwrap(); + + let result = ensure_valid("fresh.com").await.unwrap(); + assert_eq!(result.access_token, "ory_at_fresh"); + + if let Some(h) = saved { + unsafe { std::env::set_var("HOME", h) }; + } +} + +#[tokio::test] +async fn ensure_valid_refreshes_when_stale() { + let _g = HOME_LOCK.lock().unwrap_or_else(|p| p.into_inner()); + let tmp = tempfile::tempdir().unwrap(); + let saved = std::env::var("HOME").ok(); + unsafe { std::env::set_var("HOME", tmp.path()) }; + + let server = MockServer::start().await; + Mock::given(method("GET")) + .and(path("/.well-known/openid-configuration")) + .respond_with(ResponseTemplate::new(200).set_body_json(discovery_body(&server))) + .mount(&server) + .await; + Mock::given(method("POST")) + .and(path("/oauth2/token")) + .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({ + "access_token": "ory_at_refreshed_via_ensure", + "expires_in": 3600, + }))) + .mount(&server) + .await; + + let domain = "stale.com"; + let token = StoredToken { + access_token: "ory_at_old".into(), + refresh_token: Some("ory_rt_old".into()), + id_token: None, + expires_at: Utc::now() - chrono::Duration::seconds(10), + issuer: server.uri(), + domain: domain.into(), + }; + save_token(&token).unwrap(); + + let result = ensure_valid(domain).await.unwrap(); + assert_eq!(result.access_token, "ory_at_refreshed_via_ensure"); + + if let Some(h) = saved { + unsafe { std::env::set_var("HOME", h) }; + } +} + +#[tokio::test] +async fn ensure_valid_errors_when_no_token() { + let _g = HOME_LOCK.lock().unwrap_or_else(|p| p.into_inner()); + let tmp = tempfile::tempdir().unwrap(); + let saved = std::env::var("HOME").ok(); + unsafe { std::env::set_var("HOME", tmp.path()) }; + + let result = ensure_valid("nonexistent.com").await; + assert!(result.is_err()); + + if let Some(h) = saved { + unsafe { std::env::set_var("HOME", h) }; + } +} diff --git a/wfectl/tests/commands.rs b/wfectl/tests/commands.rs new file mode 100644 index 0000000..31291ae --- /dev/null +++ b/wfectl/tests/commands.rs @@ -0,0 +1,401 @@ +//! Drive the wfectl command handlers against a stub gRPC server. This tests +//! the full command -> client -> server -> response -> formatter pipeline. + +mod stub; + +use std::path::PathBuf; + +use wfectl::client::build as build_client; +use wfectl::commands; +use wfectl::output::OutputFormat; + +use stub::spawn_stub; + +#[tokio::test] +async fn register_command_table_output() { + let (server, _seen) = spawn_stub().await; + let client = build_client(&server, "test-token").await.unwrap(); + + // Write a temp YAML file + let tmp = tempfile::NamedTempFile::new().unwrap(); + std::fs::write(tmp.path(), "workflow:\n id: x\n version: 1\n steps: []").unwrap(); + + let args = commands::register::RegisterArgs { + file: tmp.path().to_path_buf(), + config: vec![("key".into(), "val".into())], + }; + commands::register::run(args, client, OutputFormat::Table) + .await + .unwrap(); +} + +#[tokio::test] +async fn register_command_json_output() { + let (server, _seen) = spawn_stub().await; + let client = build_client(&server, "test-token").await.unwrap(); + let tmp = tempfile::NamedTempFile::new().unwrap(); + std::fs::write(tmp.path(), "workflow:\n id: x\n version: 1\n steps: []").unwrap(); + let args = commands::register::RegisterArgs { + file: tmp.path().to_path_buf(), + config: vec![], + }; + commands::register::run(args, client, OutputFormat::Json) + .await + .unwrap(); +} + +#[tokio::test] +async fn register_command_missing_file_errors() { + let (server, _seen) = spawn_stub().await; + let client = build_client(&server, "test-token").await.unwrap(); + let args = commands::register::RegisterArgs { + file: PathBuf::from("/nonexistent/file.yaml"), + config: vec![], + }; + let result = commands::register::run(args, client, OutputFormat::Table).await; + assert!(result.is_err()); +} + +#[tokio::test] +async fn definitions_list_table() { + let (server, _seen) = spawn_stub().await; + let client = build_client(&server, "test-token").await.unwrap(); + let args = commands::definitions::DefinitionsArgs { + cmd: commands::definitions::DefinitionsCmd::List, + }; + commands::definitions::run(args, client, OutputFormat::Table) + .await + .unwrap(); +} + +#[tokio::test] +async fn definitions_list_json() { + let (server, _seen) = spawn_stub().await; + let client = build_client(&server, "test-token").await.unwrap(); + let args = commands::definitions::DefinitionsArgs { + cmd: commands::definitions::DefinitionsCmd::List, + }; + commands::definitions::run(args, client, OutputFormat::Json) + .await + .unwrap(); +} + +#[tokio::test] +async fn run_command_with_inline_data() { + let (server, _seen) = spawn_stub().await; + let client = build_client(&server, "test-token").await.unwrap(); + let args = commands::run::RunArgs { + definition_id: "ci".into(), + version: 1, + data: None, + data_json: Some(r#"{"key":"value"}"#.into()), + name: None, + }; + commands::run::run(args, client, OutputFormat::Table) + .await + .unwrap(); +} + +#[tokio::test] +async fn run_command_with_data_file() { + let (server, _seen) = spawn_stub().await; + let client = build_client(&server, "test-token").await.unwrap(); + let tmp = tempfile::NamedTempFile::new().unwrap(); + std::fs::write(tmp.path(), r#"{"deploy":true}"#).unwrap(); + let args = commands::run::RunArgs { + definition_id: "ci".into(), + version: 2, + data: Some(tmp.path().to_path_buf()), + data_json: None, + name: None, + }; + commands::run::run(args, client, OutputFormat::Json) + .await + .unwrap(); +} + +#[tokio::test] +async fn run_command_no_data() { + let (server, _seen) = spawn_stub().await; + let client = build_client(&server, "test-token").await.unwrap(); + let args = commands::run::RunArgs { + definition_id: "ci".into(), + version: 1, + data: None, + data_json: None, + name: None, + }; + commands::run::run(args, client, OutputFormat::Table) + .await + .unwrap(); +} + +#[tokio::test] +async fn run_command_invalid_json_errors() { + let (server, _seen) = spawn_stub().await; + let client = build_client(&server, "test-token").await.unwrap(); + let args = commands::run::RunArgs { + definition_id: "ci".into(), + version: 1, + data: None, + data_json: Some("not json".into()), + name: None, + }; + let result = commands::run::run(args, client, OutputFormat::Table).await; + assert!(result.is_err()); +} + +#[tokio::test] +async fn get_command_table() { + let (server, _seen) = spawn_stub().await; + let client = build_client(&server, "test-token").await.unwrap(); + let args = commands::get::GetArgs { + workflow_id: "wf-1".into(), + }; + commands::get::run(args, client, OutputFormat::Table) + .await + .unwrap(); +} + +#[tokio::test] +async fn get_command_json() { + let (server, _seen) = spawn_stub().await; + let client = build_client(&server, "test-token").await.unwrap(); + let args = commands::get::GetArgs { + workflow_id: "wf-1".into(), + }; + commands::get::run(args, client, OutputFormat::Json) + .await + .unwrap(); +} + +#[tokio::test] +async fn list_command_no_filters() { + let (server, _seen) = spawn_stub().await; + let client = build_client(&server, "test-token").await.unwrap(); + let args = commands::list::ListArgs { + query: None, + status: None, + limit: 10, + skip: 0, + }; + commands::list::run(args, client, OutputFormat::Table) + .await + .unwrap(); +} + +#[tokio::test] +async fn list_command_all_filters() { + let (server, _seen) = spawn_stub().await; + let client = build_client(&server, "test-token").await.unwrap(); + let args = commands::list::ListArgs { + query: Some("foo".into()), + status: Some(commands::list::StatusFilter::Complete), + limit: 50, + skip: 10, + }; + commands::list::run(args, client, OutputFormat::Json) + .await + .unwrap(); +} + +#[tokio::test] +async fn list_command_each_status_variant() { + use commands::list::StatusFilter; + for status in [ + StatusFilter::Runnable, + StatusFilter::Suspended, + StatusFilter::Complete, + StatusFilter::Terminated, + ] { + let (server, _seen) = spawn_stub().await; + let client = build_client(&server, "test-token").await.unwrap(); + let args = commands::list::ListArgs { + query: None, + status: Some(status), + limit: 10, + skip: 0, + }; + commands::list::run(args, client, OutputFormat::Table) + .await + .unwrap(); + } +} + +#[tokio::test] +async fn cancel_suspend_resume_commands() { + let (server, _seen) = spawn_stub().await; + let client1 = build_client(&server, "test-token").await.unwrap(); + let client2 = build_client(&server, "test-token").await.unwrap(); + let client3 = build_client(&server, "test-token").await.unwrap(); + + commands::cancel::run( + commands::cancel::CancelArgs { + workflow_id: "wf-1".into(), + }, + client1, + ) + .await + .unwrap(); + + commands::suspend::run( + commands::suspend::SuspendArgs { + workflow_id: "wf-1".into(), + }, + client2, + ) + .await + .unwrap(); + + commands::resume::run( + commands::resume::ResumeArgs { + workflow_id: "wf-1".into(), + }, + client3, + ) + .await + .unwrap(); +} + +#[tokio::test] +async fn publish_command_with_inline_data() { + let (server, _seen) = spawn_stub().await; + let client = build_client(&server, "test-token").await.unwrap(); + let args = commands::publish::PublishArgs { + event_name: "order.paid".into(), + event_key: "order-42".into(), + data: None, + data_json: Some(r#"{"amount":100}"#.into()), + }; + commands::publish::run(args, client, OutputFormat::Table) + .await + .unwrap(); +} + +#[tokio::test] +async fn publish_command_with_file() { + let (server, _seen) = spawn_stub().await; + let client = build_client(&server, "test-token").await.unwrap(); + let tmp = tempfile::NamedTempFile::new().unwrap(); + std::fs::write(tmp.path(), r#"{"amount":100}"#).unwrap(); + let args = commands::publish::PublishArgs { + event_name: "order.paid".into(), + event_key: "order-42".into(), + data: Some(tmp.path().to_path_buf()), + data_json: None, + }; + commands::publish::run(args, client, OutputFormat::Json) + .await + .unwrap(); +} + +#[tokio::test] +async fn publish_command_no_data() { + let (server, _seen) = spawn_stub().await; + let client = build_client(&server, "test-token").await.unwrap(); + let args = commands::publish::PublishArgs { + event_name: "test".into(), + event_key: "x".into(), + data: None, + data_json: None, + }; + commands::publish::run(args, client, OutputFormat::Table) + .await + .unwrap(); +} + +#[tokio::test] +async fn watch_command_streams_events() { + let (server, _seen) = spawn_stub().await; + let client = build_client(&server, "test-token").await.unwrap(); + let args = commands::watch::WatchArgs { + workflow_id: Some("wf-1".into()), + }; + commands::watch::run(args, client).await.unwrap(); +} + +#[tokio::test] +async fn watch_command_no_filter() { + let (server, _seen) = spawn_stub().await; + let client = build_client(&server, "test-token").await.unwrap(); + let args = commands::watch::WatchArgs { workflow_id: None }; + commands::watch::run(args, client).await.unwrap(); +} + +#[tokio::test] +async fn logs_command_basic() { + let (server, _seen) = spawn_stub().await; + let client = build_client(&server, "test-token").await.unwrap(); + let args = commands::logs::LogsArgs { + workflow_id: "wf-1".into(), + step: None, + follow: false, + }; + commands::logs::run(args, client).await.unwrap(); +} + +#[tokio::test] +async fn logs_command_with_step_filter() { + let (server, _seen) = spawn_stub().await; + let client = build_client(&server, "test-token").await.unwrap(); + let args = commands::logs::LogsArgs { + workflow_id: "wf-1".into(), + step: Some("build".into()), + follow: true, + }; + commands::logs::run(args, client).await.unwrap(); +} + +#[tokio::test] +async fn search_logs_command_table() { + let (server, _seen) = spawn_stub().await; + let client = build_client(&server, "test-token").await.unwrap(); + let args = commands::search_logs::SearchLogsArgs { + query: "needle".into(), + workflow: None, + step: None, + stream: None, + limit: 10, + skip: 0, + }; + commands::search_logs::run(args, client, OutputFormat::Table) + .await + .unwrap(); +} + +#[tokio::test] +async fn search_logs_command_with_filters_json() { + let (server, _seen) = spawn_stub().await; + let client = build_client(&server, "test-token").await.unwrap(); + let args = commands::search_logs::SearchLogsArgs { + query: "needle".into(), + workflow: Some("wf-1".into()), + step: Some("build".into()), + stream: Some(commands::search_logs::StreamFilter::Stdout), + limit: 100, + skip: 5, + }; + commands::search_logs::run(args, client, OutputFormat::Json) + .await + .unwrap(); +} + +#[tokio::test] +async fn search_logs_each_stream_variant() { + use commands::search_logs::StreamFilter; + for stream in [StreamFilter::Stdout, StreamFilter::Stderr] { + let (server, _seen) = spawn_stub().await; + let client = build_client(&server, "test-token").await.unwrap(); + let args = commands::search_logs::SearchLogsArgs { + query: "x".into(), + workflow: None, + step: None, + stream: Some(stream), + limit: 10, + skip: 0, + }; + commands::search_logs::run(args, client, OutputFormat::Table) + .await + .unwrap(); + } +} diff --git a/wfectl/tests/integration.rs b/wfectl/tests/integration.rs new file mode 100644 index 0000000..fe84ab6 --- /dev/null +++ b/wfectl/tests/integration.rs @@ -0,0 +1,220 @@ +//! Raw gRPC client tests against the in-process stub server. +//! Verifies the wire protocol and bearer auth interceptor. + +mod stub; + +use wfe_server_protos::wfe::v1::{ + CancelWorkflowRequest, GetWorkflowRequest, ListDefinitionsRequest, PublishEventRequest, + RegisterWorkflowRequest, ResumeWorkflowRequest, SearchLogsRequest, SearchWorkflowsRequest, + StartWorkflowRequest, SuspendWorkflowRequest, WatchLifecycleRequest, WorkflowStatus, +}; + +use wfectl::client::build as build_client; + +use stub::spawn_stub; + +#[tokio::test] +async fn client_register_workflow() { + let (server, _seen) = spawn_stub().await; + let mut client = build_client(&server, "test-token").await.unwrap(); + let resp = client + .register_workflow(RegisterWorkflowRequest { + yaml: "workflow:\n id: x\n version: 1\n steps: []".into(), + config: Default::default(), + }) + .await + .unwrap() + .into_inner(); + assert_eq!(resp.definitions.len(), 1); + assert_eq!(resp.definitions[0].step_count, 3); +} + +#[tokio::test] +async fn client_list_definitions() { + let (server, _seen) = spawn_stub().await; + let mut client = build_client(&server, "test-token").await.unwrap(); + let resp = client + .list_definitions(ListDefinitionsRequest {}) + .await + .unwrap() + .into_inner(); + assert_eq!(resp.definitions.len(), 1); + assert_eq!(resp.definitions[0].id, "ci"); +} + +#[tokio::test] +async fn client_start_workflow_with_data() { + let (server, _seen) = spawn_stub().await; + let mut client = build_client(&server, "test-token").await.unwrap(); + let resp = client + .start_workflow(StartWorkflowRequest { + definition_id: "ci".into(), + version: 1, + data: Some(wfectl::struct_util::json_object_to_struct( + &serde_json::json!({"key": "value"}), + )), + name: String::new(), + }) + .await + .unwrap() + .into_inner(); + assert_eq!(resp.workflow_id, "wf-ci-1"); +} + +#[tokio::test] +async fn client_get_workflow_returns_instance() { + let (server, _seen) = spawn_stub().await; + let mut client = build_client(&server, "test-token").await.unwrap(); + let resp = client + .get_workflow(GetWorkflowRequest { + workflow_id: "wf-ci-1".into(), + }) + .await + .unwrap() + .into_inner(); + let instance = resp.instance.unwrap(); + assert_eq!(instance.id, "wf-ci-1"); + assert_eq!(instance.definition_id, "ci"); +} + +#[tokio::test] +async fn client_cancel_suspend_resume() { + let (server, _seen) = spawn_stub().await; + let mut client = build_client(&server, "test-token").await.unwrap(); + client + .cancel_workflow(CancelWorkflowRequest { + workflow_id: "wf-1".into(), + }) + .await + .unwrap(); + client + .suspend_workflow(SuspendWorkflowRequest { + workflow_id: "wf-1".into(), + }) + .await + .unwrap(); + client + .resume_workflow(ResumeWorkflowRequest { + workflow_id: "wf-1".into(), + }) + .await + .unwrap(); +} + +#[tokio::test] +async fn client_search_workflows() { + let (server, _seen) = spawn_stub().await; + let mut client = build_client(&server, "test-token").await.unwrap(); + let resp = client + .search_workflows(SearchWorkflowsRequest { + query: "ci".into(), + status_filter: WorkflowStatus::Complete as i32, + skip: 0, + take: 10, + }) + .await + .unwrap() + .into_inner(); + assert_eq!(resp.total, 1); + assert_eq!(resp.results[0].id, "wf-1"); +} + +#[tokio::test] +async fn client_publish_event() { + let (server, _seen) = spawn_stub().await; + let mut client = build_client(&server, "test-token").await.unwrap(); + let resp = client + .publish_event(PublishEventRequest { + event_name: "order.paid".into(), + event_key: "order-42".into(), + data: None, + }) + .await + .unwrap() + .into_inner(); + assert_eq!(resp.event_id, "evt-order.paid-order-42"); +} + +#[tokio::test] +async fn client_watch_lifecycle_stream() { + use futures::StreamExt; + let (server, _seen) = spawn_stub().await; + let mut client = build_client(&server, "test-token").await.unwrap(); + let mut stream = client + .watch_lifecycle(WatchLifecycleRequest { + workflow_id: String::new(), + }) + .await + .unwrap() + .into_inner(); + let mut count = 0; + while let Some(event) = stream.next().await { + let event = event.unwrap(); + assert_eq!(event.workflow_id, "wf-1"); + count += 1; + } + assert_eq!(count, 2); +} + +#[tokio::test] +async fn client_stream_logs() { + use futures::StreamExt; + let (server, _seen) = spawn_stub().await; + let mut client = build_client(&server, "test-token").await.unwrap(); + let mut stream = client + .stream_logs(wfe_server_protos::wfe::v1::StreamLogsRequest { + workflow_id: "wf-1".into(), + step_name: String::new(), + follow: false, + }) + .await + .unwrap() + .into_inner(); + let entry = stream.next().await.unwrap().unwrap(); + assert_eq!(entry.step_name, "build"); + assert_eq!(entry.data, b"hello\n"); +} + +#[tokio::test] +async fn client_search_logs() { + let (server, _seen) = spawn_stub().await; + let mut client = build_client(&server, "test-token").await.unwrap(); + let resp = client + .search_logs(SearchLogsRequest { + query: "needle".into(), + workflow_id: String::new(), + step_name: String::new(), + stream_filter: 0, + skip: 0, + take: 10, + }) + .await + .unwrap() + .into_inner(); + assert_eq!(resp.total, 1); + assert!(resp.results[0].line.contains("needle")); +} + +#[tokio::test] +async fn auth_interceptor_sends_bearer_header() { + let (server, seen) = spawn_stub().await; + let mut client = build_client(&server, "ory_at_test_token").await.unwrap(); + client + .list_definitions(ListDefinitionsRequest {}) + .await + .unwrap(); + let captured = seen.lock().await.clone(); + assert_eq!(captured, Some("Bearer ory_at_test_token".to_string())); +} + +#[tokio::test] +async fn empty_token_sends_no_header() { + let (server, seen) = spawn_stub().await; + let mut client = build_client(&server, "").await.unwrap(); + client + .list_definitions(ListDefinitionsRequest {}) + .await + .unwrap(); + let captured = seen.lock().await.clone(); + assert!(captured.is_none()); +} diff --git a/wfectl/tests/stub/mod.rs b/wfectl/tests/stub/mod.rs new file mode 100644 index 0000000..8cce86e --- /dev/null +++ b/wfectl/tests/stub/mod.rs @@ -0,0 +1,312 @@ +//! Shared in-process gRPC stub server for command and integration tests. + +use std::net::SocketAddr; +use std::sync::Arc; + +use tokio::sync::Mutex; +use tonic::transport::Server; +use tonic::{Request, Response, Status}; + +use wfe_server_protos::wfe::v1::{ + CancelWorkflowRequest, CancelWorkflowResponse, DefinitionSummary, GetWorkflowRequest, + GetWorkflowResponse, ListDefinitionsRequest, ListDefinitionsResponse, PublishEventRequest, + PublishEventResponse, RegisterWorkflowRequest, RegisterWorkflowResponse, RegisteredDefinition, + ResumeWorkflowRequest, ResumeWorkflowResponse, SearchLogsRequest, SearchLogsResponse, + SearchWorkflowsRequest, SearchWorkflowsResponse, StartWorkflowRequest, StartWorkflowResponse, + SuspendWorkflowRequest, SuspendWorkflowResponse, WatchLifecycleRequest, WorkflowInstance, + WorkflowSearchResult, WorkflowStatus, + wfe_server::{Wfe, WfeServer}, +}; + +#[derive(Default)] +pub struct StubWfe { + pub seen_authorization: Arc>>, +} + +impl StubWfe { + async fn capture_auth(&self, req: &Request) { + if let Some(val) = req.metadata().get("authorization") { + if let Ok(s) = val.to_str() { + let mut guard = self.seen_authorization.lock().await; + *guard = Some(s.to_string()); + } + } + } +} + +#[tonic::async_trait] +impl Wfe for StubWfe { + async fn register_workflow( + &self, + req: Request, + ) -> Result, Status> { + self.capture_auth(&req).await; + let inner = req.into_inner(); + Ok(Response::new(RegisterWorkflowResponse { + definitions: vec![RegisteredDefinition { + definition_id: format!("test-{}", inner.yaml.len()), + version: 1, + step_count: 3, + name: "Test Workflow".into(), + }], + })) + } + + async fn list_definitions( + &self, + req: Request, + ) -> Result, Status> { + self.capture_auth(&req).await; + Ok(Response::new(ListDefinitionsResponse { + definitions: vec![DefinitionSummary { + id: "ci".into(), + version: 1, + description: "CI pipeline".into(), + step_count: 5, + name: "Continuous Integration".into(), + }], + })) + } + + async fn start_workflow( + &self, + req: Request, + ) -> Result, Status> { + self.capture_auth(&req).await; + let inner = req.into_inner(); + let workflow_id = format!("wf-{}-{}", inner.definition_id, inner.version); + let name = if inner.name.is_empty() { + format!("{}-1", inner.definition_id) + } else { + inner.name + }; + Ok(Response::new(StartWorkflowResponse { workflow_id, name })) + } + + async fn get_workflow( + &self, + req: Request, + ) -> Result, Status> { + self.capture_auth(&req).await; + let id = req.into_inner().workflow_id; + Ok(Response::new(GetWorkflowResponse { + instance: Some(WorkflowInstance { + id: id.clone(), + name: "ci-1".into(), + definition_id: "ci".into(), + version: 1, + description: "test instance".into(), + reference: "ref-1".into(), + status: WorkflowStatus::Runnable as i32, + data: None, + create_time: Some(prost_types::Timestamp { + seconds: 1_700_000_000, + nanos: 0, + }), + complete_time: None, + execution_pointers: vec![wfe_server_protos::wfe::v1::ExecutionPointer { + id: "ptr-1".into(), + step_id: 0, + step_name: "build".into(), + status: wfe_server_protos::wfe::v1::PointerStatus::Complete as i32, + start_time: Some(prost_types::Timestamp { + seconds: 1_700_000_000, + nanos: 0, + }), + end_time: Some(prost_types::Timestamp { + seconds: 1_700_000_100, + nanos: 0, + }), + retry_count: 0, + active: false, + }], + }), + })) + } + + async fn cancel_workflow( + &self, + req: Request, + ) -> Result, Status> { + self.capture_auth(&req).await; + Ok(Response::new(CancelWorkflowResponse {})) + } + + async fn suspend_workflow( + &self, + req: Request, + ) -> Result, Status> { + self.capture_auth(&req).await; + Ok(Response::new(SuspendWorkflowResponse {})) + } + + async fn resume_workflow( + &self, + req: Request, + ) -> Result, Status> { + self.capture_auth(&req).await; + Ok(Response::new(ResumeWorkflowResponse {})) + } + + async fn search_workflows( + &self, + req: Request, + ) -> Result, Status> { + self.capture_auth(&req).await; + Ok(Response::new(SearchWorkflowsResponse { + results: vec![WorkflowSearchResult { + id: "wf-1".into(), + name: "ci-1".into(), + definition_id: "ci".into(), + version: 1, + status: WorkflowStatus::Complete as i32, + reference: "ref-1".into(), + description: "test".into(), + create_time: Some(prost_types::Timestamp { + seconds: 1_700_000_000, + nanos: 0, + }), + }], + total: 1, + })) + } + + async fn publish_event( + &self, + req: Request, + ) -> Result, Status> { + self.capture_auth(&req).await; + let event = req.into_inner(); + Ok(Response::new(PublishEventResponse { + event_id: format!("evt-{}-{}", event.event_name, event.event_key), + })) + } + + type WatchLifecycleStream = tokio_stream::wrappers::ReceiverStream< + Result, + >; + async fn watch_lifecycle( + &self, + req: Request, + ) -> Result, Status> { + self.capture_auth(&req).await; + let (tx, rx) = tokio::sync::mpsc::channel(4); + let _ = tx + .send(Ok(wfe_server_protos::wfe::v1::LifecycleEvent { + event_time: Some(prost_types::Timestamp { + seconds: 1_700_000_000, + nanos: 0, + }), + workflow_id: "wf-1".into(), + definition_id: "ci".into(), + version: 1, + event_type: wfe_server_protos::wfe::v1::LifecycleEventType::Started as i32, + step_id: 0, + step_name: String::new(), + error_message: String::new(), + })) + .await; + let _ = tx + .send(Ok(wfe_server_protos::wfe::v1::LifecycleEvent { + event_time: Some(prost_types::Timestamp { + seconds: 1_700_000_001, + nanos: 0, + }), + workflow_id: "wf-1".into(), + definition_id: "ci".into(), + version: 1, + event_type: wfe_server_protos::wfe::v1::LifecycleEventType::StepCompleted as i32, + step_id: 1, + step_name: "build".into(), + error_message: String::new(), + })) + .await; + drop(tx); + Ok(Response::new(tokio_stream::wrappers::ReceiverStream::new( + rx, + ))) + } + + type StreamLogsStream = tokio_stream::wrappers::ReceiverStream< + Result, + >; + async fn stream_logs( + &self, + req: Request, + ) -> Result, Status> { + self.capture_auth(&req).await; + let (tx, rx) = tokio::sync::mpsc::channel(4); + let _ = tx + .send(Ok(wfe_server_protos::wfe::v1::LogEntry { + workflow_id: "wf-1".into(), + step_name: "build".into(), + step_id: 0, + stream: wfe_server_protos::wfe::v1::LogStream::Stdout as i32, + data: b"hello\n".to_vec(), + timestamp: Some(prost_types::Timestamp { + seconds: 1_700_000_000, + nanos: 0, + }), + })) + .await; + let _ = tx + .send(Ok(wfe_server_protos::wfe::v1::LogEntry { + workflow_id: "wf-1".into(), + step_name: "build".into(), + step_id: 0, + stream: wfe_server_protos::wfe::v1::LogStream::Stderr as i32, + data: b"warning\n".to_vec(), + timestamp: Some(prost_types::Timestamp { + seconds: 1_700_000_001, + nanos: 0, + }), + })) + .await; + drop(tx); + Ok(Response::new(tokio_stream::wrappers::ReceiverStream::new( + rx, + ))) + } + + async fn search_logs( + &self, + req: Request, + ) -> Result, Status> { + self.capture_auth(&req).await; + let inner = req.into_inner(); + Ok(Response::new(SearchLogsResponse { + results: vec![wfe_server_protos::wfe::v1::LogSearchResult { + workflow_id: "wf-1".into(), + definition_id: "ci".into(), + step_name: "build".into(), + line: format!("matched {}", inner.query), + stream: wfe_server_protos::wfe::v1::LogStream::Stdout as i32, + timestamp: Some(prost_types::Timestamp { + seconds: 1_700_000_000, + nanos: 0, + }), + }], + total: 1, + })) + } +} + +/// Spawn the stub server on an ephemeral port and return its URL + the +/// shared `seen_authorization` slot so tests can assert on it. +pub async fn spawn_stub() -> (String, Arc>>) { + let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr: SocketAddr = listener.local_addr().unwrap(); + let stub = StubWfe::default(); + let seen = stub.seen_authorization.clone(); + + let incoming = tokio_stream::wrappers::TcpListenerStream::new(listener); + tokio::spawn(async move { + Server::builder() + .add_service(WfeServer::new(stub)) + .serve_with_incoming(incoming) + .await + .unwrap(); + }); + + (format!("http://{addr}"), seen) +}