8 Commits

Author SHA1 Message Date
a4d0f2a547 fix: add version + registry to wfe-deno path dep for publishing 2026-04-06 13:57:56 +01:00
4b8e544ab8 chore: bump version to 1.7.0, update
CHANGELOG
2026-04-05 22:29:05 +01:00
9a08882e28 feat(wfe-deno): Deno bindings for the WFE workflow engine 2026-04-05 22:06:07 +01:00
afb91c66bd feat(wfe-core): make WorkflowBuilder steps and last_step fields public
Needed by wfe-deno to build workflow definitions via ops without
going through the pub(crate) StepBuilder constructor.
2026-04-05 22:05:04 +01:00
ead883f714 chore: bump version to 1.6.3, update CHANGELOG 2026-04-05 19:55:44 +01:00
ac45011794 fix(wfe-core): propagate step_name into execution pointers
Sets step_name on execution pointers when advancing to next steps,
compensation steps, and parallel branch children so that runtime
consumers can identify steps by name without lookup.
2026-04-05 19:55:12 +01:00
2b244348ca chore: bump version to 1.6.2, update CHANGELOG 2026-04-05 12:45:25 +01:00
de66fef2d6 feat(wfe-core): add add_step_typed() and make wire_outcome public
Adds WorkflowBuilder::add_step_typed<S>() for adding named, configured
steps directly — needed for parallel branch closures in the CLI.
Makes wire_outcome() public so callers can wire custom step graphs.
2026-04-05 12:44:00 +01:00
23 changed files with 2035 additions and 20 deletions

View File

@@ -2,6 +2,31 @@
All notable changes to this project will be documented in this file.
## [1.7.0] - 2026-04-05
### Added
- **wfe-deno**: New crate -- Deno/JS/TS bindings for the WFE workflow engine
- Full API surface via 23 deno_core ops: host lifecycle, workflow management, fluent builder, step registration, event publishing
- Channel-based execution bridge: JS step functions called from tokio executor via mpsc/oneshot channels
- High-level JS API classes: `WorkflowHost`, `WorkflowBuilder`, `ExecutionResult`
- 52 tests (26 unit + 26 integration), 93% line coverage
- **wfe-core**: `WorkflowBuilder.steps` and `last_step` fields now public
## [1.6.3] - 2026-04-05
### Fixed
- **wfe-core**: Propagate `step_name` into execution pointers when advancing to next steps, compensation steps, and parallel branch children
- **wfe**: Set `step_name` on initial execution pointer when starting a workflow instance
## [1.6.2] - 2026-04-05
### Added
- **wfe-core**: `WorkflowBuilder::add_step_typed()` for adding named, configured steps in parallel branch closures
- **wfe-core**: `WorkflowBuilder::wire_outcome()` now public for custom graph wiring
## [1.6.1] - 2026-04-05
### Added

View File

@@ -1,9 +1,9 @@
[workspace]
members = ["wfe-core", "wfe-sqlite", "wfe-postgres", "wfe-opensearch", "wfe-valkey", "wfe", "wfe-yaml", "wfe-buildkit", "wfe-containerd", "wfe-containerd-protos", "wfe-buildkit-protos", "wfe-rustlang", "wfe-server-protos", "wfe-server"]
members = ["wfe-core", "wfe-sqlite", "wfe-postgres", "wfe-opensearch", "wfe-valkey", "wfe", "wfe-yaml", "wfe-buildkit", "wfe-containerd", "wfe-containerd-protos", "wfe-buildkit-protos", "wfe-rustlang", "wfe-server-protos", "wfe-server", "wfe-deno"]
resolver = "2"
[workspace.package]
version = "1.6.1"
version = "1.7.0"
edition = "2024"
license = "MIT"
repository = "https://src.sunbeam.pt/studio/wfe"
@@ -38,15 +38,15 @@ redis = { version = "0.27", features = ["tokio-comp", "connection-manager"] }
opensearch = "2"
# Internal crates
wfe-core = { version = "1.6.1", path = "wfe-core", registry = "sunbeam" }
wfe-sqlite = { version = "1.6.1", path = "wfe-sqlite", registry = "sunbeam" }
wfe-postgres = { version = "1.6.1", path = "wfe-postgres", registry = "sunbeam" }
wfe-opensearch = { version = "1.6.1", path = "wfe-opensearch", registry = "sunbeam" }
wfe-valkey = { version = "1.6.1", path = "wfe-valkey", registry = "sunbeam" }
wfe-yaml = { version = "1.6.1", path = "wfe-yaml", registry = "sunbeam" }
wfe-buildkit = { version = "1.6.1", path = "wfe-buildkit", registry = "sunbeam" }
wfe-containerd = { version = "1.6.1", path = "wfe-containerd", registry = "sunbeam" }
wfe-rustlang = { version = "1.6.1", path = "wfe-rustlang", registry = "sunbeam" }
wfe-core = { version = "1.7.0", path = "wfe-core", registry = "sunbeam" }
wfe-sqlite = { version = "1.7.0", path = "wfe-sqlite", registry = "sunbeam" }
wfe-postgres = { version = "1.7.0", path = "wfe-postgres", registry = "sunbeam" }
wfe-opensearch = { version = "1.7.0", path = "wfe-opensearch", registry = "sunbeam" }
wfe-valkey = { version = "1.7.0", path = "wfe-valkey", registry = "sunbeam" }
wfe-yaml = { version = "1.7.0", path = "wfe-yaml", registry = "sunbeam" }
wfe-buildkit = { version = "1.7.0", path = "wfe-buildkit", registry = "sunbeam" }
wfe-containerd = { version = "1.7.0", path = "wfe-containerd", registry = "sunbeam" }
wfe-rustlang = { version = "1.7.0", path = "wfe-rustlang", registry = "sunbeam" }
# YAML
serde_yaml = "0.9"

View File

@@ -16,7 +16,7 @@ async-trait = { workspace = true }
tracing = { workspace = true }
thiserror = { workspace = true }
regex = { workspace = true }
wfe-buildkit-protos = { version = "1.6.1", path = "../wfe-buildkit-protos", registry = "sunbeam" }
wfe-buildkit-protos = { version = "1.7.0", path = "../wfe-buildkit-protos", registry = "sunbeam" }
tonic = "0.14"
tower = { version = "0.4", features = ["util"] }
hyper-util = { version = "0.1", features = ["tokio"] }

View File

@@ -9,7 +9,7 @@ description = "containerd container runner executor for WFE"
[dependencies]
wfe-core = { workspace = true }
wfe-containerd-protos = { version = "1.6.1", path = "../wfe-containerd-protos", registry = "sunbeam" }
wfe-containerd-protos = { version = "1.7.0", path = "../wfe-containerd-protos", registry = "sunbeam" }
tokio = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }

View File

