fix(wfe-core): propagate step_name into execution pointers
Sets step_name on execution pointers when advancing to next steps, compensation steps, and parallel branch children so that runtime consumers can identify steps by name without lookup.
This commit is contained in:
20
Cargo.toml
20
Cargo.toml
@@ -3,7 +3,7 @@ members = ["wfe-core", "wfe-sqlite", "wfe-postgres", "wfe-opensearch", "wfe-valk
|
|||||||
resolver = "2"
|
resolver = "2"
|
||||||
|
|
||||||
[workspace.package]
|
[workspace.package]
|
||||||
version = "1.6.2"
|
version = "1.6.3"
|
||||||
edition = "2024"
|
edition = "2024"
|
||||||
license = "MIT"
|
license = "MIT"
|
||||||
repository = "https://src.sunbeam.pt/studio/wfe"
|
repository = "https://src.sunbeam.pt/studio/wfe"
|
||||||
@@ -38,15 +38,15 @@ redis = { version = "0.27", features = ["tokio-comp", "connection-manager"] }
|
|||||||
opensearch = "2"
|
opensearch = "2"
|
||||||
|
|
||||||
# Internal crates
|
# Internal crates
|
||||||
wfe-core = { version = "1.6.2", path = "wfe-core", registry = "sunbeam" }
|
wfe-core = { version = "1.6.3", path = "wfe-core", registry = "sunbeam" }
|
||||||
wfe-sqlite = { version = "1.6.2", path = "wfe-sqlite", registry = "sunbeam" }
|
wfe-sqlite = { version = "1.6.3", path = "wfe-sqlite", registry = "sunbeam" }
|
||||||
wfe-postgres = { version = "1.6.2", path = "wfe-postgres", registry = "sunbeam" }
|
wfe-postgres = { version = "1.6.3", path = "wfe-postgres", registry = "sunbeam" }
|
||||||
wfe-opensearch = { version = "1.6.2", path = "wfe-opensearch", registry = "sunbeam" }
|
wfe-opensearch = { version = "1.6.3", path = "wfe-opensearch", registry = "sunbeam" }
|
||||||
wfe-valkey = { version = "1.6.2", path = "wfe-valkey", registry = "sunbeam" }
|
wfe-valkey = { version = "1.6.3", path = "wfe-valkey", registry = "sunbeam" }
|
||||||
wfe-yaml = { version = "1.6.2", path = "wfe-yaml", registry = "sunbeam" }
|
wfe-yaml = { version = "1.6.3", path = "wfe-yaml", registry = "sunbeam" }
|
||||||
wfe-buildkit = { version = "1.6.2", path = "wfe-buildkit", registry = "sunbeam" }
|
wfe-buildkit = { version = "1.6.3", path = "wfe-buildkit", registry = "sunbeam" }
|
||||||
wfe-containerd = { version = "1.6.2", path = "wfe-containerd", registry = "sunbeam" }
|
wfe-containerd = { version = "1.6.3", path = "wfe-containerd", registry = "sunbeam" }
|
||||||
wfe-rustlang = { version = "1.6.2", path = "wfe-rustlang", registry = "sunbeam" }
|
wfe-rustlang = { version = "1.6.3", path = "wfe-rustlang", registry = "sunbeam" }
|
||||||
|
|
||||||
# YAML
|
# YAML
|
||||||
serde_yaml = "0.9"
|
serde_yaml = "0.9"
|
||||||
|
|||||||
@@ -67,6 +67,9 @@ pub fn handle_error(
|
|||||||
&& let Some(comp_step_id) = step.compensation_step_id
|
&& let Some(comp_step_id) = step.compensation_step_id
|
||||||
{
|
{
|
||||||
let mut comp_pointer = ExecutionPointer::new(comp_step_id);
|
let mut comp_pointer = ExecutionPointer::new(comp_step_id);
|
||||||
|
comp_pointer.step_name = definition.steps.iter()
|
||||||
|
.find(|s| s.id == comp_step_id)
|
||||||
|
.and_then(|s| s.name.clone());
|
||||||
comp_pointer.predecessor_id = Some(pointer.id.clone());
|
comp_pointer.predecessor_id = Some(pointer.id.clone());
|
||||||
comp_pointer.scope = pointer.scope.clone();
|
comp_pointer.scope = pointer.scope.clone();
|
||||||
new_pointers.push(comp_pointer);
|
new_pointers.push(comp_pointer);
|
||||||
|
|||||||
@@ -36,6 +36,9 @@ pub fn process_result(
|
|||||||
let next_step_id = find_next_step(step, &result.outcome_value);
|
let next_step_id = find_next_step(step, &result.outcome_value);
|
||||||
if let Some(next_id) = next_step_id {
|
if let Some(next_id) = next_step_id {
|
||||||
let mut next_pointer = ExecutionPointer::new(next_id);
|
let mut next_pointer = ExecutionPointer::new(next_id);
|
||||||
|
next_pointer.step_name = definition.steps.iter()
|
||||||
|
.find(|s| s.id == next_id)
|
||||||
|
.and_then(|s| s.name.clone());
|
||||||
next_pointer.predecessor_id = Some(pointer.id.clone());
|
next_pointer.predecessor_id = Some(pointer.id.clone());
|
||||||
next_pointer.scope = pointer.scope.clone();
|
next_pointer.scope = pointer.scope.clone();
|
||||||
new_pointers.push(next_pointer);
|
new_pointers.push(next_pointer);
|
||||||
@@ -59,6 +62,9 @@ pub fn process_result(
|
|||||||
for value in branch_values {
|
for value in branch_values {
|
||||||
for &child_step_id in &child_step_ids {
|
for &child_step_id in &child_step_ids {
|
||||||
let mut child_pointer = ExecutionPointer::new(child_step_id);
|
let mut child_pointer = ExecutionPointer::new(child_step_id);
|
||||||
|
child_pointer.step_name = definition.steps.iter()
|
||||||
|
.find(|s| s.id == child_step_id)
|
||||||
|
.and_then(|s| s.name.clone());
|
||||||
child_pointer.context_item = Some(value.clone());
|
child_pointer.context_item = Some(value.clone());
|
||||||
child_pointer.scope = child_scope.clone();
|
child_pointer.scope = child_scope.clone();
|
||||||
child_pointer.predecessor_id = Some(pointer.id.clone());
|
child_pointer.predecessor_id = Some(pointer.id.clone());
|
||||||
|
|||||||
@@ -181,6 +181,9 @@ impl WorkflowExecutor {
|
|||||||
if let Some(next_id) = next_step_id {
|
if let Some(next_id) = next_step_id {
|
||||||
let mut next_pointer =
|
let mut next_pointer =
|
||||||
crate::models::ExecutionPointer::new(next_id);
|
crate::models::ExecutionPointer::new(next_id);
|
||||||
|
next_pointer.step_name = definition.steps.iter()
|
||||||
|
.find(|s| s.id == next_id)
|
||||||
|
.and_then(|s| s.name.clone());
|
||||||
next_pointer.predecessor_id =
|
next_pointer.predecessor_id =
|
||||||
Some(workflow.execution_pointers[idx].id.clone());
|
Some(workflow.execution_pointers[idx].id.clone());
|
||||||
next_pointer.scope =
|
next_pointer.scope =
|
||||||
|
|||||||
@@ -293,7 +293,9 @@ impl WorkflowHost {
|
|||||||
// Create initial execution pointer for step 0 if the definition has steps.
|
// Create initial execution pointer for step 0 if the definition has steps.
|
||||||
let mut instance = WorkflowInstance::new(definition_id, version, data);
|
let mut instance = WorkflowInstance::new(definition_id, version, data);
|
||||||
if !definition.steps.is_empty() {
|
if !definition.steps.is_empty() {
|
||||||
instance.execution_pointers.push(ExecutionPointer::new(0));
|
let mut pointer = ExecutionPointer::new(0);
|
||||||
|
pointer.step_name = definition.steps.first().and_then(|s| s.name.clone());
|
||||||
|
instance.execution_pointers.push(pointer);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Persist the instance.
|
// Persist the instance.
|
||||||
|
|||||||
Reference in New Issue
Block a user