Compare commits
7 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
ead883f714
|
|||
|
ac45011794
|
|||
|
2b244348ca
|
|||
|
de66fef2d6
|
|||
|
6c16c89379
|
|||
|
e515ffbe0c
|
|||
|
978109d3fc
|
20
CHANGELOG.md
20
CHANGELOG.md
@@ -2,6 +2,26 @@
|
||||
|
||||
All notable changes to this project will be documented in this file.
|
||||
|
||||
## [1.6.3] - 2026-04-05
|
||||
|
||||
### Fixed
|
||||
|
||||
- **wfe-core**: Propagate `step_name` into execution pointers when advancing to next steps, compensation steps, and parallel branch children
|
||||
- **wfe**: Set `step_name` on initial execution pointer when starting a workflow instance
|
||||
|
||||
## [1.6.2] - 2026-04-05
|
||||
|
||||
### Added
|
||||
|
||||
- **wfe-core**: `WorkflowBuilder::add_step_typed()` for adding named, configured steps in parallel branch closures
|
||||
- **wfe-core**: `WorkflowBuilder::wire_outcome()` now public for custom graph wiring
|
||||
|
||||
## [1.6.1] - 2026-04-05
|
||||
|
||||
### Added
|
||||
|
||||
- **wfe-core**: `StepBuilder::config()` for attaching arbitrary JSON configuration to individual steps, readable at runtime via `context.step.step_config`
|
||||
|
||||
## [1.6.0] - 2026-04-01
|
||||
|
||||
### Added
|
||||
|
||||
20
Cargo.toml
20
Cargo.toml
@@ -3,7 +3,7 @@ members = ["wfe-core", "wfe-sqlite", "wfe-postgres", "wfe-opensearch", "wfe-valk
|
||||
resolver = "2"
|
||||
|
||||
[workspace.package]
|
||||
version = "1.6.0"
|
||||
version = "1.6.3"
|
||||
edition = "2024"
|
||||
license = "MIT"
|
||||
repository = "https://src.sunbeam.pt/studio/wfe"
|
||||
@@ -38,15 +38,15 @@ redis = { version = "0.27", features = ["tokio-comp", "connection-manager"] }
|
||||
opensearch = "2"
|
||||
|
||||
# Internal crates
|
||||
wfe-core = { version = "1.6.0", path = "wfe-core", registry = "sunbeam" }
|
||||
wfe-sqlite = { version = "1.6.0", path = "wfe-sqlite", registry = "sunbeam" }
|
||||
wfe-postgres = { version = "1.6.0", path = "wfe-postgres", registry = "sunbeam" }
|
||||
wfe-opensearch = { version = "1.6.0", path = "wfe-opensearch", registry = "sunbeam" }
|
||||
wfe-valkey = { version = "1.6.0", path = "wfe-valkey", registry = "sunbeam" }
|
||||
wfe-yaml = { version = "1.6.0", path = "wfe-yaml", registry = "sunbeam" }
|
||||
wfe-buildkit = { version = "1.6.0", path = "wfe-buildkit", registry = "sunbeam" }
|
||||
wfe-containerd = { version = "1.6.0", path = "wfe-containerd", registry = "sunbeam" }
|
||||
wfe-rustlang = { version = "1.6.0", path = "wfe-rustlang", registry = "sunbeam" }
|
||||
wfe-core = { version = "1.6.3", path = "wfe-core", registry = "sunbeam" }
|
||||
wfe-sqlite = { version = "1.6.3", path = "wfe-sqlite", registry = "sunbeam" }
|
||||
wfe-postgres = { version = "1.6.3", path = "wfe-postgres", registry = "sunbeam" }
|
||||
wfe-opensearch = { version = "1.6.3", path = "wfe-opensearch", registry = "sunbeam" }
|
||||
wfe-valkey = { version = "1.6.3", path = "wfe-valkey", registry = "sunbeam" }
|
||||
wfe-yaml = { version = "1.6.3", path = "wfe-yaml", registry = "sunbeam" }
|
||||
wfe-buildkit = { version = "1.6.3", path = "wfe-buildkit", registry = "sunbeam" }
|
||||
wfe-containerd = { version = "1.6.3", path = "wfe-containerd", registry = "sunbeam" }
|
||||
wfe-rustlang = { version = "1.6.3", path = "wfe-rustlang", registry = "sunbeam" }
|
||||
|
||||
# YAML
|
||||
serde_yaml = "0.9"
|
||||
|
||||
@@ -16,7 +16,7 @@ async-trait = { workspace = true }
|
||||
tracing = { workspace = true }
|
||||
thiserror = { workspace = true }
|
||||
regex = { workspace = true }
|
||||
wfe-buildkit-protos = { version = "1.6.0", path = "../wfe-buildkit-protos", registry = "sunbeam" }
|
||||
wfe-buildkit-protos = { version = "1.6.3", path = "../wfe-buildkit-protos", registry = "sunbeam" }
|
||||
tonic = "0.14"
|
||||
tower = { version = "0.4", features = ["util"] }
|
||||
hyper-util = { version = "0.1", features = ["tokio"] }
|
||||
|
||||
@@ -9,7 +9,7 @@ description = "containerd container runner executor for WFE"
|
||||
|
||||
[dependencies]
|
||||
wfe-core = { workspace = true }
|
||||
wfe-containerd-protos = { version = "1.6.0", path = "../wfe-containerd-protos", registry = "sunbeam" }
|
||||
wfe-containerd-protos = { version = "1.6.3", path = "../wfe-containerd-protos", registry = "sunbeam" }
|
||||
tokio = { workspace = true }
|
||||
serde = { workspace = true }
|
||||
serde_json = { workspace = true }
|
||||
|
||||
@@ -43,6 +43,14 @@ impl<D: WorkflowData> StepBuilder<D> {
|
||||
self
|
||||
}
|
||||
|
||||
/// Attach arbitrary JSON configuration to this step.
|
||||
///
|
||||
/// The step can read it at runtime via `context.step.step_config`.
|
||||
pub fn config(mut self, config: serde_json::Value) -> Self {
|
||||
self.builder.steps[self.step_id].step_config = Some(config);
|
||||
self
|
||||
}
|
||||
|
||||
/// Add a compensation step for saga rollback.
|
||||
pub fn compensate_with<C: StepBody + Default + 'static>(mut self) -> Self {
|
||||
let comp_id = self.builder.add_step(std::any::type_name::<C>());
|
||||
|
||||
@@ -61,8 +61,23 @@ impl<D: WorkflowData> WorkflowBuilder<D> {
|
||||
id
|
||||
}
|
||||
|
||||
/// Add a typed step with an optional name and config.
|
||||
/// Convenience for use inside `parallel` branch closures.
|
||||
pub fn add_step_typed<S: StepBody + Default + 'static>(
|
||||
&mut self,
|
||||
name: &str,
|
||||
config: Option<serde_json::Value>,
|
||||
) -> usize {
|
||||
let id = self.add_step(std::any::type_name::<S>());
|
||||
self.steps[id].name = Some(name.to_string());
|
||||
if let Some(cfg) = config {
|
||||
self.steps[id].step_config = Some(cfg);
|
||||
}
|
||||
id
|
||||
}
|
||||
|
||||
/// Wire an outcome from `from_step` to `to_step`.
|
||||
pub(crate) fn wire_outcome(&mut self, from_step: usize, to_step: usize, value: Option<serde_json::Value>) {
|
||||
pub fn wire_outcome(&mut self, from_step: usize, to_step: usize, value: Option<serde_json::Value>) {
|
||||
if let Some(step) = self.steps.get_mut(from_step) {
|
||||
step.outcomes.push(StepOutcome {
|
||||
next_step: to_step,
|
||||
@@ -341,6 +356,79 @@ mod tests {
|
||||
assert!(def.steps[1].step_type.contains("StepB"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn config_sets_step_config() {
|
||||
let cfg = serde_json::json!({"namespace": "ory", "timeout": 30});
|
||||
let def = WorkflowBuilder::<TestData>::new()
|
||||
.start_with::<StepA>()
|
||||
.config(cfg.clone())
|
||||
.end_workflow()
|
||||
.build("test", 1);
|
||||
assert_eq!(def.steps[0].step_config, Some(cfg));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn config_chains_with_name() {
|
||||
let cfg = serde_json::json!({"namespace": "data"});
|
||||
let def = WorkflowBuilder::<TestData>::new()
|
||||
.start_with::<StepA>()
|
||||
.name("apply-data")
|
||||
.config(cfg.clone())
|
||||
.then::<StepB>()
|
||||
.end_workflow()
|
||||
.build("test", 1);
|
||||
assert_eq!(def.steps[0].name, Some("apply-data".into()));
|
||||
assert_eq!(def.steps[0].step_config, Some(cfg));
|
||||
assert_eq!(def.steps[0].outcomes[0].next_step, 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn config_on_multiple_steps_of_same_type() {
|
||||
let cfg_a = serde_json::json!({"namespace": "ory"});
|
||||
let cfg_b = serde_json::json!({"namespace": "data"});
|
||||
let def = WorkflowBuilder::<TestData>::new()
|
||||
.start_with::<StepA>()
|
||||
.name("apply-ory")
|
||||
.config(cfg_a.clone())
|
||||
.then::<StepA>()
|
||||
.name("apply-data")
|
||||
.config(cfg_b.clone())
|
||||
.end_workflow()
|
||||
.build("test", 1);
|
||||
assert_eq!(def.steps[0].step_config, Some(cfg_a));
|
||||
assert_eq!(def.steps[1].step_config, Some(cfg_b));
|
||||
// Both are StepA
|
||||
assert_eq!(def.steps[0].step_type, def.steps[1].step_type);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn add_step_typed_sets_name_and_config() {
|
||||
let cfg = serde_json::json!({"namespace": "ory"});
|
||||
let mut builder = WorkflowBuilder::<TestData>::new();
|
||||
let id = builder.add_step_typed::<StepA>("apply-ory", Some(cfg.clone()));
|
||||
assert_eq!(builder.steps[id].name, Some("apply-ory".into()));
|
||||
assert_eq!(builder.steps[id].step_config, Some(cfg));
|
||||
assert!(builder.steps[id].step_type.contains("StepA"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn add_step_typed_without_config() {
|
||||
let mut builder = WorkflowBuilder::<TestData>::new();
|
||||
let id = builder.add_step_typed::<StepB>("my-step", None);
|
||||
assert_eq!(builder.steps[id].name, Some("my-step".into()));
|
||||
assert_eq!(builder.steps[id].step_config, None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn wire_outcome_connects_steps() {
|
||||
let mut builder = WorkflowBuilder::<TestData>::new();
|
||||
let id0 = builder.add_step_typed::<StepA>("first", None);
|
||||
let id1 = builder.add_step_typed::<StepB>("second", None);
|
||||
builder.wire_outcome(id0, id1, None);
|
||||
assert_eq!(builder.steps[id0].outcomes.len(), 1);
|
||||
assert_eq!(builder.steps[id0].outcomes[0].next_step, id1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn inline_step_via_then_fn() {
|
||||
let def = WorkflowBuilder::<TestData>::new()
|
||||
|
||||
@@ -67,6 +67,9 @@ pub fn handle_error(
|
||||
&& let Some(comp_step_id) = step.compensation_step_id
|
||||
{
|
||||
let mut comp_pointer = ExecutionPointer::new(comp_step_id);
|
||||
comp_pointer.step_name = definition.steps.iter()
|
||||
.find(|s| s.id == comp_step_id)
|
||||
.and_then(|s| s.name.clone());
|
||||
comp_pointer.predecessor_id = Some(pointer.id.clone());
|
||||
comp_pointer.scope = pointer.scope.clone();
|
||||
new_pointers.push(comp_pointer);
|
||||
|
||||
@@ -36,6 +36,9 @@ pub fn process_result(
|
||||
let next_step_id = find_next_step(step, &result.outcome_value);
|
||||
if let Some(next_id) = next_step_id {
|
||||
let mut next_pointer = ExecutionPointer::new(next_id);
|
||||
next_pointer.step_name = definition.steps.iter()
|
||||
.find(|s| s.id == next_id)
|
||||
.and_then(|s| s.name.clone());
|
||||
next_pointer.predecessor_id = Some(pointer.id.clone());
|
||||
next_pointer.scope = pointer.scope.clone();
|
||||
new_pointers.push(next_pointer);
|
||||
@@ -59,6 +62,9 @@ pub fn process_result(
|
||||
for value in branch_values {
|
||||
for &child_step_id in &child_step_ids {
|
||||
let mut child_pointer = ExecutionPointer::new(child_step_id);
|
||||
child_pointer.step_name = definition.steps.iter()
|
||||
.find(|s| s.id == child_step_id)
|
||||
.and_then(|s| s.name.clone());
|
||||
child_pointer.context_item = Some(value.clone());
|
||||
child_pointer.scope = child_scope.clone();
|
||||
child_pointer.predecessor_id = Some(pointer.id.clone());
|
||||
|
||||
@@ -181,6 +181,9 @@ impl WorkflowExecutor {
|
||||
if let Some(next_id) = next_step_id {
|
||||
let mut next_pointer =
|
||||
crate::models::ExecutionPointer::new(next_id);
|
||||
next_pointer.step_name = definition.steps.iter()
|
||||
.find(|s| s.id == next_id)
|
||||
.and_then(|s| s.name.clone());
|
||||
next_pointer.predecessor_id =
|
||||
Some(workflow.execution_pointers[idx].id.clone());
|
||||
next_pointer.scope =
|
||||
|
||||
@@ -14,9 +14,9 @@ path = "src/main.rs"
|
||||
[dependencies]
|
||||
# Internal
|
||||
wfe-core = { workspace = true, features = ["test-support"] }
|
||||
wfe = { path = "../wfe" }
|
||||
wfe-yaml = { path = "../wfe-yaml", features = ["rustlang", "buildkit", "containerd"] }
|
||||
wfe-server-protos = { path = "../wfe-server-protos" }
|
||||
wfe = { version = "1.6.3", path = "../wfe", registry = "sunbeam" }
|
||||
wfe-yaml = { version = "1.6.3", path = "../wfe-yaml", registry = "sunbeam", features = ["rustlang", "buildkit", "containerd"] }
|
||||
wfe-server-protos = { version = "1.6.3", path = "../wfe-server-protos", registry = "sunbeam" }
|
||||
wfe-sqlite = { workspace = true }
|
||||
wfe-postgres = { workspace = true }
|
||||
wfe-valkey = { workspace = true }
|
||||
|
||||
@@ -293,7 +293,9 @@ impl WorkflowHost {
|
||||
// Create initial execution pointer for step 0 if the definition has steps.
|
||||
let mut instance = WorkflowInstance::new(definition_id, version, data);
|
||||
if !definition.steps.is_empty() {
|
||||
instance.execution_pointers.push(ExecutionPointer::new(0));
|
||||
let mut pointer = ExecutionPointer::new(0);
|
||||
pointer.step_name = definition.steps.first().and_then(|s| s.name.clone());
|
||||
instance.execution_pointers.push(pointer);
|
||||
}
|
||||
|
||||
// Persist the instance.
|
||||
|
||||
Reference in New Issue
Block a user