diff --git a/Dockerfile b/Dockerfile index 83e5e06..8b300c8 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,7 +1,14 @@ # Stage 1: Build -FROM rust:alpine AS builder +# +# Using debian-slim (glibc) rather than alpine because deno_core's bundled v8 +# only ships glibc binaries — building v8 under musl from source is impractical +# and we need the full feature set (rustlang, buildkit, containerd, kubernetes, +# deno) compiled into wfe-server. +FROM rust:1-bookworm AS builder -RUN apk add --no-cache musl-dev protobuf-dev openssl-dev openssl-libs-static pkgconfig +RUN apt-get update && apt-get install -y --no-install-recommends \ + protobuf-compiler libprotobuf-dev libssl-dev pkg-config ca-certificates \ + && rm -rf /var/lib/apt/lists/* WORKDIR /build COPY . . @@ -11,17 +18,19 @@ RUN mkdir -p .cargo && printf '[registries.sunbeam]\nindex = "sparse+https://src RUN cargo build --release --bin wfe-server \ -p wfe-server \ - --features "wfe-yaml/rustlang,wfe-yaml/buildkit,wfe-yaml/containerd,wfe-yaml/kubernetes" \ + --features "wfe-yaml/rustlang,wfe-yaml/buildkit,wfe-yaml/containerd,wfe-yaml/kubernetes,wfe-yaml/deno" \ && strip target/release/wfe-server # Stage 2: Runtime -FROM alpine:3.21 +FROM debian:bookworm-slim -RUN apk add --no-cache ca-certificates tini +RUN apt-get update && apt-get install -y --no-install-recommends \ + ca-certificates tini libssl3 \ + && rm -rf /var/lib/apt/lists/* COPY --from=builder /build/target/release/wfe-server /usr/local/bin/wfe-server -RUN adduser -D -u 1000 wfe +RUN useradd -u 1000 -m wfe USER wfe EXPOSE 50051 8080 diff --git a/wfe-server-protos/proto/wfe/v1/wfe.proto b/wfe-server-protos/proto/wfe/v1/wfe.proto index cc2f71b..46272a4 100644 --- a/wfe-server-protos/proto/wfe/v1/wfe.proto +++ b/wfe-server-protos/proto/wfe/v1/wfe.proto @@ -45,6 +45,9 @@ message RegisteredDefinition { string definition_id = 1; uint32 version = 2; uint32 step_count = 3; + // Human-friendly display name declared in the YAML (e.g. "Continuous + // Integration"). Empty when the definition did not set one. + string name = 4; } message ListDefinitionsRequest {} @@ -58,6 +61,10 @@ message DefinitionSummary { uint32 version = 2; string description = 3; uint32 step_count = 4; + // Human-friendly display name declared in the YAML (e.g. "Continuous + // Integration"). Empty when the definition did not set one; clients should + // fall back to `id` for presentation. + string name = 5; } // ─── Instances ─────────────────────────────────────────────────────── @@ -66,13 +73,23 @@ message StartWorkflowRequest { string definition_id = 1; uint32 version = 2; google.protobuf.Struct data = 3; + // Optional caller-supplied name for this instance. Must be unique across + // all workflow instances. When unset the server auto-assigns + // `{definition_id}-{N}` using a per-definition monotonic counter. + string name = 4; } message StartWorkflowResponse { string workflow_id = 1; + // Human-friendly name that was assigned to the new instance (either the + // caller override or the auto-generated `{definition_id}-{N}`). + string name = 2; } message GetWorkflowRequest { + // Accepts either the UUID `workflow_id` or the human-friendly instance + // name (e.g. "ci-42"). The server tries UUID first, then falls back to + // name-based lookup. string workflow_id = 1; } @@ -201,6 +218,10 @@ message WorkflowInstance { google.protobuf.Timestamp create_time = 8; google.protobuf.Timestamp complete_time = 9; repeated ExecutionPointer execution_pointers = 10; + // Human-friendly unique name, auto-assigned as `{definition_id}-{N}` at + // start time, or the caller-supplied override from StartWorkflowRequest. + // Interchangeable with `id` in Get/Cancel/Suspend/Resume/Watch/Logs RPCs. + string name = 11; } message ExecutionPointer { @@ -222,6 +243,8 @@ message WorkflowSearchResult { string reference = 5; string description = 6; google.protobuf.Timestamp create_time = 7; + // Human-friendly instance name (e.g. "ci-42"). + string name = 8; } enum WorkflowStatus { diff --git a/wfe-server/Cargo.toml b/wfe-server/Cargo.toml index 12d2e14..346a96e 100644 --- a/wfe-server/Cargo.toml +++ b/wfe-server/Cargo.toml @@ -14,9 +14,9 @@ path = "src/main.rs" [dependencies] # Internal wfe-core = { workspace = true, features = ["test-support"] } -wfe = { version = "1.8.1", path = "../wfe", registry = "sunbeam" } -wfe-yaml = { version = "1.8.1", path = "../wfe-yaml", registry = "sunbeam", features = ["rustlang", "buildkit", "containerd"] } -wfe-server-protos = { version = "1.8.1", path = "../wfe-server-protos", registry = "sunbeam" } +wfe = { version = "1.9.0", path = "../wfe", registry = "sunbeam" } +wfe-yaml = { version = "1.9.0", path = "../wfe-yaml", registry = "sunbeam", features = ["rustlang", "buildkit", "containerd", "kubernetes", "deno"] } +wfe-server-protos = { version = "1.9.0", path = "../wfe-server-protos", registry = "sunbeam" } wfe-sqlite = { workspace = true } wfe-postgres = { workspace = true } wfe-valkey = { workspace = true } diff --git a/wfe-server/src/grpc.rs b/wfe-server/src/grpc.rs index 024d0ef..7a4fae2 100644 --- a/wfe-server/src/grpc.rs +++ b/wfe-server/src/grpc.rs @@ -2,8 +2,8 @@ use std::collections::{BTreeMap, HashMap}; use std::sync::Arc; use tonic::{Request, Response, Status}; -use wfe_server_protos::wfe::v1::*; use wfe_server_protos::wfe::v1::wfe_server::Wfe; +use wfe_server_protos::wfe::v1::*; pub struct WfeService { host: Arc, @@ -18,7 +18,12 @@ impl WfeService { lifecycle_bus: Arc, log_store: Arc, ) -> Self { - Self { host, lifecycle_bus, log_store, log_search: None } + Self { + host, + lifecycle_bus, + log_store, + log_search: None, + } } pub fn with_log_search(mut self, index: Arc) -> Self { @@ -56,6 +61,7 @@ impl Wfe for WfeService { let id = compiled.definition.id.clone(); let version = compiled.definition.version; let step_count = compiled.definition.steps.len() as u32; + let name = compiled.definition.name.clone().unwrap_or_default(); self.host .register_workflow_definition(compiled.definition) @@ -65,6 +71,7 @@ impl Wfe for WfeService { definition_id: id, version, step_count, + name, }); } @@ -94,13 +101,33 @@ impl Wfe for WfeService { .map(struct_to_json) .unwrap_or_else(|| serde_json::json!({})); + // Empty `name` means "auto-assign"; pass None through so the host + // generates `{definition_id}-{N}` via the persistence sequence. + let name_override = if req.name.trim().is_empty() { + None + } else { + Some(req.name) + }; + let workflow_id = self .host - .start_workflow(&req.definition_id, req.version, data) + .start_workflow_with_name(&req.definition_id, req.version, data, name_override) .await .map_err(|e| Status::internal(format!("failed to start workflow: {e}")))?; - Ok(Response::new(StartWorkflowResponse { workflow_id })) + // Load the instance back so we can return the assigned name to the + // client. Cheap read, single row, avoids plumbing the name through + // the host's return signature. + let instance = self + .host + .get_workflow(&workflow_id) + .await + .map_err(|e| Status::internal(format!("failed to load new workflow: {e}")))?; + + Ok(Response::new(StartWorkflowResponse { + workflow_id, + name: instance.name, + })) } async fn get_workflow( @@ -206,10 +233,18 @@ impl Wfe for WfeService { request: Request, ) -> Result, Status> { let req = request.into_inner(); + // Resolve name-or-UUID to the canonical UUID upfront. Lifecycle events + // carry UUIDs, so filtering by a human name would silently drop + // everything. Empty filter means "all workflows". let filter_workflow_id = if req.workflow_id.is_empty() { None } else { - Some(req.workflow_id) + let resolved = self + .host + .resolve_workflow_id(&req.workflow_id) + .await + .map_err(|e| Status::not_found(format!("workflow not found: {e}")))?; + Some(resolved) }; let mut broadcast_rx = self.lifecycle_bus.subscribe(); @@ -239,7 +274,9 @@ impl Wfe for WfeService { } }); - Ok(Response::new(tokio_stream::wrappers::ReceiverStream::new(rx))) + Ok(Response::new(tokio_stream::wrappers::ReceiverStream::new( + rx, + ))) } type StreamLogsStream = tokio_stream::wrappers::ReceiverStream>; @@ -249,7 +286,13 @@ impl Wfe for WfeService { request: Request, ) -> Result, Status> { let req = request.into_inner(); - let workflow_id = req.workflow_id.clone(); + // Resolve name-or-UUID so the log_store (which is keyed by UUID) + // returns history for the right instance. + let workflow_id = self + .host + .resolve_workflow_id(&req.workflow_id) + .await + .map_err(|e| Status::not_found(format!("workflow not found: {e}")))?; let step_name_filter = if req.step_name.is_empty() { None } else { @@ -301,7 +344,9 @@ impl Wfe for WfeService { // If not follow mode, the stream ends after history replay. }); - Ok(Response::new(tokio_stream::wrappers::ReceiverStream::new(rx))) + Ok(Response::new(tokio_stream::wrappers::ReceiverStream::new( + rx, + ))) } // ── Search ─────────────────────────────────────────────────────── @@ -311,12 +356,31 @@ impl Wfe for WfeService { request: Request, ) -> Result, Status> { let Some(ref search) = self.log_search else { - return Err(Status::unavailable("log search not configured — set --search-url")); + return Err(Status::unavailable( + "log search not configured — set --search-url", + )); }; let req = request.into_inner(); - let workflow_id = if req.workflow_id.is_empty() { None } else { Some(req.workflow_id.as_str()) }; - let step_name = if req.step_name.is_empty() { None } else { Some(req.step_name.as_str()) }; + // Resolve name-or-UUID upfront so the search index (keyed by UUID) + // matches the requested instance. We materialize into a String so + // the borrowed reference below has a stable lifetime. + let resolved_workflow_id = if req.workflow_id.is_empty() { + None + } else { + Some( + self.host + .resolve_workflow_id(&req.workflow_id) + .await + .map_err(|e| Status::not_found(format!("workflow not found: {e}")))?, + ) + }; + let workflow_id = resolved_workflow_id.as_deref(); + let step_name = if req.step_name.is_empty() { + None + } else { + Some(req.step_name.as_str()) + }; let stream_filter = match req.stream_filter { x if x == LogStream::Stdout as i32 => Some("stdout"), x if x == LogStream::Stderr as i32 => Some("stderr"), @@ -325,7 +389,14 @@ impl Wfe for WfeService { let take = if req.take == 0 { 50 } else { req.take }; let (hits, total) = search - .search(&req.query, workflow_id, step_name, stream_filter, req.skip, take) + .search( + &req.query, + workflow_id, + step_name, + stream_filter, + req.skip, + take, + ) .await .map_err(|e| Status::internal(format!("search failed: {e}")))?; @@ -431,8 +502,18 @@ fn lifecycle_event_to_proto(e: &wfe_core::models::LifecycleEvent) -> LifecycleEv LET::Suspended => (PLET::Suspended as i32, 0, String::new(), String::new()), LET::Resumed => (PLET::Resumed as i32, 0, String::new(), String::new()), LET::Error { message } => (PLET::Error as i32, 0, String::new(), message.clone()), - LET::StepStarted { step_id, step_name } => (PLET::StepStarted as i32, *step_id as u32, step_name.clone().unwrap_or_default(), String::new()), - LET::StepCompleted { step_id, step_name } => (PLET::StepCompleted as i32, *step_id as u32, step_name.clone().unwrap_or_default(), String::new()), + LET::StepStarted { step_id, step_name } => ( + PLET::StepStarted as i32, + *step_id as u32, + step_name.clone().unwrap_or_default(), + String::new(), + ), + LET::StepCompleted { step_id, step_name } => ( + PLET::StepCompleted as i32, + *step_id as u32, + step_name.clone().unwrap_or_default(), + String::new(), + ), }; LifecycleEvent { event_time: Some(datetime_to_timestamp(&e.event_time_utc)), @@ -456,6 +537,7 @@ fn datetime_to_timestamp(dt: &chrono::DateTime) -> prost_types::Tim fn workflow_to_proto(w: &wfe_core::models::WorkflowInstance) -> WorkflowInstance { WorkflowInstance { id: w.id.clone(), + name: w.name.clone(), definition_id: w.workflow_definition_id.clone(), version: w.version, description: w.description.clone().unwrap_or_default(), @@ -469,11 +551,7 @@ fn workflow_to_proto(w: &wfe_core::models::WorkflowInstance) -> WorkflowInstance data: Some(json_to_struct(&w.data)), create_time: Some(datetime_to_timestamp(&w.create_time)), complete_time: w.complete_time.as_ref().map(datetime_to_timestamp), - execution_pointers: w - .execution_pointers - .iter() - .map(pointer_to_proto) - .collect(), + execution_pointers: w.execution_pointers.iter().map(pointer_to_proto).collect(), } } @@ -630,7 +708,10 @@ mod tests { assert_eq!(pointer_to_proto(&p).status, PointerStatus::Sleeping as i32); p.status = PS::WaitingForEvent; - assert_eq!(pointer_to_proto(&p).status, PointerStatus::WaitingForEvent as i32); + assert_eq!( + pointer_to_proto(&p).status, + PointerStatus::WaitingForEvent as i32 + ); p.status = PS::Failed; assert_eq!(pointer_to_proto(&p).status, PointerStatus::Failed as i32); @@ -644,7 +725,8 @@ mod tests { #[test] fn workflow_to_proto_basic() { - let w = wfe_core::models::WorkflowInstance::new("my-wf", 1, serde_json::json!({"key": "val"})); + let w = + wfe_core::models::WorkflowInstance::new("my-wf", 1, serde_json::json!({"key": "val"})); let p = workflow_to_proto(&w); assert_eq!(p.definition_id, "my-wf"); assert_eq!(p.version, 1); @@ -674,7 +756,8 @@ mod tests { host.start().await.unwrap(); - let lifecycle_bus = std::sync::Arc::new(crate::lifecycle_bus::BroadcastLifecyclePublisher::new(64)); + let lifecycle_bus = + std::sync::Arc::new(crate::lifecycle_bus::BroadcastLifecyclePublisher::new(64)); let log_store = std::sync::Arc::new(crate::log_store::LogStore::new()); WfeService::new(std::sync::Arc::new(host), lifecycle_bus, log_store) @@ -695,7 +778,8 @@ workflow: type: shell config: run: echo hi -"#.to_string(), +"# + .to_string(), config: Default::default(), }); let resp = svc.register_workflow(req).await.unwrap().into_inner(); @@ -709,6 +793,7 @@ workflow: definition_id: "test-wf".to_string(), version: 1, data: None, + name: String::new(), }); let resp = svc.start_workflow(req).await.unwrap().into_inner(); assert!(!resp.workflow_id.is_empty()); @@ -741,6 +826,7 @@ workflow: definition_id: "nonexistent".to_string(), version: 1, data: None, + name: String::new(), }); let err = svc.start_workflow(req).await.unwrap_err(); assert_eq!(err.code(), tonic::Code::Internal); @@ -771,16 +857,30 @@ workflow: definition_id: "cancel-test".to_string(), version: 1, data: None, + name: String::new(), }); - let wf_id = svc.start_workflow(req).await.unwrap().into_inner().workflow_id; + let wf_id = svc + .start_workflow(req) + .await + .unwrap() + .into_inner() + .workflow_id; // Cancel it. - let req = Request::new(CancelWorkflowRequest { workflow_id: wf_id.clone() }); + let req = Request::new(CancelWorkflowRequest { + workflow_id: wf_id.clone(), + }); svc.cancel_workflow(req).await.unwrap(); // Verify it's terminated. let req = Request::new(GetWorkflowRequest { workflow_id: wf_id }); - let instance = svc.get_workflow(req).await.unwrap().into_inner().instance.unwrap(); + let instance = svc + .get_workflow(req) + .await + .unwrap() + .into_inner() + .instance + .unwrap(); assert_eq!(instance.status, WorkflowStatus::Terminated as i32); } @@ -798,23 +898,47 @@ workflow: definition_id: "sr-test".to_string(), version: 1, data: None, + name: String::new(), }); - let wf_id = svc.start_workflow(req).await.unwrap().into_inner().workflow_id; + let wf_id = svc + .start_workflow(req) + .await + .unwrap() + .into_inner() + .workflow_id; // Suspend. - let req = Request::new(SuspendWorkflowRequest { workflow_id: wf_id.clone() }); + let req = Request::new(SuspendWorkflowRequest { + workflow_id: wf_id.clone(), + }); svc.suspend_workflow(req).await.unwrap(); - let req = Request::new(GetWorkflowRequest { workflow_id: wf_id.clone() }); - let instance = svc.get_workflow(req).await.unwrap().into_inner().instance.unwrap(); + let req = Request::new(GetWorkflowRequest { + workflow_id: wf_id.clone(), + }); + let instance = svc + .get_workflow(req) + .await + .unwrap() + .into_inner() + .instance + .unwrap(); assert_eq!(instance.status, WorkflowStatus::Suspended as i32); // Resume. - let req = Request::new(ResumeWorkflowRequest { workflow_id: wf_id.clone() }); + let req = Request::new(ResumeWorkflowRequest { + workflow_id: wf_id.clone(), + }); svc.resume_workflow(req).await.unwrap(); let req = Request::new(GetWorkflowRequest { workflow_id: wf_id }); - let instance = svc.get_workflow(req).await.unwrap().into_inner().instance.unwrap(); + let instance = svc + .get_workflow(req) + .await + .unwrap() + .into_inner() + .instance + .unwrap(); assert_eq!(instance.status, WorkflowStatus::Runnable as i32); }