diff --git a/wfe/src/host.rs b/wfe/src/host.rs index b891e72..43847bc 100644 --- a/wfe/src/host.rs +++ b/wfe/src/host.rs @@ -13,7 +13,7 @@ use wfe_core::models::{ }; use wfe_core::traits::{ DistributedLockProvider, HostContext, LifecyclePublisher, PersistenceProvider, QueueProvider, - SearchIndex, StepBody, WorkflowData, + SearchIndex, ServiceProvider, StepBody, WorkflowData, }; use wfe_core::traits::registry::WorkflowRegistry; use wfe_core::{Result, WfeError}; @@ -75,6 +75,7 @@ pub struct WorkflowHost { pub(crate) search: Option>, pub(crate) registry: Arc>, pub(crate) step_registry: Arc>, + pub(crate) service_provider: Option>, pub(crate) executor: Arc, pub(crate) shutdown: CancellationToken, } @@ -119,6 +120,7 @@ impl WorkflowHost { registry: Arc::clone(&self.registry), queue_provider: Arc::clone(&self.queue_provider), }); + let svc_provider = self.service_provider.clone(); tokio::spawn(async move { loop { @@ -146,10 +148,71 @@ impl WorkflowHost { match definition { Some(def) => { let def_clone = def.clone(); + + // Capability check: can this host execute this workflow? + { + let sr = step_registry.read().await; + if !can_execute_workflow(&def_clone, &sr, &svc_provider) { + drop(reg); + debug!( + workflow_id = %workflow_id, + "Host cannot execute workflow, re-queuing" + ); + if let Err(e) = queue.queue_work(&workflow_id, QueueType::Workflow).await { + error!(error = %e, "Failed to re-queue workflow"); + } + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + continue; + } + } + + // Provision services if needed. + if !def_clone.services.is_empty() { + if let Some(ref provider) = svc_provider { + match provider.provision(&workflow_id, &def_clone.services).await { + Ok(endpoints) => { + debug!( + workflow_id = %workflow_id, + services = endpoints.len(), + "Services provisioned" + ); + // Inject service endpoints into workflow data. + if let Err(e) = inject_service_endpoints( + &executor.persistence, + &workflow_id, + &endpoints, + ).await { + error!(error = %e, "Failed to inject service endpoints"); + provider.teardown(&workflow_id).await.ok(); + continue; + } + } + Err(e) => { + error!( + workflow_id = %workflow_id, + error = %e, + "Failed to provision services" + ); + continue; + } + } + } + } + + // Execute the workflow. let sr = step_registry.read().await; if let Err(e) = executor.execute(&workflow_id, &def_clone, &sr, Some(host_ctx.as_ref())).await { error!(workflow_id = %workflow_id, error = %e, "Workflow execution failed"); } + + // Teardown services after execution (always, even on error). + if !def_clone.services.is_empty() { + if let Some(ref provider) = svc_provider { + if let Err(e) = provider.teardown(&workflow_id).await { + warn!(workflow_id = %workflow_id, error = %e, "Service teardown failed"); + } + } + } } None => { warn!( @@ -506,3 +569,92 @@ async fn process_event( Ok(()) } + +/// Check if a workflow definition can be executed by this host. +/// +/// Returns false if: +/// - Any step type is not registered in the step registry +/// - Services are declared but no ServiceProvider is configured +/// - The ServiceProvider cannot provision the required services +fn can_execute_workflow( + definition: &WorkflowDefinition, + step_registry: &StepRegistry, + service_provider: &Option>, +) -> bool { + // Check all step types are registered. + for step in &definition.steps { + if step_registry.resolve(&step.step_type).is_none() { + debug!( + step_type = %step.step_type, + "step type not registered on this host" + ); + return false; + } + } + + // If services are declared, check service provider. + if !definition.services.is_empty() { + match service_provider { + None => { + debug!("workflow requires services but no ServiceProvider configured"); + return false; + } + Some(provider) => { + if !provider.can_provision(&definition.services) { + debug!("ServiceProvider cannot provision required services"); + return false; + } + } + } + } + + true +} + +/// Inject service endpoint information into workflow instance data. +async fn inject_service_endpoints( + persistence: &Arc, + workflow_id: &str, + endpoints: &[wfe_core::models::ServiceEndpoint], +) -> Result<()> { + let mut instance = persistence.get_workflow_instance(workflow_id).await?; + + // Build service info map. + let mut services_map = serde_json::Map::new(); + for ep in endpoints { + let mut ep_map = serde_json::Map::new(); + ep_map.insert("host".into(), serde_json::Value::String(ep.host.clone())); + let ports: Vec = ep + .ports + .iter() + .map(|p| serde_json::json!(p.container_port)) + .collect(); + ep_map.insert("ports".into(), serde_json::Value::Array(ports)); + services_map.insert(ep.name.clone(), serde_json::Value::Object(ep_map)); + + // Also set env-style keys: SVC_{NAME}_HOST, SVC_{NAME}_PORT + let prefix = format!("SVC_{}", ep.name.to_uppercase()); + if let Some(data_obj) = instance.data.as_object_mut() { + data_obj.insert( + format!("{prefix}_HOST"), + serde_json::Value::String(ep.host.clone()), + ); + if let Some(port) = ep.ports.first() { + data_obj.insert( + format!("{prefix}_PORT"), + serde_json::json!(port.container_port), + ); + } + } + } + + if let Some(data_obj) = instance.data.as_object_mut() { + data_obj.insert( + "services".into(), + serde_json::Value::Object(services_map), + ); + } + + persistence.persist_workflow(&instance).await?; + Ok(()) +} diff --git a/wfe/src/host_builder.rs b/wfe/src/host_builder.rs index da1a566..9d14ad8 100644 --- a/wfe/src/host_builder.rs +++ b/wfe/src/host_builder.rs @@ -6,6 +6,7 @@ use tokio_util::sync::CancellationToken; use wfe_core::executor::{StepRegistry, WorkflowExecutor}; use wfe_core::traits::{ DistributedLockProvider, LifecyclePublisher, PersistenceProvider, QueueProvider, SearchIndex, + ServiceProvider, }; use wfe_core::WfeError; @@ -22,6 +23,7 @@ pub struct WorkflowHostBuilder { lifecycle: Option>, search: Option>, log_sink: Option>, + service_provider: Option>, } impl WorkflowHostBuilder { @@ -33,6 +35,7 @@ impl WorkflowHostBuilder { lifecycle: None, search: None, log_sink: None, + service_provider: None, } } @@ -72,6 +75,12 @@ impl WorkflowHostBuilder { self } + /// Set an optional service provider for provisioning infrastructure services. + pub fn use_service_provider(mut self, provider: Arc) -> Self { + self.service_provider = Some(provider); + self + } + /// Build the `WorkflowHost`. /// /// Returns an error if persistence, lock_provider, or queue_provider have not been set. @@ -108,6 +117,7 @@ impl WorkflowHostBuilder { queue_provider, lifecycle: self.lifecycle, search: self.search, + service_provider: self.service_provider, registry: Arc::new(RwLock::new(InMemoryWorkflowRegistry::new())), step_registry: Arc::new(RwLock::new(StepRegistry::new())), executor: Arc::new(executor),