feat(wfe-server): full feature set, debian base, name resolution in gRPC

Proto changes:

* Add `name` to `WorkflowInstance`, `WorkflowSearchResult`,
  `RegisteredDefinition`, and `DefinitionSummary` messages.
* Add optional `name` override to `StartWorkflowRequest` and echo the
  assigned name back in `StartWorkflowResponse`.
* Document that `GetWorkflowRequest.workflow_id` accepts UUID or
  human name.

gRPC handler changes:

* `start_workflow` honors the optional name override and reads the
  instance back to return the assigned name to clients.
* `get_workflow` flows through `WorkflowHost::get_workflow`, which
  already falls back from UUID to name lookup.
* `stream_logs`, `watch_lifecycle`, and `search_logs` resolve
  name-or-UUID up front so the LogStore/lifecycle bus (keyed by
  UUID) subscribe to the right instance.
* `register_workflow` propagates the definition's display name into
  `RegisteredDefinition.name`.

Crate build changes:

* Enable the full executor feature set on wfe-yaml —
  `rustlang,buildkit,containerd,kubernetes,deno` — so the shipped
  binary recognizes every step type users can write.
* Dockerfile switched from `rust:alpine` to `rust:1-bookworm` +
  `debian:bookworm-slim` runtime. `deno_core` bundles a v8 binary
  that only ships glibc; alpine/musl can't link it without building
  v8 from source.
This commit is contained in:
2026-04-07 19:07:52 +01:00
parent d88af54db9
commit 34209470c3
4 changed files with 197 additions and 41 deletions

View File

@@ -14,9 +14,9 @@ path = "src/main.rs"
[dependencies]
# Internal
wfe-core = { workspace = true, features = ["test-support"] }
wfe = { version = "1.8.1", path = "../wfe", registry = "sunbeam" }
wfe-yaml = { version = "1.8.1", path = "../wfe-yaml", registry = "sunbeam", features = ["rustlang", "buildkit", "containerd"] }
wfe-server-protos = { version = "1.8.1", path = "../wfe-server-protos", registry = "sunbeam" }
wfe = { version = "1.9.0", path = "../wfe", registry = "sunbeam" }
wfe-yaml = { version = "1.9.0", path = "../wfe-yaml", registry = "sunbeam", features = ["rustlang", "buildkit", "containerd", "kubernetes", "deno"] }
wfe-server-protos = { version = "1.9.0", path = "../wfe-server-protos", registry = "sunbeam" }
wfe-sqlite = { workspace = true }
wfe-postgres = { workspace = true }
wfe-valkey = { workspace = true }

View File

