diff --git a/Cargo.toml b/Cargo.toml index c3d90c5..558f093 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,5 @@ [workspace] -members = ["wfe-core", "wfe-sqlite", "wfe-postgres", "wfe-opensearch", "wfe-valkey", "wfe", "wfe-yaml", "wfe-buildkit", "wfe-containerd", "wfe-containerd-protos", "wfe-buildkit-protos", "wfe-rustlang", "wfe-server-protos", "wfe-server"] +members = ["wfe-core", "wfe-sqlite", "wfe-postgres", "wfe-opensearch", "wfe-valkey", "wfe", "wfe-yaml", "wfe-buildkit", "wfe-containerd", "wfe-containerd-protos", "wfe-buildkit-protos", "wfe-rustlang", "wfe-server-protos", "wfe-server", "wfe-deno"] resolver = "2" [workspace.package] diff --git a/wfe-deno/Cargo.toml b/wfe-deno/Cargo.toml new file mode 100644 index 0000000..794bb13 --- /dev/null +++ b/wfe-deno/Cargo.toml @@ -0,0 +1,26 @@ +[package] +name = "wfe-deno" +version.workspace = true +edition.workspace = true +license.workspace = true +repository.workspace = true +homepage.workspace = true +description = "Deno bindings for the WFE workflow engine" + +[dependencies] +wfe-core = { workspace = true, features = ["test-support"] } +wfe = { path = "../wfe" } +deno_core = { workspace = true } +deno_error = { workspace = true } +tokio = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +async-trait = { workspace = true } +tracing = { workspace = true } +chrono = { workspace = true } + +[dev-dependencies] +pretty_assertions = { workspace = true } +tokio = { workspace = true, features = ["test-util"] } +tokio-util = "0.7" +tempfile = { workspace = true } diff --git a/wfe-deno/src/bridge.rs b/wfe-deno/src/bridge.rs new file mode 100644 index 0000000..7d29d3b --- /dev/null +++ b/wfe-deno/src/bridge.rs @@ -0,0 +1,444 @@ +use async_trait::async_trait; +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; +use std::time::Duration; +use tokio::sync::{mpsc, oneshot}; +use wfe_core::models::ExecutionResult; +use wfe_core::traits::step::{StepBody, StepExecutionContext}; +use wfe_core::WfeError; + +/// A request sent from the executor (tokio) to the V8 thread. +pub struct StepRequest { + pub request_id: u32, + pub step_type: String, + pub context: serde_json::Value, + pub response_tx: oneshot::Sender>, +} + +/// A `StepBody` implementation that bridges to JavaScript via channels. +/// +/// When the workflow executor calls `run()`, this sends a serialized +/// `StepExecutionContext` to the V8 thread and awaits the response. +pub struct JsStepBody { + request_tx: mpsc::Sender, + request_id_counter: std::sync::Arc, +} + +impl JsStepBody { + pub fn new( + request_tx: mpsc::Sender, + request_id_counter: std::sync::Arc, + ) -> Self { + Self { + request_tx, + request_id_counter, + } + } +} + +#[async_trait] +impl StepBody for JsStepBody { + async fn run( + &mut self, + context: &StepExecutionContext<'_>, + ) -> wfe_core::Result { + let ctx_json = serialize_context(context); + let (tx, rx) = oneshot::channel(); + let request_id = self + .request_id_counter + .fetch_add(1, std::sync::atomic::Ordering::Relaxed); + + self.request_tx + .send(StepRequest { + request_id, + step_type: context.step.step_type.clone(), + context: ctx_json, + response_tx: tx, + }) + .await + .map_err(|_| WfeError::StepExecution("step request channel closed".into()))?; + + let result_json = rx + .await + .map_err(|_| WfeError::StepExecution("step response channel dropped".into()))? + .map_err(WfeError::StepExecution)?; + + deserialize_execution_result(&result_json) + } +} + +/// Serializable view of `StepExecutionContext` for passing to JavaScript. +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +pub struct JsStepContext { + pub item: Option, + pub persistence_data: Option, + pub step: JsStepInfo, + pub workflow: JsWorkflowInfo, + pub pointer: JsPointerInfo, +} + +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +pub struct JsStepInfo { + pub id: usize, + pub name: Option, + pub step_type: String, + pub step_config: Option, +} + +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +pub struct JsWorkflowInfo { + pub id: String, + pub definition_id: String, + pub version: u32, + pub status: String, + pub data: serde_json::Value, + pub create_time: DateTime, +} + +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +pub struct JsPointerInfo { + pub id: String, + pub step_id: usize, + pub step_name: Option, + pub retry_count: u32, +} + +/// Serialize a `StepExecutionContext` into a JSON value for JavaScript. +pub fn serialize_context(ctx: &StepExecutionContext<'_>) -> serde_json::Value { + let js_ctx = JsStepContext { + item: ctx.item.cloned(), + persistence_data: ctx.persistence_data.cloned(), + step: JsStepInfo { + id: ctx.step.id, + name: ctx.step.name.clone(), + step_type: ctx.step.step_type.clone(), + step_config: ctx.step.step_config.clone(), + }, + workflow: JsWorkflowInfo { + id: ctx.workflow.id.clone(), + definition_id: ctx.workflow.workflow_definition_id.clone(), + version: ctx.workflow.version, + status: format!("{:?}", ctx.workflow.status), + data: ctx.workflow.data.clone(), + create_time: ctx.workflow.create_time, + }, + pointer: JsPointerInfo { + id: ctx.execution_pointer.id.clone(), + step_id: ctx.execution_pointer.step_id, + step_name: ctx.execution_pointer.step_name.clone(), + retry_count: ctx.execution_pointer.retry_count, + }, + }; + serde_json::to_value(js_ctx).unwrap_or(serde_json::Value::Null) +} + +/// Shape of an `ExecutionResult` as returned from JavaScript. +#[derive(Deserialize, Default)] +#[serde(rename_all = "camelCase")] +pub struct JsExecutionResult { + #[serde(default = "default_true")] + pub proceed: bool, + pub outcome_value: Option, + pub sleep_for: Option, + pub persistence_data: Option, + pub event_name: Option, + pub event_key: Option, + pub branch_values: Option>, + pub output_data: Option, +} + +fn default_true() -> bool { + true +} + +/// Deserialize a JavaScript execution result into the Rust `ExecutionResult`. +pub fn deserialize_execution_result( + value: &serde_json::Value, +) -> wfe_core::Result { + let js_result: JsExecutionResult = serde_json::from_value(value.clone()).map_err(|e| { + WfeError::StepExecution(format!("failed to deserialize ExecutionResult from JS: {e}")) + })?; + + Ok(ExecutionResult { + proceed: js_result.proceed, + outcome_value: js_result.outcome_value, + sleep_for: js_result.sleep_for.map(Duration::from_millis), + persistence_data: js_result.persistence_data, + event_name: js_result.event_name, + event_key: js_result.event_key, + event_as_of: None, + branch_values: js_result.branch_values, + poll_endpoint: None, + output_data: js_result.output_data, + }) +} + +#[cfg(test)] +mod tests { + use super::*; + use pretty_assertions::assert_eq; + use wfe_core::models::{ExecutionPointer, WorkflowInstance, WorkflowStatus, WorkflowStep}; + + fn make_test_context() -> (WorkflowInstance, WorkflowStep, ExecutionPointer) { + let instance = WorkflowInstance { + id: "wf-1".into(), + workflow_definition_id: "test-def".into(), + version: 1, + description: None, + reference: None, + execution_pointers: vec![], + next_execution: None, + status: WorkflowStatus::Runnable, + data: serde_json::json!({"name": "World"}), + create_time: Utc::now(), + complete_time: None, + }; + let step = WorkflowStep::new(0, "MyStep"); + let pointer = ExecutionPointer::new(0); + (instance, step, pointer) + } + + #[test] + fn serialize_context_produces_valid_json() { + let (instance, mut step, pointer) = make_test_context(); + step.name = Some("greet".into()); + step.step_config = Some(serde_json::json!({"key": "val"})); + + let ctx = StepExecutionContext { + item: None, + execution_pointer: &pointer, + persistence_data: None, + step: &step, + workflow: &instance, + cancellation_token: tokio_util::sync::CancellationToken::new(), + host_context: None, + log_sink: None, + }; + + let json = serialize_context(&ctx); + assert_eq!(json["step"]["name"], "greet"); + assert_eq!(json["step"]["stepConfig"]["key"], "val"); + assert_eq!(json["workflow"]["data"]["name"], "World"); + assert_eq!(json["workflow"]["definitionId"], "test-def"); + assert_eq!(json["pointer"]["stepId"], 0); + } + + #[test] + fn serialize_context_with_item() { + let (instance, step, pointer) = make_test_context(); + let item = serde_json::json!({"id": 42}); + + let ctx = StepExecutionContext { + item: Some(&item), + execution_pointer: &pointer, + persistence_data: Some(&serde_json::json!({"saved": true})), + step: &step, + workflow: &instance, + cancellation_token: tokio_util::sync::CancellationToken::new(), + host_context: None, + log_sink: None, + }; + + let json = serialize_context(&ctx); + assert_eq!(json["item"]["id"], 42); + assert_eq!(json["persistenceData"]["saved"], true); + } + + #[test] + fn deserialize_next_result() { + let json = serde_json::json!({"proceed": true}); + let result = deserialize_execution_result(&json).unwrap(); + assert!(result.proceed); + assert!(result.outcome_value.is_none()); + } + + #[test] + fn deserialize_outcome_result() { + let json = serde_json::json!({ + "proceed": true, + "outcomeValue": "branch-a" + }); + let result = deserialize_execution_result(&json).unwrap(); + assert!(result.proceed); + assert_eq!(result.outcome_value, Some(serde_json::json!("branch-a"))); + } + + #[test] + fn deserialize_sleep_result() { + let json = serde_json::json!({ + "proceed": false, + "sleepFor": 5000 + }); + let result = deserialize_execution_result(&json).unwrap(); + assert!(!result.proceed); + assert_eq!(result.sleep_for, Some(Duration::from_millis(5000))); + } + + #[test] + fn deserialize_persist_result() { + let json = serde_json::json!({ + "proceed": false, + "persistenceData": {"page": 2} + }); + let result = deserialize_execution_result(&json).unwrap(); + assert!(!result.proceed); + assert_eq!( + result.persistence_data, + Some(serde_json::json!({"page": 2})) + ); + } + + #[test] + fn deserialize_wait_for_event_result() { + let json = serde_json::json!({ + "proceed": false, + "eventName": "order.paid", + "eventKey": "order-123" + }); + let result = deserialize_execution_result(&json).unwrap(); + assert_eq!(result.event_name, Some("order.paid".into())); + assert_eq!(result.event_key, Some("order-123".into())); + } + + #[test] + fn deserialize_branch_result() { + let json = serde_json::json!({ + "proceed": false, + "branchValues": [1, 2, 3] + }); + let result = deserialize_execution_result(&json).unwrap(); + assert_eq!( + result.branch_values, + Some(vec![ + serde_json::json!(1), + serde_json::json!(2), + serde_json::json!(3) + ]) + ); + } + + #[test] + fn deserialize_output_data_result() { + let json = serde_json::json!({ + "proceed": true, + "outputData": {"greeted": "World"} + }); + let result = deserialize_execution_result(&json).unwrap(); + assert!(result.proceed); + assert_eq!( + result.output_data, + Some(serde_json::json!({"greeted": "World"})) + ); + } + + #[test] + fn deserialize_empty_object_defaults_to_proceed() { + let json = serde_json::json!({}); + let result = deserialize_execution_result(&json).unwrap(); + assert!(result.proceed); + } + + #[test] + fn deserialize_invalid_json_returns_error() { + let json = serde_json::json!("not an object"); + let result = deserialize_execution_result(&json); + assert!(result.is_err()); + } + + #[tokio::test] + async fn js_step_body_channel_round_trip() { + let (tx, mut rx) = mpsc::channel::(16); + let counter = std::sync::Arc::new(std::sync::atomic::AtomicU32::new(0)); + let mut body = JsStepBody::new(tx, counter); + + let (instance, step, pointer) = make_test_context(); + let ctx = StepExecutionContext { + item: None, + execution_pointer: &pointer, + persistence_data: None, + step: &step, + workflow: &instance, + cancellation_token: tokio_util::sync::CancellationToken::new(), + host_context: None, + log_sink: None, + }; + + // Spawn a "JS side" that responds to the request. + let responder = tokio::spawn(async move { + let req = rx.recv().await.unwrap(); + assert_eq!(req.step_type, "MyStep"); + assert_eq!(req.request_id, 0); + req.response_tx + .send(Ok(serde_json::json!({"proceed": true, "outputData": {"done": true}}))) + .unwrap(); + }); + + let result = body.run(&ctx).await.unwrap(); + assert!(result.proceed); + assert_eq!(result.output_data, Some(serde_json::json!({"done": true}))); + + responder.await.unwrap(); + } + + #[tokio::test] + async fn js_step_body_propagates_js_error() { + let (tx, mut rx) = mpsc::channel::(16); + let counter = std::sync::Arc::new(std::sync::atomic::AtomicU32::new(0)); + let mut body = JsStepBody::new(tx, counter); + + let (instance, step, pointer) = make_test_context(); + let ctx = StepExecutionContext { + item: None, + execution_pointer: &pointer, + persistence_data: None, + step: &step, + workflow: &instance, + cancellation_token: tokio_util::sync::CancellationToken::new(), + host_context: None, + log_sink: None, + }; + + tokio::spawn(async move { + let req = rx.recv().await.unwrap(); + req.response_tx + .send(Err("TypeError: undefined is not a function".into())) + .unwrap(); + }); + + let result = body.run(&ctx).await; + assert!(result.is_err()); + let err_msg = format!("{}", result.unwrap_err()); + assert!(err_msg.contains("TypeError")); + } + + #[tokio::test] + async fn js_step_body_handles_dropped_responder() { + let (tx, mut rx) = mpsc::channel::(16); + let counter = std::sync::Arc::new(std::sync::atomic::AtomicU32::new(0)); + let mut body = JsStepBody::new(tx, counter); + + let (instance, step, pointer) = make_test_context(); + let ctx = StepExecutionContext { + item: None, + execution_pointer: &pointer, + persistence_data: None, + step: &step, + workflow: &instance, + cancellation_token: tokio_util::sync::CancellationToken::new(), + host_context: None, + log_sink: None, + }; + + tokio::spawn(async move { + let req = rx.recv().await.unwrap(); + drop(req.response_tx); // Drop without sending + }); + + let result = body.run(&ctx).await; + assert!(result.is_err()); + } +} diff --git a/wfe-deno/src/js/bootstrap.js b/wfe-deno/src/js/bootstrap.js new file mode 100644 index 0000000..fafdd67 --- /dev/null +++ b/wfe-deno/src/js/bootstrap.js @@ -0,0 +1,50 @@ +// Import wfe.js so it's evaluated and available for user code. +import "ext:wfe-deno/wfe.js"; + +// Step function registry -- maps step type names to JS functions. +const _stepFunctions = {}; + +// Called by WorkflowHost.registerStep() to store the JS function. +globalThis.__wfe_registerStepFunction = (stepType, fn) => { + _stepFunctions[stepType] = fn; +}; + +// Step executor loop -- runs in the background, dispatching step execution +// requests from the Rust executor to registered JS functions. +// +// This loop starts automatically when the extension loads. It blocks on +// op_step_executor_poll (async op) until a step needs to execute, then +// calls the matching JS function and sends the result back. +globalThis.__wfe_startExecutorLoop = () => { + (async () => { + while (true) { + let req; + try { + req = await Deno.core.ops.op_step_executor_poll(); + } catch (_) { + // Poll op failed (e.g. already active) -- stop loop. + break; + } + if (req === null || req === undefined) break; // shutdown signal + + try { + const fn = _stepFunctions[req.stepType]; + if (!fn) { + throw new Error(`No step function registered for "${req.stepType}"`); + } + const result = await fn(req.context); + Deno.core.ops.op_step_executor_respond( + req.requestId, + result || { proceed: true }, + null, + ); + } catch (e) { + Deno.core.ops.op_step_executor_respond( + req.requestId, + null, + e.message || String(e), + ); + } + } + })(); +}; diff --git a/wfe-deno/src/js/wfe.js b/wfe-deno/src/js/wfe.js new file mode 100644 index 0000000..c76bc23 --- /dev/null +++ b/wfe-deno/src/js/wfe.js @@ -0,0 +1,188 @@ +// High-level WFE API for Deno. +// +// Usage: +// import { WorkflowHost, WorkflowBuilder, ExecutionResult } from "ext:wfe-deno/wfe.js"; + +/// Factory methods for step execution results. +export class ExecutionResult { + /** Proceed to the next step. */ + static next() { + return { proceed: true }; + } + + /** Proceed with a routing outcome value. */ + static outcome(value) { + return { proceed: true, outcomeValue: value }; + } + + /** Pause and persist step state for later resumption. */ + static persist(data) { + return { proceed: false, persistenceData: data }; + } + + /** Sleep for the given number of milliseconds. */ + static sleep(ms, persistenceData) { + const r = { proceed: false, sleepFor: ms }; + if (persistenceData !== undefined) r.persistenceData = persistenceData; + return r; + } + + /** Wait for an external event before continuing. */ + static waitForEvent(eventName, eventKey) { + return { proceed: false, eventName, eventKey }; + } + + /** Fork into parallel branches. */ + static branch(values) { + return { proceed: false, branchValues: values }; + } + + /** Proceed and attach output data. */ + static output(data) { + return { proceed: true, outputData: data }; + } +} + +/// Workflow engine host. +export class WorkflowHost { + /** Create a WorkflowHost with in-memory providers. */ + static create() { + Deno.core.ops.op_host_create(); + return new WorkflowHost(); + } + + /** Start the background workflow and event consumer loops. */ + async start() { + await Deno.core.ops.op_host_start(); + } + + /** Stop the host gracefully. */ + async stop() { + await Deno.core.ops.op_host_stop(); + } + + /** + * Register a JavaScript function as a workflow step. + * + * @param {string} stepType - The step type name (used in builder). + * @param {function} fn - Async function receiving context, returning ExecutionResult. + */ + async registerStep(stepType, fn) { + // Start the executor loop on first step registration. + if (!this._executorStarted) { + globalThis.__wfe_startExecutorLoop(); + this._executorStarted = true; + } + await Deno.core.ops.op_register_step(stepType); + globalThis.__wfe_registerStepFunction(stepType, fn); + } + + /** + * Start a new workflow instance. + * + * @param {string} definitionId - The workflow definition ID. + * @param {number} version - The workflow version. + * @param {object} data - Initial workflow data. + * @returns {Promise} The workflow instance ID. + */ + async startWorkflow(definitionId, version, data) { + return await Deno.core.ops.op_start_workflow(definitionId, version, data); + } + + /** Suspend a running workflow. */ + async suspendWorkflow(id) { + return await Deno.core.ops.op_suspend_workflow(id); + } + + /** Resume a suspended workflow. */ + async resumeWorkflow(id) { + return await Deno.core.ops.op_resume_workflow(id); + } + + /** Terminate a workflow. */ + async terminateWorkflow(id) { + return await Deno.core.ops.op_terminate_workflow(id); + } + + /** Get a workflow instance by ID. */ + async getWorkflow(id) { + return await Deno.core.ops.op_get_workflow(id); + } + + /** Publish an event for waiting workflows. */ + async publishEvent(eventName, eventKey, data) { + await Deno.core.ops.op_publish_event(eventName, eventKey, data); + } + + /** + * Build and register a workflow definition using a fluent builder. + * + * @param {string} id - Workflow definition ID. + * @param {number} version - Workflow version. + * @param {function} builderFn - Function receiving a WorkflowBuilder. + */ + async buildWorkflow(id, version, builderFn) { + const b = new WorkflowBuilder(); + builderFn(b); + await b._register(id, version); + } +} + +/// Fluent workflow builder wrapping the Rust WorkflowBuilder via ops. +export class WorkflowBuilder { + constructor() { + this._handle = Deno.core.ops.op_builder_create(); + } + + /** Add the first step. */ + startWith(stepType) { + Deno.core.ops.op_builder_start_with(this._handle, stepType); + return this; + } + + /** Chain the next step. */ + then(stepType) { + Deno.core.ops.op_builder_then(this._handle, stepType); + return this; + } + + /** Set the current step's display name. */ + name(n) { + Deno.core.ops.op_builder_name(this._handle, n); + return this; + } + + /** Attach JSON configuration to the current step. */ + config(c) { + Deno.core.ops.op_builder_config(this._handle, c); + return this; + } + + /** Set error handling behavior for the current step. */ + onError(behavior) { + Deno.core.ops.op_builder_on_error(this._handle, behavior); + return this; + } + + /** Add a delay step (milliseconds). */ + delay(ms) { + Deno.core.ops.op_builder_delay(this._handle, ms); + return this; + } + + /** Add a wait-for-event step. */ + waitFor(eventName, eventKey) { + Deno.core.ops.op_builder_wait_for(this._handle, eventName, eventKey); + return this; + } + + /** Build and return the workflow definition as JSON. */ + build(id, version) { + return Deno.core.ops.op_builder_build(this._handle, id, version); + } + + /** Build and register the definition with the host. */ + async _register(id, version) { + await Deno.core.ops.op_builder_register(this._handle, id, version); + } +} diff --git a/wfe-deno/src/lib.rs b/wfe-deno/src/lib.rs new file mode 100644 index 0000000..77dbf2f --- /dev/null +++ b/wfe-deno/src/lib.rs @@ -0,0 +1,63 @@ +pub mod bridge; +pub mod ops; +pub mod state; + +use deno_core::{JsRuntime, RuntimeOptions}; +use tokio::sync::mpsc; + +use crate::ops::wfe_deno_ext; +use crate::state::WfeState; + +/// Create a `JsRuntime` with the wfe-deno extension loaded. +/// +/// The runtime is ready to execute JavaScript that uses the WFE API: +/// ```js +/// import { WorkflowHost, ExecutionResult } from "ext:wfe-deno/wfe.js"; +/// ``` +pub fn create_wfe_runtime() -> JsRuntime { + create_wfe_runtime_with_channel_size(256) +} + +/// Create a `JsRuntime` with a custom step-request channel buffer size. +pub fn create_wfe_runtime_with_channel_size(channel_size: usize) -> JsRuntime { + let ext = wfe_deno_ext::init(); + + let runtime = JsRuntime::new(RuntimeOptions { + extensions: vec![ext], + ..Default::default() + }); + + let (tx, rx) = mpsc::channel(channel_size); + { + let op_state = runtime.op_state(); + let mut op_state = op_state.borrow_mut(); + op_state.put(WfeState::new(tx, rx)); + } + + runtime +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn create_runtime_succeeds() { + let _runtime = create_wfe_runtime(); + } + + #[test] + fn create_runtime_with_custom_channel_size() { + let _runtime = create_wfe_runtime_with_channel_size(1); + } + + #[test] + fn runtime_has_wfe_state() { + let runtime = create_wfe_runtime(); + let op_state = runtime.op_state(); + let op_state = op_state.borrow(); + let wfe = op_state.borrow::(); + assert!(wfe.host.is_none()); + assert!(wfe.step_request_rx.is_some()); + } +} diff --git a/wfe-deno/src/ops/builder.rs b/wfe-deno/src/ops/builder.rs new file mode 100644 index 0000000..0c13ad7 --- /dev/null +++ b/wfe-deno/src/ops/builder.rs @@ -0,0 +1,312 @@ +use std::time::Duration; + +use deno_core::op2; +use deno_core::OpState; +use wfe_core::builder::WorkflowBuilder; +use wfe_core::models::ErrorBehavior; + +use crate::state::WfeState; + +/// Internal builder state that tracks the WorkflowBuilder and current step. +/// +/// We don't use StepBuilder (pub(crate)) — instead we work directly with +/// WorkflowBuilder's public `steps` field, `add_step`, and `wire_outcome`. +pub struct JsBuilderState { + pub wb: WorkflowBuilder, + pub current_step: Option, +} + +/// Create a new WorkflowBuilder and return its handle. +#[op2(fast)] +#[smi] +pub fn op_builder_create(state: &mut OpState) -> u32 { + let wfe = state.borrow_mut::(); + let id = wfe.alloc_builder_id(); + wfe.builders.insert( + id, + JsBuilderState { + wb: WorkflowBuilder::::new(), + current_step: None, + }, + ); + id +} + +/// Add the first step via `start_with`. +#[op2(fast)] +pub fn op_builder_start_with( + state: &mut OpState, + #[smi] handle: u32, + #[string] step_type: String, +) -> Result<(), deno_error::JsErrorBox> { + let wfe = state.borrow_mut::(); + let bs = get_builder(wfe, handle)?; + + if bs.current_step.is_some() { + return Err(deno_error::JsErrorBox::generic( + "start_with already called on this builder", + )); + } + + let step_id = bs.wb.add_step(&step_type); + bs.current_step = Some(step_id); + Ok(()) +} + +/// Chain the next step via `then`. +#[op2(fast)] +pub fn op_builder_then( + state: &mut OpState, + #[smi] handle: u32, + #[string] step_type: String, +) -> Result<(), deno_error::JsErrorBox> { + let wfe = state.borrow_mut::(); + let bs = get_builder(wfe, handle)?; + + let prev_id = bs + .current_step + .ok_or_else(|| deno_error::JsErrorBox::generic("call start_with before then"))?; + + let next_id = bs.wb.add_step(&step_type); + bs.wb.wire_outcome(prev_id, next_id, None); + bs.current_step = Some(next_id); + Ok(()) +} + +/// Set the current step's name. +#[op2(fast)] +pub fn op_builder_name( + state: &mut OpState, + #[smi] handle: u32, + #[string] name: String, +) -> Result<(), deno_error::JsErrorBox> { + let wfe = state.borrow_mut::(); + let bs = get_builder(wfe, handle)?; + let step_id = current_step(bs)?; + bs.wb.steps[step_id].name = Some(name); + Ok(()) +} + +/// Set the current step's JSON config. +#[op2] +pub fn op_builder_config( + state: &mut OpState, + #[smi] handle: u32, + #[serde] config: serde_json::Value, +) -> Result<(), deno_error::JsErrorBox> { + let wfe = state.borrow_mut::(); + let bs = get_builder(wfe, handle)?; + let step_id = current_step(bs)?; + bs.wb.steps[step_id].step_config = Some(config); + Ok(()) +} + +/// Set the current step's error behavior. +#[op2] +pub fn op_builder_on_error( + state: &mut OpState, + #[smi] handle: u32, + #[serde] behavior: serde_json::Value, +) -> Result<(), deno_error::JsErrorBox> { + let wfe = state.borrow_mut::(); + let bs = get_builder(wfe, handle)?; + let step_id = current_step(bs)?; + let eb = parse_error_behavior(&behavior)?; + bs.wb.steps[step_id].error_behavior = Some(eb); + Ok(()) +} + +/// Chain a delay step after the current step. +#[op2(fast)] +pub fn op_builder_delay( + state: &mut OpState, + #[smi] handle: u32, + #[number] ms: u64, +) -> Result<(), deno_error::JsErrorBox> { + let wfe = state.borrow_mut::(); + let bs = get_builder(wfe, handle)?; + let prev_id = current_step(bs)?; + + let delay_type = std::any::type_name::(); + let next_id = bs.wb.add_step(delay_type); + bs.wb.steps[next_id].step_config = Some(serde_json::json!({ + "duration_millis": ms, + })); + bs.wb.wire_outcome(prev_id, next_id, None); + bs.current_step = Some(next_id); + Ok(()) +} + +/// Chain a wait-for-event step after the current step. +#[op2(fast)] +pub fn op_builder_wait_for( + state: &mut OpState, + #[smi] handle: u32, + #[string] event_name: String, + #[string] event_key: String, +) -> Result<(), deno_error::JsErrorBox> { + let wfe = state.borrow_mut::(); + let bs = get_builder(wfe, handle)?; + let prev_id = current_step(bs)?; + + let wait_type = std::any::type_name::(); + let next_id = bs.wb.add_step(wait_type); + bs.wb.steps[next_id].step_config = Some(serde_json::json!({ + "event_name": event_name, + "event_key": event_key, + })); + bs.wb.wire_outcome(prev_id, next_id, None); + bs.current_step = Some(next_id); + Ok(()) +} + +/// Build the workflow definition and return it as JSON. Consumes the builder. +#[op2] +#[serde] +pub fn op_builder_build( + state: &mut OpState, + #[smi] handle: u32, + #[string] id: String, + #[smi] version: u32, +) -> Result { + let wfe = state.borrow_mut::(); + let bs = wfe + .builders + .remove(&handle) + .ok_or_else(|| deno_error::JsErrorBox::generic("invalid builder handle"))?; + + let def = bs.wb.build(&id, version); + serde_json::to_value(&def) + .map_err(|e| deno_error::JsErrorBox::generic(format!("serialization failed: {e}"))) +} + +/// Build the workflow definition and register it with the host. Consumes the builder. +#[op2] +pub async fn op_builder_register( + state: std::rc::Rc>, + #[smi] handle: u32, + #[string] id: String, + #[smi] version: u32, +) -> Result<(), deno_error::JsErrorBox> { + let (def, host) = { + let mut s = state.borrow_mut(); + let wfe = s.borrow_mut::(); + let bs = wfe + .builders + .remove(&handle) + .ok_or_else(|| deno_error::JsErrorBox::generic("invalid builder handle"))?; + let def = bs.wb.build(&id, version); + let host = wfe.host()?.clone(); + (def, host) + }; + + host.register_workflow_definition(def).await; + Ok(()) +} + +fn get_builder( + wfe: &mut WfeState, + handle: u32, +) -> Result<&mut JsBuilderState, deno_error::JsErrorBox> { + wfe.builders + .get_mut(&handle) + .ok_or_else(|| deno_error::JsErrorBox::generic("invalid builder handle")) +} + +fn current_step(bs: &JsBuilderState) -> Result { + bs.current_step + .ok_or_else(|| deno_error::JsErrorBox::generic("no current step — call start_with first")) +} + +fn parse_error_behavior( + value: &serde_json::Value, +) -> Result { + match value.as_str() { + Some("suspend") => Ok(ErrorBehavior::Suspend), + Some("terminate") => Ok(ErrorBehavior::Terminate), + Some("compensate") => Ok(ErrorBehavior::Compensate), + _ => { + if let Some(retry) = value.get("retry") { + let interval = retry + .get("interval") + .and_then(|v| v.as_u64()) + .unwrap_or(60_000); + let max_retries = retry + .get("maxRetries") + .and_then(|v| v.as_u64()) + .unwrap_or(3) as u32; + Ok(ErrorBehavior::Retry { + interval: Duration::from_millis(interval), + max_retries, + }) + } else { + Err(deno_error::JsErrorBox::generic(format!( + "invalid error behavior: {value}" + ))) + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use pretty_assertions::assert_eq; + + #[test] + fn parse_suspend_behavior() { + let eb = parse_error_behavior(&serde_json::json!("suspend")).unwrap(); + assert!(matches!(eb, ErrorBehavior::Suspend)); + } + + #[test] + fn parse_terminate_behavior() { + let eb = parse_error_behavior(&serde_json::json!("terminate")).unwrap(); + assert!(matches!(eb, ErrorBehavior::Terminate)); + } + + #[test] + fn parse_compensate_behavior() { + let eb = parse_error_behavior(&serde_json::json!("compensate")).unwrap(); + assert!(matches!(eb, ErrorBehavior::Compensate)); + } + + #[test] + fn parse_retry_behavior() { + let eb = parse_error_behavior( + &serde_json::json!({"retry": {"interval": 5000, "maxRetries": 5}}), + ) + .unwrap(); + match eb { + ErrorBehavior::Retry { + interval, + max_retries, + } => { + assert_eq!(interval, Duration::from_millis(5000)); + assert_eq!(max_retries, 5); + } + _ => panic!("expected Retry"), + } + } + + #[test] + fn parse_retry_behavior_defaults() { + let eb = parse_error_behavior(&serde_json::json!({"retry": {}})).unwrap(); + match eb { + ErrorBehavior::Retry { + interval, + max_retries, + } => { + assert_eq!(interval, Duration::from_millis(60_000)); + assert_eq!(max_retries, 3); + } + _ => panic!("expected Retry"), + } + } + + #[test] + fn parse_invalid_behavior_returns_error() { + let result = parse_error_behavior(&serde_json::json!("nonsense")); + assert!(result.is_err()); + } +} diff --git a/wfe-deno/src/ops/event.rs b/wfe-deno/src/ops/event.rs new file mode 100644 index 0000000..1290cbc --- /dev/null +++ b/wfe-deno/src/ops/event.rs @@ -0,0 +1,22 @@ +use deno_core::op2; +use deno_core::OpState; + +use crate::state::WfeState; + +/// Publish an event to the workflow host for matching subscriptions. +#[op2] +pub async fn op_publish_event( + state: std::rc::Rc>, + #[string] event_name: String, + #[string] event_key: String, + #[serde] data: serde_json::Value, +) -> Result<(), deno_error::JsErrorBox> { + let host = { + let s = state.borrow(); + let wfe = s.borrow::(); + wfe.host()?.clone() + }; + host.publish_event(&event_name, &event_key, data) + .await + .map_err(|e| deno_error::JsErrorBox::generic(format!("publish_event failed: {e}"))) +} diff --git a/wfe-deno/src/ops/host.rs b/wfe-deno/src/ops/host.rs new file mode 100644 index 0000000..0dc541a --- /dev/null +++ b/wfe-deno/src/ops/host.rs @@ -0,0 +1,62 @@ +use std::sync::Arc; + +use deno_core::op2; +use deno_core::OpState; + +use crate::state::WfeState; + +/// Create a WorkflowHost with in-memory providers and store it in state. +#[op2(fast)] +pub fn op_host_create(state: &mut OpState) -> Result<(), deno_error::JsErrorBox> { + let wfe = state.borrow_mut::(); + if wfe.host.is_some() { + return Err(deno_error::JsErrorBox::generic( + "WorkflowHost already created", + )); + } + + let persistence = Arc::new(wfe_core::test_support::InMemoryPersistenceProvider::new()); + let lock = Arc::new(wfe_core::test_support::InMemoryLockProvider::new()); + let queue = Arc::new(wfe_core::test_support::InMemoryQueueProvider::new()); + let lifecycle = Arc::new(wfe_core::test_support::InMemoryLifecyclePublisher::new()); + + let host = wfe::WorkflowHostBuilder::new() + .use_persistence(persistence) + .use_lock_provider(lock) + .use_queue_provider(queue) + .use_lifecycle(lifecycle) + .build() + .map_err(|e| deno_error::JsErrorBox::generic(format!("Failed to build host: {e}")))?; + + wfe.host = Some(Arc::new(host)); + Ok(()) +} + +/// Start the WorkflowHost background tasks. +#[op2] +pub async fn op_host_start( + state: std::rc::Rc>, +) -> Result<(), deno_error::JsErrorBox> { + let host = { + let s = state.borrow(); + let wfe = s.borrow::(); + wfe.host()?.clone() + }; + host.start() + .await + .map_err(|e| deno_error::JsErrorBox::generic(format!("Failed to start host: {e}"))) +} + +/// Stop the WorkflowHost gracefully. +#[op2] +pub async fn op_host_stop( + state: std::rc::Rc>, +) -> Result<(), deno_error::JsErrorBox> { + let host = { + let s = state.borrow(); + let wfe = s.borrow::(); + wfe.host()?.clone() + }; + host.stop().await; + Ok(()) +} diff --git a/wfe-deno/src/ops/mod.rs b/wfe-deno/src/ops/mod.rs new file mode 100644 index 0000000..99de1c4 --- /dev/null +++ b/wfe-deno/src/ops/mod.rs @@ -0,0 +1,43 @@ +pub mod builder; +pub mod event; +pub mod host; +pub mod step; +pub mod workflow; + +deno_core::extension!( + wfe_deno_ext, + ops = [ + // Host lifecycle + host::op_host_create, + host::op_host_start, + host::op_host_stop, + // Workflow management + workflow::op_start_workflow, + workflow::op_suspend_workflow, + workflow::op_resume_workflow, + workflow::op_terminate_workflow, + workflow::op_get_workflow, + // Builder + builder::op_builder_create, + builder::op_builder_start_with, + builder::op_builder_then, + builder::op_builder_name, + builder::op_builder_config, + builder::op_builder_on_error, + builder::op_builder_delay, + builder::op_builder_wait_for, + builder::op_builder_build, + builder::op_builder_register, + // Step execution bridge + step::op_register_step, + step::op_step_executor_poll, + step::op_step_executor_respond, + // Events + event::op_publish_event, + ], + esm_entry_point = "ext:wfe-deno/bootstrap.js", + esm = [ + "ext:wfe-deno/bootstrap.js" = "src/js/bootstrap.js", + "ext:wfe-deno/wfe.js" = "src/js/wfe.js", + ], +); diff --git a/wfe-deno/src/ops/step.rs b/wfe-deno/src/ops/step.rs new file mode 100644 index 0000000..5a51c93 --- /dev/null +++ b/wfe-deno/src/ops/step.rs @@ -0,0 +1,107 @@ +use std::sync::Arc; + +use deno_core::op2; +use deno_core::OpState; + +use crate::bridge::JsStepBody; +use crate::state::WfeState; + +/// Register a JS step type with the host. +/// +/// On the Rust side this creates a factory that produces `JsStepBody` instances. +/// On the JS side the caller also registers the actual function via +/// `__wfe_registerStepFunction(stepType, fn)`. +#[op2] +pub async fn op_register_step( + state: std::rc::Rc>, + #[string] step_type: String, +) -> Result<(), deno_error::JsErrorBox> { + let (host, tx) = { + let s = state.borrow(); + let wfe = s.borrow::(); + (wfe.host()?.clone(), wfe.step_request_tx.clone()) + }; + let counter = Arc::new(std::sync::atomic::AtomicU32::new(0)); + + host.register_step_factory( + &step_type, + move || Box::new(JsStepBody::new(tx.clone(), counter.clone())), + ) + .await; + + Ok(()) +} + +/// Poll for a step execution request from the Rust executor. +/// +/// This is an async op that blocks until a step needs to be executed. +/// Returns `{ requestId, stepType, context }` or null on shutdown. +#[op2] +#[serde] +pub async fn op_step_executor_poll( + state: std::rc::Rc>, +) -> Result { + // Take the receiver out of state (only the first call gets it). + let mut rx = { + let mut s = state.borrow_mut(); + let wfe = s.borrow_mut::(); + match wfe.step_request_rx.take() { + Some(rx) => rx, + None => { + return Err(deno_error::JsErrorBox::generic( + "step executor poll already active", + )); + } + } + }; + + // Wait for the next request. + match rx.recv().await { + Some(req) => { + let request_id = req.request_id; + let result = serde_json::json!({ + "requestId": request_id, + "stepType": req.step_type, + "context": req.context, + }); + + // Store the response sender for op_step_executor_respond. + { + let mut s = state.borrow_mut(); + let wfe = s.borrow_mut::(); + wfe.inflight.insert(request_id, req.response_tx); + // Put the receiver back so we can poll again. + wfe.step_request_rx = Some(rx); + } + + Ok(result) + } + None => { + // Channel closed — shutdown. + Ok(serde_json::Value::Null) + } + } +} + +/// Send a step execution result back to the Rust executor. +#[op2] +pub fn op_step_executor_respond( + state: &mut OpState, + #[smi] request_id: u32, + #[serde] result: Option, + #[string] error: Option, +) -> Result<(), deno_error::JsErrorBox> { + let wfe = state.borrow_mut::(); + let tx = wfe.inflight.remove(&request_id).ok_or_else(|| { + deno_error::JsErrorBox::generic(format!("no inflight request with id {request_id}")) + })?; + + let response = match error { + Some(err) => Err(err), + None => Ok(result.unwrap_or(serde_json::json!({"proceed": true}))), + }; + + // Ignore send failure — the executor may have timed out. + let _ = tx.send(response); + Ok(()) +} diff --git a/wfe-deno/src/ops/workflow.rs b/wfe-deno/src/ops/workflow.rs new file mode 100644 index 0000000..b257cda --- /dev/null +++ b/wfe-deno/src/ops/workflow.rs @@ -0,0 +1,91 @@ +use deno_core::op2; +use deno_core::OpState; + +use crate::state::WfeState; + +/// Start a new workflow instance. Returns the workflow instance ID. +#[op2] +#[string] +pub async fn op_start_workflow( + state: std::rc::Rc>, + #[string] definition_id: String, + #[smi] version: u32, + #[serde] data: serde_json::Value, +) -> Result { + let host = { + let s = state.borrow(); + let wfe = s.borrow::(); + wfe.host()?.clone() + }; + host.start_workflow(&definition_id, version, data) + .await + .map_err(|e| deno_error::JsErrorBox::generic(format!("start_workflow failed: {e}"))) +} + +/// Suspend a running workflow. Returns true if suspension succeeded. +#[op2] +pub async fn op_suspend_workflow( + state: std::rc::Rc>, + #[string] id: String, +) -> Result { + let host = { + let s = state.borrow(); + let wfe = s.borrow::(); + wfe.host()?.clone() + }; + host.suspend_workflow(&id) + .await + .map_err(|e| deno_error::JsErrorBox::generic(format!("suspend_workflow failed: {e}"))) +} + +/// Resume a suspended workflow. Returns true if resumption succeeded. +#[op2] +pub async fn op_resume_workflow( + state: std::rc::Rc>, + #[string] id: String, +) -> Result { + let host = { + let s = state.borrow(); + let wfe = s.borrow::(); + wfe.host()?.clone() + }; + host.resume_workflow(&id) + .await + .map_err(|e| deno_error::JsErrorBox::generic(format!("resume_workflow failed: {e}"))) +} + +/// Terminate a workflow. Returns true if termination succeeded. +#[op2] +pub async fn op_terminate_workflow( + state: std::rc::Rc>, + #[string] id: String, +) -> Result { + let host = { + let s = state.borrow(); + let wfe = s.borrow::(); + wfe.host()?.clone() + }; + host.terminate_workflow(&id) + .await + .map_err(|e| deno_error::JsErrorBox::generic(format!("terminate_workflow failed: {e}"))) +} + +/// Get a workflow instance by ID. Returns the serialized WorkflowInstance. +#[op2] +#[serde] +pub async fn op_get_workflow( + state: std::rc::Rc>, + #[string] id: String, +) -> Result { + let host = { + let s = state.borrow(); + let wfe = s.borrow::(); + wfe.host()?.clone() + }; + let instance = host + .get_workflow(&id) + .await + .map_err(|e| deno_error::JsErrorBox::generic(format!("get_workflow failed: {e}")))?; + serde_json::to_value(&instance) + .map_err(|e| deno_error::JsErrorBox::generic(format!("serialization failed: {e}"))) +} diff --git a/wfe-deno/src/state.rs b/wfe-deno/src/state.rs new file mode 100644 index 0000000..4fbeed4 --- /dev/null +++ b/wfe-deno/src/state.rs @@ -0,0 +1,85 @@ +use std::collections::HashMap; +use std::sync::Arc; + +use tokio::sync::{mpsc, oneshot}; +use wfe::WorkflowHost; + +use crate::bridge::StepRequest; +use crate::ops::builder::JsBuilderState; + +/// Central state shared between all ops via `OpState`. +pub struct WfeState { + pub host: Option>, + pub step_request_tx: mpsc::Sender, + pub step_request_rx: Option>, + pub builders: HashMap, + pub next_builder_id: u32, + pub inflight: HashMap>>, + pub next_request_id: u32, +} + +impl WfeState { + pub fn new( + step_request_tx: mpsc::Sender, + step_request_rx: mpsc::Receiver, + ) -> Self { + Self { + host: None, + step_request_tx, + step_request_rx: Some(step_request_rx), + builders: HashMap::new(), + next_builder_id: 0, + inflight: HashMap::new(), + next_request_id: 0, + } + } + + pub fn host(&self) -> Result<&Arc, deno_error::JsErrorBox> { + self.host.as_ref().ok_or_else(|| { + deno_error::JsErrorBox::generic( + "WorkflowHost not created yet — call op_host_create first", + ) + }) + } + + pub fn alloc_builder_id(&mut self) -> u32 { + let id = self.next_builder_id; + self.next_builder_id += 1; + id + } + + pub fn alloc_request_id(&mut self) -> u32 { + let id = self.next_request_id; + self.next_request_id += 1; + id + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn alloc_builder_id_increments() { + let (tx, rx) = mpsc::channel(1); + let mut state = WfeState::new(tx, rx); + assert_eq!(state.alloc_builder_id(), 0); + assert_eq!(state.alloc_builder_id(), 1); + assert_eq!(state.alloc_builder_id(), 2); + } + + #[test] + fn alloc_request_id_increments() { + let (tx, rx) = mpsc::channel(1); + let mut state = WfeState::new(tx, rx); + assert_eq!(state.alloc_request_id(), 0); + assert_eq!(state.alloc_request_id(), 1); + } + + #[test] + fn host_returns_error_when_not_created() { + let (tx, rx) = mpsc::channel(1); + let state = WfeState::new(tx, rx); + assert!(state.host().is_err()); + } +} diff --git a/wfe-deno/tests/integration.rs b/wfe-deno/tests/integration.rs new file mode 100644 index 0000000..8e3e813 --- /dev/null +++ b/wfe-deno/tests/integration.rs @@ -0,0 +1,440 @@ +use wfe_deno::create_wfe_runtime; + +/// Helper: run a JS script in a fresh wfe runtime and drive the event loop. +async fn run_js(code: &str) -> Result<(), Box> { + let mut runtime = create_wfe_runtime(); + runtime + .execute_script("", code.to_string()) + .map_err(|e| format!("script error: {e}"))?; + runtime + .run_event_loop(Default::default()) + .await + .map_err(|e| format!("event loop error: {e}"))?; + Ok(()) +} + +/// Helper: run a JS module in a fresh wfe runtime and drive the event loop. +async fn run_module(code: &str) -> Result<(), Box> { + let mut runtime = create_wfe_runtime(); + let specifier = + deno_core::ModuleSpecifier::parse("ext:wfe-deno/test-module.js").unwrap(); + let module_id = runtime + .load_main_es_module_from_code(&specifier, code.to_string()) + .await + .map_err(|e| format!("module load error: {e}"))?; + let eval = runtime.mod_evaluate(module_id); + runtime + .run_event_loop(Default::default()) + .await + .map_err(|e| format!("event loop error: {e}"))?; + eval.await + .map_err(|e| format!("module eval error: {e}"))?; + Ok(()) +} + +#[tokio::test] +async fn host_create_via_op() { + run_js("Deno.core.ops.op_host_create()").await.unwrap(); +} + +#[tokio::test] +async fn host_create_twice_fails() { + let result = run_js( + r#" + Deno.core.ops.op_host_create(); + Deno.core.ops.op_host_create(); + "#, + ) + .await; + assert!(result.is_err()); +} + +#[tokio::test] +async fn host_start_without_create_fails() { + let _result = run_js("await Deno.core.ops.op_host_start()").await; + // Script eval of bare await fails, use module mode + let result = run_module("await Deno.core.ops.op_host_start()").await; + assert!(result.is_err()); +} + +#[tokio::test] +async fn host_create_and_start() { + run_module( + r#" + Deno.core.ops.op_host_create(); + await Deno.core.ops.op_host_start(); + await Deno.core.ops.op_host_stop(); + "#, + ) + .await + .unwrap(); +} + +#[tokio::test] +async fn builder_create_returns_handle() { + run_js( + r#" + Deno.core.ops.op_host_create(); + const h = Deno.core.ops.op_builder_create(); + if (typeof h !== "number") throw new Error("expected number handle"); + "#, + ) + .await + .unwrap(); +} + +#[tokio::test] +async fn builder_start_with_and_name() { + run_js( + r#" + Deno.core.ops.op_host_create(); + const h = Deno.core.ops.op_builder_create(); + Deno.core.ops.op_builder_start_with(h, "my-step"); + Deno.core.ops.op_builder_name(h, "Step One"); + "#, + ) + .await + .unwrap(); +} + +#[tokio::test] +async fn builder_chain_and_build() { + run_js( + r#" + Deno.core.ops.op_host_create(); + const h = Deno.core.ops.op_builder_create(); + Deno.core.ops.op_builder_start_with(h, "step-a"); + Deno.core.ops.op_builder_name(h, "First"); + Deno.core.ops.op_builder_config(h, { key: "val" }); + Deno.core.ops.op_builder_then(h, "step-b"); + Deno.core.ops.op_builder_name(h, "Second"); + const def = Deno.core.ops.op_builder_build(h, "test-wf", 1); + if (def.id !== "test-wf") throw new Error("wrong id: " + def.id); + if (def.version !== 1) throw new Error("wrong version"); + if (def.steps.length !== 2) throw new Error("expected 2 steps, got " + def.steps.length); + if (def.steps[0].name !== "First") throw new Error("wrong step 0 name"); + if (def.steps[1].name !== "Second") throw new Error("wrong step 1 name"); + "#, + ) + .await + .unwrap(); +} + +#[tokio::test] +async fn builder_config_and_on_error() { + run_js( + r#" + Deno.core.ops.op_host_create(); + const h = Deno.core.ops.op_builder_create(); + Deno.core.ops.op_builder_start_with(h, "step-a"); + Deno.core.ops.op_builder_config(h, { ns: "ory" }); + Deno.core.ops.op_builder_on_error(h, "suspend"); + const def = Deno.core.ops.op_builder_build(h, "test", 1); + if (!def.steps[0].step_config) throw new Error("missing step_config"); + if (def.steps[0].step_config.ns !== "ory") throw new Error("wrong config"); + "#, + ) + .await + .unwrap(); +} + +#[tokio::test] +async fn builder_on_error_retry() { + run_js( + r#" + Deno.core.ops.op_host_create(); + const h = Deno.core.ops.op_builder_create(); + Deno.core.ops.op_builder_start_with(h, "step-a"); + Deno.core.ops.op_builder_on_error(h, { retry: { interval: 1000, maxRetries: 5 } }); + const def = Deno.core.ops.op_builder_build(h, "test", 1); + "#, + ) + .await + .unwrap(); +} + +#[tokio::test] +async fn builder_delay_step() { + run_js( + r#" + Deno.core.ops.op_host_create(); + const h = Deno.core.ops.op_builder_create(); + Deno.core.ops.op_builder_start_with(h, "step-a"); + Deno.core.ops.op_builder_delay(h, 5000); + const def = Deno.core.ops.op_builder_build(h, "test", 1); + if (def.steps.length !== 2) throw new Error("expected 2 steps (original + delay)"); + "#, + ) + .await + .unwrap(); +} + +#[tokio::test] +async fn builder_wait_for_step() { + run_js( + r#" + Deno.core.ops.op_host_create(); + const h = Deno.core.ops.op_builder_create(); + Deno.core.ops.op_builder_start_with(h, "step-a"); + Deno.core.ops.op_builder_wait_for(h, "order.paid", "order-123"); + const def = Deno.core.ops.op_builder_build(h, "test", 1); + if (def.steps.length !== 2) throw new Error("expected 2 steps"); + "#, + ) + .await + .unwrap(); +} + +#[tokio::test] +async fn builder_register_with_host() { + run_module( + r#" + Deno.core.ops.op_host_create(); + const h = Deno.core.ops.op_builder_create(); + Deno.core.ops.op_builder_start_with(h, "step-a"); + Deno.core.ops.op_builder_name(h, "First"); + await Deno.core.ops.op_builder_register(h, "registered-wf", 1); + "#, + ) + .await + .unwrap(); +} + +#[tokio::test] +async fn builder_start_with_twice_fails() { + let result = run_js( + r#" + Deno.core.ops.op_host_create(); + const h = Deno.core.ops.op_builder_create(); + Deno.core.ops.op_builder_start_with(h, "a"); + Deno.core.ops.op_builder_start_with(h, "b"); + "#, + ) + .await; + assert!(result.is_err()); +} + +#[tokio::test] +async fn builder_then_without_start_fails() { + let result = run_js( + r#" + Deno.core.ops.op_host_create(); + const h = Deno.core.ops.op_builder_create(); + Deno.core.ops.op_builder_then(h, "b"); + "#, + ) + .await; + assert!(result.is_err()); +} + +#[tokio::test] +async fn builder_name_without_step_fails() { + let result = run_js( + r#" + Deno.core.ops.op_host_create(); + const h = Deno.core.ops.op_builder_create(); + Deno.core.ops.op_builder_name(h, "oops"); + "#, + ) + .await; + assert!(result.is_err()); +} + +#[tokio::test] +async fn builder_invalid_handle_fails() { + let result = run_js( + r#" + Deno.core.ops.op_host_create(); + Deno.core.ops.op_builder_start_with(999, "a"); + "#, + ) + .await; + assert!(result.is_err()); +} + +#[tokio::test] +async fn builder_build_invalid_handle_fails() { + let result = run_js( + r#" + Deno.core.ops.op_host_create(); + Deno.core.ops.op_builder_build(999, "id", 1); + "#, + ) + .await; + assert!(result.is_err()); +} + +#[tokio::test] +async fn workflow_management_ops() { + run_module( + r#" + Deno.core.ops.op_host_create(); + await Deno.core.ops.op_host_start(); + + // Register a no-op step and workflow. + const h = Deno.core.ops.op_builder_create(); + Deno.core.ops.op_builder_start_with(h, "noop"); + await Deno.core.ops.op_builder_register(h, "mgmt-test", 1); + + // Start a workflow (it will just sit with unknown step type). + const wfId = await Deno.core.ops.op_start_workflow("mgmt-test", 1, { foo: "bar" }); + if (typeof wfId !== "string") throw new Error("expected string ID"); + + // Get the workflow. + const wf = await Deno.core.ops.op_get_workflow(wfId); + if (wf.id !== wfId) throw new Error("wrong id"); + if (wf.data.foo !== "bar") throw new Error("wrong data"); + + // Suspend. + const suspended = await Deno.core.ops.op_suspend_workflow(wfId); + if (!suspended) throw new Error("expected suspend to succeed"); + + // Resume. + const resumed = await Deno.core.ops.op_resume_workflow(wfId); + if (!resumed) throw new Error("expected resume to succeed"); + + // Terminate. + const terminated = await Deno.core.ops.op_terminate_workflow(wfId); + if (!terminated) throw new Error("expected terminate to succeed"); + + // Suspend after terminate should return false. + const again = await Deno.core.ops.op_suspend_workflow(wfId); + if (again) throw new Error("expected suspend after terminate to fail"); + + await Deno.core.ops.op_host_stop(); + "#, + ) + .await + .unwrap(); +} + +#[tokio::test] +async fn step_executor_respond_invalid_id_fails() { + let result = run_js( + r#" + Deno.core.ops.op_host_create(); + Deno.core.ops.op_step_executor_respond(999, { proceed: true }, null); + "#, + ) + .await; + assert!(result.is_err()); +} + +#[tokio::test] +async fn wfe_js_api_execution_result_factories() { + run_module( + r#" + import { ExecutionResult } from "ext:wfe-deno/wfe.js"; + + const next = ExecutionResult.next(); + if (!next.proceed) throw new Error("next should proceed"); + + const outcome = ExecutionResult.outcome("branch-a"); + if (outcome.outcomeValue !== "branch-a") throw new Error("wrong outcome"); + + const persist = ExecutionResult.persist({ page: 2 }); + if (persist.proceed) throw new Error("persist should not proceed"); + if (persist.persistenceData.page !== 2) throw new Error("wrong persist data"); + + const sleep = ExecutionResult.sleep(5000); + if (sleep.sleepFor !== 5000) throw new Error("wrong sleep"); + + const wait = ExecutionResult.waitForEvent("ev", "k"); + if (wait.eventName !== "ev") throw new Error("wrong event name"); + + const branch = ExecutionResult.branch([1, 2]); + if (branch.branchValues.length !== 2) throw new Error("wrong branch"); + + const output = ExecutionResult.output({ done: true }); + if (!output.proceed) throw new Error("output should proceed"); + if (!output.outputData.done) throw new Error("wrong output"); + "#, + ) + .await + .unwrap(); +} + +#[tokio::test] +async fn wfe_js_api_workflow_host_class() { + run_module( + r#" + import { WorkflowHost } from "ext:wfe-deno/wfe.js"; + + const host = WorkflowHost.create(); + await host.start(); + await host.stop(); + "#, + ) + .await + .unwrap(); +} + +#[tokio::test] +async fn wfe_js_api_workflow_builder_class() { + run_module( + r#" + import { WorkflowHost, WorkflowBuilder } from "ext:wfe-deno/wfe.js"; + + const host = WorkflowHost.create(); + const b = new WorkflowBuilder(); + b.startWith("step-a").name("First").config({ key: "val" }).then("step-b").name("Second"); + const def = b.build("builder-test", 1); + if (def.steps.length !== 2) throw new Error("expected 2 steps"); + if (def.steps[0].name !== "First") throw new Error("wrong name"); + "#, + ) + .await + .unwrap(); +} + +#[tokio::test] +async fn wfe_js_api_build_workflow_helper() { + run_module( + r#" + import { WorkflowHost } from "ext:wfe-deno/wfe.js"; + + const host = WorkflowHost.create(); + await host.buildWorkflow("helper-test", 1, (b) => { + b.startWith("step-a").name("A").then("step-b").name("B"); + }); + "#, + ) + .await + .unwrap(); +} + +#[tokio::test] +async fn publish_event_op() { + run_module( + r#" + Deno.core.ops.op_host_create(); + await Deno.core.ops.op_host_start(); + await Deno.core.ops.op_publish_event("test.event", "key-1", { data: true }); + await Deno.core.ops.op_host_stop(); + "#, + ) + .await + .unwrap(); +} + +#[tokio::test] +async fn start_workflow_without_host_fails() { + let result = run_module( + r#" + await Deno.core.ops.op_start_workflow("nope", 1, {}); + "#, + ) + .await; + assert!(result.is_err()); +} + +#[tokio::test] +async fn register_step_without_host_fails() { + let result = run_module( + r#" + await Deno.core.ops.op_register_step("nope"); + "#, + ) + .await; + assert!(result.is_err()); +}