Compare commits
11 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
a4d0f2a547
|
|||
|
4b8e544ab8
|
|||
|
9a08882e28
|
|||
|
afb91c66bd
|
|||
|
ead883f714
|
|||
|
ac45011794
|
|||
|
2b244348ca
|
|||
|
de66fef2d6
|
|||
|
6c16c89379
|
|||
|
e515ffbe0c
|
|||
|
978109d3fc
|
31
CHANGELOG.md
31
CHANGELOG.md
@@ -2,6 +2,37 @@
|
||||
|
||||
All notable changes to this project will be documented in this file.
|
||||
|
||||
## [1.7.0] - 2026-04-05
|
||||
|
||||
### Added
|
||||
|
||||
- **wfe-deno**: New crate -- Deno/JS/TS bindings for the WFE workflow engine
|
||||
- Full API surface via 23 deno_core ops: host lifecycle, workflow management, fluent builder, step registration, event publishing
|
||||
- Channel-based execution bridge: JS step functions called from tokio executor via mpsc/oneshot channels
|
||||
- High-level JS API classes: `WorkflowHost`, `WorkflowBuilder`, `ExecutionResult`
|
||||
- 52 tests (26 unit + 26 integration), 93% line coverage
|
||||
- **wfe-core**: `WorkflowBuilder.steps` and `last_step` fields now public
|
||||
|
||||
## [1.6.3] - 2026-04-05
|
||||
|
||||
### Fixed
|
||||
|
||||
- **wfe-core**: Propagate `step_name` into execution pointers when advancing to next steps, compensation steps, and parallel branch children
|
||||
- **wfe**: Set `step_name` on initial execution pointer when starting a workflow instance
|
||||
|
||||
## [1.6.2] - 2026-04-05
|
||||
|
||||
### Added
|
||||
|
||||
- **wfe-core**: `WorkflowBuilder::add_step_typed()` for adding named, configured steps in parallel branch closures
|
||||
- **wfe-core**: `WorkflowBuilder::wire_outcome()` now public for custom graph wiring
|
||||
|
||||
## [1.6.1] - 2026-04-05
|
||||
|
||||
### Added
|
||||
|
||||
- **wfe-core**: `StepBuilder::config()` for attaching arbitrary JSON configuration to individual steps, readable at runtime via `context.step.step_config`
|
||||
|
||||
## [1.6.0] - 2026-04-01
|
||||
|
||||
### Added
|
||||
|
||||
22
Cargo.toml
22
Cargo.toml
@@ -1,9 +1,9 @@
|
||||
[workspace]
|
||||
members = ["wfe-core", "wfe-sqlite", "wfe-postgres", "wfe-opensearch", "wfe-valkey", "wfe", "wfe-yaml", "wfe-buildkit", "wfe-containerd", "wfe-containerd-protos", "wfe-buildkit-protos", "wfe-rustlang", "wfe-server-protos", "wfe-server"]
|
||||
members = ["wfe-core", "wfe-sqlite", "wfe-postgres", "wfe-opensearch", "wfe-valkey", "wfe", "wfe-yaml", "wfe-buildkit", "wfe-containerd", "wfe-containerd-protos", "wfe-buildkit-protos", "wfe-rustlang", "wfe-server-protos", "wfe-server", "wfe-deno"]
|
||||
resolver = "2"
|
||||
|
||||
[workspace.package]
|
||||
version = "1.6.0"
|
||||
version = "1.7.0"
|
||||
edition = "2024"
|
||||
license = "MIT"
|
||||
repository = "https://src.sunbeam.pt/studio/wfe"
|
||||
@@ -38,15 +38,15 @@ redis = { version = "0.27", features = ["tokio-comp", "connection-manager"] }
|
||||
opensearch = "2"
|
||||
|
||||
# Internal crates
|
||||
wfe-core = { version = "1.6.0", path = "wfe-core", registry = "sunbeam" }
|
||||
wfe-sqlite = { version = "1.6.0", path = "wfe-sqlite", registry = "sunbeam" }
|
||||
wfe-postgres = { version = "1.6.0", path = "wfe-postgres", registry = "sunbeam" }
|
||||
wfe-opensearch = { version = "1.6.0", path = "wfe-opensearch", registry = "sunbeam" }
|
||||
wfe-valkey = { version = "1.6.0", path = "wfe-valkey", registry = "sunbeam" }
|
||||
wfe-yaml = { version = "1.6.0", path = "wfe-yaml", registry = "sunbeam" }
|
||||
wfe-buildkit = { version = "1.6.0", path = "wfe-buildkit", registry = "sunbeam" }
|
||||
wfe-containerd = { version = "1.6.0", path = "wfe-containerd", registry = "sunbeam" }
|
||||
wfe-rustlang = { version = "1.6.0", path = "wfe-rustlang", registry = "sunbeam" }
|
||||
wfe-core = { version = "1.7.0", path = "wfe-core", registry = "sunbeam" }
|
||||
wfe-sqlite = { version = "1.7.0", path = "wfe-sqlite", registry = "sunbeam" }
|
||||
wfe-postgres = { version = "1.7.0", path = "wfe-postgres", registry = "sunbeam" }
|
||||
wfe-opensearch = { version = "1.7.0", path = "wfe-opensearch", registry = "sunbeam" }
|
||||
wfe-valkey = { version = "1.7.0", path = "wfe-valkey", registry = "sunbeam" }
|
||||
wfe-yaml = { version = "1.7.0", path = "wfe-yaml", registry = "sunbeam" }
|
||||
wfe-buildkit = { version = "1.7.0", path = "wfe-buildkit", registry = "sunbeam" }
|
||||
wfe-containerd = { version = "1.7.0", path = "wfe-containerd", registry = "sunbeam" }
|
||||
wfe-rustlang = { version = "1.7.0", path = "wfe-rustlang", registry = "sunbeam" }
|
||||
|
||||
# YAML
|
||||
serde_yaml = "0.9"
|
||||
|
||||
@@ -16,7 +16,7 @@ async-trait = { workspace = true }
|
||||
tracing = { workspace = true }
|
||||
thiserror = { workspace = true }
|
||||
regex = { workspace = true }
|
||||
wfe-buildkit-protos = { version = "1.6.0", path = "../wfe-buildkit-protos", registry = "sunbeam" }
|
||||
wfe-buildkit-protos = { version = "1.7.0", path = "../wfe-buildkit-protos", registry = "sunbeam" }
|
||||
tonic = "0.14"
|
||||
tower = { version = "0.4", features = ["util"] }
|
||||
hyper-util = { version = "0.1", features = ["tokio"] }
|
||||
|
||||
@@ -9,7 +9,7 @@ description = "containerd container runner executor for WFE"
|
||||
|
||||
[dependencies]
|
||||
wfe-core = { workspace = true }
|
||||
wfe-containerd-protos = { version = "1.6.0", path = "../wfe-containerd-protos", registry = "sunbeam" }
|
||||
wfe-containerd-protos = { version = "1.7.0", path = "../wfe-containerd-protos", registry = "sunbeam" }
|
||||
tokio = { workspace = true }
|
||||
serde = { workspace = true }
|
||||
serde_json = { workspace = true }
|
||||
|
||||
@@ -43,6 +43,14 @@ impl<D: WorkflowData> StepBuilder<D> {
|
||||
self
|
||||
}
|
||||
|
||||
/// Attach arbitrary JSON configuration to this step.
|
||||
///
|
||||
/// The step can read it at runtime via `context.step.step_config`.
|
||||
pub fn config(mut self, config: serde_json::Value) -> Self {
|
||||
self.builder.steps[self.step_id].step_config = Some(config);
|
||||
self
|
||||
}
|
||||
|
||||
/// Add a compensation step for saga rollback.
|
||||
pub fn compensate_with<C: StepBody + Default + 'static>(mut self) -> Self {
|
||||
let comp_id = self.builder.add_step(std::any::type_name::<C>());
|
||||
|
||||
@@ -28,8 +28,8 @@ pub type InlineClosureBox = Box<dyn Fn() -> ExecutionResult + Send + Sync>;
|
||||
/// .build("my-workflow", 1);
|
||||
/// ```
|
||||
pub struct WorkflowBuilder<D: WorkflowData> {
|
||||
pub(crate) steps: Vec<WorkflowStep>,
|
||||
pub(crate) last_step: Option<usize>,
|
||||
pub steps: Vec<WorkflowStep>,
|
||||
pub last_step: Option<usize>,
|
||||
/// Inline closures keyed by step id, stored for later registration.
|
||||
pub(crate) inline_closures: HashMap<usize, InlineClosureBox>,
|
||||
_phantom: PhantomData<D>,
|
||||
@@ -61,8 +61,23 @@ impl<D: WorkflowData> WorkflowBuilder<D> {
|
||||
id
|
||||
}
|
||||
|
||||
/// Add a typed step with an optional name and config.
|
||||
/// Convenience for use inside `parallel` branch closures.
|
||||
pub fn add_step_typed<S: StepBody + Default + 'static>(
|
||||
&mut self,
|
||||
name: &str,
|
||||
config: Option<serde_json::Value>,
|
||||
) -> usize {
|
||||
let id = self.add_step(std::any::type_name::<S>());
|
||||
self.steps[id].name = Some(name.to_string());
|
||||
if let Some(cfg) = config {
|
||||
self.steps[id].step_config = Some(cfg);
|
||||
}
|
||||
id
|
||||
}
|
||||
|
||||
/// Wire an outcome from `from_step` to `to_step`.
|
||||
pub(crate) fn wire_outcome(&mut self, from_step: usize, to_step: usize, value: Option<serde_json::Value>) {
|
||||
pub fn wire_outcome(&mut self, from_step: usize, to_step: usize, value: Option<serde_json::Value>) {
|
||||
if let Some(step) = self.steps.get_mut(from_step) {
|
||||
step.outcomes.push(StepOutcome {
|
||||
next_step: to_step,
|
||||
@@ -341,6 +356,79 @@ mod tests {
|
||||
assert!(def.steps[1].step_type.contains("StepB"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn config_sets_step_config() {
|
||||
let cfg = serde_json::json!({"namespace": "ory", "timeout": 30});
|
||||
let def = WorkflowBuilder::<TestData>::new()
|
||||
.start_with::<StepA>()
|
||||
.config(cfg.clone())
|
||||
.end_workflow()
|
||||
.build("test", 1);
|
||||
assert_eq!(def.steps[0].step_config, Some(cfg));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn config_chains_with_name() {
|
||||
let cfg = serde_json::json!({"namespace": "data"});
|
||||
let def = WorkflowBuilder::<TestData>::new()
|
||||
.start_with::<StepA>()
|
||||
.name("apply-data")
|
||||
.config(cfg.clone())
|
||||
.then::<StepB>()
|
||||
.end_workflow()
|
||||
.build("test", 1);
|
||||
assert_eq!(def.steps[0].name, Some("apply-data".into()));
|
||||
assert_eq!(def.steps[0].step_config, Some(cfg));
|
||||
assert_eq!(def.steps[0].outcomes[0].next_step, 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn config_on_multiple_steps_of_same_type() {
|
||||
let cfg_a = serde_json::json!({"namespace": "ory"});
|
||||
let cfg_b = serde_json::json!({"namespace": "data"});
|
||||
let def = WorkflowBuilder::<TestData>::new()
|
||||
.start_with::<StepA>()
|
||||
.name("apply-ory")
|
||||
.config(cfg_a.clone())
|
||||
.then::<StepA>()
|
||||
.name("apply-data")
|
||||
.config(cfg_b.clone())
|
||||
.end_workflow()
|
||||
.build("test", 1);
|
||||
assert_eq!(def.steps[0].step_config, Some(cfg_a));
|
||||
assert_eq!(def.steps[1].step_config, Some(cfg_b));
|
||||
// Both are StepA
|
||||
assert_eq!(def.steps[0].step_type, def.steps[1].step_type);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn add_step_typed_sets_name_and_config() {
|
||||
let cfg = serde_json::json!({"namespace": "ory"});
|
||||
let mut builder = WorkflowBuilder::<TestData>::new();
|
||||
let id = builder.add_step_typed::<StepA>("apply-ory", Some(cfg.clone()));
|
||||
assert_eq!(builder.steps[id].name, Some("apply-ory".into()));
|
||||
assert_eq!(builder.steps[id].step_config, Some(cfg));
|
||||
assert!(builder.steps[id].step_type.contains("StepA"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn add_step_typed_without_config() {
|
||||
let mut builder = WorkflowBuilder::<TestData>::new();
|
||||
let id = builder.add_step_typed::<StepB>("my-step", None);
|
||||
assert_eq!(builder.steps[id].name, Some("my-step".into()));
|
||||
assert_eq!(builder.steps[id].step_config, None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn wire_outcome_connects_steps() {
|
||||
let mut builder = WorkflowBuilder::<TestData>::new();
|
||||
let id0 = builder.add_step_typed::<StepA>("first", None);
|
||||
let id1 = builder.add_step_typed::<StepB>("second", None);
|
||||
builder.wire_outcome(id0, id1, None);
|
||||
assert_eq!(builder.steps[id0].outcomes.len(), 1);
|
||||
assert_eq!(builder.steps[id0].outcomes[0].next_step, id1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn inline_step_via_then_fn() {
|
||||
let def = WorkflowBuilder::<TestData>::new()
|
||||
|
||||
@@ -67,6 +67,9 @@ pub fn handle_error(
|
||||
&& let Some(comp_step_id) = step.compensation_step_id
|
||||
{
|
||||
let mut comp_pointer = ExecutionPointer::new(comp_step_id);
|
||||
comp_pointer.step_name = definition.steps.iter()
|
||||
.find(|s| s.id == comp_step_id)
|
||||
.and_then(|s| s.name.clone());
|
||||
comp_pointer.predecessor_id = Some(pointer.id.clone());
|
||||
comp_pointer.scope = pointer.scope.clone();
|
||||
new_pointers.push(comp_pointer);
|
||||
|
||||
@@ -36,6 +36,9 @@ pub fn process_result(
|
||||
let next_step_id = find_next_step(step, &result.outcome_value);
|
||||
if let Some(next_id) = next_step_id {
|
||||
let mut next_pointer = ExecutionPointer::new(next_id);
|
||||
next_pointer.step_name = definition.steps.iter()
|
||||
.find(|s| s.id == next_id)
|
||||
.and_then(|s| s.name.clone());
|
||||
next_pointer.predecessor_id = Some(pointer.id.clone());
|
||||
next_pointer.scope = pointer.scope.clone();
|
||||
new_pointers.push(next_pointer);
|
||||
@@ -59,6 +62,9 @@ pub fn process_result(
|
||||
for value in branch_values {
|
||||
for &child_step_id in &child_step_ids {
|
||||
let mut child_pointer = ExecutionPointer::new(child_step_id);
|
||||
child_pointer.step_name = definition.steps.iter()
|
||||
.find(|s| s.id == child_step_id)
|
||||
.and_then(|s| s.name.clone());
|
||||
child_pointer.context_item = Some(value.clone());
|
||||
child_pointer.scope = child_scope.clone();
|
||||
child_pointer.predecessor_id = Some(pointer.id.clone());
|
||||
|
||||
@@ -181,6 +181,9 @@ impl WorkflowExecutor {
|
||||
if let Some(next_id) = next_step_id {
|
||||
let mut next_pointer =
|
||||
crate::models::ExecutionPointer::new(next_id);
|
||||
next_pointer.step_name = definition.steps.iter()
|
||||
.find(|s| s.id == next_id)
|
||||
.and_then(|s| s.name.clone());
|
||||
next_pointer.predecessor_id =
|
||||
Some(workflow.execution_pointers[idx].id.clone());
|
||||
next_pointer.scope =
|
||||
|
||||
26
wfe-deno/Cargo.toml
Normal file
26
wfe-deno/Cargo.toml
Normal file
@@ -0,0 +1,26 @@
|
||||
[package]
|
||||
name = "wfe-deno"
|
||||
version.workspace = true
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
repository.workspace = true
|
||||
homepage.workspace = true
|
||||
description = "Deno bindings for the WFE workflow engine"
|
||||
|
||||
[dependencies]
|
||||
wfe-core = { workspace = true, features = ["test-support"] }
|
||||
wfe = { version = "1.7.0", path = "../wfe", registry = "sunbeam" }
|
||||
deno_core = { workspace = true }
|
||||
deno_error = { workspace = true }
|
||||
tokio = { workspace = true }
|
||||
serde = { workspace = true }
|
||||
serde_json = { workspace = true }
|
||||
async-trait = { workspace = true }
|
||||
tracing = { workspace = true }
|
||||
chrono = { workspace = true }
|
||||
|
||||
[dev-dependencies]
|
||||
pretty_assertions = { workspace = true }
|
||||
tokio = { workspace = true, features = ["test-util"] }
|
||||
tokio-util = "0.7"
|
||||
tempfile = { workspace = true }
|
||||
444
wfe-deno/src/bridge.rs
Normal file
444
wfe-deno/src/bridge.rs
Normal file
@@ -0,0 +1,444 @@
|
||||
use async_trait::async_trait;
|
||||
use chrono::{DateTime, Utc};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::time::Duration;
|
||||
use tokio::sync::{mpsc, oneshot};
|
||||
use wfe_core::models::ExecutionResult;
|
||||
use wfe_core::traits::step::{StepBody, StepExecutionContext};
|
||||
use wfe_core::WfeError;
|
||||
|
||||
/// A request sent from the executor (tokio) to the V8 thread.
|
||||
pub struct StepRequest {
|
||||
pub request_id: u32,
|
||||
pub step_type: String,
|
||||
pub context: serde_json::Value,
|
||||
pub response_tx: oneshot::Sender<Result<serde_json::Value, String>>,
|
||||
}
|
||||
|
||||
/// A `StepBody` implementation that bridges to JavaScript via channels.
|
||||
///
|
||||
/// When the workflow executor calls `run()`, this sends a serialized
|
||||
/// `StepExecutionContext` to the V8 thread and awaits the response.
|
||||
pub struct JsStepBody {
|
||||
request_tx: mpsc::Sender<StepRequest>,
|
||||
request_id_counter: std::sync::Arc<std::sync::atomic::AtomicU32>,
|
||||
}
|
||||
|
||||
impl JsStepBody {
|
||||
pub fn new(
|
||||
request_tx: mpsc::Sender<StepRequest>,
|
||||
request_id_counter: std::sync::Arc<std::sync::atomic::AtomicU32>,
|
||||
) -> Self {
|
||||
Self {
|
||||
request_tx,
|
||||
request_id_counter,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl StepBody for JsStepBody {
|
||||
async fn run(
|
||||
&mut self,
|
||||
context: &StepExecutionContext<'_>,
|
||||
) -> wfe_core::Result<ExecutionResult> {
|
||||
let ctx_json = serialize_context(context);
|
||||
let (tx, rx) = oneshot::channel();
|
||||
let request_id = self
|
||||
.request_id_counter
|
||||
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||
|
||||
self.request_tx
|
||||
.send(StepRequest {
|
||||
request_id,
|
||||
step_type: context.step.step_type.clone(),
|
||||
context: ctx_json,
|
||||
response_tx: tx,
|
||||
})
|
||||
.await
|
||||
.map_err(|_| WfeError::StepExecution("step request channel closed".into()))?;
|
||||
|
||||
let result_json = rx
|
||||
.await
|
||||
.map_err(|_| WfeError::StepExecution("step response channel dropped".into()))?
|
||||
.map_err(WfeError::StepExecution)?;
|
||||
|
||||
deserialize_execution_result(&result_json)
|
||||
}
|
||||
}
|
||||
|
||||
/// Serializable view of `StepExecutionContext` for passing to JavaScript.
|
||||
#[derive(Serialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct JsStepContext {
|
||||
pub item: Option<serde_json::Value>,
|
||||
pub persistence_data: Option<serde_json::Value>,
|
||||
pub step: JsStepInfo,
|
||||
pub workflow: JsWorkflowInfo,
|
||||
pub pointer: JsPointerInfo,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct JsStepInfo {
|
||||
pub id: usize,
|
||||
pub name: Option<String>,
|
||||
pub step_type: String,
|
||||
pub step_config: Option<serde_json::Value>,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct JsWorkflowInfo {
|
||||
pub id: String,
|
||||
pub definition_id: String,
|
||||
pub version: u32,
|
||||
pub status: String,
|
||||
pub data: serde_json::Value,
|
||||
pub create_time: DateTime<Utc>,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct JsPointerInfo {
|
||||
pub id: String,
|
||||
pub step_id: usize,
|
||||
pub step_name: Option<String>,
|
||||
pub retry_count: u32,
|
||||
}
|
||||
|
||||
/// Serialize a `StepExecutionContext` into a JSON value for JavaScript.
|
||||
pub fn serialize_context(ctx: &StepExecutionContext<'_>) -> serde_json::Value {
|
||||
let js_ctx = JsStepContext {
|
||||
item: ctx.item.cloned(),
|
||||
persistence_data: ctx.persistence_data.cloned(),
|
||||
step: JsStepInfo {
|
||||
id: ctx.step.id,
|
||||
name: ctx.step.name.clone(),
|
||||
step_type: ctx.step.step_type.clone(),
|
||||
step_config: ctx.step.step_config.clone(),
|
||||
},
|
||||
workflow: JsWorkflowInfo {
|
||||
id: ctx.workflow.id.clone(),
|
||||
definition_id: ctx.workflow.workflow_definition_id.clone(),
|
||||
version: ctx.workflow.version,
|
||||
status: format!("{:?}", ctx.workflow.status),
|
||||
data: ctx.workflow.data.clone(),
|
||||
create_time: ctx.workflow.create_time,
|
||||
},
|
||||
pointer: JsPointerInfo {
|
||||
id: ctx.execution_pointer.id.clone(),
|
||||
step_id: ctx.execution_pointer.step_id,
|
||||
step_name: ctx.execution_pointer.step_name.clone(),
|
||||
retry_count: ctx.execution_pointer.retry_count,
|
||||
},
|
||||
};
|
||||
serde_json::to_value(js_ctx).unwrap_or(serde_json::Value::Null)
|
||||
}
|
||||
|
||||
/// Shape of an `ExecutionResult` as returned from JavaScript.
|
||||
#[derive(Deserialize, Default)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct JsExecutionResult {
|
||||
#[serde(default = "default_true")]
|
||||
pub proceed: bool,
|
||||
pub outcome_value: Option<serde_json::Value>,
|
||||
pub sleep_for: Option<u64>,
|
||||
pub persistence_data: Option<serde_json::Value>,
|
||||
pub event_name: Option<String>,
|
||||
pub event_key: Option<String>,
|
||||
pub branch_values: Option<Vec<serde_json::Value>>,
|
||||
pub output_data: Option<serde_json::Value>,
|
||||
}
|
||||
|
||||
fn default_true() -> bool {
|
||||
true
|
||||
}
|
||||
|
||||
/// Deserialize a JavaScript execution result into the Rust `ExecutionResult`.
|
||||
pub fn deserialize_execution_result(
|
||||
value: &serde_json::Value,
|
||||
) -> wfe_core::Result<ExecutionResult> {
|
||||
let js_result: JsExecutionResult = serde_json::from_value(value.clone()).map_err(|e| {
|
||||
WfeError::StepExecution(format!("failed to deserialize ExecutionResult from JS: {e}"))
|
||||
})?;
|
||||
|
||||
Ok(ExecutionResult {
|
||||
proceed: js_result.proceed,
|
||||
outcome_value: js_result.outcome_value,
|
||||
sleep_for: js_result.sleep_for.map(Duration::from_millis),
|
||||
persistence_data: js_result.persistence_data,
|
||||
event_name: js_result.event_name,
|
||||
event_key: js_result.event_key,
|
||||
event_as_of: None,
|
||||
branch_values: js_result.branch_values,
|
||||
poll_endpoint: None,
|
||||
output_data: js_result.output_data,
|
||||
})
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use pretty_assertions::assert_eq;
|
||||
use wfe_core::models::{ExecutionPointer, WorkflowInstance, WorkflowStatus, WorkflowStep};
|
||||
|
||||
fn make_test_context() -> (WorkflowInstance, WorkflowStep, ExecutionPointer) {
|
||||
let instance = WorkflowInstance {
|
||||
id: "wf-1".into(),
|
||||
workflow_definition_id: "test-def".into(),
|
||||
version: 1,
|
||||
description: None,
|
||||
reference: None,
|
||||
execution_pointers: vec![],
|
||||
next_execution: None,
|
||||
status: WorkflowStatus::Runnable,
|
||||
data: serde_json::json!({"name": "World"}),
|
||||
create_time: Utc::now(),
|
||||
complete_time: None,
|
||||
};
|
||||
let step = WorkflowStep::new(0, "MyStep");
|
||||
let pointer = ExecutionPointer::new(0);
|
||||
(instance, step, pointer)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn serialize_context_produces_valid_json() {
|
||||
let (instance, mut step, pointer) = make_test_context();
|
||||
step.name = Some("greet".into());
|
||||
step.step_config = Some(serde_json::json!({"key": "val"}));
|
||||
|
||||
let ctx = StepExecutionContext {
|
||||
item: None,
|
||||
execution_pointer: &pointer,
|
||||
persistence_data: None,
|
||||
step: &step,
|
||||
workflow: &instance,
|
||||
cancellation_token: tokio_util::sync::CancellationToken::new(),
|
||||
host_context: None,
|
||||
log_sink: None,
|
||||
};
|
||||
|
||||
let json = serialize_context(&ctx);
|
||||
assert_eq!(json["step"]["name"], "greet");
|
||||
assert_eq!(json["step"]["stepConfig"]["key"], "val");
|
||||
assert_eq!(json["workflow"]["data"]["name"], "World");
|
||||
assert_eq!(json["workflow"]["definitionId"], "test-def");
|
||||
assert_eq!(json["pointer"]["stepId"], 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn serialize_context_with_item() {
|
||||
let (instance, step, pointer) = make_test_context();
|
||||
let item = serde_json::json!({"id": 42});
|
||||
|
||||
let ctx = StepExecutionContext {
|
||||
item: Some(&item),
|
||||
execution_pointer: &pointer,
|
||||
persistence_data: Some(&serde_json::json!({"saved": true})),
|
||||
step: &step,
|
||||
workflow: &instance,
|
||||
cancellation_token: tokio_util::sync::CancellationToken::new(),
|
||||
host_context: None,
|
||||
log_sink: None,
|
||||
};
|
||||
|
||||
let json = serialize_context(&ctx);
|
||||
assert_eq!(json["item"]["id"], 42);
|
||||
assert_eq!(json["persistenceData"]["saved"], true);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn deserialize_next_result() {
|
||||
let json = serde_json::json!({"proceed": true});
|
||||
let result = deserialize_execution_result(&json).unwrap();
|
||||
assert!(result.proceed);
|
||||
assert!(result.outcome_value.is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn deserialize_outcome_result() {
|
||||
let json = serde_json::json!({
|
||||
"proceed": true,
|
||||
"outcomeValue": "branch-a"
|
||||
});
|
||||
let result = deserialize_execution_result(&json).unwrap();
|
||||
assert!(result.proceed);
|
||||
assert_eq!(result.outcome_value, Some(serde_json::json!("branch-a")));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn deserialize_sleep_result() {
|
||||
let json = serde_json::json!({
|
||||
"proceed": false,
|
||||
"sleepFor": 5000
|
||||
});
|
||||
let result = deserialize_execution_result(&json).unwrap();
|
||||
assert!(!result.proceed);
|
||||
assert_eq!(result.sleep_for, Some(Duration::from_millis(5000)));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn deserialize_persist_result() {
|
||||
let json = serde_json::json!({
|
||||
"proceed": false,
|
||||
"persistenceData": {"page": 2}
|
||||
});
|
||||
let result = deserialize_execution_result(&json).unwrap();
|
||||
assert!(!result.proceed);
|
||||
assert_eq!(
|
||||
result.persistence_data,
|
||||
Some(serde_json::json!({"page": 2}))
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn deserialize_wait_for_event_result() {
|
||||
let json = serde_json::json!({
|
||||
"proceed": false,
|
||||
"eventName": "order.paid",
|
||||
"eventKey": "order-123"
|
||||
});
|
||||
let result = deserialize_execution_result(&json).unwrap();
|
||||
assert_eq!(result.event_name, Some("order.paid".into()));
|
||||
assert_eq!(result.event_key, Some("order-123".into()));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn deserialize_branch_result() {
|
||||
let json = serde_json::json!({
|
||||
"proceed": false,
|
||||
"branchValues": [1, 2, 3]
|
||||
});
|
||||
let result = deserialize_execution_result(&json).unwrap();
|
||||
assert_eq!(
|
||||
result.branch_values,
|
||||
Some(vec![
|
||||
serde_json::json!(1),
|
||||
serde_json::json!(2),
|
||||
serde_json::json!(3)
|
||||
])
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn deserialize_output_data_result() {
|
||||
let json = serde_json::json!({
|
||||
"proceed": true,
|
||||
"outputData": {"greeted": "World"}
|
||||
});
|
||||
let result = deserialize_execution_result(&json).unwrap();
|
||||
assert!(result.proceed);
|
||||
assert_eq!(
|
||||
result.output_data,
|
||||
Some(serde_json::json!({"greeted": "World"}))
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn deserialize_empty_object_defaults_to_proceed() {
|
||||
let json = serde_json::json!({});
|
||||
let result = deserialize_execution_result(&json).unwrap();
|
||||
assert!(result.proceed);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn deserialize_invalid_json_returns_error() {
|
||||
let json = serde_json::json!("not an object");
|
||||
let result = deserialize_execution_result(&json);
|
||||
assert!(result.is_err());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn js_step_body_channel_round_trip() {
|
||||
let (tx, mut rx) = mpsc::channel::<StepRequest>(16);
|
||||
let counter = std::sync::Arc::new(std::sync::atomic::AtomicU32::new(0));
|
||||
let mut body = JsStepBody::new(tx, counter);
|
||||
|
||||
let (instance, step, pointer) = make_test_context();
|
||||
let ctx = StepExecutionContext {
|
||||
item: None,
|
||||
execution_pointer: &pointer,
|
||||
persistence_data: None,
|
||||
step: &step,
|
||||
workflow: &instance,
|
||||
cancellation_token: tokio_util::sync::CancellationToken::new(),
|
||||
host_context: None,
|
||||
log_sink: None,
|
||||
};
|
||||
|
||||
// Spawn a "JS side" that responds to the request.
|
||||
let responder = tokio::spawn(async move {
|
||||
let req = rx.recv().await.unwrap();
|
||||
assert_eq!(req.step_type, "MyStep");
|
||||
assert_eq!(req.request_id, 0);
|
||||
req.response_tx
|
||||
.send(Ok(serde_json::json!({"proceed": true, "outputData": {"done": true}})))
|
||||
.unwrap();
|
||||
});
|
||||
|
||||
let result = body.run(&ctx).await.unwrap();
|
||||
assert!(result.proceed);
|
||||
assert_eq!(result.output_data, Some(serde_json::json!({"done": true})));
|
||||
|
||||
responder.await.unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn js_step_body_propagates_js_error() {
|
||||
let (tx, mut rx) = mpsc::channel::<StepRequest>(16);
|
||||
let counter = std::sync::Arc::new(std::sync::atomic::AtomicU32::new(0));
|
||||
let mut body = JsStepBody::new(tx, counter);
|
||||
|
||||
let (instance, step, pointer) = make_test_context();
|
||||
let ctx = StepExecutionContext {
|
||||
item: None,
|
||||
execution_pointer: &pointer,
|
||||
persistence_data: None,
|
||||
step: &step,
|
||||
workflow: &instance,
|
||||
cancellation_token: tokio_util::sync::CancellationToken::new(),
|
||||
host_context: None,
|
||||
log_sink: None,
|
||||
};
|
||||
|
||||
tokio::spawn(async move {
|
||||
let req = rx.recv().await.unwrap();
|
||||
req.response_tx
|
||||
.send(Err("TypeError: undefined is not a function".into()))
|
||||
.unwrap();
|
||||
});
|
||||
|
||||
let result = body.run(&ctx).await;
|
||||
assert!(result.is_err());
|
||||
let err_msg = format!("{}", result.unwrap_err());
|
||||
assert!(err_msg.contains("TypeError"));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn js_step_body_handles_dropped_responder() {
|
||||
let (tx, mut rx) = mpsc::channel::<StepRequest>(16);
|
||||
let counter = std::sync::Arc::new(std::sync::atomic::AtomicU32::new(0));
|
||||
let mut body = JsStepBody::new(tx, counter);
|
||||
|
||||
let (instance, step, pointer) = make_test_context();
|
||||
let ctx = StepExecutionContext {
|
||||
item: None,
|
||||
execution_pointer: &pointer,
|
||||
persistence_data: None,
|
||||
step: &step,
|
||||
workflow: &instance,
|
||||
cancellation_token: tokio_util::sync::CancellationToken::new(),
|
||||
host_context: None,
|
||||
log_sink: None,
|
||||
};
|
||||
|
||||
tokio::spawn(async move {
|
||||
let req = rx.recv().await.unwrap();
|
||||
drop(req.response_tx); // Drop without sending
|
||||
});
|
||||
|
||||
let result = body.run(&ctx).await;
|
||||
assert!(result.is_err());
|
||||
}
|
||||
}
|
||||
50
wfe-deno/src/js/bootstrap.js
vendored
Normal file
50
wfe-deno/src/js/bootstrap.js
vendored
Normal file
@@ -0,0 +1,50 @@
|
||||
// Import wfe.js so it's evaluated and available for user code.
|
||||
import "ext:wfe-deno/wfe.js";
|
||||
|
||||
// Step function registry -- maps step type names to JS functions.
|
||||
const _stepFunctions = {};
|
||||
|
||||
// Called by WorkflowHost.registerStep() to store the JS function.
|
||||
globalThis.__wfe_registerStepFunction = (stepType, fn) => {
|
||||
_stepFunctions[stepType] = fn;
|
||||
};
|
||||
|
||||
// Step executor loop -- runs in the background, dispatching step execution
|
||||
// requests from the Rust executor to registered JS functions.
|
||||
//
|
||||
// This loop starts automatically when the extension loads. It blocks on
|
||||
// op_step_executor_poll (async op) until a step needs to execute, then
|
||||
// calls the matching JS function and sends the result back.
|
||||
globalThis.__wfe_startExecutorLoop = () => {
|
||||
(async () => {
|
||||
while (true) {
|
||||
let req;
|
||||
try {
|
||||
req = await Deno.core.ops.op_step_executor_poll();
|
||||
} catch (_) {
|
||||
// Poll op failed (e.g. already active) -- stop loop.
|
||||
break;
|
||||
}
|
||||
if (req === null || req === undefined) break; // shutdown signal
|
||||
|
||||
try {
|
||||
const fn = _stepFunctions[req.stepType];
|
||||
if (!fn) {
|
||||
throw new Error(`No step function registered for "${req.stepType}"`);
|
||||
}
|
||||
const result = await fn(req.context);
|
||||
Deno.core.ops.op_step_executor_respond(
|
||||
req.requestId,
|
||||
result || { proceed: true },
|
||||
null,
|
||||
);
|
||||
} catch (e) {
|
||||
Deno.core.ops.op_step_executor_respond(
|
||||
req.requestId,
|
||||
null,
|
||||
e.message || String(e),
|
||||
);
|
||||
}
|
||||
}
|
||||
})();
|
||||
};
|
||||
188
wfe-deno/src/js/wfe.js
Normal file
188
wfe-deno/src/js/wfe.js
Normal file
@@ -0,0 +1,188 @@
|
||||
// High-level WFE API for Deno.
|
||||
//
|
||||
// Usage:
|
||||
// import { WorkflowHost, WorkflowBuilder, ExecutionResult } from "ext:wfe-deno/wfe.js";
|
||||
|
||||
/// Factory methods for step execution results.
|
||||
export class ExecutionResult {
|
||||
/** Proceed to the next step. */
|
||||
static next() {
|
||||
return { proceed: true };
|
||||
}
|
||||
|
||||
/** Proceed with a routing outcome value. */
|
||||
static outcome(value) {
|
||||
return { proceed: true, outcomeValue: value };
|
||||
}
|
||||
|
||||
/** Pause and persist step state for later resumption. */
|
||||
static persist(data) {
|
||||
return { proceed: false, persistenceData: data };
|
||||
}
|
||||
|
||||
/** Sleep for the given number of milliseconds. */
|
||||
static sleep(ms, persistenceData) {
|
||||
const r = { proceed: false, sleepFor: ms };
|
||||
if (persistenceData !== undefined) r.persistenceData = persistenceData;
|
||||
return r;
|
||||
}
|
||||
|
||||
/** Wait for an external event before continuing. */
|
||||
static waitForEvent(eventName, eventKey) {
|
||||
return { proceed: false, eventName, eventKey };
|
||||
}
|
||||
|
||||
/** Fork into parallel branches. */
|
||||
static branch(values) {
|
||||
return { proceed: false, branchValues: values };
|
||||
}
|
||||
|
||||
/** Proceed and attach output data. */
|
||||
static output(data) {
|
||||
return { proceed: true, outputData: data };
|
||||
}
|
||||
}
|
||||
|
||||
/// Workflow engine host.
|
||||
export class WorkflowHost {
|
||||
/** Create a WorkflowHost with in-memory providers. */
|
||||
static create() {
|
||||
Deno.core.ops.op_host_create();
|
||||
return new WorkflowHost();
|
||||
}
|
||||
|
||||
/** Start the background workflow and event consumer loops. */
|
||||
async start() {
|
||||
await Deno.core.ops.op_host_start();
|
||||
}
|
||||
|
||||
/** Stop the host gracefully. */
|
||||
async stop() {
|
||||
await Deno.core.ops.op_host_stop();
|
||||
}
|
||||
|
||||
/**
|
||||
* Register a JavaScript function as a workflow step.
|
||||
*
|
||||
* @param {string} stepType - The step type name (used in builder).
|
||||
* @param {function} fn - Async function receiving context, returning ExecutionResult.
|
||||
*/
|
||||
async registerStep(stepType, fn) {
|
||||
// Start the executor loop on first step registration.
|
||||
if (!this._executorStarted) {
|
||||
globalThis.__wfe_startExecutorLoop();
|
||||
this._executorStarted = true;
|
||||
}
|
||||
await Deno.core.ops.op_register_step(stepType);
|
||||
globalThis.__wfe_registerStepFunction(stepType, fn);
|
||||
}
|
||||
|
||||
/**
|
||||
* Start a new workflow instance.
|
||||
*
|
||||
* @param {string} definitionId - The workflow definition ID.
|
||||
* @param {number} version - The workflow version.
|
||||
* @param {object} data - Initial workflow data.
|
||||
* @returns {Promise<string>} The workflow instance ID.
|
||||
*/
|
||||
async startWorkflow(definitionId, version, data) {
|
||||
return await Deno.core.ops.op_start_workflow(definitionId, version, data);
|
||||
}
|
||||
|
||||
/** Suspend a running workflow. */
|
||||
async suspendWorkflow(id) {
|
||||
return await Deno.core.ops.op_suspend_workflow(id);
|
||||
}
|
||||
|
||||
/** Resume a suspended workflow. */
|
||||
async resumeWorkflow(id) {
|
||||
return await Deno.core.ops.op_resume_workflow(id);
|
||||
}
|
||||
|
||||
/** Terminate a workflow. */
|
||||
async terminateWorkflow(id) {
|
||||
return await Deno.core.ops.op_terminate_workflow(id);
|
||||
}
|
||||
|
||||
/** Get a workflow instance by ID. */
|
||||
async getWorkflow(id) {
|
||||
return await Deno.core.ops.op_get_workflow(id);
|
||||
}
|
||||
|
||||
/** Publish an event for waiting workflows. */
|
||||
async publishEvent(eventName, eventKey, data) {
|
||||
await Deno.core.ops.op_publish_event(eventName, eventKey, data);
|
||||
}
|
||||
|
||||
/**
|
||||
* Build and register a workflow definition using a fluent builder.
|
||||
*
|
||||
* @param {string} id - Workflow definition ID.
|
||||
* @param {number} version - Workflow version.
|
||||
* @param {function} builderFn - Function receiving a WorkflowBuilder.
|
||||
*/
|
||||
async buildWorkflow(id, version, builderFn) {
|
||||
const b = new WorkflowBuilder();
|
||||
builderFn(b);
|
||||
await b._register(id, version);
|
||||
}
|
||||
}
|
||||
|
||||
/// Fluent workflow builder wrapping the Rust WorkflowBuilder via ops.
|
||||
export class WorkflowBuilder {
|
||||
constructor() {
|
||||
this._handle = Deno.core.ops.op_builder_create();
|
||||
}
|
||||
|
||||
/** Add the first step. */
|
||||
startWith(stepType) {
|
||||
Deno.core.ops.op_builder_start_with(this._handle, stepType);
|
||||
return this;
|
||||
}
|
||||
|
||||
/** Chain the next step. */
|
||||
then(stepType) {
|
||||
Deno.core.ops.op_builder_then(this._handle, stepType);
|
||||
return this;
|
||||
}
|
||||
|
||||
/** Set the current step's display name. */
|
||||
name(n) {
|
||||
Deno.core.ops.op_builder_name(this._handle, n);
|
||||
return this;
|
||||
}
|
||||
|
||||
/** Attach JSON configuration to the current step. */
|
||||
config(c) {
|
||||
Deno.core.ops.op_builder_config(this._handle, c);
|
||||
return this;
|
||||
}
|
||||
|
||||
/** Set error handling behavior for the current step. */
|
||||
onError(behavior) {
|
||||
Deno.core.ops.op_builder_on_error(this._handle, behavior);
|
||||
return this;
|
||||
}
|
||||
|
||||
/** Add a delay step (milliseconds). */
|
||||
delay(ms) {
|
||||
Deno.core.ops.op_builder_delay(this._handle, ms);
|
||||
return this;
|
||||
}
|
||||
|
||||
/** Add a wait-for-event step. */
|
||||
waitFor(eventName, eventKey) {
|
||||
Deno.core.ops.op_builder_wait_for(this._handle, eventName, eventKey);
|
||||
return this;
|
||||
}
|
||||
|
||||
/** Build and return the workflow definition as JSON. */
|
||||
build(id, version) {
|
||||
return Deno.core.ops.op_builder_build(this._handle, id, version);
|
||||
}
|
||||
|
||||
/** Build and register the definition with the host. */
|
||||
async _register(id, version) {
|
||||
await Deno.core.ops.op_builder_register(this._handle, id, version);
|
||||
}
|
||||
}
|
||||
63
wfe-deno/src/lib.rs
Normal file
63
wfe-deno/src/lib.rs
Normal file
@@ -0,0 +1,63 @@
|
||||
pub mod bridge;
|
||||
pub mod ops;
|
||||
pub mod state;
|
||||
|
||||
use deno_core::{JsRuntime, RuntimeOptions};
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
use crate::ops::wfe_deno_ext;
|
||||
use crate::state::WfeState;
|
||||
|
||||
/// Create a `JsRuntime` with the wfe-deno extension loaded.
|
||||
///
|
||||
/// The runtime is ready to execute JavaScript that uses the WFE API:
|
||||
/// ```js
|
||||
/// import { WorkflowHost, ExecutionResult } from "ext:wfe-deno/wfe.js";
|
||||
/// ```
|
||||
pub fn create_wfe_runtime() -> JsRuntime {
|
||||
create_wfe_runtime_with_channel_size(256)
|
||||
}
|
||||
|
||||
/// Create a `JsRuntime` with a custom step-request channel buffer size.
|
||||
pub fn create_wfe_runtime_with_channel_size(channel_size: usize) -> JsRuntime {
|
||||
let ext = wfe_deno_ext::init();
|
||||
|
||||
let runtime = JsRuntime::new(RuntimeOptions {
|
||||
extensions: vec![ext],
|
||||
..Default::default()
|
||||
});
|
||||
|
||||
let (tx, rx) = mpsc::channel(channel_size);
|
||||
{
|
||||
let op_state = runtime.op_state();
|
||||
let mut op_state = op_state.borrow_mut();
|
||||
op_state.put(WfeState::new(tx, rx));
|
||||
}
|
||||
|
||||
runtime
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn create_runtime_succeeds() {
|
||||
let _runtime = create_wfe_runtime();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn create_runtime_with_custom_channel_size() {
|
||||
let _runtime = create_wfe_runtime_with_channel_size(1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn runtime_has_wfe_state() {
|
||||
let runtime = create_wfe_runtime();
|
||||
let op_state = runtime.op_state();
|
||||
let op_state = op_state.borrow();
|
||||
let wfe = op_state.borrow::<WfeState>();
|
||||
assert!(wfe.host.is_none());
|
||||
assert!(wfe.step_request_rx.is_some());
|
||||
}
|
||||
}
|
||||
312
wfe-deno/src/ops/builder.rs
Normal file
312
wfe-deno/src/ops/builder.rs
Normal file
@@ -0,0 +1,312 @@
|
||||
use std::time::Duration;
|
||||
|
||||
use deno_core::op2;
|
||||
use deno_core::OpState;
|
||||
use wfe_core::builder::WorkflowBuilder;
|
||||
use wfe_core::models::ErrorBehavior;
|
||||
|
||||
use crate::state::WfeState;
|
||||
|
||||
/// Internal builder state that tracks the WorkflowBuilder and current step.
|
||||
///
|
||||
/// We don't use StepBuilder (pub(crate)) — instead we work directly with
|
||||
/// WorkflowBuilder's public `steps` field, `add_step`, and `wire_outcome`.
|
||||
pub struct JsBuilderState {
|
||||
pub wb: WorkflowBuilder<serde_json::Value>,
|
||||
pub current_step: Option<usize>,
|
||||
}
|
||||
|
||||
/// Create a new WorkflowBuilder and return its handle.
|
||||
#[op2(fast)]
|
||||
#[smi]
|
||||
pub fn op_builder_create(state: &mut OpState) -> u32 {
|
||||
let wfe = state.borrow_mut::<WfeState>();
|
||||
let id = wfe.alloc_builder_id();
|
||||
wfe.builders.insert(
|
||||
id,
|
||||
JsBuilderState {
|
||||
wb: WorkflowBuilder::<serde_json::Value>::new(),
|
||||
current_step: None,
|
||||
},
|
||||
);
|
||||
id
|
||||
}
|
||||
|
||||
/// Add the first step via `start_with`.
|
||||
#[op2(fast)]
|
||||
pub fn op_builder_start_with(
|
||||
state: &mut OpState,
|
||||
#[smi] handle: u32,
|
||||
#[string] step_type: String,
|
||||
) -> Result<(), deno_error::JsErrorBox> {
|
||||
let wfe = state.borrow_mut::<WfeState>();
|
||||
let bs = get_builder(wfe, handle)?;
|
||||
|
||||
if bs.current_step.is_some() {
|
||||
return Err(deno_error::JsErrorBox::generic(
|
||||
"start_with already called on this builder",
|
||||
));
|
||||
}
|
||||
|
||||
let step_id = bs.wb.add_step(&step_type);
|
||||
bs.current_step = Some(step_id);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Chain the next step via `then`.
|
||||
#[op2(fast)]
|
||||
pub fn op_builder_then(
|
||||
state: &mut OpState,
|
||||
#[smi] handle: u32,
|
||||
#[string] step_type: String,
|
||||
) -> Result<(), deno_error::JsErrorBox> {
|
||||
let wfe = state.borrow_mut::<WfeState>();
|
||||
let bs = get_builder(wfe, handle)?;
|
||||
|
||||
let prev_id = bs
|
||||
.current_step
|
||||
.ok_or_else(|| deno_error::JsErrorBox::generic("call start_with before then"))?;
|
||||
|
||||
let next_id = bs.wb.add_step(&step_type);
|
||||
bs.wb.wire_outcome(prev_id, next_id, None);
|
||||
bs.current_step = Some(next_id);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Set the current step's name.
|
||||
#[op2(fast)]
|
||||
pub fn op_builder_name(
|
||||
state: &mut OpState,
|
||||
#[smi] handle: u32,
|
||||
#[string] name: String,
|
||||
) -> Result<(), deno_error::JsErrorBox> {
|
||||
let wfe = state.borrow_mut::<WfeState>();
|
||||
let bs = get_builder(wfe, handle)?;
|
||||
let step_id = current_step(bs)?;
|
||||
bs.wb.steps[step_id].name = Some(name);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Set the current step's JSON config.
|
||||
#[op2]
|
||||
pub fn op_builder_config(
|
||||
state: &mut OpState,
|
||||
#[smi] handle: u32,
|
||||
#[serde] config: serde_json::Value,
|
||||
) -> Result<(), deno_error::JsErrorBox> {
|
||||
let wfe = state.borrow_mut::<WfeState>();
|
||||
let bs = get_builder(wfe, handle)?;
|
||||
let step_id = current_step(bs)?;
|
||||
bs.wb.steps[step_id].step_config = Some(config);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Set the current step's error behavior.
|
||||
#[op2]
|
||||
pub fn op_builder_on_error(
|
||||
state: &mut OpState,
|
||||
#[smi] handle: u32,
|
||||
#[serde] behavior: serde_json::Value,
|
||||
) -> Result<(), deno_error::JsErrorBox> {
|
||||
let wfe = state.borrow_mut::<WfeState>();
|
||||
let bs = get_builder(wfe, handle)?;
|
||||
let step_id = current_step(bs)?;
|
||||
let eb = parse_error_behavior(&behavior)?;
|
||||
bs.wb.steps[step_id].error_behavior = Some(eb);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Chain a delay step after the current step.
|
||||
#[op2(fast)]
|
||||
pub fn op_builder_delay(
|
||||
state: &mut OpState,
|
||||
#[smi] handle: u32,
|
||||
#[number] ms: u64,
|
||||
) -> Result<(), deno_error::JsErrorBox> {
|
||||
let wfe = state.borrow_mut::<WfeState>();
|
||||
let bs = get_builder(wfe, handle)?;
|
||||
let prev_id = current_step(bs)?;
|
||||
|
||||
let delay_type = std::any::type_name::<wfe_core::primitives::delay::DelayStep>();
|
||||
let next_id = bs.wb.add_step(delay_type);
|
||||
bs.wb.steps[next_id].step_config = Some(serde_json::json!({
|
||||
"duration_millis": ms,
|
||||
}));
|
||||
bs.wb.wire_outcome(prev_id, next_id, None);
|
||||
bs.current_step = Some(next_id);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Chain a wait-for-event step after the current step.
|
||||
#[op2(fast)]
|
||||
pub fn op_builder_wait_for(
|
||||
state: &mut OpState,
|
||||
#[smi] handle: u32,
|
||||
#[string] event_name: String,
|
||||
#[string] event_key: String,
|
||||
) -> Result<(), deno_error::JsErrorBox> {
|
||||
let wfe = state.borrow_mut::<WfeState>();
|
||||
let bs = get_builder(wfe, handle)?;
|
||||
let prev_id = current_step(bs)?;
|
||||
|
||||
let wait_type = std::any::type_name::<wfe_core::primitives::wait_for::WaitForStep>();
|
||||
let next_id = bs.wb.add_step(wait_type);
|
||||
bs.wb.steps[next_id].step_config = Some(serde_json::json!({
|
||||
"event_name": event_name,
|
||||
"event_key": event_key,
|
||||
}));
|
||||
bs.wb.wire_outcome(prev_id, next_id, None);
|
||||
bs.current_step = Some(next_id);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Build the workflow definition and return it as JSON. Consumes the builder.
|
||||
#[op2]
|
||||
#[serde]
|
||||
pub fn op_builder_build(
|
||||
state: &mut OpState,
|
||||
#[smi] handle: u32,
|
||||
#[string] id: String,
|
||||
#[smi] version: u32,
|
||||
) -> Result<serde_json::Value, deno_error::JsErrorBox> {
|
||||
let wfe = state.borrow_mut::<WfeState>();
|
||||
let bs = wfe
|
||||
.builders
|
||||
.remove(&handle)
|
||||
.ok_or_else(|| deno_error::JsErrorBox::generic("invalid builder handle"))?;
|
||||
|
||||
let def = bs.wb.build(&id, version);
|
||||
serde_json::to_value(&def)
|
||||
.map_err(|e| deno_error::JsErrorBox::generic(format!("serialization failed: {e}")))
|
||||
}
|
||||
|
||||
/// Build the workflow definition and register it with the host. Consumes the builder.
|
||||
#[op2]
|
||||
pub async fn op_builder_register(
|
||||
state: std::rc::Rc<std::cell::RefCell<OpState>>,
|
||||
#[smi] handle: u32,
|
||||
#[string] id: String,
|
||||
#[smi] version: u32,
|
||||
) -> Result<(), deno_error::JsErrorBox> {
|
||||
let (def, host) = {
|
||||
let mut s = state.borrow_mut();
|
||||
let wfe = s.borrow_mut::<WfeState>();
|
||||
let bs = wfe
|
||||
.builders
|
||||
.remove(&handle)
|
||||
.ok_or_else(|| deno_error::JsErrorBox::generic("invalid builder handle"))?;
|
||||
let def = bs.wb.build(&id, version);
|
||||
let host = wfe.host()?.clone();
|
||||
(def, host)
|
||||
};
|
||||
|
||||
host.register_workflow_definition(def).await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn get_builder(
|
||||
wfe: &mut WfeState,
|
||||
handle: u32,
|
||||
) -> Result<&mut JsBuilderState, deno_error::JsErrorBox> {
|
||||
wfe.builders
|
||||
.get_mut(&handle)
|
||||
.ok_or_else(|| deno_error::JsErrorBox::generic("invalid builder handle"))
|
||||
}
|
||||
|
||||
fn current_step(bs: &JsBuilderState) -> Result<usize, deno_error::JsErrorBox> {
|
||||
bs.current_step
|
||||
.ok_or_else(|| deno_error::JsErrorBox::generic("no current step — call start_with first"))
|
||||
}
|
||||
|
||||
fn parse_error_behavior(
|
||||
value: &serde_json::Value,
|
||||
) -> Result<ErrorBehavior, deno_error::JsErrorBox> {
|
||||
match value.as_str() {
|
||||
Some("suspend") => Ok(ErrorBehavior::Suspend),
|
||||
Some("terminate") => Ok(ErrorBehavior::Terminate),
|
||||
Some("compensate") => Ok(ErrorBehavior::Compensate),
|
||||
_ => {
|
||||
if let Some(retry) = value.get("retry") {
|
||||
let interval = retry
|
||||
.get("interval")
|
||||
.and_then(|v| v.as_u64())
|
||||
.unwrap_or(60_000);
|
||||
let max_retries = retry
|
||||
.get("maxRetries")
|
||||
.and_then(|v| v.as_u64())
|
||||
.unwrap_or(3) as u32;
|
||||
Ok(ErrorBehavior::Retry {
|
||||
interval: Duration::from_millis(interval),
|
||||
max_retries,
|
||||
})
|
||||
} else {
|
||||
Err(deno_error::JsErrorBox::generic(format!(
|
||||
"invalid error behavior: {value}"
|
||||
)))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use pretty_assertions::assert_eq;
|
||||
|
||||
#[test]
|
||||
fn parse_suspend_behavior() {
|
||||
let eb = parse_error_behavior(&serde_json::json!("suspend")).unwrap();
|
||||
assert!(matches!(eb, ErrorBehavior::Suspend));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_terminate_behavior() {
|
||||
let eb = parse_error_behavior(&serde_json::json!("terminate")).unwrap();
|
||||
assert!(matches!(eb, ErrorBehavior::Terminate));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_compensate_behavior() {
|
||||
let eb = parse_error_behavior(&serde_json::json!("compensate")).unwrap();
|
||||
assert!(matches!(eb, ErrorBehavior::Compensate));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_retry_behavior() {
|
||||
let eb = parse_error_behavior(
|
||||
&serde_json::json!({"retry": {"interval": 5000, "maxRetries": 5}}),
|
||||
)
|
||||
.unwrap();
|
||||
match eb {
|
||||
ErrorBehavior::Retry {
|
||||
interval,
|
||||
max_retries,
|
||||
} => {
|
||||
assert_eq!(interval, Duration::from_millis(5000));
|
||||
assert_eq!(max_retries, 5);
|
||||
}
|
||||
_ => panic!("expected Retry"),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_retry_behavior_defaults() {
|
||||
let eb = parse_error_behavior(&serde_json::json!({"retry": {}})).unwrap();
|
||||
match eb {
|
||||
ErrorBehavior::Retry {
|
||||
interval,
|
||||
max_retries,
|
||||
} => {
|
||||
assert_eq!(interval, Duration::from_millis(60_000));
|
||||
assert_eq!(max_retries, 3);
|
||||
}
|
||||
_ => panic!("expected Retry"),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_invalid_behavior_returns_error() {
|
||||
let result = parse_error_behavior(&serde_json::json!("nonsense"));
|
||||
assert!(result.is_err());
|
||||
}
|
||||
}
|
||||
22
wfe-deno/src/ops/event.rs
Normal file
22
wfe-deno/src/ops/event.rs
Normal file
@@ -0,0 +1,22 @@
|
||||
use deno_core::op2;
|
||||
use deno_core::OpState;
|
||||
|
||||
use crate::state::WfeState;
|
||||
|
||||
/// Publish an event to the workflow host for matching subscriptions.
|
||||
#[op2]
|
||||
pub async fn op_publish_event(
|
||||
state: std::rc::Rc<std::cell::RefCell<OpState>>,
|
||||
#[string] event_name: String,
|
||||
#[string] event_key: String,
|
||||
#[serde] data: serde_json::Value,
|
||||
) -> Result<(), deno_error::JsErrorBox> {
|
||||
let host = {
|
||||
let s = state.borrow();
|
||||
let wfe = s.borrow::<WfeState>();
|
||||
wfe.host()?.clone()
|
||||
};
|
||||
host.publish_event(&event_name, &event_key, data)
|
||||
.await
|
||||
.map_err(|e| deno_error::JsErrorBox::generic(format!("publish_event failed: {e}")))
|
||||
}
|
||||
62
wfe-deno/src/ops/host.rs
Normal file
62
wfe-deno/src/ops/host.rs
Normal file
@@ -0,0 +1,62 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use deno_core::op2;
|
||||
use deno_core::OpState;
|
||||
|
||||
use crate::state::WfeState;
|
||||
|
||||
/// Create a WorkflowHost with in-memory providers and store it in state.
|
||||
#[op2(fast)]
|
||||
pub fn op_host_create(state: &mut OpState) -> Result<(), deno_error::JsErrorBox> {
|
||||
let wfe = state.borrow_mut::<WfeState>();
|
||||
if wfe.host.is_some() {
|
||||
return Err(deno_error::JsErrorBox::generic(
|
||||
"WorkflowHost already created",
|
||||
));
|
||||
}
|
||||
|
||||
let persistence = Arc::new(wfe_core::test_support::InMemoryPersistenceProvider::new());
|
||||
let lock = Arc::new(wfe_core::test_support::InMemoryLockProvider::new());
|
||||
let queue = Arc::new(wfe_core::test_support::InMemoryQueueProvider::new());
|
||||
let lifecycle = Arc::new(wfe_core::test_support::InMemoryLifecyclePublisher::new());
|
||||
|
||||
let host = wfe::WorkflowHostBuilder::new()
|
||||
.use_persistence(persistence)
|
||||
.use_lock_provider(lock)
|
||||
.use_queue_provider(queue)
|
||||
.use_lifecycle(lifecycle)
|
||||
.build()
|
||||
.map_err(|e| deno_error::JsErrorBox::generic(format!("Failed to build host: {e}")))?;
|
||||
|
||||
wfe.host = Some(Arc::new(host));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Start the WorkflowHost background tasks.
|
||||
#[op2]
|
||||
pub async fn op_host_start(
|
||||
state: std::rc::Rc<std::cell::RefCell<OpState>>,
|
||||
) -> Result<(), deno_error::JsErrorBox> {
|
||||
let host = {
|
||||
let s = state.borrow();
|
||||
let wfe = s.borrow::<WfeState>();
|
||||
wfe.host()?.clone()
|
||||
};
|
||||
host.start()
|
||||
.await
|
||||
.map_err(|e| deno_error::JsErrorBox::generic(format!("Failed to start host: {e}")))
|
||||
}
|
||||
|
||||
/// Stop the WorkflowHost gracefully.
|
||||
#[op2]
|
||||
pub async fn op_host_stop(
|
||||
state: std::rc::Rc<std::cell::RefCell<OpState>>,
|
||||
) -> Result<(), deno_error::JsErrorBox> {
|
||||
let host = {
|
||||
let s = state.borrow();
|
||||
let wfe = s.borrow::<WfeState>();
|
||||
wfe.host()?.clone()
|
||||
};
|
||||
host.stop().await;
|
||||
Ok(())
|
||||
}
|
||||
43
wfe-deno/src/ops/mod.rs
Normal file
43
wfe-deno/src/ops/mod.rs
Normal file
@@ -0,0 +1,43 @@
|
||||
pub mod builder;
|
||||
pub mod event;
|
||||
pub mod host;
|
||||
pub mod step;
|
||||
pub mod workflow;
|
||||
|
||||
deno_core::extension!(
|
||||
wfe_deno_ext,
|
||||
ops = [
|
||||
// Host lifecycle
|
||||
host::op_host_create,
|
||||
host::op_host_start,
|
||||
host::op_host_stop,
|
||||
// Workflow management
|
||||
workflow::op_start_workflow,
|
||||
workflow::op_suspend_workflow,
|
||||
workflow::op_resume_workflow,
|
||||
workflow::op_terminate_workflow,
|
||||
workflow::op_get_workflow,
|
||||
// Builder
|
||||
builder::op_builder_create,
|
||||
builder::op_builder_start_with,
|
||||
builder::op_builder_then,
|
||||
builder::op_builder_name,
|
||||
builder::op_builder_config,
|
||||
builder::op_builder_on_error,
|
||||
builder::op_builder_delay,
|
||||
builder::op_builder_wait_for,
|
||||
builder::op_builder_build,
|
||||
builder::op_builder_register,
|
||||
// Step execution bridge
|
||||
step::op_register_step,
|
||||
step::op_step_executor_poll,
|
||||
step::op_step_executor_respond,
|
||||
// Events
|
||||
event::op_publish_event,
|
||||
],
|
||||
esm_entry_point = "ext:wfe-deno/bootstrap.js",
|
||||
esm = [
|
||||
"ext:wfe-deno/bootstrap.js" = "src/js/bootstrap.js",
|
||||
"ext:wfe-deno/wfe.js" = "src/js/wfe.js",
|
||||
],
|
||||
);
|
||||
107
wfe-deno/src/ops/step.rs
Normal file
107
wfe-deno/src/ops/step.rs
Normal file
@@ -0,0 +1,107 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use deno_core::op2;
|
||||
use deno_core::OpState;
|
||||
|
||||
use crate::bridge::JsStepBody;
|
||||
use crate::state::WfeState;
|
||||
|
||||
/// Register a JS step type with the host.
|
||||
///
|
||||
/// On the Rust side this creates a factory that produces `JsStepBody` instances.
|
||||
/// On the JS side the caller also registers the actual function via
|
||||
/// `__wfe_registerStepFunction(stepType, fn)`.
|
||||
#[op2]
|
||||
pub async fn op_register_step(
|
||||
state: std::rc::Rc<std::cell::RefCell<OpState>>,
|
||||
#[string] step_type: String,
|
||||
) -> Result<(), deno_error::JsErrorBox> {
|
||||
let (host, tx) = {
|
||||
let s = state.borrow();
|
||||
let wfe = s.borrow::<WfeState>();
|
||||
(wfe.host()?.clone(), wfe.step_request_tx.clone())
|
||||
};
|
||||
let counter = Arc::new(std::sync::atomic::AtomicU32::new(0));
|
||||
|
||||
host.register_step_factory(
|
||||
&step_type,
|
||||
move || Box::new(JsStepBody::new(tx.clone(), counter.clone())),
|
||||
)
|
||||
.await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Poll for a step execution request from the Rust executor.
|
||||
///
|
||||
/// This is an async op that blocks until a step needs to be executed.
|
||||
/// Returns `{ requestId, stepType, context }` or null on shutdown.
|
||||
#[op2]
|
||||
#[serde]
|
||||
pub async fn op_step_executor_poll(
|
||||
state: std::rc::Rc<std::cell::RefCell<OpState>>,
|
||||
) -> Result<serde_json::Value, deno_error::JsErrorBox> {
|
||||
// Take the receiver out of state (only the first call gets it).
|
||||
let mut rx = {
|
||||
let mut s = state.borrow_mut();
|
||||
let wfe = s.borrow_mut::<WfeState>();
|
||||
match wfe.step_request_rx.take() {
|
||||
Some(rx) => rx,
|
||||
None => {
|
||||
return Err(deno_error::JsErrorBox::generic(
|
||||
"step executor poll already active",
|
||||
));
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// Wait for the next request.
|
||||
match rx.recv().await {
|
||||
Some(req) => {
|
||||
let request_id = req.request_id;
|
||||
let result = serde_json::json!({
|
||||
"requestId": request_id,
|
||||
"stepType": req.step_type,
|
||||
"context": req.context,
|
||||
});
|
||||
|
||||
// Store the response sender for op_step_executor_respond.
|
||||
{
|
||||
let mut s = state.borrow_mut();
|
||||
let wfe = s.borrow_mut::<WfeState>();
|
||||
wfe.inflight.insert(request_id, req.response_tx);
|
||||
// Put the receiver back so we can poll again.
|
||||
wfe.step_request_rx = Some(rx);
|
||||
}
|
||||
|
||||
Ok(result)
|
||||
}
|
||||
None => {
|
||||
// Channel closed — shutdown.
|
||||
Ok(serde_json::Value::Null)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Send a step execution result back to the Rust executor.
|
||||
#[op2]
|
||||
pub fn op_step_executor_respond(
|
||||
state: &mut OpState,
|
||||
#[smi] request_id: u32,
|
||||
#[serde] result: Option<serde_json::Value>,
|
||||
#[string] error: Option<String>,
|
||||
) -> Result<(), deno_error::JsErrorBox> {
|
||||
let wfe = state.borrow_mut::<WfeState>();
|
||||
let tx = wfe.inflight.remove(&request_id).ok_or_else(|| {
|
||||
deno_error::JsErrorBox::generic(format!("no inflight request with id {request_id}"))
|
||||
})?;
|
||||
|
||||
let response = match error {
|
||||
Some(err) => Err(err),
|
||||
None => Ok(result.unwrap_or(serde_json::json!({"proceed": true}))),
|
||||
};
|
||||
|
||||
// Ignore send failure — the executor may have timed out.
|
||||
let _ = tx.send(response);
|
||||
Ok(())
|
||||
}
|
||||
91
wfe-deno/src/ops/workflow.rs
Normal file
91
wfe-deno/src/ops/workflow.rs
Normal file
@@ -0,0 +1,91 @@
|
||||
use deno_core::op2;
|
||||
use deno_core::OpState;
|
||||
|
||||
use crate::state::WfeState;
|
||||
|
||||
/// Start a new workflow instance. Returns the workflow instance ID.
|
||||
#[op2]
|
||||
#[string]
|
||||
pub async fn op_start_workflow(
|
||||
state: std::rc::Rc<std::cell::RefCell<OpState>>,
|
||||
#[string] definition_id: String,
|
||||
#[smi] version: u32,
|
||||
#[serde] data: serde_json::Value,
|
||||
) -> Result<String, deno_error::JsErrorBox> {
|
||||
let host = {
|
||||
let s = state.borrow();
|
||||
let wfe = s.borrow::<WfeState>();
|
||||
wfe.host()?.clone()
|
||||
};
|
||||
host.start_workflow(&definition_id, version, data)
|
||||
.await
|
||||
.map_err(|e| deno_error::JsErrorBox::generic(format!("start_workflow failed: {e}")))
|
||||
}
|
||||
|
||||
/// Suspend a running workflow. Returns true if suspension succeeded.
|
||||
#[op2]
|
||||
pub async fn op_suspend_workflow(
|
||||
state: std::rc::Rc<std::cell::RefCell<OpState>>,
|
||||
#[string] id: String,
|
||||
) -> Result<bool, deno_error::JsErrorBox> {
|
||||
let host = {
|
||||
let s = state.borrow();
|
||||
let wfe = s.borrow::<WfeState>();
|
||||
wfe.host()?.clone()
|
||||
};
|
||||
host.suspend_workflow(&id)
|
||||
.await
|
||||
.map_err(|e| deno_error::JsErrorBox::generic(format!("suspend_workflow failed: {e}")))
|
||||
}
|
||||
|
||||
/// Resume a suspended workflow. Returns true if resumption succeeded.
|
||||
#[op2]
|
||||
pub async fn op_resume_workflow(
|
||||
state: std::rc::Rc<std::cell::RefCell<OpState>>,
|
||||
#[string] id: String,
|
||||
) -> Result<bool, deno_error::JsErrorBox> {
|
||||
let host = {
|
||||
let s = state.borrow();
|
||||
let wfe = s.borrow::<WfeState>();
|
||||
wfe.host()?.clone()
|
||||
};
|
||||
host.resume_workflow(&id)
|
||||
.await
|
||||
.map_err(|e| deno_error::JsErrorBox::generic(format!("resume_workflow failed: {e}")))
|
||||
}
|
||||
|
||||
/// Terminate a workflow. Returns true if termination succeeded.
|
||||
#[op2]
|
||||
pub async fn op_terminate_workflow(
|
||||
state: std::rc::Rc<std::cell::RefCell<OpState>>,
|
||||
#[string] id: String,
|
||||
) -> Result<bool, deno_error::JsErrorBox> {
|
||||
let host = {
|
||||
let s = state.borrow();
|
||||
let wfe = s.borrow::<WfeState>();
|
||||
wfe.host()?.clone()
|
||||
};
|
||||
host.terminate_workflow(&id)
|
||||
.await
|
||||
.map_err(|e| deno_error::JsErrorBox::generic(format!("terminate_workflow failed: {e}")))
|
||||
}
|
||||
|
||||
/// Get a workflow instance by ID. Returns the serialized WorkflowInstance.
|
||||
#[op2]
|
||||
#[serde]
|
||||
pub async fn op_get_workflow(
|
||||
state: std::rc::Rc<std::cell::RefCell<OpState>>,
|
||||
#[string] id: String,
|
||||
) -> Result<serde_json::Value, deno_error::JsErrorBox> {
|
||||
let host = {
|
||||
let s = state.borrow();
|
||||
let wfe = s.borrow::<WfeState>();
|
||||
wfe.host()?.clone()
|
||||
};
|
||||
let instance = host
|
||||
.get_workflow(&id)
|
||||
.await
|
||||
.map_err(|e| deno_error::JsErrorBox::generic(format!("get_workflow failed: {e}")))?;
|
||||
serde_json::to_value(&instance)
|
||||
.map_err(|e| deno_error::JsErrorBox::generic(format!("serialization failed: {e}")))
|
||||
}
|
||||
85
wfe-deno/src/state.rs
Normal file
85
wfe-deno/src/state.rs
Normal file
@@ -0,0 +1,85 @@
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
use tokio::sync::{mpsc, oneshot};
|
||||
use wfe::WorkflowHost;
|
||||
|
||||
use crate::bridge::StepRequest;
|
||||
use crate::ops::builder::JsBuilderState;
|
||||
|
||||
/// Central state shared between all ops via `OpState`.
|
||||
pub struct WfeState {
|
||||
pub host: Option<Arc<WorkflowHost>>,
|
||||
pub step_request_tx: mpsc::Sender<StepRequest>,
|
||||
pub step_request_rx: Option<mpsc::Receiver<StepRequest>>,
|
||||
pub builders: HashMap<u32, JsBuilderState>,
|
||||
pub next_builder_id: u32,
|
||||
pub inflight: HashMap<u32, oneshot::Sender<Result<serde_json::Value, String>>>,
|
||||
pub next_request_id: u32,
|
||||
}
|
||||
|
||||
impl WfeState {
|
||||
pub fn new(
|
||||
step_request_tx: mpsc::Sender<StepRequest>,
|
||||
step_request_rx: mpsc::Receiver<StepRequest>,
|
||||
) -> Self {
|
||||
Self {
|
||||
host: None,
|
||||
step_request_tx,
|
||||
step_request_rx: Some(step_request_rx),
|
||||
builders: HashMap::new(),
|
||||
next_builder_id: 0,
|
||||
inflight: HashMap::new(),
|
||||
next_request_id: 0,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn host(&self) -> Result<&Arc<WorkflowHost>, deno_error::JsErrorBox> {
|
||||
self.host.as_ref().ok_or_else(|| {
|
||||
deno_error::JsErrorBox::generic(
|
||||
"WorkflowHost not created yet — call op_host_create first",
|
||||
)
|
||||
})
|
||||
}
|
||||
|
||||
pub fn alloc_builder_id(&mut self) -> u32 {
|
||||
let id = self.next_builder_id;
|
||||
self.next_builder_id += 1;
|
||||
id
|
||||
}
|
||||
|
||||
pub fn alloc_request_id(&mut self) -> u32 {
|
||||
let id = self.next_request_id;
|
||||
self.next_request_id += 1;
|
||||
id
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn alloc_builder_id_increments() {
|
||||
let (tx, rx) = mpsc::channel(1);
|
||||
let mut state = WfeState::new(tx, rx);
|
||||
assert_eq!(state.alloc_builder_id(), 0);
|
||||
assert_eq!(state.alloc_builder_id(), 1);
|
||||
assert_eq!(state.alloc_builder_id(), 2);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn alloc_request_id_increments() {
|
||||
let (tx, rx) = mpsc::channel(1);
|
||||
let mut state = WfeState::new(tx, rx);
|
||||
assert_eq!(state.alloc_request_id(), 0);
|
||||
assert_eq!(state.alloc_request_id(), 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn host_returns_error_when_not_created() {
|
||||
let (tx, rx) = mpsc::channel(1);
|
||||
let state = WfeState::new(tx, rx);
|
||||
assert!(state.host().is_err());
|
||||
}
|
||||
}
|
||||
440
wfe-deno/tests/integration.rs
Normal file
440
wfe-deno/tests/integration.rs
Normal file
@@ -0,0 +1,440 @@
|
||||
use wfe_deno::create_wfe_runtime;
|
||||
|
||||
/// Helper: run a JS script in a fresh wfe runtime and drive the event loop.
|
||||
async fn run_js(code: &str) -> Result<(), Box<dyn std::error::Error>> {
|
||||
let mut runtime = create_wfe_runtime();
|
||||
runtime
|
||||
.execute_script("<test>", code.to_string())
|
||||
.map_err(|e| format!("script error: {e}"))?;
|
||||
runtime
|
||||
.run_event_loop(Default::default())
|
||||
.await
|
||||
.map_err(|e| format!("event loop error: {e}"))?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Helper: run a JS module in a fresh wfe runtime and drive the event loop.
|
||||
async fn run_module(code: &str) -> Result<(), Box<dyn std::error::Error>> {
|
||||
let mut runtime = create_wfe_runtime();
|
||||
let specifier =
|
||||
deno_core::ModuleSpecifier::parse("ext:wfe-deno/test-module.js").unwrap();
|
||||
let module_id = runtime
|
||||
.load_main_es_module_from_code(&specifier, code.to_string())
|
||||
.await
|
||||
.map_err(|e| format!("module load error: {e}"))?;
|
||||
let eval = runtime.mod_evaluate(module_id);
|
||||
runtime
|
||||
.run_event_loop(Default::default())
|
||||
.await
|
||||
.map_err(|e| format!("event loop error: {e}"))?;
|
||||
eval.await
|
||||
.map_err(|e| format!("module eval error: {e}"))?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn host_create_via_op() {
|
||||
run_js("Deno.core.ops.op_host_create()").await.unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn host_create_twice_fails() {
|
||||
let result = run_js(
|
||||
r#"
|
||||
Deno.core.ops.op_host_create();
|
||||
Deno.core.ops.op_host_create();
|
||||
"#,
|
||||
)
|
||||
.await;
|
||||
assert!(result.is_err());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn host_start_without_create_fails() {
|
||||
let _result = run_js("await Deno.core.ops.op_host_start()").await;
|
||||
// Script eval of bare await fails, use module mode
|
||||
let result = run_module("await Deno.core.ops.op_host_start()").await;
|
||||
assert!(result.is_err());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn host_create_and_start() {
|
||||
run_module(
|
||||
r#"
|
||||
Deno.core.ops.op_host_create();
|
||||
await Deno.core.ops.op_host_start();
|
||||
await Deno.core.ops.op_host_stop();
|
||||
"#,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn builder_create_returns_handle() {
|
||||
run_js(
|
||||
r#"
|
||||
Deno.core.ops.op_host_create();
|
||||
const h = Deno.core.ops.op_builder_create();
|
||||
if (typeof h !== "number") throw new Error("expected number handle");
|
||||
"#,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn builder_start_with_and_name() {
|
||||
run_js(
|
||||
r#"
|
||||
Deno.core.ops.op_host_create();
|
||||
const h = Deno.core.ops.op_builder_create();
|
||||
Deno.core.ops.op_builder_start_with(h, "my-step");
|
||||
Deno.core.ops.op_builder_name(h, "Step One");
|
||||
"#,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn builder_chain_and_build() {
|
||||
run_js(
|
||||
r#"
|
||||
Deno.core.ops.op_host_create();
|
||||
const h = Deno.core.ops.op_builder_create();
|
||||
Deno.core.ops.op_builder_start_with(h, "step-a");
|
||||
Deno.core.ops.op_builder_name(h, "First");
|
||||
Deno.core.ops.op_builder_config(h, { key: "val" });
|
||||
Deno.core.ops.op_builder_then(h, "step-b");
|
||||
Deno.core.ops.op_builder_name(h, "Second");
|
||||
const def = Deno.core.ops.op_builder_build(h, "test-wf", 1);
|
||||
if (def.id !== "test-wf") throw new Error("wrong id: " + def.id);
|
||||
if (def.version !== 1) throw new Error("wrong version");
|
||||
if (def.steps.length !== 2) throw new Error("expected 2 steps, got " + def.steps.length);
|
||||
if (def.steps[0].name !== "First") throw new Error("wrong step 0 name");
|
||||
if (def.steps[1].name !== "Second") throw new Error("wrong step 1 name");
|
||||
"#,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn builder_config_and_on_error() {
|
||||
run_js(
|
||||
r#"
|
||||
Deno.core.ops.op_host_create();
|
||||
const h = Deno.core.ops.op_builder_create();
|
||||
Deno.core.ops.op_builder_start_with(h, "step-a");
|
||||
Deno.core.ops.op_builder_config(h, { ns: "ory" });
|
||||
Deno.core.ops.op_builder_on_error(h, "suspend");
|
||||
const def = Deno.core.ops.op_builder_build(h, "test", 1);
|
||||
if (!def.steps[0].step_config) throw new Error("missing step_config");
|
||||
if (def.steps[0].step_config.ns !== "ory") throw new Error("wrong config");
|
||||
"#,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn builder_on_error_retry() {
|
||||
run_js(
|
||||
r#"
|
||||
Deno.core.ops.op_host_create();
|
||||
const h = Deno.core.ops.op_builder_create();
|
||||
Deno.core.ops.op_builder_start_with(h, "step-a");
|
||||
Deno.core.ops.op_builder_on_error(h, { retry: { interval: 1000, maxRetries: 5 } });
|
||||
const def = Deno.core.ops.op_builder_build(h, "test", 1);
|
||||
"#,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn builder_delay_step() {
|
||||
run_js(
|
||||
r#"
|
||||
Deno.core.ops.op_host_create();
|
||||
const h = Deno.core.ops.op_builder_create();
|
||||
Deno.core.ops.op_builder_start_with(h, "step-a");
|
||||
Deno.core.ops.op_builder_delay(h, 5000);
|
||||
const def = Deno.core.ops.op_builder_build(h, "test", 1);
|
||||
if (def.steps.length !== 2) throw new Error("expected 2 steps (original + delay)");
|
||||
"#,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn builder_wait_for_step() {
|
||||
run_js(
|
||||
r#"
|
||||
Deno.core.ops.op_host_create();
|
||||
const h = Deno.core.ops.op_builder_create();
|
||||
Deno.core.ops.op_builder_start_with(h, "step-a");
|
||||
Deno.core.ops.op_builder_wait_for(h, "order.paid", "order-123");
|
||||
const def = Deno.core.ops.op_builder_build(h, "test", 1);
|
||||
if (def.steps.length !== 2) throw new Error("expected 2 steps");
|
||||
"#,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn builder_register_with_host() {
|
||||
run_module(
|
||||
r#"
|
||||
Deno.core.ops.op_host_create();
|
||||
const h = Deno.core.ops.op_builder_create();
|
||||
Deno.core.ops.op_builder_start_with(h, "step-a");
|
||||
Deno.core.ops.op_builder_name(h, "First");
|
||||
await Deno.core.ops.op_builder_register(h, "registered-wf", 1);
|
||||
"#,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn builder_start_with_twice_fails() {
|
||||
let result = run_js(
|
||||
r#"
|
||||
Deno.core.ops.op_host_create();
|
||||
const h = Deno.core.ops.op_builder_create();
|
||||
Deno.core.ops.op_builder_start_with(h, "a");
|
||||
Deno.core.ops.op_builder_start_with(h, "b");
|
||||
"#,
|
||||
)
|
||||
.await;
|
||||
assert!(result.is_err());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn builder_then_without_start_fails() {
|
||||
let result = run_js(
|
||||
r#"
|
||||
Deno.core.ops.op_host_create();
|
||||
const h = Deno.core.ops.op_builder_create();
|
||||
Deno.core.ops.op_builder_then(h, "b");
|
||||
"#,
|
||||
)
|
||||
.await;
|
||||
assert!(result.is_err());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn builder_name_without_step_fails() {
|
||||
let result = run_js(
|
||||
r#"
|
||||
Deno.core.ops.op_host_create();
|
||||
const h = Deno.core.ops.op_builder_create();
|
||||
Deno.core.ops.op_builder_name(h, "oops");
|
||||
"#,
|
||||
)
|
||||
.await;
|
||||
assert!(result.is_err());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn builder_invalid_handle_fails() {
|
||||
let result = run_js(
|
||||
r#"
|
||||
Deno.core.ops.op_host_create();
|
||||
Deno.core.ops.op_builder_start_with(999, "a");
|
||||
"#,
|
||||
)
|
||||
.await;
|
||||
assert!(result.is_err());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn builder_build_invalid_handle_fails() {
|
||||
let result = run_js(
|
||||
r#"
|
||||
Deno.core.ops.op_host_create();
|
||||
Deno.core.ops.op_builder_build(999, "id", 1);
|
||||
"#,
|
||||
)
|
||||
.await;
|
||||
assert!(result.is_err());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn workflow_management_ops() {
|
||||
run_module(
|
||||
r#"
|
||||
Deno.core.ops.op_host_create();
|
||||
await Deno.core.ops.op_host_start();
|
||||
|
||||
// Register a no-op step and workflow.
|
||||
const h = Deno.core.ops.op_builder_create();
|
||||
Deno.core.ops.op_builder_start_with(h, "noop");
|
||||
await Deno.core.ops.op_builder_register(h, "mgmt-test", 1);
|
||||
|
||||
// Start a workflow (it will just sit with unknown step type).
|
||||
const wfId = await Deno.core.ops.op_start_workflow("mgmt-test", 1, { foo: "bar" });
|
||||
if (typeof wfId !== "string") throw new Error("expected string ID");
|
||||
|
||||
// Get the workflow.
|
||||
const wf = await Deno.core.ops.op_get_workflow(wfId);
|
||||
if (wf.id !== wfId) throw new Error("wrong id");
|
||||
if (wf.data.foo !== "bar") throw new Error("wrong data");
|
||||
|
||||
// Suspend.
|
||||
const suspended = await Deno.core.ops.op_suspend_workflow(wfId);
|
||||
if (!suspended) throw new Error("expected suspend to succeed");
|
||||
|
||||
// Resume.
|
||||
const resumed = await Deno.core.ops.op_resume_workflow(wfId);
|
||||
if (!resumed) throw new Error("expected resume to succeed");
|
||||
|
||||
// Terminate.
|
||||
const terminated = await Deno.core.ops.op_terminate_workflow(wfId);
|
||||
if (!terminated) throw new Error("expected terminate to succeed");
|
||||
|
||||
// Suspend after terminate should return false.
|
||||
const again = await Deno.core.ops.op_suspend_workflow(wfId);
|
||||
if (again) throw new Error("expected suspend after terminate to fail");
|
||||
|
||||
await Deno.core.ops.op_host_stop();
|
||||
"#,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn step_executor_respond_invalid_id_fails() {
|
||||
let result = run_js(
|
||||
r#"
|
||||
Deno.core.ops.op_host_create();
|
||||
Deno.core.ops.op_step_executor_respond(999, { proceed: true }, null);
|
||||
"#,
|
||||
)
|
||||
.await;
|
||||
assert!(result.is_err());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn wfe_js_api_execution_result_factories() {
|
||||
run_module(
|
||||
r#"
|
||||
import { ExecutionResult } from "ext:wfe-deno/wfe.js";
|
||||
|
||||
const next = ExecutionResult.next();
|
||||
if (!next.proceed) throw new Error("next should proceed");
|
||||
|
||||
const outcome = ExecutionResult.outcome("branch-a");
|
||||
if (outcome.outcomeValue !== "branch-a") throw new Error("wrong outcome");
|
||||
|
||||
const persist = ExecutionResult.persist({ page: 2 });
|
||||
if (persist.proceed) throw new Error("persist should not proceed");
|
||||
if (persist.persistenceData.page !== 2) throw new Error("wrong persist data");
|
||||
|
||||
const sleep = ExecutionResult.sleep(5000);
|
||||
if (sleep.sleepFor !== 5000) throw new Error("wrong sleep");
|
||||
|
||||
const wait = ExecutionResult.waitForEvent("ev", "k");
|
||||
if (wait.eventName !== "ev") throw new Error("wrong event name");
|
||||
|
||||
const branch = ExecutionResult.branch([1, 2]);
|
||||
if (branch.branchValues.length !== 2) throw new Error("wrong branch");
|
||||
|
||||
const output = ExecutionResult.output({ done: true });
|
||||
if (!output.proceed) throw new Error("output should proceed");
|
||||
if (!output.outputData.done) throw new Error("wrong output");
|
||||
"#,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn wfe_js_api_workflow_host_class() {
|
||||
run_module(
|
||||
r#"
|
||||
import { WorkflowHost } from "ext:wfe-deno/wfe.js";
|
||||
|
||||
const host = WorkflowHost.create();
|
||||
await host.start();
|
||||
await host.stop();
|
||||
"#,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn wfe_js_api_workflow_builder_class() {
|
||||
run_module(
|
||||
r#"
|
||||
import { WorkflowHost, WorkflowBuilder } from "ext:wfe-deno/wfe.js";
|
||||
|
||||
const host = WorkflowHost.create();
|
||||
const b = new WorkflowBuilder();
|
||||
b.startWith("step-a").name("First").config({ key: "val" }).then("step-b").name("Second");
|
||||
const def = b.build("builder-test", 1);
|
||||
if (def.steps.length !== 2) throw new Error("expected 2 steps");
|
||||
if (def.steps[0].name !== "First") throw new Error("wrong name");
|
||||
"#,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn wfe_js_api_build_workflow_helper() {
|
||||
run_module(
|
||||
r#"
|
||||
import { WorkflowHost } from "ext:wfe-deno/wfe.js";
|
||||
|
||||
const host = WorkflowHost.create();
|
||||
await host.buildWorkflow("helper-test", 1, (b) => {
|
||||
b.startWith("step-a").name("A").then("step-b").name("B");
|
||||
});
|
||||
"#,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn publish_event_op() {
|
||||
run_module(
|
||||
r#"
|
||||
Deno.core.ops.op_host_create();
|
||||
await Deno.core.ops.op_host_start();
|
||||
await Deno.core.ops.op_publish_event("test.event", "key-1", { data: true });
|
||||
await Deno.core.ops.op_host_stop();
|
||||
"#,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn start_workflow_without_host_fails() {
|
||||
let result = run_module(
|
||||
r#"
|
||||
await Deno.core.ops.op_start_workflow("nope", 1, {});
|
||||
"#,
|
||||
)
|
||||
.await;
|
||||
assert!(result.is_err());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn register_step_without_host_fails() {
|
||||
let result = run_module(
|
||||
r#"
|
||||
await Deno.core.ops.op_register_step("nope");
|
||||
"#,
|
||||
)
|
||||
.await;
|
||||
assert!(result.is_err());
|
||||
}
|
||||
@@ -14,9 +14,9 @@ path = "src/main.rs"
|
||||
[dependencies]
|
||||
# Internal
|
||||
wfe-core = { workspace = true, features = ["test-support"] }
|
||||
wfe = { path = "../wfe" }
|
||||
wfe-yaml = { path = "../wfe-yaml", features = ["rustlang", "buildkit", "containerd"] }
|
||||
wfe-server-protos = { path = "../wfe-server-protos" }
|
||||
wfe = { version = "1.7.0", path = "../wfe", registry = "sunbeam" }
|
||||
wfe-yaml = { version = "1.7.0", path = "../wfe-yaml", registry = "sunbeam", features = ["rustlang", "buildkit", "containerd"] }
|
||||
wfe-server-protos = { version = "1.7.0", path = "../wfe-server-protos", registry = "sunbeam" }
|
||||
wfe-sqlite = { workspace = true }
|
||||
wfe-postgres = { workspace = true }
|
||||
wfe-valkey = { workspace = true }
|
||||
|
||||
@@ -293,7 +293,9 @@ impl WorkflowHost {
|
||||
// Create initial execution pointer for step 0 if the definition has steps.
|
||||
let mut instance = WorkflowInstance::new(definition_id, version, data);
|
||||
if !definition.steps.is_empty() {
|
||||
instance.execution_pointers.push(ExecutionPointer::new(0));
|
||||
let mut pointer = ExecutionPointer::new(0);
|
||||
pointer.step_name = definition.steps.first().and_then(|s| s.name.clone());
|
||||
instance.execution_pointers.push(pointer);
|
||||
}
|
||||
|
||||
// Persist the instance.
|
||||
|
||||
Reference in New Issue
Block a user