feat(wfe): capability-based workflow routing and service lifecycle in dequeue loop

This commit is contained in:
2026-04-06 17:59:44 +01:00
parent affcf1bca8
commit 4dfcc61143
2 changed files with 163 additions and 1 deletions

View File

@@ -13,7 +13,7 @@ use wfe_core::models::{
};
use wfe_core::traits::{
DistributedLockProvider, HostContext, LifecyclePublisher, PersistenceProvider, QueueProvider,
SearchIndex, StepBody, WorkflowData,
SearchIndex, ServiceProvider, StepBody, WorkflowData,
};
use wfe_core::traits::registry::WorkflowRegistry;
use wfe_core::{Result, WfeError};
@@ -75,6 +75,7 @@ pub struct WorkflowHost {
pub(crate) search: Option<Arc<dyn SearchIndex>>,
pub(crate) registry: Arc<RwLock<InMemoryWorkflowRegistry>>,
pub(crate) step_registry: Arc<RwLock<StepRegistry>>,
pub(crate) service_provider: Option<Arc<dyn ServiceProvider>>,
pub(crate) executor: Arc<WorkflowExecutor>,
pub(crate) shutdown: CancellationToken,
}
@@ -119,6 +120,7 @@ impl WorkflowHost {
registry: Arc::clone(&self.registry),
queue_provider: Arc::clone(&self.queue_provider),
});
let svc_provider = self.service_provider.clone();
tokio::spawn(async move {
loop {
@@ -146,10 +148,71 @@ impl WorkflowHost {
match definition {
Some(def) => {
let def_clone = def.clone();
// Capability check: can this host execute this workflow?
{
let sr = step_registry.read().await;
if !can_execute_workflow(&def_clone, &sr, &svc_provider) {
drop(reg);
debug!(
workflow_id = %workflow_id,
"Host cannot execute workflow, re-queuing"
);
if let Err(e) = queue.queue_work(&workflow_id, QueueType::Workflow).await {
error!(error = %e, "Failed to re-queue workflow");
}
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
continue;
}
}
// Provision services if needed.
if !def_clone.services.is_empty() {
if let Some(ref provider) = svc_provider {
match provider.provision(&workflow_id, &def_clone.services).await {
Ok(endpoints) => {
debug!(
workflow_id = %workflow_id,
services = endpoints.len(),
"Services provisioned"
);
// Inject service endpoints into workflow data.
if let Err(e) = inject_service_endpoints(
&executor.persistence,
&workflow_id,
&endpoints,
).await {
error!(error = %e, "Failed to inject service endpoints");
provider.teardown(&workflow_id).await.ok();
continue;
}
}
Err(e) => {
error!(
workflow_id = %workflow_id,
error = %e,
"Failed to provision services"
);
continue;
}
}
}
}
// Execute the workflow.
let sr = step_registry.read().await;
if let Err(e) = executor.execute(&workflow_id, &def_clone, &sr, Some(host_ctx.as_ref())).await {
error!(workflow_id = %workflow_id, error = %e, "Workflow execution failed");
}
// Teardown services after execution (always, even on error).
if !def_clone.services.is_empty() {
if let Some(ref provider) = svc_provider {
if let Err(e) = provider.teardown(&workflow_id).await {
warn!(workflow_id = %workflow_id, error = %e, "Service teardown failed");
}
}
}
}
None => {
warn!(
@@ -506,3 +569,92 @@ async fn process_event(
Ok(())
}
/// Check if a workflow definition can be executed by this host.
///
/// Returns false if:
/// - Any step type is not registered in the step registry
/// - Services are declared but no ServiceProvider is configured
/// - The ServiceProvider cannot provision the required services
fn can_execute_workflow(
definition: &WorkflowDefinition,
step_registry: &StepRegistry,
service_provider: &Option<Arc<dyn ServiceProvider>>,
) -> bool {
// Check all step types are registered.
for step in &definition.steps {
if step_registry.resolve(&step.step_type).is_none() {
debug!(
step_type = %step.step_type,
"step type not registered on this host"
);
return false;
}
}
// If services are declared, check service provider.
if !definition.services.is_empty() {
match service_provider {
None => {
debug!("workflow requires services but no ServiceProvider configured");
return false;
}
Some(provider) => {
if !provider.can_provision(&definition.services) {
debug!("ServiceProvider cannot provision required services");
return false;
}
}
}
}
true
}
/// Inject service endpoint information into workflow instance data.
async fn inject_service_endpoints(
persistence: &Arc<dyn PersistenceProvider>,
workflow_id: &str,
endpoints: &[wfe_core::models::ServiceEndpoint],
) -> Result<()> {
let mut instance = persistence.get_workflow_instance(workflow_id).await?;
// Build service info map.
let mut services_map = serde_json::Map::new();
for ep in endpoints {
let mut ep_map = serde_json::Map::new();
ep_map.insert("host".into(), serde_json::Value::String(ep.host.clone()));
let ports: Vec<serde_json::Value> = ep
.ports
.iter()
.map(|p| serde_json::json!(p.container_port))
.collect();
ep_map.insert("ports".into(), serde_json::Value::Array(ports));
services_map.insert(ep.name.clone(), serde_json::Value::Object(ep_map));
// Also set env-style keys: SVC_{NAME}_HOST, SVC_{NAME}_PORT
let prefix = format!("SVC_{}", ep.name.to_uppercase());
if let Some(data_obj) = instance.data.as_object_mut() {
data_obj.insert(
format!("{prefix}_HOST"),
serde_json::Value::String(ep.host.clone()),
);
if let Some(port) = ep.ports.first() {
data_obj.insert(
format!("{prefix}_PORT"),
serde_json::json!(port.container_port),
);
}
}
}
if let Some(data_obj) = instance.data.as_object_mut() {
data_obj.insert(
"services".into(),
serde_json::Value::Object(services_map),
);
}
persistence.persist_workflow(&instance).await?;
Ok(())
}

View File

@@ -6,6 +6,7 @@ use tokio_util::sync::CancellationToken;
use wfe_core::executor::{StepRegistry, WorkflowExecutor};
use wfe_core::traits::{
DistributedLockProvider, LifecyclePublisher, PersistenceProvider, QueueProvider, SearchIndex,
ServiceProvider,
};
use wfe_core::WfeError;
@@ -22,6 +23,7 @@ pub struct WorkflowHostBuilder {
lifecycle: Option<Arc<dyn LifecyclePublisher>>,
search: Option<Arc<dyn SearchIndex>>,
log_sink: Option<Arc<dyn wfe_core::traits::LogSink>>,
service_provider: Option<Arc<dyn ServiceProvider>>,
}
impl WorkflowHostBuilder {
@@ -33,6 +35,7 @@ impl WorkflowHostBuilder {
lifecycle: None,
search: None,
log_sink: None,
service_provider: None,
}
}
@@ -72,6 +75,12 @@ impl WorkflowHostBuilder {
self
}
/// Set an optional service provider for provisioning infrastructure services.
pub fn use_service_provider(mut self, provider: Arc<dyn ServiceProvider>) -> Self {
self.service_provider = Some(provider);
self
}
/// Build the `WorkflowHost`.
///
/// Returns an error if persistence, lock_provider, or queue_provider have not been set.
@@ -108,6 +117,7 @@ impl WorkflowHostBuilder {
queue_provider,
lifecycle: self.lifecycle,
search: self.search,
service_provider: self.service_provider,
registry: Arc::new(RwLock::new(InMemoryWorkflowRegistry::new())),
step_registry: Arc::new(RwLock::new(StepRegistry::new())),
executor: Arc::new(executor),