diff --git a/wfe-opensearch/Cargo.toml b/wfe-opensearch/Cargo.toml new file mode 100644 index 0000000..61ef731 --- /dev/null +++ b/wfe-opensearch/Cargo.toml @@ -0,0 +1,25 @@ +[package] +name = "wfe-opensearch" +version.workspace = true +edition.workspace = true +license.workspace = true +description = "OpenSearch index provider for WFE" + +[dependencies] +wfe-core = { workspace = true } +opensearch = { workspace = true } +tokio = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +async-trait = { workspace = true } +thiserror = { workspace = true } +tracing = { workspace = true } + +[dev-dependencies] +wfe-core = { workspace = true, features = ["test-support"] } +pretty_assertions = { workspace = true } +rstest = { workspace = true } +tokio = { workspace = true, features = ["test-util"] } +uuid = { workspace = true } +chrono = { workspace = true } +opensearch = { workspace = true } diff --git a/wfe-opensearch/src/lib.rs b/wfe-opensearch/src/lib.rs new file mode 100644 index 0000000..463dbb8 --- /dev/null +++ b/wfe-opensearch/src/lib.rs @@ -0,0 +1,296 @@ +use async_trait::async_trait; +use opensearch::http::transport::Transport; +use opensearch::{IndexParts, OpenSearch, SearchParts}; +use serde::{Deserialize, Serialize}; +use serde_json::json; +use tracing::debug; +use wfe_core::models::{WorkflowInstance, WorkflowStatus}; +use wfe_core::traits::search::{Page, SearchFilter, SearchIndex, WorkflowSearchResult}; + +/// Document structure stored in OpenSearch. +#[derive(Debug, Serialize, Deserialize)] +struct WorkflowDocument { + id: String, + workflow_definition_id: String, + version: u32, + status: String, + reference: Option, + description: Option, + data: serde_json::Value, + create_time: String, + complete_time: Option, +} + +impl From<&WorkflowInstance> for WorkflowDocument { + fn from(instance: &WorkflowInstance) -> Self { + Self { + id: instance.id.clone(), + workflow_definition_id: instance.workflow_definition_id.clone(), + version: instance.version, + status: serde_json::to_value(instance.status) + .ok() + .and_then(|v| v.as_str().map(String::from)) + .unwrap_or_default(), + reference: instance.reference.clone(), + description: instance.description.clone(), + data: instance.data.clone(), + create_time: instance.create_time.to_rfc3339(), + complete_time: instance.complete_time.map(|t| t.to_rfc3339()), + } + } +} + +/// OpenSearch-backed search index for workflow instances. +pub struct OpenSearchIndex { + client: OpenSearch, + index_name: String, +} + +impl OpenSearchIndex { + /// Create a new OpenSearch index provider. + /// + /// # Arguments + /// * `url` - OpenSearch server URL (e.g. `http://localhost:9200`) + /// * `index_name` - Name of the index to use + pub fn new(url: &str, index_name: &str) -> wfe_core::Result { + let transport = Transport::single_node(url) + .map_err(|e| Box::new(e) as Box)?; + let client = OpenSearch::new(transport); + Ok(Self { + client, + index_name: index_name.to_string(), + }) + } + + /// Get a reference to the underlying OpenSearch client (useful for tests). + pub fn client(&self) -> &OpenSearch { + &self.client + } + + /// Get the index name. + pub fn index_name(&self) -> &str { + &self.index_name + } +} + +#[async_trait] +impl SearchIndex for OpenSearchIndex { + async fn start(&self) -> wfe_core::Result<()> { + let exists = self + .client + .indices() + .exists(opensearch::indices::IndicesExistsParts::Index(&[ + &self.index_name, + ])) + .send() + .await + .map_err(|e| Box::new(e) as Box)?; + + if exists.status_code().is_success() { + debug!(index = %self.index_name, "Index already exists"); + return Ok(()); + } + + let body = json!({ + "mappings": { + "properties": { + "id": { "type": "keyword" }, + "workflow_definition_id": { "type": "keyword" }, + "version": { "type": "integer" }, + "status": { "type": "keyword" }, + "reference": { "type": "keyword" }, + "description": { "type": "text" }, + "data": { "type": "object", "enabled": false }, + "create_time": { "type": "date" }, + "complete_time": { "type": "date" } + } + } + }); + + let response = self + .client + .indices() + .create(opensearch::indices::IndicesCreateParts::Index( + &self.index_name, + )) + .body(body) + .send() + .await + .map_err(|e| Box::new(e) as Box)?; + + if !response.status_code().is_success() { + let text = response + .text() + .await + .unwrap_or_else(|_| "unknown error".to_string()); + return Err(wfe_core::WfeError::Persistence(format!( + "Failed to create index: {text}" + ))); + } + + debug!(index = %self.index_name, "Index created"); + Ok(()) + } + + async fn stop(&self) -> wfe_core::Result<()> { + Ok(()) + } + + async fn index_workflow(&self, instance: &WorkflowInstance) -> wfe_core::Result<()> { + let doc = WorkflowDocument::from(instance); + + let response = self + .client + .index(IndexParts::IndexId(&self.index_name, &doc.id)) + .body(serde_json::to_value(&doc)?) + .send() + .await + .map_err(|e| Box::new(e) as Box)?; + + if !response.status_code().is_success() { + let text = response + .text() + .await + .unwrap_or_else(|_| "unknown error".to_string()); + return Err(wfe_core::WfeError::Persistence(format!( + "Failed to index workflow: {text}" + ))); + } + + debug!(id = %instance.id, index = %self.index_name, "Workflow indexed"); + Ok(()) + } + + async fn search( + &self, + terms: &str, + skip: u64, + take: u64, + filters: &[SearchFilter], + ) -> wfe_core::Result> { + let mut must_clauses: Vec = Vec::new(); + let mut filter_clauses: Vec = Vec::new(); + + if !terms.is_empty() { + must_clauses.push(json!({ + "multi_match": { + "query": terms, + "fields": ["description", "reference", "workflow_definition_id"] + } + })); + } + + for filter in filters { + match filter { + SearchFilter::Status(status) => { + let status_str = serde_json::to_value(status) + .ok() + .and_then(|v| v.as_str().map(String::from)) + .unwrap_or_default(); + filter_clauses.push(json!({ + "term": { "status": status_str } + })); + } + SearchFilter::DateRange { + field, + before, + after, + } => { + let mut range = serde_json::Map::new(); + if let Some(before) = before { + range.insert("lt".to_string(), json!(before.to_rfc3339())); + } + if let Some(after) = after { + range.insert("gte".to_string(), json!(after.to_rfc3339())); + } + if !range.is_empty() { + filter_clauses.push(json!({ + "range": { field.clone(): range } + })); + } + } + SearchFilter::Reference(reference) => { + filter_clauses.push(json!({ + "term": { "reference": reference } + })); + } + } + } + + let query = if must_clauses.is_empty() && filter_clauses.is_empty() { + json!({ "match_all": {} }) + } else { + let mut bool_query = serde_json::Map::new(); + if !must_clauses.is_empty() { + bool_query.insert("must".to_string(), json!(must_clauses)); + } + if !filter_clauses.is_empty() { + bool_query.insert("filter".to_string(), json!(filter_clauses)); + } + json!({ "bool": bool_query }) + }; + + let body = json!({ + "query": query, + "from": skip, + "size": take, + }); + + let response = self + .client + .search(SearchParts::Index(&[&self.index_name])) + .body(body) + .send() + .await + .map_err(|e| Box::new(e) as Box)?; + + if !response.status_code().is_success() { + let text = response + .text() + .await + .unwrap_or_else(|_| "unknown error".to_string()); + return Err(wfe_core::WfeError::Persistence(format!( + "Search failed: {text}" + ))); + } + + let response_body: serde_json::Value = response + .json() + .await + .map_err(|e| Box::new(e) as Box)?; + + let total = response_body["hits"]["total"]["value"] + .as_u64() + .unwrap_or(0); + + let hits = response_body["hits"]["hits"] + .as_array() + .cloned() + .unwrap_or_default(); + + let mut results = Vec::with_capacity(hits.len()); + for hit in &hits { + let source = &hit["_source"]; + let status_str = source["status"].as_str().unwrap_or("Runnable"); + let status: WorkflowStatus = + serde_json::from_value(json!(status_str)).unwrap_or_default(); + + results.push(WorkflowSearchResult { + id: source["id"].as_str().unwrap_or_default().to_string(), + workflow_definition_id: source["workflow_definition_id"] + .as_str() + .unwrap_or_default() + .to_string(), + version: source["version"].as_u64().unwrap_or(0) as u32, + status, + reference: source["reference"].as_str().map(String::from), + description: source["description"].as_str().map(String::from), + }); + } + + Ok(Page { + data: results, + total, + }) + } +} diff --git a/wfe-opensearch/tests/search.rs b/wfe-opensearch/tests/search.rs new file mode 100644 index 0000000..038e9e9 --- /dev/null +++ b/wfe-opensearch/tests/search.rs @@ -0,0 +1,198 @@ +use chrono::Utc; +use opensearch::http::transport::Transport; +use opensearch::OpenSearch; +use pretty_assertions::assert_eq; +use serde_json::json; +use uuid::Uuid; +use wfe_core::models::{WorkflowInstance, WorkflowStatus}; +use wfe_core::traits::search::{SearchFilter, SearchIndex}; +use wfe_opensearch::OpenSearchIndex; + +const OPENSEARCH_URL: &str = "http://localhost:9200"; + +/// Check if OpenSearch is reachable, skip test if not. +async fn opensearch_available() -> bool { + let transport = Transport::single_node(OPENSEARCH_URL); + if transport.is_err() { + return false; + } + let client = OpenSearch::new(transport.unwrap()); + client + .ping() + .send() + .await + .map(|r| r.status_code().is_success()) + .unwrap_or(false) +} + +/// Helper to create a unique test index and return the provider + cleanup handle. +async fn setup() -> Option<(OpenSearchIndex, String)> { + if !opensearch_available().await { + eprintln!("OpenSearch not available, skipping test"); + return None; + } + let index_name = format!("wfe_test_{}", Uuid::new_v4()); + let provider = OpenSearchIndex::new(OPENSEARCH_URL, &index_name).unwrap(); + provider.start().await.unwrap(); + Some((provider, index_name)) +} + +/// Refresh the index so documents become searchable immediately. +async fn refresh_index(provider: &OpenSearchIndex) { + let url = format!("/{}/_refresh", provider.index_name()); + provider + .client() + .send( + opensearch::http::Method::Post, + &url, + opensearch::http::headers::HeaderMap::new(), + Option::<&serde_json::Value>::None, + Some(b"".as_ref()), + None, + ) + .await + .unwrap(); +} + +/// Delete the test index. +async fn cleanup(provider: &OpenSearchIndex) { + let _ = provider + .client() + .indices() + .delete(opensearch::indices::IndicesDeleteParts::Index(&[ + provider.index_name(), + ])) + .send() + .await; +} + +fn make_instance(description: Option<&str>, reference: Option<&str>) -> WorkflowInstance { + let mut instance = WorkflowInstance::new("test-workflow", 1, json!({"key": "value"})); + instance.description = description.map(String::from); + instance.reference = reference.map(String::from); + instance +} + +#[tokio::test] +async fn index_and_search_by_terms() { + let Some((provider, _index)) = setup().await else { + return; + }; + + let instance = make_instance(Some("Process the quarterly financial report"), None); + provider.index_workflow(&instance).await.unwrap(); + refresh_index(&provider).await; + + let page = provider + .search("quarterly financial", 0, 10, &[]) + .await + .unwrap(); + + assert_eq!(page.total, 1); + assert_eq!(page.data.len(), 1); + assert_eq!(page.data[0].id, instance.id); + assert_eq!( + page.data[0].description.as_deref(), + Some("Process the quarterly financial report") + ); + + cleanup(&provider).await; +} + +#[tokio::test] +async fn search_with_status_filter() { + let Some((provider, _index)) = setup().await else { + return; + }; + + let mut runnable = make_instance(Some("Runnable workflow"), None); + runnable.status = WorkflowStatus::Runnable; + + let mut complete = make_instance(Some("Complete workflow"), None); + complete.status = WorkflowStatus::Complete; + complete.complete_time = Some(Utc::now()); + + provider.index_workflow(&runnable).await.unwrap(); + provider.index_workflow(&complete).await.unwrap(); + refresh_index(&provider).await; + + let page = provider + .search("", 0, 10, &[SearchFilter::Status(WorkflowStatus::Complete)]) + .await + .unwrap(); + + assert_eq!(page.total, 1); + assert_eq!(page.data[0].id, complete.id); + assert_eq!(page.data[0].status, WorkflowStatus::Complete); + + cleanup(&provider).await; +} + +#[tokio::test] +async fn search_with_no_results() { + let Some((provider, _index)) = setup().await else { + return; + }; + + let instance = make_instance(Some("A regular workflow"), None); + provider.index_workflow(&instance).await.unwrap(); + refresh_index(&provider).await; + + let page = provider + .search("nonexistent-xyzzy-42", 0, 10, &[]) + .await + .unwrap(); + + assert_eq!(page.total, 0); + assert!(page.data.is_empty()); + + cleanup(&provider).await; +} + +#[tokio::test] +async fn index_multiple_and_paginate() { + let Some((provider, _index)) = setup().await else { + return; + }; + + let mut instances = Vec::new(); + for i in 0..5 { + let instance = make_instance(Some(&format!("Paginated workflow number {i}")), None); + provider.index_workflow(&instance).await.unwrap(); + instances.push(instance); + } + refresh_index(&provider).await; + + // Search all, but skip 2 and take 2 + let page = provider.search("Paginated workflow", 2, 2, &[]).await.unwrap(); + + assert_eq!(page.total, 5); + assert_eq!(page.data.len(), 2); + + cleanup(&provider).await; +} + +#[tokio::test] +async fn search_by_reference() { + let Some((provider, _index)) = setup().await else { + return; + }; + + let inst1 = make_instance(Some("First workflow"), Some("REF-001")); + let inst2 = make_instance(Some("Second workflow"), Some("REF-002")); + + provider.index_workflow(&inst1).await.unwrap(); + provider.index_workflow(&inst2).await.unwrap(); + refresh_index(&provider).await; + + let page = provider + .search("", 0, 10, &[SearchFilter::Reference("REF-001".to_string())]) + .await + .unwrap(); + + assert_eq!(page.total, 1); + assert_eq!(page.data[0].id, inst1.id); + assert_eq!(page.data[0].reference.as_deref(), Some("REF-001")); + + cleanup(&provider).await; +}