feat(wfe-core): add typed workflow schema system

SchemaType enum with inline syntax parsing: "string", "string?",
"list<number>", "map<string>", nested generics. WorkflowSchema
validates inputs/outputs against type declarations at both compile
time and runtime. 39 tests for parse and validate paths.
This commit is contained in:
2026-03-26 14:12:51 +00:00
parent 0317c6adea
commit a3211552a5
8 changed files with 998 additions and 30 deletions

View File

@@ -7,7 +7,7 @@ use super::error_handler;
use super::result_processor;
use super::step_registry::StepRegistry;
use crate::models::{
ExecutionError, PointerStatus, QueueType, WorkflowDefinition, WorkflowStatus,
Event, ExecutionError, PointerStatus, QueueType, WorkflowDefinition, WorkflowStatus,
};
use crate::traits::{
DistributedLockProvider, LifecyclePublisher, PersistenceProvider, QueueProvider, SearchIndex,
@@ -61,7 +61,7 @@ impl WorkflowExecutor {
/// 8. Release lock
#[tracing::instrument(
name = "workflow.execute",
skip(self, definition, step_registry),
skip(self, definition, step_registry, host_context),
fields(
workflow.id = %workflow_id,
workflow.definition_id,
@@ -73,6 +73,7 @@ impl WorkflowExecutor {
workflow_id: &str,
definition: &WorkflowDefinition,
step_registry: &StepRegistry,
host_context: Option<&dyn crate::traits::HostContext>,
) -> Result<()> {
// 1. Acquire distributed lock.
let acquired = self.lock_provider.acquire_lock(workflow_id).await?;
@@ -82,7 +83,7 @@ impl WorkflowExecutor {
}
let result = self
.execute_inner(workflow_id, definition, step_registry)
.execute_inner(workflow_id, definition, step_registry, host_context)
.await;
// 7. Release lock (always).
@@ -98,6 +99,7 @@ impl WorkflowExecutor {
workflow_id: &str,
definition: &WorkflowDefinition,
step_registry: &StepRegistry,
host_context: Option<&dyn crate::traits::HostContext>,
) -> Result<()> {
// 2. Load workflow instance.
let mut workflow = self
@@ -173,6 +175,7 @@ impl WorkflowExecutor {
step,
workflow: &workflow,
cancellation_token,
host_context,
};
// d. Call step.run(context).
@@ -280,6 +283,18 @@ impl WorkflowExecutor {
info!(workflow_id, "All pointers complete, workflow finished");
workflow.status = WorkflowStatus::Complete;
workflow.complete_time = Some(Utc::now());
// Publish completion event for SubWorkflow parents.
let completion_event = Event::new(
"wfe.workflow.completed",
workflow_id,
serde_json::json!({ "status": "Complete", "data": workflow.data }),
);
let _ = self.persistence.create_event(&completion_event).await;
let _ = self
.queue_provider
.queue_work(&completion_event.id, QueueType::Event)
.await;
}
tracing::Span::current().record("workflow.status", tracing::field::debug(&workflow.status));
@@ -573,7 +588,7 @@ mod tests {
persistence.create_new_workflow(&instance).await.unwrap();
executor.execute(&instance.id, &def, &registry).await.unwrap();
executor.execute(&instance.id, &def, &registry, None).await.unwrap();
let updated = persistence.get_workflow_instance(&instance.id).await.unwrap();
assert_eq!(updated.status, WorkflowStatus::Complete);
@@ -604,7 +619,7 @@ mod tests {
persistence.create_new_workflow(&instance).await.unwrap();
// First execution: step 0 completes, step 1 pointer created.
executor.execute(&instance.id, &def, &registry).await.unwrap();
executor.execute(&instance.id, &def, &registry, None).await.unwrap();
let updated = persistence.get_workflow_instance(&instance.id).await.unwrap();
assert_eq!(updated.execution_pointers.len(), 2);
@@ -613,7 +628,7 @@ mod tests {
assert_eq!(updated.execution_pointers[1].step_id, 1);
// Second execution: step 1 completes.
executor.execute(&instance.id, &def, &registry).await.unwrap();
executor.execute(&instance.id, &def, &registry, None).await.unwrap();
let updated = persistence.get_workflow_instance(&instance.id).await.unwrap();
assert_eq!(updated.status, WorkflowStatus::Complete);
@@ -644,7 +659,7 @@ mod tests {
// Execute three times for three steps.
for _ in 0..3 {
executor.execute(&instance.id, &def, &registry).await.unwrap();
executor.execute(&instance.id, &def, &registry, None).await.unwrap();
}
let updated = persistence.get_workflow_instance(&instance.id).await.unwrap();
@@ -684,7 +699,7 @@ mod tests {
instance.execution_pointers.push(ExecutionPointer::new(0));
persistence.create_new_workflow(&instance).await.unwrap();
executor.execute(&instance.id, &def, &registry).await.unwrap();
executor.execute(&instance.id, &def, &registry, None).await.unwrap();
let updated = persistence.get_workflow_instance(&instance.id).await.unwrap();
assert_eq!(updated.execution_pointers.len(), 2);
@@ -707,7 +722,7 @@ mod tests {
instance.execution_pointers.push(ExecutionPointer::new(0));
persistence.create_new_workflow(&instance).await.unwrap();
executor.execute(&instance.id, &def, &registry).await.unwrap();
executor.execute(&instance.id, &def, &registry, None).await.unwrap();
let updated = persistence.get_workflow_instance(&instance.id).await.unwrap();
assert_eq!(updated.status, WorkflowStatus::Runnable);
@@ -733,7 +748,7 @@ mod tests {
instance.execution_pointers.push(ExecutionPointer::new(0));
persistence.create_new_workflow(&instance).await.unwrap();
executor.execute(&instance.id, &def, &registry).await.unwrap();
executor.execute(&instance.id, &def, &registry, None).await.unwrap();
let updated = persistence.get_workflow_instance(&instance.id).await.unwrap();
assert_eq!(updated.execution_pointers[0].status, PointerStatus::Sleeping);
@@ -756,7 +771,7 @@ mod tests {
instance.execution_pointers.push(ExecutionPointer::new(0));
persistence.create_new_workflow(&instance).await.unwrap();
executor.execute(&instance.id, &def, &registry).await.unwrap();
executor.execute(&instance.id, &def, &registry, None).await.unwrap();
let updated = persistence.get_workflow_instance(&instance.id).await.unwrap();
assert_eq!(
@@ -796,7 +811,7 @@ mod tests {
instance.execution_pointers.push(pointer);
persistence.create_new_workflow(&instance).await.unwrap();
executor.execute(&instance.id, &def, &registry).await.unwrap();
executor.execute(&instance.id, &def, &registry, None).await.unwrap();
let updated = persistence.get_workflow_instance(&instance.id).await.unwrap();
assert_eq!(updated.execution_pointers[0].status, PointerStatus::Complete);
@@ -822,7 +837,7 @@ mod tests {
instance.execution_pointers.push(ExecutionPointer::new(0));
persistence.create_new_workflow(&instance).await.unwrap();
executor.execute(&instance.id, &def, &registry).await.unwrap();
executor.execute(&instance.id, &def, &registry, None).await.unwrap();
let updated = persistence.get_workflow_instance(&instance.id).await.unwrap();
// 1 original + 3 children.
@@ -858,7 +873,7 @@ mod tests {
instance.execution_pointers.push(ExecutionPointer::new(0));
persistence.create_new_workflow(&instance).await.unwrap();
executor.execute(&instance.id, &def, &registry).await.unwrap();
executor.execute(&instance.id, &def, &registry, None).await.unwrap();
let updated = persistence.get_workflow_instance(&instance.id).await.unwrap();
assert_eq!(updated.execution_pointers[0].retry_count, 1);
@@ -884,7 +899,7 @@ mod tests {
instance.execution_pointers.push(ExecutionPointer::new(0));
persistence.create_new_workflow(&instance).await.unwrap();
executor.execute(&instance.id, &def, &registry).await.unwrap();
executor.execute(&instance.id, &def, &registry, None).await.unwrap();
let updated = persistence.get_workflow_instance(&instance.id).await.unwrap();
assert_eq!(updated.status, WorkflowStatus::Suspended);
@@ -908,7 +923,7 @@ mod tests {
instance.execution_pointers.push(ExecutionPointer::new(0));
persistence.create_new_workflow(&instance).await.unwrap();
executor.execute(&instance.id, &def, &registry).await.unwrap();
executor.execute(&instance.id, &def, &registry, None).await.unwrap();
let updated = persistence.get_workflow_instance(&instance.id).await.unwrap();
assert_eq!(updated.status, WorkflowStatus::Terminated);
@@ -936,7 +951,7 @@ mod tests {
instance.execution_pointers.push(ExecutionPointer::new(0));
persistence.create_new_workflow(&instance).await.unwrap();
executor.execute(&instance.id, &def, &registry).await.unwrap();
executor.execute(&instance.id, &def, &registry, None).await.unwrap();
let updated = persistence.get_workflow_instance(&instance.id).await.unwrap();
assert_eq!(updated.execution_pointers[0].status, PointerStatus::Failed);
@@ -964,7 +979,7 @@ mod tests {
instance.execution_pointers.push(ExecutionPointer::new(1));
persistence.create_new_workflow(&instance).await.unwrap();
executor.execute(&instance.id, &def, &registry).await.unwrap();
executor.execute(&instance.id, &def, &registry, None).await.unwrap();
let updated = persistence.get_workflow_instance(&instance.id).await.unwrap();
assert_eq!(updated.status, WorkflowStatus::Complete);
@@ -999,7 +1014,7 @@ mod tests {
persistence.create_new_workflow(&instance).await.unwrap();
// Should not error on a completed workflow.
executor.execute(&instance.id, &def, &registry).await.unwrap();
executor.execute(&instance.id, &def, &registry, None).await.unwrap();
let updated = persistence.get_workflow_instance(&instance.id).await.unwrap();
assert_eq!(updated.status, WorkflowStatus::Complete);
@@ -1024,7 +1039,7 @@ mod tests {
instance.execution_pointers.push(pointer);
persistence.create_new_workflow(&instance).await.unwrap();
executor.execute(&instance.id, &def, &registry).await.unwrap();
executor.execute(&instance.id, &def, &registry, None).await.unwrap();
let updated = persistence.get_workflow_instance(&instance.id).await.unwrap();
// Should still be sleeping since sleep_until is in the future.
@@ -1048,7 +1063,7 @@ mod tests {
instance.execution_pointers.push(ExecutionPointer::new(0));
persistence.create_new_workflow(&instance).await.unwrap();
executor.execute(&instance.id, &def, &registry).await.unwrap();
executor.execute(&instance.id, &def, &registry, None).await.unwrap();
let errors = persistence.get_errors().await;
assert_eq!(errors.len(), 1);
@@ -1072,7 +1087,7 @@ mod tests {
instance.execution_pointers.push(ExecutionPointer::new(0));
persistence.create_new_workflow(&instance).await.unwrap();
executor.execute(&instance.id, &def, &registry).await.unwrap();
executor.execute(&instance.id, &def, &registry, None).await.unwrap();
// Executor itself doesn't publish lifecycle events in the current implementation,
// but the with_lifecycle builder works correctly.
@@ -1097,7 +1112,7 @@ mod tests {
instance.execution_pointers.push(ExecutionPointer::new(0));
persistence.create_new_workflow(&instance).await.unwrap();
executor.execute(&instance.id, &def, &registry).await.unwrap();
executor.execute(&instance.id, &def, &registry, None).await.unwrap();
let updated = persistence.get_workflow_instance(&instance.id).await.unwrap();
assert_eq!(updated.status, WorkflowStatus::Terminated);
@@ -1118,7 +1133,7 @@ mod tests {
instance.execution_pointers.push(ExecutionPointer::new(0));
persistence.create_new_workflow(&instance).await.unwrap();
executor.execute(&instance.id, &def, &registry).await.unwrap();
executor.execute(&instance.id, &def, &registry, None).await.unwrap();
let updated = persistence.get_workflow_instance(&instance.id).await.unwrap();
assert!(updated.execution_pointers[0].start_time.is_some());
@@ -1148,7 +1163,7 @@ mod tests {
instance.execution_pointers.push(ExecutionPointer::new(0));
persistence.create_new_workflow(&instance).await.unwrap();
executor.execute(&instance.id, &def, &registry).await.unwrap();
executor.execute(&instance.id, &def, &registry, None).await.unwrap();
let updated = persistence.get_workflow_instance(&instance.id).await.unwrap();
assert_eq!(
@@ -1203,13 +1218,13 @@ mod tests {
persistence.create_new_workflow(&instance).await.unwrap();
// First execution: fails, retry scheduled.
executor.execute(&instance.id, &def, &registry).await.unwrap();
executor.execute(&instance.id, &def, &registry, None).await.unwrap();
let updated = persistence.get_workflow_instance(&instance.id).await.unwrap();
assert_eq!(updated.execution_pointers[0].retry_count, 1);
assert_eq!(updated.execution_pointers[0].status, PointerStatus::Sleeping);
// Second execution: succeeds (sleep_until is in the past with 0ms interval).
executor.execute(&instance.id, &def, &registry).await.unwrap();
executor.execute(&instance.id, &def, &registry, None).await.unwrap();
let updated = persistence.get_workflow_instance(&instance.id).await.unwrap();
assert_eq!(updated.execution_pointers[0].status, PointerStatus::Complete);
assert_eq!(updated.status, WorkflowStatus::Complete);
@@ -1227,7 +1242,7 @@ mod tests {
// No execution pointers at all.
persistence.create_new_workflow(&instance).await.unwrap();
executor.execute(&instance.id, &def, &registry).await.unwrap();
executor.execute(&instance.id, &def, &registry, None).await.unwrap();
let updated = persistence.get_workflow_instance(&instance.id).await.unwrap();
assert_eq!(updated.status, WorkflowStatus::Runnable);