@@ -2,8 +2,8 @@ use std::collections::{BTreeMap, HashMap};
use std::sync::Arc;
use tonic::{Request, Response, Status};
use wfe_server_protos::wfe::v1::*;
use wfe_server_protos::wfe::v1::wfe_server::Wfe;
use wfe_server_protos::wfe::v1::*;
pub struct WfeService {
host: Arc<wfe::WorkflowHost>,
@@ -18,7 +18,12 @@ impl WfeService {
lifecycle_bus: Arc<crate::lifecycle_bus::BroadcastLifecyclePublisher>,
log_store: Arc<crate::log_store::LogStore>,
) -> Self {
Self { host, lifecycle_bus, log_store, log_search: None }
Self {
host,
lifecycle_bus,
log_store,
log_search: None,
}
}
pub fn with_log_search(mut self, index: Arc<crate::log_search::LogSearchIndex>) -> Self {
@@ -56,6 +61,7 @@ impl Wfe for WfeService {
let id = compiled.definition.id.clone();
let version = compiled.definition.version;
let step_count = compiled.definition.steps.len() as u32;
let name = compiled.definition.name.clone().unwrap_or_default();
self.host
.register_workflow_definition(compiled.definition)
@@ -65,6 +71,7 @@ impl Wfe for WfeService {
definition_id: id,
version,
step_count,
name,
});
}
@@ -94,13 +101,33 @@ impl Wfe for WfeService {
.map(struct_to_json)
.unwrap_or_else(|| serde_json::json!({}));
// Empty `name` means "auto-assign"; pass None through so the host
// generates `{definition_id}-{N}` via the persistence sequence.
let name_override = if req.name.trim().is_empty() {
None
} else {
Some(req.name)
};
let workflow_id = self
.host
.start_workflow(&req.definition_id, req.version, data)
.start_workflow_with_name(&req.definition_id, req.version, data, name_override)
.await
.map_err(|e| Status::internal(format!("failed to start workflow: {e}")))?;
Ok(Response::new(StartWorkflowResponse { workflow_id }))
// Load the instance back so we can return the assigned name to the
// client. Cheap read, single row, avoids plumbing the name through
// the host's return signature.
let instance = self
.host
.get_workflow(&workflow_id)
.await
.map_err(|e| Status::internal(format!("failed to load new workflow: {e}")))?;
Ok(Response::new(StartWorkflowResponse {
workflow_id,
name: instance.name,
}))
}
async fn get_workflow(
@@ -206,10 +233,18 @@ impl Wfe for WfeService {
request: Request<WatchLifecycleRequest>,
) -> Result<Response<Self::WatchLifecycleStream>, Status> {
let req = request.into_inner();
// Resolve name-or-UUID to the canonical UUID upfront. Lifecycle events
// carry UUIDs, so filtering by a human name would silently drop
// everything. Empty filter means "all workflows".
let filter_workflow_id = if req.workflow_id.is_empty() {
None
} else {
Some(req.workflow_id)
let resolved = self
.host
.resolve_workflow_id(&req.workflow_id)
.await
.map_err(|e| Status::not_found(format!("workflow not found: {e}")))?;
Some(resolved)
};
let mut broadcast_rx = self.lifecycle_bus.subscribe();
@@ -239,7 +274,9 @@ impl Wfe for WfeService {
}
});
Ok(Response::new(tokio_stream::wrappers::ReceiverStream::new(rx)))
Ok(Response::new(tokio_stream::wrappers::ReceiverStream::new(
rx,
)))
}
type StreamLogsStream = tokio_stream::wrappers::ReceiverStream<Result<LogEntry, Status>>;
@@ -249,7 +286,13 @@ impl Wfe for WfeService {
request: Request<StreamLogsRequest>,
) -> Result<Response<Self::StreamLogsStream>, Status> {
let req = request.into_inner();
let workflow_id = req.workflow_id.clone();
// Resolve name-or-UUID so the log_store (which is keyed by UUID)
// returns history for the right instance.
let workflow_id = self
.host
.resolve_workflow_id(&req.workflow_id)
.await
.map_err(|e| Status::not_found(format!("workflow not found: {e}")))?;
let step_name_filter = if req.step_name.is_empty() {
None
} else {
@@ -301,7 +344,9 @@ impl Wfe for WfeService {
// If not follow mode, the stream ends after history replay.
});
Ok(Response::new(tokio_stream::wrappers::ReceiverStream::new(rx)))
Ok(Response::new(tokio_stream::wrappers::ReceiverStream::new(
rx,
)))
}
// ── Search ───────────────────────────────────────────────────────
@@ -311,12 +356,31 @@ impl Wfe for WfeService {
request: Request<SearchLogsRequest>,
) -> Result<Response<SearchLogsResponse>, Status> {
let Some(ref search) = self.log_search else {
return Err(Status::unavailable("log search not configured — set --search-url"));
return Err(Status::unavailable(
"log search not configured — set --search-url",
));
};
let req = request.into_inner();
let workflow_id = if req.workflow_id.is_empty() { None } else { Some(req.workflow_id.as_str()) };
let step_name = if req.step_name.is_empty() { None } else { Some(req.step_name.as_str()) };
// Resolve name-or-UUID upfront so the search index (keyed by UUID)
// matches the requested instance. We materialize into a String so
// the borrowed reference below has a stable lifetime.
let resolved_workflow_id = if req.workflow_id.is_empty() {
None
} else {
Some(
self.host
.resolve_workflow_id(&req.workflow_id)
.await
.map_err(|e| Status::not_found(format!("workflow not found: {e}")))?,
)
};
let workflow_id = resolved_workflow_id.as_deref();
let step_name = if req.step_name.is_empty() {
None
} else {
Some(req.step_name.as_str())
};
let stream_filter = match req.stream_filter {
x if x == LogStream::Stdout as i32 => Some("stdout"),
x if x == LogStream::Stderr as i32 => Some("stderr"),
@@ -325,7 +389,14 @@ impl Wfe for WfeService {
let take = if req.take == 0 { 50 } else { req.take };
let (hits, total) = search
.search(&req.query, workflow_id, step_name, stream_filter, req.skip, take)
.search(
&req.query,
workflow_id,
step_name,
stream_filter,
req.skip,
take,
)
.await
.map_err(|e| Status::internal(format!("search failed: {e}")))?;
@@ -431,8 +502,18 @@ fn lifecycle_event_to_proto(e: &wfe_core::models::LifecycleEvent) -> LifecycleEv
LET::Suspended => (PLET::Suspended as i32, 0, String::new(), String::new()),
LET::Resumed => (PLET::Resumed as i32, 0, String::new(), String::new()),
LET::Error { message } => (PLET::Error as i32, 0, String::new(), message.clone()),
LET::StepStarted { step_id, step_name } => (PLET::StepStarted as i32, *step_id as u32, step_name.clone().unwrap_or_default(), String::new()),
LET::StepCompleted { step_id, step_name } => (PLET::StepCompleted as i32, *step_id as u32, step_name.clone().unwrap_or_default(), String::new()),
LET::StepStarted { step_id, step_name } => (
PLET::StepStarted as i32,
*step_id as u32,
step_name.clone().unwrap_or_default(),
String::new(),
),
LET::StepCompleted { step_id, step_name } => (
PLET::StepCompleted as i32,
*step_id as u32,
step_name.clone().unwrap_or_default(),
String::new(),
),
};
LifecycleEvent {
event_time: Some(datetime_to_timestamp(&e.event_time_utc)),
@@ -456,6 +537,7 @@ fn datetime_to_timestamp(dt: &chrono::DateTime<chrono::Utc>) -> prost_types::Tim
fn workflow_to_proto(w: &wfe_core::models::WorkflowInstance) -> WorkflowInstance {
WorkflowInstance {
id: w.id.clone(),
name: w.name.clone(),
definition_id: w.workflow_definition_id.clone(),
version: w.version,
description: w.description.clone().unwrap_or_default(),
@@ -469,11 +551,7 @@ fn workflow_to_proto(w: &wfe_core::models::WorkflowInstance) -> WorkflowInstance
data: Some(json_to_struct(&w.data)),
create_time: Some(datetime_to_timestamp(&w.create_time)),
complete_time: w.complete_time.as_ref().map(datetime_to_timestamp),
execution_pointers: w
.execution_pointers
.iter()
.map(pointer_to_proto)
.collect(),
execution_pointers: w.execution_pointers.iter().map(pointer_to_proto).collect(),
}
}
@@ -630,7 +708,10 @@ mod tests {
assert_eq!(pointer_to_proto(&p).status, PointerStatus::Sleeping as i32);
p.status = PS::WaitingForEvent;
assert_eq!(pointer_to_proto(&p).status, PointerStatus::WaitingForEvent as i32);
assert_eq!(
pointer_to_proto(&p).status,
PointerStatus::WaitingForEvent as i32
);
p.status = PS::Failed;
assert_eq!(pointer_to_proto(&p).status, PointerStatus::Failed as i32);
@@ -644,7 +725,8 @@ mod tests {
#[test]
fn workflow_to_proto_basic() {
let w = wfe_core::models::WorkflowInstance::new("my-wf", 1, serde_json::json!({"key": "val"}));
let w =
wfe_core::models::WorkflowInstance::new("my-wf", 1, serde_json::json!({"key": "val"}));
let p = workflow_to_proto(&w);
assert_eq!(p.definition_id, "my-wf");
assert_eq!(p.version, 1);
@@ -674,7 +756,8 @@ mod tests {
host.start().await.unwrap();
let lifecycle_bus = std::sync::Arc::new(crate::lifecycle_bus::BroadcastLifecyclePublisher::new(64));
let lifecycle_bus =
std::sync::Arc::new(crate::lifecycle_bus::BroadcastLifecyclePublisher::new(64));
let log_store = std::sync::Arc::new(crate::log_store::LogStore::new());
WfeService::new(std::sync::Arc::new(host), lifecycle_bus, log_store)
@@ -695,7 +778,8 @@ workflow:
type: shell
config:
run: echo hi
"#.to_string(),
"#
.to_string(),
config: Default::default(),
});
let resp = svc.register_workflow(req).await.unwrap().into_inner();
@@ -709,6 +793,7 @@ workflow:
definition_id: "test-wf".to_string(),
version: 1,
data: None,
name: String::new(),
});
let resp = svc.start_workflow(req).await.unwrap().into_inner();
assert!(!resp.workflow_id.is_empty());
@@ -741,6 +826,7 @@ workflow:
definition_id: "nonexistent".to_string(),
version: 1,
data: None,
name: String::new(),
});
let err = svc.start_workflow(req).await.unwrap_err();
assert_eq!(err.code(), tonic::Code::Internal);
@@ -771,16 +857,30 @@ workflow:
definition_id: "cancel-test".to_string(),
version: 1,
data: None,
name: String::new(),
});
let wf_id = svc.start_workflow(req).await.unwrap().into_inner().workflow_id;
let wf_id = svc
.start_workflow(req)
.await
.unwrap()
.into_inner()
.workflow_id;
// Cancel it.
let req = Request::new(CancelWorkflowRequest { workflow_id: wf_id.clone() });
let req = Request::new(CancelWorkflowRequest {
workflow_id: wf_id.clone(),
});
svc.cancel_workflow(req).await.unwrap();
// Verify it's terminated.
let req = Request::new(GetWorkflowRequest { workflow_id: wf_id });
let instance = svc.get_workflow(req).await.unwrap().into_inner().instance.unwrap();
let instance = svc
.get_workflow(req)
.await
.unwrap()
.into_inner()
.instance
.unwrap();
assert_eq!(instance.status, WorkflowStatus::Terminated as i32);
}
@@ -798,23 +898,47 @@ workflow:
definition_id: "sr-test".to_string(),
version: 1,
data: None,
name: String::new(),
});
let wf_id = svc.start_workflow(req).await.unwrap().into_inner().workflow_id;
let wf_id = svc
.start_workflow(req)
.await
.unwrap()
.into_inner()
.workflow_id;
// Suspend.
let req = Request::new(SuspendWorkflowRequest { workflow_id: wf_id.clone() });
let req = Request::new(SuspendWorkflowRequest {
workflow_id: wf_id.clone(),
});
svc.suspend_workflow(req).await.unwrap();
let req = Request::new(GetWorkflowRequest { workflow_id: wf_id.clone() });
let instance = svc.get_workflow(req).await.unwrap().into_inner().instance.unwrap();
let req = Request::new(GetWorkflowRequest {
workflow_id: wf_id.clone(),
});
let instance = svc
.get_workflow(req)
.await
.unwrap()
.into_inner()
.instance
.unwrap();
assert_eq!(instance.status, WorkflowStatus::Suspended as i32);
// Resume.
let req = Request::new(ResumeWorkflowRequest { workflow_id: wf_id.clone() });
let req = Request::new(ResumeWorkflowRequest {
workflow_id: wf_id.clone(),
});
svc.resume_workflow(req).await.unwrap();
let req = Request::new(GetWorkflowRequest { workflow_id: wf_id });
let instance = svc.get_workflow(req).await.unwrap().into_inner().instance.unwrap();
let instance = svc
.get_workflow(req)
.await
.unwrap()
.into_inner()
.instance
.unwrap();
assert_eq!(instance.status, WorkflowStatus::Runnable as i32);
}