@@ -28,8 +28,8 @@ pub type InlineClosureBox = Box<dyn Fn() -> ExecutionResult + Send + Sync>;
/// .build("my-workflow", 1);
/// ```
pub struct WorkflowBuilder<D: WorkflowData> {
pub(crate) steps: Vec<WorkflowStep>,
pub(crate) last_step: Option<usize>,
pub steps: Vec<WorkflowStep>,
pub last_step: Option<usize>,
/// Inline closures keyed by step id, stored for later registration.
pub(crate) inline_closures: HashMap<usize, InlineClosureBox>,
_phantom: PhantomData<D>,
@@ -61,8 +61,23 @@ impl<D: WorkflowData> WorkflowBuilder<D> {
id
}
/// Add a typed step with an optional name and config.
/// Convenience for use inside `parallel` branch closures.
pub fn add_step_typed<S: StepBody + Default + 'static>(
&mut self,
name: &str,
config: Option<serde_json::Value>,
) -> usize {
let id = self.add_step(std::any::type_name::<S>());
self.steps[id].name = Some(name.to_string());
if let Some(cfg) = config {
self.steps[id].step_config = Some(cfg);
}
id
}
/// Wire an outcome from `from_step` to `to_step`.
pub(crate) fn wire_outcome(&mut self, from_step: usize, to_step: usize, value: Option<serde_json::Value>) {
pub fn wire_outcome(&mut self, from_step: usize, to_step: usize, value: Option<serde_json::Value>) {
if let Some(step) = self.steps.get_mut(from_step) {
step.outcomes.push(StepOutcome {
next_step: to_step,
@@ -386,6 +401,34 @@ mod tests {
assert_eq!(def.steps[0].step_type, def.steps[1].step_type);
}
#[test]
fn add_step_typed_sets_name_and_config() {
let cfg = serde_json::json!({"namespace": "ory"});
let mut builder = WorkflowBuilder::<TestData>::new();
let id = builder.add_step_typed::<StepA>("apply-ory", Some(cfg.clone()));
assert_eq!(builder.steps[id].name, Some("apply-ory".into()));
assert_eq!(builder.steps[id].step_config, Some(cfg));
assert!(builder.steps[id].step_type.contains("StepA"));
}
#[test]
fn add_step_typed_without_config() {
let mut builder = WorkflowBuilder::<TestData>::new();
let id = builder.add_step_typed::<StepB>("my-step", None);
assert_eq!(builder.steps[id].name, Some("my-step".into()));
assert_eq!(builder.steps[id].step_config, None);
}
#[test]
fn wire_outcome_connects_steps() {
let mut builder = WorkflowBuilder::<TestData>::new();
let id0 = builder.add_step_typed::<StepA>("first", None);
let id1 = builder.add_step_typed::<StepB>("second", None);
builder.wire_outcome(id0, id1, None);
assert_eq!(builder.steps[id0].outcomes.len(), 1);
assert_eq!(builder.steps[id0].outcomes[0].next_step, id1);
}
#[test]
fn inline_step_via_then_fn() {
let def = WorkflowBuilder::<TestData>::new()

View File

@@ -67,6 +67,9 @@ pub fn handle_error(
&& let Some(comp_step_id) = step.compensation_step_id
{
let mut comp_pointer = ExecutionPointer::new(comp_step_id);
comp_pointer.step_name = definition.steps.iter()
.find(|s| s.id == comp_step_id)
.and_then(|s| s.name.clone());
comp_pointer.predecessor_id = Some(pointer.id.clone());
comp_pointer.scope = pointer.scope.clone();
new_pointers.push(comp_pointer);

View File

@@ -36,6 +36,9 @@ pub fn process_result(
let next_step_id = find_next_step(step, &result.outcome_value);
if let Some(next_id) = next_step_id {
let mut next_pointer = ExecutionPointer::new(next_id);
next_pointer.step_name = definition.steps.iter()
.find(|s| s.id == next_id)
.and_then(|s| s.name.clone());
next_pointer.predecessor_id = Some(pointer.id.clone());
next_pointer.scope = pointer.scope.clone();
new_pointers.push(next_pointer);
@@ -59,6 +62,9 @@ pub fn process_result(
for value in branch_values {
for &child_step_id in &child_step_ids {
let mut child_pointer = ExecutionPointer::new(child_step_id);
child_pointer.step_name = definition.steps.iter()
.find(|s| s.id == child_step_id)
.and_then(|s| s.name.clone());
child_pointer.context_item = Some(value.clone());
child_pointer.scope = child_scope.clone();
child_pointer.predecessor_id = Some(pointer.id.clone());

View File

@@ -181,6 +181,9 @@ impl WorkflowExecutor {
if let Some(next_id) = next_step_id {
let mut next_pointer =
crate::models::ExecutionPointer::new(next_id);
next_pointer.step_name = definition.steps.iter()
.find(|s| s.id == next_id)
.and_then(|s| s.name.clone());
next_pointer.predecessor_id =
Some(workflow.execution_pointers[idx].id.clone());
next_pointer.scope =

26
wfe-deno/Cargo.toml Normal file
View File

@@ -0,0 +1,26 @@
[package]
name = "wfe-deno"
version.workspace = true
edition.workspace = true
license.workspace = true
repository.workspace = true
homepage.workspace = true
description = "Deno bindings for the WFE workflow engine"
[dependencies]
wfe-core = { workspace = true, features = ["test-support"] }
wfe = { version = "1.7.0", path = "../wfe", registry = "sunbeam" }
deno_core = { workspace = true }
deno_error = { workspace = true }
tokio = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
async-trait = { workspace = true }
tracing = { workspace = true }
chrono = { workspace = true }
[dev-dependencies]
pretty_assertions = { workspace = true }
tokio = { workspace = true, features = ["test-util"] }
tokio-util = "0.7"
tempfile = { workspace = true }

444
wfe-deno/src/bridge.rs Normal file
View File

@@ -0,0 +1,444 @@
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::time::Duration;
use tokio::sync::{mpsc, oneshot};
use wfe_core::models::ExecutionResult;
use wfe_core::traits::step::{StepBody, StepExecutionContext};
use wfe_core::WfeError;
/// A request sent from the executor (tokio) to the V8 thread.
pub struct StepRequest {
pub request_id: u32,
pub step_type: String,
pub context: serde_json::Value,
pub response_tx: oneshot::Sender<Result<serde_json::Value, String>>,
}
/// A `StepBody` implementation that bridges to JavaScript via channels.
///
/// When the workflow executor calls `run()`, this sends a serialized
/// `StepExecutionContext` to the V8 thread and awaits the response.
pub struct JsStepBody {
request_tx: mpsc::Sender<StepRequest>,
request_id_counter: std::sync::Arc<std::sync::atomic::AtomicU32>,
}
impl JsStepBody {
pub fn new(
request_tx: mpsc::Sender<StepRequest>,
request_id_counter: std::sync::Arc<std::sync::atomic::AtomicU32>,
) -> Self {
Self {
request_tx,
request_id_counter,
}
}
}
#[async_trait]
impl StepBody for JsStepBody {
async fn run(
&mut self,
context: &StepExecutionContext<'_>,
) -> wfe_core::Result<ExecutionResult> {
let ctx_json = serialize_context(context);
let (tx, rx) = oneshot::channel();
let request_id = self
.request_id_counter
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
self.request_tx
.send(StepRequest {
request_id,
step_type: context.step.step_type.clone(),
context: ctx_json,
response_tx: tx,
})
.await
.map_err(|_| WfeError::StepExecution("step request channel closed".into()))?;
let result_json = rx
.await
.map_err(|_| WfeError::StepExecution("step response channel dropped".into()))?
.map_err(WfeError::StepExecution)?;
deserialize_execution_result(&result_json)
}
}
/// Serializable view of `StepExecutionContext` for passing to JavaScript.
#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
pub struct JsStepContext {
pub item: Option<serde_json::Value>,
pub persistence_data: Option<serde_json::Value>,
pub step: JsStepInfo,
pub workflow: JsWorkflowInfo,
pub pointer: JsPointerInfo,
}
#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
pub struct JsStepInfo {
pub id: usize,
pub name: Option<String>,
pub step_type: String,
pub step_config: Option<serde_json::Value>,
}
#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
pub struct JsWorkflowInfo {
pub id: String,
pub definition_id: String,
pub version: u32,
pub status: String,
pub data: serde_json::Value,
pub create_time: DateTime<Utc>,
}
#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
pub struct JsPointerInfo {
pub id: String,
pub step_id: usize,
pub step_name: Option<String>,
pub retry_count: u32,
}
/// Serialize a `StepExecutionContext` into a JSON value for JavaScript.
pub fn serialize_context(ctx: &StepExecutionContext<'_>) -> serde_json::Value {
let js_ctx = JsStepContext {
item: ctx.item.cloned(),
persistence_data: ctx.persistence_data.cloned(),
step: JsStepInfo {
id: ctx.step.id,
name: ctx.step.name.clone(),
step_type: ctx.step.step_type.clone(),
step_config: ctx.step.step_config.clone(),
},
workflow: JsWorkflowInfo {
id: ctx.workflow.id.clone(),
definition_id: ctx.workflow.workflow_definition_id.clone(),
version: ctx.workflow.version,
status: format!("{:?}", ctx.workflow.status),
data: ctx.workflow.data.clone(),
create_time: ctx.workflow.create_time,
},
pointer: JsPointerInfo {
id: ctx.execution_pointer.id.clone(),
step_id: ctx.execution_pointer.step_id,
step_name: ctx.execution_pointer.step_name.clone(),
retry_count: ctx.execution_pointer.retry_count,
},
};
serde_json::to_value(js_ctx).unwrap_or(serde_json::Value::Null)
}
/// Shape of an `ExecutionResult` as returned from JavaScript.
#[derive(Deserialize, Default)]
#[serde(rename_all = "camelCase")]
pub struct JsExecutionResult {
#[serde(default = "default_true")]
pub proceed: bool,
pub outcome_value: Option<serde_json::Value>,
pub sleep_for: Option<u64>,
pub persistence_data: Option<serde_json::Value>,
pub event_name: Option<String>,
pub event_key: Option<String>,
pub branch_values: Option<Vec<serde_json::Value>>,
pub output_data: Option<serde_json::Value>,
}
fn default_true() -> bool {
true
}
/// Deserialize a JavaScript execution result into the Rust `ExecutionResult`.
pub fn deserialize_execution_result(
value: &serde_json::Value,
) -> wfe_core::Result<ExecutionResult> {
let js_result: JsExecutionResult = serde_json::from_value(value.clone()).map_err(|e| {
WfeError::StepExecution(format!("failed to deserialize ExecutionResult from JS: {e}"))
})?;
Ok(ExecutionResult {
proceed: js_result.proceed,
outcome_value: js_result.outcome_value,
sleep_for: js_result.sleep_for.map(Duration::from_millis),
persistence_data: js_result.persistence_data,
event_name: js_result.event_name,
event_key: js_result.event_key,
event_as_of: None,
branch_values: js_result.branch_values,
poll_endpoint: None,
output_data: js_result.output_data,
})
}
#[cfg(test)]
mod tests {
use super::*;
use pretty_assertions::assert_eq;
use wfe_core::models::{ExecutionPointer, WorkflowInstance, WorkflowStatus, WorkflowStep};
fn make_test_context() -> (WorkflowInstance, WorkflowStep, ExecutionPointer) {
let instance = WorkflowInstance {
id: "wf-1".into(),
workflow_definition_id: "test-def".into(),
version: 1,
description: None,
reference: None,
execution_pointers: vec![],
next_execution: None,
status: WorkflowStatus::Runnable,
data: serde_json::json!({"name": "World"}),
create_time: Utc::now(),
complete_time: None,
};
let step = WorkflowStep::new(0, "MyStep");
let pointer = ExecutionPointer::new(0);
(instance, step, pointer)
}
#[test]
fn serialize_context_produces_valid_json() {
let (instance, mut step, pointer) = make_test_context();
step.name = Some("greet".into());
step.step_config = Some(serde_json::json!({"key": "val"}));
let ctx = StepExecutionContext {
item: None,
execution_pointer: &pointer,
persistence_data: None,
step: &step,
workflow: &instance,
cancellation_token: tokio_util::sync::CancellationToken::new(),
host_context: None,
log_sink: None,
};
let json = serialize_context(&ctx);
assert_eq!(json["step"]["name"], "greet");
assert_eq!(json["step"]["stepConfig"]["key"], "val");
assert_eq!(json["workflow"]["data"]["name"], "World");
assert_eq!(json["workflow"]["definitionId"], "test-def");
assert_eq!(json["pointer"]["stepId"], 0);
}
#[test]
fn serialize_context_with_item() {
let (instance, step, pointer) = make_test_context();
let item = serde_json::json!({"id": 42});
let ctx = StepExecutionContext {
item: Some(&item),
execution_pointer: &pointer,
persistence_data: Some(&serde_json::json!({"saved": true})),
step: &step,
workflow: &instance,
cancellation_token: tokio_util::sync::CancellationToken::new(),
host_context: None,
log_sink: None,
};
let json = serialize_context(&ctx);
assert_eq!(json["item"]["id"], 42);
assert_eq!(json["persistenceData"]["saved"], true);
}
#[test]
fn deserialize_next_result() {
let json = serde_json::json!({"proceed": true});
let result = deserialize_execution_result(&json).unwrap();
assert!(result.proceed);
assert!(result.outcome_value.is_none());
}
#[test]
fn deserialize_outcome_result() {
let json = serde_json::json!({
"proceed": true,
"outcomeValue": "branch-a"
});
let result = deserialize_execution_result(&json).unwrap();
assert!(result.proceed);
assert_eq!(result.outcome_value, Some(serde_json::json!("branch-a")));
}
#[test]
fn deserialize_sleep_result() {
let json = serde_json::json!({
"proceed": false,
"sleepFor": 5000
});
let result = deserialize_execution_result(&json).unwrap();
assert!(!result.proceed);
assert_eq!(result.sleep_for, Some(Duration::from_millis(5000)));
}
#[test]
fn deserialize_persist_result() {
let json = serde_json::json!({
"proceed": false,
"persistenceData": {"page": 2}
});
let result = deserialize_execution_result(&json).unwrap();
assert!(!result.proceed);
assert_eq!(
result.persistence_data,
Some(serde_json::json!({"page": 2}))
);
}
#[test]
fn deserialize_wait_for_event_result() {
let json = serde_json::json!({
"proceed": false,
"eventName": "order.paid",
"eventKey": "order-123"
});
let result = deserialize_execution_result(&json).unwrap();
assert_eq!(result.event_name, Some("order.paid".into()));
assert_eq!(result.event_key, Some("order-123".into()));
}
#[test]
fn deserialize_branch_result() {
let json = serde_json::json!({
"proceed": false,
"branchValues": [1, 2, 3]
});
let result = deserialize_execution_result(&json).unwrap();
assert_eq!(
result.branch_values,
Some(vec![
serde_json::json!(1),
serde_json::json!(2),
serde_json::json!(3)
])
);
}
#[test]
fn deserialize_output_data_result() {
let json = serde_json::json!({
"proceed": true,
"outputData": {"greeted": "World"}
});
let result = deserialize_execution_result(&json).unwrap();
assert!(result.proceed);
assert_eq!(
result.output_data,
Some(serde_json::json!({"greeted": "World"}))
);
}
#[test]
fn deserialize_empty_object_defaults_to_proceed() {
let json = serde_json::json!({});
let result = deserialize_execution_result(&json).unwrap();
assert!(result.proceed);
}
#[test]
fn deserialize_invalid_json_returns_error() {
let json = serde_json::json!("not an object");
let result = deserialize_execution_result(&json);
assert!(result.is_err());
}
#[tokio::test]
async fn js_step_body_channel_round_trip() {
let (tx, mut rx) = mpsc::channel::<StepRequest>(16);
let counter = std::sync::Arc::new(std::sync::atomic::AtomicU32::new(0));
let mut body = JsStepBody::new(tx, counter);
let (instance, step, pointer) = make_test_context();
let ctx = StepExecutionContext {
item: None,
execution_pointer: &pointer,
persistence_data: None,
step: &step,
workflow: &instance,
cancellation_token: tokio_util::sync::CancellationToken::new(),
host_context: None,
log_sink: None,
};
// Spawn a "JS side" that responds to the request.
let responder = tokio::spawn(async move {
let req = rx.recv().await.unwrap();
assert_eq!(req.step_type, "MyStep");
assert_eq!(req.request_id, 0);
req.response_tx
.send(Ok(serde_json::json!({"proceed": true, "outputData": {"done": true}})))
.unwrap();
});
let result = body.run(&ctx).await.unwrap();
assert!(result.proceed);
assert_eq!(result.output_data, Some(serde_json::json!({"done": true})));
responder.await.unwrap();
}
#[tokio::test]
async fn js_step_body_propagates_js_error() {
let (tx, mut rx) = mpsc::channel::<StepRequest>(16);
let counter = std::sync::Arc::new(std::sync::atomic::AtomicU32::new(0));
let mut body = JsStepBody::new(tx, counter);
let (instance, step, pointer) = make_test_context();
let ctx = StepExecutionContext {
item: None,
execution_pointer: &pointer,
persistence_data: None,
step: &step,
workflow: &instance,
cancellation_token: tokio_util::sync::CancellationToken::new(),
host_context: None,
log_sink: None,
};
tokio::spawn(async move {
let req = rx.recv().await.unwrap();
req.response_tx
.send(Err("TypeError: undefined is not a function".into()))
.unwrap();
});
let result = body.run(&ctx).await;
assert!(result.is_err());
let err_msg = format!("{}", result.unwrap_err());
assert!(err_msg.contains("TypeError"));
}
#[tokio::test]
async fn js_step_body_handles_dropped_responder() {
let (tx, mut rx) = mpsc::channel::<StepRequest>(16);
let counter = std::sync::Arc::new(std::sync::atomic::AtomicU32::new(0));
let mut body = JsStepBody::new(tx, counter);
let (instance, step, pointer) = make_test_context();
let ctx = StepExecutionContext {
item: None,
execution_pointer: &pointer,
persistence_data: None,
step: &step,
workflow: &instance,
cancellation_token: tokio_util::sync::CancellationToken::new(),
host_context: None,
log_sink: None,
};
tokio::spawn(async move {
let req = rx.recv().await.unwrap();
drop(req.response_tx); // Drop without sending
});
let result = body.run(&ctx).await;
assert!(result.is_err());
}
}

50
wfe-deno/src/js/bootstrap.js vendored Normal file
View File

@@ -0,0 +1,50 @@
// Import wfe.js so it's evaluated and available for user code.
import "ext:wfe-deno/wfe.js";
// Step function registry -- maps step type names to JS functions.
const _stepFunctions = {};
// Called by WorkflowHost.registerStep() to store the JS function.
globalThis.__wfe_registerStepFunction = (stepType, fn) => {
_stepFunctions[stepType] = fn;
};
// Step executor loop -- runs in the background, dispatching step execution
// requests from the Rust executor to registered JS functions.
//
// This loop starts automatically when the extension loads. It blocks on
// op_step_executor_poll (async op) until a step needs to execute, then
// calls the matching JS function and sends the result back.
globalThis.__wfe_startExecutorLoop = () => {
(async () => {
while (true) {
let req;
try {
req = await Deno.core.ops.op_step_executor_poll();
} catch (_) {
// Poll op failed (e.g. already active) -- stop loop.
break;
}
if (req === null || req === undefined) break; // shutdown signal
try {
const fn = _stepFunctions[req.stepType];
if (!fn) {
throw new Error(`No step function registered for "${req.stepType}"`);
}
const result = await fn(req.context);
Deno.core.ops.op_step_executor_respond(
req.requestId,
result || { proceed: true },
null,
);
} catch (e) {
Deno.core.ops.op_step_executor_respond(
req.requestId,
null,
e.message || String(e),
);
}
}
})();
};

188
wfe-deno/src/js/wfe.js Normal file
View File

@@ -0,0 +1,188 @@
// High-level WFE API for Deno.
//
// Usage:
// import { WorkflowHost, WorkflowBuilder, ExecutionResult } from "ext:wfe-deno/wfe.js";
/// Factory methods for step execution results.
export class ExecutionResult {
/** Proceed to the next step. */
static next() {
return { proceed: true };
}
/** Proceed with a routing outcome value. */
static outcome(value) {
return { proceed: true, outcomeValue: value };
}
/** Pause and persist step state for later resumption. */
static persist(data) {
return { proceed: false, persistenceData: data };
}
/** Sleep for the given number of milliseconds. */
static sleep(ms, persistenceData) {
const r = { proceed: false, sleepFor: ms };
if (persistenceData !== undefined) r.persistenceData = persistenceData;
return r;
}
/** Wait for an external event before continuing. */
static waitForEvent(eventName, eventKey) {
return { proceed: false, eventName, eventKey };
}
/** Fork into parallel branches. */
static branch(values) {
return { proceed: false, branchValues: values };
}
/** Proceed and attach output data. */
static output(data) {
return { proceed: true, outputData: data };
}
}
/// Workflow engine host.
export class WorkflowHost {
/** Create a WorkflowHost with in-memory providers. */
static create() {
Deno.core.ops.op_host_create();
return new WorkflowHost();
}
/** Start the background workflow and event consumer loops. */
async start() {
await Deno.core.ops.op_host_start();
}
/** Stop the host gracefully. */
async stop() {
await Deno.core.ops.op_host_stop();
}
/**
* Register a JavaScript function as a workflow step.
*
* @param {string} stepType - The step type name (used in builder).
* @param {function} fn - Async function receiving context, returning ExecutionResult.
*/
async registerStep(stepType, fn) {
// Start the executor loop on first step registration.
if (!this._executorStarted) {
globalThis.__wfe_startExecutorLoop();
this._executorStarted = true;
}
await Deno.core.ops.op_register_step(stepType);
globalThis.__wfe_registerStepFunction(stepType, fn);
}
/**
* Start a new workflow instance.
*
* @param {string} definitionId - The workflow definition ID.
* @param {number} version - The workflow version.
* @param {object} data - Initial workflow data.
* @returns {Promise<string>} The workflow instance ID.
*/
async startWorkflow(definitionId, version, data) {
return await Deno.core.ops.op_start_workflow(definitionId, version, data);
}
/** Suspend a running workflow. */
async suspendWorkflow(id) {
return await Deno.core.ops.op_suspend_workflow(id);
}
/** Resume a suspended workflow. */
async resumeWorkflow(id) {
return await Deno.core.ops.op_resume_workflow(id);
}
/** Terminate a workflow. */
async terminateWorkflow(id) {
return await Deno.core.ops.op_terminate_workflow(id);
}
/** Get a workflow instance by ID. */
async getWorkflow(id) {
return await Deno.core.ops.op_get_workflow(id);
}
/** Publish an event for waiting workflows. */
async publishEvent(eventName, eventKey, data) {
await Deno.core.ops.op_publish_event(eventName, eventKey, data);
}
/**
* Build and register a workflow definition using a fluent builder.
*
* @param {string} id - Workflow definition ID.
* @param {number} version - Workflow version.
* @param {function} builderFn - Function receiving a WorkflowBuilder.
*/
async buildWorkflow(id, version, builderFn) {
const b = new WorkflowBuilder();
builderFn(b);
await b._register(id, version);
}
}
/// Fluent workflow builder wrapping the Rust WorkflowBuilder via ops.
export class WorkflowBuilder {
constructor() {
this._handle = Deno.core.ops.op_builder_create();
}
/** Add the first step. */
startWith(stepType) {
Deno.core.ops.op_builder_start_with(this._handle, stepType);
return this;
}
/** Chain the next step. */
then(stepType) {
Deno.core.ops.op_builder_then(this._handle, stepType);
return this;
}
/** Set the current step's display name. */
name(n) {
Deno.core.ops.op_builder_name(this._handle, n);
return this;
}
/** Attach JSON configuration to the current step. */
config(c) {
Deno.core.ops.op_builder_config(this._handle, c);
return this;
}
/** Set error handling behavior for the current step. */
onError(behavior) {
Deno.core.ops.op_builder_on_error(this._handle, behavior);
return this;
}
/** Add a delay step (milliseconds). */
delay(ms) {
Deno.core.ops.op_builder_delay(this._handle, ms);
return this;
}
/** Add a wait-for-event step. */
waitFor(eventName, eventKey) {
Deno.core.ops.op_builder_wait_for(this._handle, eventName, eventKey);
return this;
}
/** Build and return the workflow definition as JSON. */
build(id, version) {
return Deno.core.ops.op_builder_build(this._handle, id, version);
}
/** Build and register the definition with the host. */
async _register(id, version) {
await Deno.core.ops.op_builder_register(this._handle, id, version);
}
}

63
wfe-deno/src/lib.rs Normal file
View File

@@ -0,0 +1,63 @@
pub mod bridge;
pub mod ops;
pub mod state;
use deno_core::{JsRuntime, RuntimeOptions};
use tokio::sync::mpsc;
use crate::ops::wfe_deno_ext;
use crate::state::WfeState;
/// Create a `JsRuntime` with the wfe-deno extension loaded.
///
/// The runtime is ready to execute JavaScript that uses the WFE API:
/// ```js
/// import { WorkflowHost, ExecutionResult } from "ext:wfe-deno/wfe.js";
/// ```
pub fn create_wfe_runtime() -> JsRuntime {
create_wfe_runtime_with_channel_size(256)
}
/// Create a `JsRuntime` with a custom step-request channel buffer size.
pub fn create_wfe_runtime_with_channel_size(channel_size: usize) -> JsRuntime {
let ext = wfe_deno_ext::init();
let runtime = JsRuntime::new(RuntimeOptions {
extensions: vec![ext],
..Default::default()
});
let (tx, rx) = mpsc::channel(channel_size);
{
let op_state = runtime.op_state();
let mut op_state = op_state.borrow_mut();
op_state.put(WfeState::new(tx, rx));
}
runtime
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn create_runtime_succeeds() {
let _runtime = create_wfe_runtime();
}
#[test]
fn create_runtime_with_custom_channel_size() {
let _runtime = create_wfe_runtime_with_channel_size(1);
}
#[test]
fn runtime_has_wfe_state() {
let runtime = create_wfe_runtime();
let op_state = runtime.op_state();
let op_state = op_state.borrow();
let wfe = op_state.borrow::<WfeState>();
assert!(wfe.host.is_none());
assert!(wfe.step_request_rx.is_some());
}
}

312
wfe-deno/src/ops/builder.rs Normal file
View File

@@ -0,0 +1,312 @@
use std::time::Duration;
use deno_core::op2;
use deno_core::OpState;
use wfe_core::builder::WorkflowBuilder;
use wfe_core::models::ErrorBehavior;
use crate::state::WfeState;
/// Internal builder state that tracks the WorkflowBuilder and current step.
///
/// We don't use StepBuilder (pub(crate)) — instead we work directly with
/// WorkflowBuilder's public `steps` field, `add_step`, and `wire_outcome`.
pub struct JsBuilderState {
pub wb: WorkflowBuilder<serde_json::Value>,
pub current_step: Option<usize>,
}
/// Create a new WorkflowBuilder and return its handle.
#[op2(fast)]
#[smi]
pub fn op_builder_create(state: &mut OpState) -> u32 {
let wfe = state.borrow_mut::<WfeState>();
let id = wfe.alloc_builder_id();
wfe.builders.insert(
id,
JsBuilderState {
wb: WorkflowBuilder::<serde_json::Value>::new(),
current_step: None,
},
);
id
}
/// Add the first step via `start_with`.
#[op2(fast)]
pub fn op_builder_start_with(
state: &mut OpState,
#[smi] handle: u32,
#[string] step_type: String,
) -> Result<(), deno_error::JsErrorBox> {
let wfe = state.borrow_mut::<WfeState>();
let bs = get_builder(wfe, handle)?;
if bs.current_step.is_some() {
return Err(deno_error::JsErrorBox::generic(
"start_with already called on this builder",
));
}
let step_id = bs.wb.add_step(&step_type);
bs.current_step = Some(step_id);
Ok(())
}
/// Chain the next step via `then`.
#[op2(fast)]
pub fn op_builder_then(
state: &mut OpState,
#[smi] handle: u32,
#[string] step_type: String,
) -> Result<(), deno_error::JsErrorBox> {
let wfe = state.borrow_mut::<WfeState>();
let bs = get_builder(wfe, handle)?;
let prev_id = bs
.current_step
.ok_or_else(|| deno_error::JsErrorBox::generic("call start_with before then"))?;
let next_id = bs.wb.add_step(&step_type);
bs.wb.wire_outcome(prev_id, next_id, None);
bs.current_step = Some(next_id);
Ok(())
}
/// Set the current step's name.
#[op2(fast)]
pub fn op_builder_name(
state: &mut OpState,
#[smi] handle: u32,
#[string] name: String,
) -> Result<(), deno_error::JsErrorBox> {
let wfe = state.borrow_mut::<WfeState>();
let bs = get_builder(wfe, handle)?;
let step_id = current_step(bs)?;
bs.wb.steps[step_id].name = Some(name);
Ok(())
}
/// Set the current step's JSON config.
#[op2]
pub fn op_builder_config(
state: &mut OpState,
#[smi] handle: u32,
#[serde] config: serde_json::Value,
) -> Result<(), deno_error::JsErrorBox> {
let wfe = state.borrow_mut::<WfeState>();
let bs = get_builder(wfe, handle)?;
let step_id = current_step(bs)?;
bs.wb.steps[step_id].step_config = Some(config);
Ok(())
}
/// Set the current step's error behavior.
#[op2]
pub fn op_builder_on_error(
state: &mut OpState,
#[smi] handle: u32,
#[serde] behavior: serde_json::Value,
) -> Result<(), deno_error::JsErrorBox> {
let wfe = state.borrow_mut::<WfeState>();
let bs = get_builder(wfe, handle)?;
let step_id = current_step(bs)?;
let eb = parse_error_behavior(&behavior)?;
bs.wb.steps[step_id].error_behavior = Some(eb);
Ok(())
}
/// Chain a delay step after the current step.
#[op2(fast)]
pub fn op_builder_delay(
state: &mut OpState,
#[smi] handle: u32,
#[number] ms: u64,
) -> Result<(), deno_error::JsErrorBox> {
let wfe = state.borrow_mut::<WfeState>();
let bs = get_builder(wfe, handle)?;
let prev_id = current_step(bs)?;
let delay_type = std::any::type_name::<wfe_core::primitives::delay::DelayStep>();
let next_id = bs.wb.add_step(delay_type);
bs.wb.steps[next_id].step_config = Some(serde_json::json!({
"duration_millis": ms,
}));
bs.wb.wire_outcome(prev_id, next_id, None);
bs.current_step = Some(next_id);
Ok(())
}
/// Chain a wait-for-event step after the current step.
#[op2(fast)]
pub fn op_builder_wait_for(
state: &mut OpState,
#[smi] handle: u32,
#[string] event_name: String,
#[string] event_key: String,
) -> Result<(), deno_error::JsErrorBox> {
let wfe = state.borrow_mut::<WfeState>();
let bs = get_builder(wfe, handle)?;
let prev_id = current_step(bs)?;
let wait_type = std::any::type_name::<wfe_core::primitives::wait_for::WaitForStep>();
let next_id = bs.wb.add_step(wait_type);
bs.wb.steps[next_id].step_config = Some(serde_json::json!({
"event_name": event_name,
"event_key": event_key,
}));
bs.wb.wire_outcome(prev_id, next_id, None);
bs.current_step = Some(next_id);
Ok(())
}
/// Build the workflow definition and return it as JSON. Consumes the builder.
#[op2]
#[serde]
pub fn op_builder_build(
state: &mut OpState,
#[smi] handle: u32,
#[string] id: String,
#[smi] version: u32,
) -> Result<serde_json::Value, deno_error::JsErrorBox> {
let wfe = state.borrow_mut::<WfeState>();
let bs = wfe
.builders
.remove(&handle)
.ok_or_else(|| deno_error::JsErrorBox::generic("invalid builder handle"))?;
let def = bs.wb.build(&id, version);
serde_json::to_value(&def)
.map_err(|e| deno_error::JsErrorBox::generic(format!("serialization failed: {e}")))
}
/// Build the workflow definition and register it with the host. Consumes the builder.
#[op2]
pub async fn op_builder_register(
state: std::rc::Rc<std::cell::RefCell<OpState>>,
#[smi] handle: u32,
#[string] id: String,
#[smi] version: u32,
) -> Result<(), deno_error::JsErrorBox> {
let (def, host) = {
let mut s = state.borrow_mut();
let wfe = s.borrow_mut::<WfeState>();
let bs = wfe
.builders
.remove(&handle)
.ok_or_else(|| deno_error::JsErrorBox::generic("invalid builder handle"))?;
let def = bs.wb.build(&id, version);
let host = wfe.host()?.clone();
(def, host)
};
host.register_workflow_definition(def).await;
Ok(())
}
fn get_builder(
wfe: &mut WfeState,
handle: u32,
) -> Result<&mut JsBuilderState, deno_error::JsErrorBox> {
wfe.builders
.get_mut(&handle)
.ok_or_else(|| deno_error::JsErrorBox::generic("invalid builder handle"))
}
fn current_step(bs: &JsBuilderState) -> Result<usize, deno_error::JsErrorBox> {
bs.current_step
.ok_or_else(|| deno_error::JsErrorBox::generic("no current step — call start_with first"))
}
fn parse_error_behavior(
value: &serde_json::Value,
) -> Result<ErrorBehavior, deno_error::JsErrorBox> {
match value.as_str() {
Some("suspend") => Ok(ErrorBehavior::Suspend),
Some("terminate") => Ok(ErrorBehavior::Terminate),
Some("compensate") => Ok(ErrorBehavior::Compensate),
_ => {
if let Some(retry) = value.get("retry") {
let interval = retry
.get("interval")
.and_then(|v| v.as_u64())
.unwrap_or(60_000);
let max_retries = retry
.get("maxRetries")
.and_then(|v| v.as_u64())
.unwrap_or(3) as u32;
Ok(ErrorBehavior::Retry {
interval: Duration::from_millis(interval),
max_retries,
})
} else {
Err(deno_error::JsErrorBox::generic(format!(
"invalid error behavior: {value}"
)))
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use pretty_assertions::assert_eq;
#[test]
fn parse_suspend_behavior() {
let eb = parse_error_behavior(&serde_json::json!("suspend")).unwrap();
assert!(matches!(eb, ErrorBehavior::Suspend));
}
#[test]
fn parse_terminate_behavior() {
let eb = parse_error_behavior(&serde_json::json!("terminate")).unwrap();
assert!(matches!(eb, ErrorBehavior::Terminate));
}
#[test]
fn parse_compensate_behavior() {
let eb = parse_error_behavior(&serde_json::json!("compensate")).unwrap();
assert!(matches!(eb, ErrorBehavior::Compensate));
}
#[test]
fn parse_retry_behavior() {
let eb = parse_error_behavior(
&serde_json::json!({"retry": {"interval": 5000, "maxRetries": 5}}),
)
.unwrap();
match eb {
ErrorBehavior::Retry {
interval,
max_retries,
} => {
assert_eq!(interval, Duration::from_millis(5000));
assert_eq!(max_retries, 5);
}
_ => panic!("expected Retry"),
}
}
#[test]
fn parse_retry_behavior_defaults() {
let eb = parse_error_behavior(&serde_json::json!({"retry": {}})).unwrap();
match eb {
ErrorBehavior::Retry {
interval,
max_retries,
} => {
assert_eq!(interval, Duration::from_millis(60_000));
assert_eq!(max_retries, 3);
}
_ => panic!("expected Retry"),
}
}
#[test]
fn parse_invalid_behavior_returns_error() {
let result = parse_error_behavior(&serde_json::json!("nonsense"));
assert!(result.is_err());
}
}

22
wfe-deno/src/ops/event.rs Normal file
View File

@@ -0,0 +1,22 @@
use deno_core::op2;
use deno_core::OpState;
use crate::state::WfeState;
/// Publish an event to the workflow host for matching subscriptions.
#[op2]
pub async fn op_publish_event(
state: std::rc::Rc<std::cell::RefCell<OpState>>,
#[string] event_name: String,
#[string] event_key: String,
#[serde] data: serde_json::Value,
) -> Result<(), deno_error::JsErrorBox> {
let host = {
let s = state.borrow();
let wfe = s.borrow::<WfeState>();
wfe.host()?.clone()
};
host.publish_event(&event_name, &event_key, data)
.await
.map_err(|e| deno_error::JsErrorBox::generic(format!("publish_event failed: {e}")))
}

62
wfe-deno/src/ops/host.rs Normal file
View File

@@ -0,0 +1,62 @@
use std::sync::Arc;
use deno_core::op2;
use deno_core::OpState;
use crate::state::WfeState;
/// Create a WorkflowHost with in-memory providers and store it in state.
#[op2(fast)]
pub fn op_host_create(state: &mut OpState) -> Result<(), deno_error::JsErrorBox> {
let wfe = state.borrow_mut::<WfeState>();
if wfe.host.is_some() {
return Err(deno_error::JsErrorBox::generic(
"WorkflowHost already created",
));
}
let persistence = Arc::new(wfe_core::test_support::InMemoryPersistenceProvider::new());
let lock = Arc::new(wfe_core::test_support::InMemoryLockProvider::new());
let queue = Arc::new(wfe_core::test_support::InMemoryQueueProvider::new());
let lifecycle = Arc::new(wfe_core::test_support::InMemoryLifecyclePublisher::new());
let host = wfe::WorkflowHostBuilder::new()
.use_persistence(persistence)
.use_lock_provider(lock)
.use_queue_provider(queue)
.use_lifecycle(lifecycle)
.build()
.map_err(|e| deno_error::JsErrorBox::generic(format!("Failed to build host: {e}")))?;
wfe.host = Some(Arc::new(host));
Ok(())
}
/// Start the WorkflowHost background tasks.
#[op2]
pub async fn op_host_start(
state: std::rc::Rc<std::cell::RefCell<OpState>>,
) -> Result<(), deno_error::JsErrorBox> {
let host = {
let s = state.borrow();
let wfe = s.borrow::<WfeState>();
wfe.host()?.clone()
};
host.start()
.await
.map_err(|e| deno_error::JsErrorBox::generic(format!("Failed to start host: {e}")))
}
/// Stop the WorkflowHost gracefully.
#[op2]
pub async fn op_host_stop(
state: std::rc::Rc<std::cell::RefCell<OpState>>,
) -> Result<(), deno_error::JsErrorBox> {
let host = {
let s = state.borrow();
let wfe = s.borrow::<WfeState>();
wfe.host()?.clone()
};
host.stop().await;
Ok(())
}

43
wfe-deno/src/ops/mod.rs Normal file
View File

@@ -0,0 +1,43 @@
pub mod builder;
pub mod event;
pub mod host;
pub mod step;
pub mod workflow;
deno_core::extension!(
wfe_deno_ext,
ops = [
// Host lifecycle
host::op_host_create,
host::op_host_start,
host::op_host_stop,
// Workflow management
workflow::op_start_workflow,
workflow::op_suspend_workflow,
workflow::op_resume_workflow,
workflow::op_terminate_workflow,
workflow::op_get_workflow,
// Builder
builder::op_builder_create,
builder::op_builder_start_with,
builder::op_builder_then,
builder::op_builder_name,
builder::op_builder_config,
builder::op_builder_on_error,
builder::op_builder_delay,
builder::op_builder_wait_for,
builder::op_builder_build,
builder::op_builder_register,
// Step execution bridge
step::op_register_step,
step::op_step_executor_poll,
step::op_step_executor_respond,
// Events
event::op_publish_event,
],
esm_entry_point = "ext:wfe-deno/bootstrap.js",
esm = [
"ext:wfe-deno/bootstrap.js" = "src/js/bootstrap.js",
"ext:wfe-deno/wfe.js" = "src/js/wfe.js",
],
);

107
wfe-deno/src/ops/step.rs Normal file
View File

@@ -0,0 +1,107 @@
use std::sync::Arc;
use deno_core::op2;
use deno_core::OpState;
use crate::bridge::JsStepBody;
use crate::state::WfeState;
/// Register a JS step type with the host.
///
/// On the Rust side this creates a factory that produces `JsStepBody` instances.
/// On the JS side the caller also registers the actual function via
/// `__wfe_registerStepFunction(stepType, fn)`.
#[op2]
pub async fn op_register_step(
state: std::rc::Rc<std::cell::RefCell<OpState>>,
#[string] step_type: String,
) -> Result<(), deno_error::JsErrorBox> {
let (host, tx) = {
let s = state.borrow();
let wfe = s.borrow::<WfeState>();
(wfe.host()?.clone(), wfe.step_request_tx.clone())
};
let counter = Arc::new(std::sync::atomic::AtomicU32::new(0));
host.register_step_factory(
&step_type,
move || Box::new(JsStepBody::new(tx.clone(), counter.clone())),
)
.await;
Ok(())
}
/// Poll for a step execution request from the Rust executor.
///
/// This is an async op that blocks until a step needs to be executed.
/// Returns `{ requestId, stepType, context }` or null on shutdown.
#[op2]
#[serde]
pub async fn op_step_executor_poll(
state: std::rc::Rc<std::cell::RefCell<OpState>>,
) -> Result<serde_json::Value, deno_error::JsErrorBox> {
// Take the receiver out of state (only the first call gets it).
let mut rx = {
let mut s = state.borrow_mut();
let wfe = s.borrow_mut::<WfeState>();
match wfe.step_request_rx.take() {
Some(rx) => rx,
None => {
return Err(deno_error::JsErrorBox::generic(
"step executor poll already active",
));
}
}
};
// Wait for the next request.
match rx.recv().await {
Some(req) => {
let request_id = req.request_id;
let result = serde_json::json!({
"requestId": request_id,
"stepType": req.step_type,
"context": req.context,
});
// Store the response sender for op_step_executor_respond.
{
let mut s = state.borrow_mut();
let wfe = s.borrow_mut::<WfeState>();
wfe.inflight.insert(request_id, req.response_tx);
// Put the receiver back so we can poll again.
wfe.step_request_rx = Some(rx);
}
Ok(result)
}
None => {
// Channel closed — shutdown.
Ok(serde_json::Value::Null)
}
}
}
/// Send a step execution result back to the Rust executor.
#[op2]
pub fn op_step_executor_respond(
state: &mut OpState,
#[smi] request_id: u32,
#[serde] result: Option<serde_json::Value>,
#[string] error: Option<String>,
) -> Result<(), deno_error::JsErrorBox> {
let wfe = state.borrow_mut::<WfeState>();
let tx = wfe.inflight.remove(&request_id).ok_or_else(|| {
deno_error::JsErrorBox::generic(format!("no inflight request with id {request_id}"))
})?;
let response = match error {
Some(err) => Err(err),
None => Ok(result.unwrap_or(serde_json::json!({"proceed": true}))),
};
// Ignore send failure — the executor may have timed out.
let _ = tx.send(response);
Ok(())
}

View File

@@ -0,0 +1,91 @@
use deno_core::op2;
use deno_core::OpState;
use crate::state::WfeState;
/// Start a new workflow instance. Returns the workflow instance ID.
#[op2]
#[string]
pub async fn op_start_workflow(
state: std::rc::Rc<std::cell::RefCell<OpState>>,
#[string] definition_id: String,
#[smi] version: u32,
#[serde] data: serde_json::Value,
) -> Result<String, deno_error::JsErrorBox> {
let host = {
let s = state.borrow();
let wfe = s.borrow::<WfeState>();
wfe.host()?.clone()
};
host.start_workflow(&definition_id, version, data)
.await
.map_err(|e| deno_error::JsErrorBox::generic(format!("start_workflow failed: {e}")))
}
/// Suspend a running workflow. Returns true if suspension succeeded.
#[op2]
pub async fn op_suspend_workflow(
state: std::rc::Rc<std::cell::RefCell<OpState>>,
#[string] id: String,
) -> Result<bool, deno_error::JsErrorBox> {
let host = {
let s = state.borrow();
let wfe = s.borrow::<WfeState>();
wfe.host()?.clone()
};
host.suspend_workflow(&id)
.await
.map_err(|e| deno_error::JsErrorBox::generic(format!("suspend_workflow failed: {e}")))
}
/// Resume a suspended workflow. Returns true if resumption succeeded.
#[op2]
pub async fn op_resume_workflow(
state: std::rc::Rc<std::cell::RefCell<OpState>>,
#[string] id: String,
) -> Result<bool, deno_error::JsErrorBox> {
let host = {
let s = state.borrow();
let wfe = s.borrow::<WfeState>();
wfe.host()?.clone()
};
host.resume_workflow(&id)
.await
.map_err(|e| deno_error::JsErrorBox::generic(format!("resume_workflow failed: {e}")))
}
/// Terminate a workflow. Returns true if termination succeeded.
#[op2]
pub async fn op_terminate_workflow(
state: std::rc::Rc<std::cell::RefCell<OpState>>,
#[string] id: String,
) -> Result<bool, deno_error::JsErrorBox> {
let host = {
let s = state.borrow();
let wfe = s.borrow::<WfeState>();
wfe.host()?.clone()
};
host.terminate_workflow(&id)
.await
.map_err(|e| deno_error::JsErrorBox::generic(format!("terminate_workflow failed: {e}")))
}
/// Get a workflow instance by ID. Returns the serialized WorkflowInstance.
#[op2]
#[serde]
pub async fn op_get_workflow(
state: std::rc::Rc<std::cell::RefCell<OpState>>,
#[string] id: String,
) -> Result<serde_json::Value, deno_error::JsErrorBox> {
let host = {
let s = state.borrow();
let wfe = s.borrow::<WfeState>();
wfe.host()?.clone()
};
let instance = host
.get_workflow(&id)
.await
.map_err(|e| deno_error::JsErrorBox::generic(format!("get_workflow failed: {e}")))?;
serde_json::to_value(&instance)
.map_err(|e| deno_error::JsErrorBox::generic(format!("serialization failed: {e}")))
}

85
wfe-deno/src/state.rs Normal file
View File

@@ -0,0 +1,85 @@
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::{mpsc, oneshot};
use wfe::WorkflowHost;
use crate::bridge::StepRequest;
use crate::ops::builder::JsBuilderState;
/// Central state shared between all ops via `OpState`.
pub struct WfeState {
pub host: Option<Arc<WorkflowHost>>,
pub step_request_tx: mpsc::Sender<StepRequest>,
pub step_request_rx: Option<mpsc::Receiver<StepRequest>>,
pub builders: HashMap<u32, JsBuilderState>,
pub next_builder_id: u32,
pub inflight: HashMap<u32, oneshot::Sender<Result<serde_json::Value, String>>>,
pub next_request_id: u32,
}
impl WfeState {
pub fn new(
step_request_tx: mpsc::Sender<StepRequest>,
step_request_rx: mpsc::Receiver<StepRequest>,
) -> Self {
Self {
host: None,
step_request_tx,
step_request_rx: Some(step_request_rx),
builders: HashMap::new(),
next_builder_id: 0,
inflight: HashMap::new(),
next_request_id: 0,
}
}
pub fn host(&self) -> Result<&Arc<WorkflowHost>, deno_error::JsErrorBox> {
self.host.as_ref().ok_or_else(|| {
deno_error::JsErrorBox::generic(
"WorkflowHost not created yet — call op_host_create first",
)
})
}
pub fn alloc_builder_id(&mut self) -> u32 {
let id = self.next_builder_id;
self.next_builder_id += 1;
id
}
pub fn alloc_request_id(&mut self) -> u32 {
let id = self.next_request_id;
self.next_request_id += 1;
id
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn alloc_builder_id_increments() {
let (tx, rx) = mpsc::channel(1);
let mut state = WfeState::new(tx, rx);
assert_eq!(state.alloc_builder_id(), 0);
assert_eq!(state.alloc_builder_id(), 1);
assert_eq!(state.alloc_builder_id(), 2);
}
#[test]
fn alloc_request_id_increments() {
let (tx, rx) = mpsc::channel(1);
let mut state = WfeState::new(tx, rx);
assert_eq!(state.alloc_request_id(), 0);
assert_eq!(state.alloc_request_id(), 1);
}
#[test]
fn host_returns_error_when_not_created() {
let (tx, rx) = mpsc::channel(1);
let state = WfeState::new(tx, rx);
assert!(state.host().is_err());
}
}

View File

@@ -0,0 +1,440 @@
use wfe_deno::create_wfe_runtime;
/// Helper: run a JS script in a fresh wfe runtime and drive the event loop.
async fn run_js(code: &str) -> Result<(), Box<dyn std::error::Error>> {
let mut runtime = create_wfe_runtime();
runtime
.execute_script("<test>", code.to_string())
.map_err(|e| format!("script error: {e}"))?;
runtime
.run_event_loop(Default::default())
.await
.map_err(|e| format!("event loop error: {e}"))?;
Ok(())
}
/// Helper: run a JS module in a fresh wfe runtime and drive the event loop.
async fn run_module(code: &str) -> Result<(), Box<dyn std::error::Error>> {
let mut runtime = create_wfe_runtime();
let specifier =
deno_core::ModuleSpecifier::parse("ext:wfe-deno/test-module.js").unwrap();
let module_id = runtime
.load_main_es_module_from_code(&specifier, code.to_string())
.await
.map_err(|e| format!("module load error: {e}"))?;
let eval = runtime.mod_evaluate(module_id);
runtime
.run_event_loop(Default::default())
.await
.map_err(|e| format!("event loop error: {e}"))?;
eval.await
.map_err(|e| format!("module eval error: {e}"))?;
Ok(())
}
#[tokio::test]
async fn host_create_via_op() {
run_js("Deno.core.ops.op_host_create()").await.unwrap();
}
#[tokio::test]
async fn host_create_twice_fails() {
let result = run_js(
r#"
Deno.core.ops.op_host_create();
Deno.core.ops.op_host_create();
"#,
)
.await;
assert!(result.is_err());
}
#[tokio::test]
async fn host_start_without_create_fails() {
let _result = run_js("await Deno.core.ops.op_host_start()").await;
// Script eval of bare await fails, use module mode
let result = run_module("await Deno.core.ops.op_host_start()").await;
assert!(result.is_err());
}
#[tokio::test]
async fn host_create_and_start() {
run_module(
r#"
Deno.core.ops.op_host_create();
await Deno.core.ops.op_host_start();
await Deno.core.ops.op_host_stop();
"#,
)
.await
.unwrap();
}
#[tokio::test]
async fn builder_create_returns_handle() {
run_js(
r#"
Deno.core.ops.op_host_create();
const h = Deno.core.ops.op_builder_create();
if (typeof h !== "number") throw new Error("expected number handle");
"#,
)
.await
.unwrap();
}
#[tokio::test]
async fn builder_start_with_and_name() {
run_js(
r#"
Deno.core.ops.op_host_create();
const h = Deno.core.ops.op_builder_create();
Deno.core.ops.op_builder_start_with(h, "my-step");
Deno.core.ops.op_builder_name(h, "Step One");
"#,
)
.await
.unwrap();
}
#[tokio::test]
async fn builder_chain_and_build() {
run_js(
r#"
Deno.core.ops.op_host_create();
const h = Deno.core.ops.op_builder_create();
Deno.core.ops.op_builder_start_with(h, "step-a");
Deno.core.ops.op_builder_name(h, "First");
Deno.core.ops.op_builder_config(h, { key: "val" });
Deno.core.ops.op_builder_then(h, "step-b");
Deno.core.ops.op_builder_name(h, "Second");
const def = Deno.core.ops.op_builder_build(h, "test-wf", 1);
if (def.id !== "test-wf") throw new Error("wrong id: " + def.id);
if (def.version !== 1) throw new Error("wrong version");
if (def.steps.length !== 2) throw new Error("expected 2 steps, got " + def.steps.length);
if (def.steps[0].name !== "First") throw new Error("wrong step 0 name");
if (def.steps[1].name !== "Second") throw new Error("wrong step 1 name");
"#,
)
.await
.unwrap();
}
#[tokio::test]
async fn builder_config_and_on_error() {
run_js(
r#"
Deno.core.ops.op_host_create();
const h = Deno.core.ops.op_builder_create();
Deno.core.ops.op_builder_start_with(h, "step-a");
Deno.core.ops.op_builder_config(h, { ns: "ory" });
Deno.core.ops.op_builder_on_error(h, "suspend");
const def = Deno.core.ops.op_builder_build(h, "test", 1);
if (!def.steps[0].step_config) throw new Error("missing step_config");
if (def.steps[0].step_config.ns !== "ory") throw new Error("wrong config");
"#,
)
.await
.unwrap();
}
#[tokio::test]
async fn builder_on_error_retry() {
run_js(
r#"
Deno.core.ops.op_host_create();
const h = Deno.core.ops.op_builder_create();
Deno.core.ops.op_builder_start_with(h, "step-a");
Deno.core.ops.op_builder_on_error(h, { retry: { interval: 1000, maxRetries: 5 } });
const def = Deno.core.ops.op_builder_build(h, "test", 1);
"#,
)
.await
.unwrap();
}
#[tokio::test]
async fn builder_delay_step() {
run_js(
r#"
Deno.core.ops.op_host_create();
const h = Deno.core.ops.op_builder_create();
Deno.core.ops.op_builder_start_with(h, "step-a");
Deno.core.ops.op_builder_delay(h, 5000);
const def = Deno.core.ops.op_builder_build(h, "test", 1);
if (def.steps.length !== 2) throw new Error("expected 2 steps (original + delay)");
"#,
)
.await
.unwrap();
}
#[tokio::test]
async fn builder_wait_for_step() {
run_js(
r#"
Deno.core.ops.op_host_create();
const h = Deno.core.ops.op_builder_create();
Deno.core.ops.op_builder_start_with(h, "step-a");
Deno.core.ops.op_builder_wait_for(h, "order.paid", "order-123");
const def = Deno.core.ops.op_builder_build(h, "test", 1);
if (def.steps.length !== 2) throw new Error("expected 2 steps");
"#,
)
.await
.unwrap();
}
#[tokio::test]
async fn builder_register_with_host() {
run_module(
r#"
Deno.core.ops.op_host_create();
const h = Deno.core.ops.op_builder_create();
Deno.core.ops.op_builder_start_with(h, "step-a");
Deno.core.ops.op_builder_name(h, "First");
await Deno.core.ops.op_builder_register(h, "registered-wf", 1);
"#,
)
.await
.unwrap();
}
#[tokio::test]
async fn builder_start_with_twice_fails() {
let result = run_js(
r#"
Deno.core.ops.op_host_create();
const h = Deno.core.ops.op_builder_create();
Deno.core.ops.op_builder_start_with(h, "a");
Deno.core.ops.op_builder_start_with(h, "b");
"#,
)
.await;
assert!(result.is_err());
}
#[tokio::test]
async fn builder_then_without_start_fails() {
let result = run_js(
r#"
Deno.core.ops.op_host_create();
const h = Deno.core.ops.op_builder_create();
Deno.core.ops.op_builder_then(h, "b");
"#,
)
.await;
assert!(result.is_err());
}
#[tokio::test]
async fn builder_name_without_step_fails() {
let result = run_js(
r#"
Deno.core.ops.op_host_create();
const h = Deno.core.ops.op_builder_create();
Deno.core.ops.op_builder_name(h, "oops");
"#,
)
.await;
assert!(result.is_err());
}
#[tokio::test]
async fn builder_invalid_handle_fails() {
let result = run_js(
r#"
Deno.core.ops.op_host_create();
Deno.core.ops.op_builder_start_with(999, "a");
"#,
)
.await;
assert!(result.is_err());
}
#[tokio::test]
async fn builder_build_invalid_handle_fails() {
let result = run_js(
r#"
Deno.core.ops.op_host_create();
Deno.core.ops.op_builder_build(999, "id", 1);
"#,
)
.await;
assert!(result.is_err());
}
#[tokio::test]
async fn workflow_management_ops() {
run_module(
r#"
Deno.core.ops.op_host_create();
await Deno.core.ops.op_host_start();
// Register a no-op step and workflow.
const h = Deno.core.ops.op_builder_create();
Deno.core.ops.op_builder_start_with(h, "noop");
await Deno.core.ops.op_builder_register(h, "mgmt-test", 1);
// Start a workflow (it will just sit with unknown step type).
const wfId = await Deno.core.ops.op_start_workflow("mgmt-test", 1, { foo: "bar" });
if (typeof wfId !== "string") throw new Error("expected string ID");
// Get the workflow.
const wf = await Deno.core.ops.op_get_workflow(wfId);
if (wf.id !== wfId) throw new Error("wrong id");
if (wf.data.foo !== "bar") throw new Error("wrong data");
// Suspend.
const suspended = await Deno.core.ops.op_suspend_workflow(wfId);
if (!suspended) throw new Error("expected suspend to succeed");
// Resume.
const resumed = await Deno.core.ops.op_resume_workflow(wfId);
if (!resumed) throw new Error("expected resume to succeed");
// Terminate.
const terminated = await Deno.core.ops.op_terminate_workflow(wfId);
if (!terminated) throw new Error("expected terminate to succeed");
// Suspend after terminate should return false.
const again = await Deno.core.ops.op_suspend_workflow(wfId);
if (again) throw new Error("expected suspend after terminate to fail");
await Deno.core.ops.op_host_stop();
"#,
)
.await
.unwrap();
}
#[tokio::test]
async fn step_executor_respond_invalid_id_fails() {
let result = run_js(
r#"
Deno.core.ops.op_host_create();
Deno.core.ops.op_step_executor_respond(999, { proceed: true }, null);
"#,
)
.await;
assert!(result.is_err());
}
#[tokio::test]
async fn wfe_js_api_execution_result_factories() {
run_module(
r#"
import { ExecutionResult } from "ext:wfe-deno/wfe.js";
const next = ExecutionResult.next();
if (!next.proceed) throw new Error("next should proceed");
const outcome = ExecutionResult.outcome("branch-a");
if (outcome.outcomeValue !== "branch-a") throw new Error("wrong outcome");
const persist = ExecutionResult.persist({ page: 2 });
if (persist.proceed) throw new Error("persist should not proceed");
if (persist.persistenceData.page !== 2) throw new Error("wrong persist data");
const sleep = ExecutionResult.sleep(5000);
if (sleep.sleepFor !== 5000) throw new Error("wrong sleep");
const wait = ExecutionResult.waitForEvent("ev", "k");
if (wait.eventName !== "ev") throw new Error("wrong event name");
const branch = ExecutionResult.branch([1, 2]);
if (branch.branchValues.length !== 2) throw new Error("wrong branch");
const output = ExecutionResult.output({ done: true });
if (!output.proceed) throw new Error("output should proceed");
if (!output.outputData.done) throw new Error("wrong output");
"#,
)
.await
.unwrap();
}
#[tokio::test]
async fn wfe_js_api_workflow_host_class() {
run_module(
r#"
import { WorkflowHost } from "ext:wfe-deno/wfe.js";
const host = WorkflowHost.create();
await host.start();
await host.stop();
"#,
)
.await
.unwrap();
}
#[tokio::test]
async fn wfe_js_api_workflow_builder_class() {
run_module(
r#"
import { WorkflowHost, WorkflowBuilder } from "ext:wfe-deno/wfe.js";
const host = WorkflowHost.create();
const b = new WorkflowBuilder();
b.startWith("step-a").name("First").config({ key: "val" }).then("step-b").name("Second");
const def = b.build("builder-test", 1);
if (def.steps.length !== 2) throw new Error("expected 2 steps");
if (def.steps[0].name !== "First") throw new Error("wrong name");
"#,
)
.await
.unwrap();
}
#[tokio::test]
async fn wfe_js_api_build_workflow_helper() {
run_module(
r#"
import { WorkflowHost } from "ext:wfe-deno/wfe.js";
const host = WorkflowHost.create();
await host.buildWorkflow("helper-test", 1, (b) => {
b.startWith("step-a").name("A").then("step-b").name("B");
});
"#,
)
.await
.unwrap();
}
#[tokio::test]
async fn publish_event_op() {
run_module(
r#"
Deno.core.ops.op_host_create();
await Deno.core.ops.op_host_start();
await Deno.core.ops.op_publish_event("test.event", "key-1", { data: true });
await Deno.core.ops.op_host_stop();
"#,
)
.await
.unwrap();
}
#[tokio::test]
async fn start_workflow_without_host_fails() {
let result = run_module(
r#"
await Deno.core.ops.op_start_workflow("nope", 1, {});
"#,
)
.await;
assert!(result.is_err());
}
#[tokio::test]
async fn register_step_without_host_fails() {
let result = run_module(
r#"
await Deno.core.ops.op_register_step("nope");
"#,
)
.await;
assert!(result.is_err());
}

View File

@@ -14,9 +14,9 @@ path = "src/main.rs"
[dependencies]
# Internal
wfe-core = { workspace = true, features = ["test-support"] }
wfe = { version = "1.6.1", path = "../wfe", registry = "sunbeam" }
wfe-yaml = { version = "1.6.1", path = "../wfe-yaml", registry = "sunbeam", features = ["rustlang", "buildkit", "containerd"] }
wfe-server-protos = { version = "1.6.1", path = "../wfe-server-protos", registry = "sunbeam" }
wfe = { version = "1.7.0", path = "../wfe", registry = "sunbeam" }
wfe-yaml = { version = "1.7.0", path = "../wfe-yaml", registry = "sunbeam", features = ["rustlang", "buildkit", "containerd"] }
wfe-server-protos = { version = "1.7.0", path = "../wfe-server-protos", registry = "sunbeam" }
wfe-sqlite = { workspace = true }
wfe-postgres = { workspace = true }
wfe-valkey = { workspace = true }

View File

@@ -293,7 +293,9 @@ impl WorkflowHost {
// Create initial execution pointer for step 0 if the definition has steps.
let mut instance = WorkflowInstance::new(definition_id, version, data);
if !definition.steps.is_empty() {
instance.execution_pointers.push(ExecutionPointer::new(0));
let mut pointer = ExecutionPointer::new(0);
pointer.step_name = definition.steps.first().and_then(|s| s.name.clone());
instance.execution_pointers.push(pointer);
}
// Persist the instance.