From 503e407243fd79561883299f4fc89cae3c66a8d8 Mon Sep 17 00:00:00 2001 From: Sienna Meridian Satterwhite Date: Fri, 20 Mar 2026 13:16:00 +0000 Subject: [PATCH] feat: implement OpenSearch ML setup and model_id injection ensure_opensearch_ml: cluster settings, model registration/deployment (all-mpnet-base-v2), ingest + search pipelines for hybrid BM25+neural. inject_opensearch_model_id: reads model_id from ingest pipeline, writes to matrix/opensearch-ml-config ConfigMap. os_api helper: kube exec curl inside opensearch pod. --- src/manifests.rs | 309 ++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 303 insertions(+), 6 deletions(-) diff --git a/src/manifests.rs b/src/manifests.rs index e422536..5bcfe9f 100644 --- a/src/manifests.rs +++ b/src/manifests.rs @@ -1,4 +1,4 @@ -use anyhow::Result; +use crate::error::Result; pub const MANAGED_NS: &[&str] = &[ "data", @@ -56,7 +56,7 @@ pub async fn cmd_apply(env: &str, domain: &str, email: &str, namespace: &str) -> domain.to_string() }; if d.is_empty() { - anyhow::bail!("--domain is required for production apply on first deploy"); + bail!("--domain is required for production apply on first deploy"); } let overlay = infra_dir.join("overlays").join("production"); (d, overlay) @@ -404,16 +404,313 @@ async fn patch_tuwunel_oauth2_redirect(domain: &str) { } } +// --------------------------------------------------------------------------- +// OpenSearch helpers (kube exec + curl inside pod) +// --------------------------------------------------------------------------- + +/// Call OpenSearch API via kube exec curl inside the opensearch pod. +async fn os_api(path: &str, method: &str, body: Option<&str>) -> Option { + let url = format!("http://localhost:9200{path}"); + let mut curl_args: Vec<&str> = vec!["curl", "-sf", &url]; + if method != "GET" { + curl_args.extend_from_slice(&["-X", method]); + } + let body_string; + if let Some(b) = body { + body_string = b.to_string(); + curl_args.extend_from_slice(&["-H", "Content-Type: application/json", "-d", &body_string]); + } + + // Build the full exec command: exec deploy/opensearch -n data -c opensearch -- curl ... + let mut exec_cmd: Vec<&str> = vec!["curl"]; + exec_cmd = curl_args; + + match crate::kube::kube_exec("data", "opensearch-0", &exec_cmd, Some("opensearch")).await { + Ok((0, out)) if !out.is_empty() => Some(out), + _ => None, + } +} + /// Inject OpenSearch model_id into matrix/opensearch-ml-config ConfigMap. async fn inject_opensearch_model_id() { - // Read model_id from the ingest pipeline via OpenSearch API - // This requires port-forward to opensearch — skip if not reachable - // TODO: implement opensearch API calls via port-forward + reqwest + let pipe_resp = + match os_api("/_ingest/pipeline/tuwunel_embedding_pipeline", "GET", None).await { + Some(r) => r, + None => { + crate::output::warn( + "OpenSearch ingest pipeline not found -- skipping model_id injection.", + ); + return; + } + }; + + let model_id = serde_json::from_str::(&pipe_resp) + .ok() + .and_then(|v| { + v.get("tuwunel_embedding_pipeline")? + .get("processors")? + .as_array()? + .iter() + .find_map(|p| { + p.get("text_embedding")? + .get("model_id")? + .as_str() + .map(String::from) + }) + }); + + let Some(model_id) = model_id else { + crate::output::warn( + "No model_id in ingest pipeline -- tuwunel hybrid search unavailable.", + ); + return; + }; + + // Check if ConfigMap already has this value + if let Ok(current) = + crate::kube::kube_get_secret_field("matrix", "opensearch-ml-config", "model_id").await + { + if current == model_id { + return; + } + } + + let cm = serde_json::json!({ + "apiVersion": "v1", + "kind": "ConfigMap", + "metadata": {"name": "opensearch-ml-config", "namespace": "matrix"}, + "data": {"model_id": &model_id}, + }); + + let manifest = serde_json::to_string(&cm).unwrap_or_default(); + if let Err(e) = crate::kube::kube_apply(&manifest).await { + crate::output::warn(&format!("Failed to inject OpenSearch model_id: {e}")); + } else { + crate::output::ok(&format!( + "Injected OpenSearch model_id ({model_id}) into matrix/opensearch-ml-config." + )); + } } /// Configure OpenSearch ML Commons for neural search. +/// +/// 1. Sets cluster settings to allow ML on data nodes. +/// 2. Registers and deploys all-mpnet-base-v2 (pre-trained, 384-dim). +/// 3. Creates ingest + search pipelines for hybrid BM25+neural scoring. async fn ensure_opensearch_ml() { - // TODO: implement opensearch ML setup via port-forward + reqwest + if os_api("/_cluster/health", "GET", None).await.is_none() { + crate::output::warn("OpenSearch not reachable -- skipping ML setup."); + return; + } + + // 1. ML Commons cluster settings + let settings = serde_json::json!({ + "persistent": { + "plugins.ml_commons.only_run_on_ml_node": false, + "plugins.ml_commons.native_memory_threshold": 90, + "plugins.ml_commons.model_access_control_enabled": false, + "plugins.ml_commons.allow_registering_model_via_url": true, + } + }); + os_api( + "/_cluster/settings", + "PUT", + Some(&serde_json::to_string(&settings).unwrap()), + ) + .await; + + // 2. Check if model already registered and deployed + let search_body = + r#"{"query":{"match":{"name":"huggingface/sentence-transformers/all-mpnet-base-v2"}}}"#; + let search_resp = match os_api("/_plugins/_ml/models/_search", "POST", Some(search_body)).await + { + Some(r) => r, + None => { + crate::output::warn("OpenSearch ML search API failed -- skipping ML setup."); + return; + } + }; + + let resp: serde_json::Value = match serde_json::from_str(&search_resp) { + Ok(v) => v, + Err(_) => return, + }; + + let hits = resp + .get("hits") + .and_then(|h| h.get("hits")) + .and_then(|h| h.as_array()) + .cloned() + .unwrap_or_default(); + + let mut model_id: Option = None; + let mut already_deployed = false; + + for hit in &hits { + let state = hit + .get("_source") + .and_then(|s| s.get("model_state")) + .and_then(|v| v.as_str()) + .unwrap_or(""); + let id = hit.get("_id").and_then(|v| v.as_str()).unwrap_or(""); + match state { + "DEPLOYED" => { + model_id = Some(id.to_string()); + already_deployed = true; + break; + } + "REGISTERED" | "DEPLOYING" => { + model_id = Some(id.to_string()); + } + _ => {} + } + } + + if !already_deployed { + if let Some(ref mid) = model_id { + // Registered but not deployed -- deploy it + crate::output::ok("Deploying OpenSearch ML model..."); + os_api( + &format!("/_plugins/_ml/models/{mid}/_deploy"), + "POST", + None, + ) + .await; + for _ in 0..30 { + tokio::time::sleep(std::time::Duration::from_secs(5)).await; + if let Some(r) = + os_api(&format!("/_plugins/_ml/models/{mid}"), "GET", None).await + { + if r.contains("\"DEPLOYED\"") { + break; + } + } + } + } else { + // Register from pre-trained hub + crate::output::ok("Registering OpenSearch ML model (all-mpnet-base-v2)..."); + let reg_body = serde_json::json!({ + "name": "huggingface/sentence-transformers/all-mpnet-base-v2", + "version": "1.0.1", + "model_format": "TORCH_SCRIPT", + }); + let reg_resp = match os_api( + "/_plugins/_ml/models/_register", + "POST", + Some(&serde_json::to_string(®_body).unwrap()), + ) + .await + { + Some(r) => r, + None => { + crate::output::warn("Failed to register ML model -- skipping."); + return; + } + }; + + let task_id = serde_json::from_str::(®_resp) + .ok() + .and_then(|v| v.get("task_id")?.as_str().map(String::from)) + .unwrap_or_default(); + + if task_id.is_empty() { + crate::output::warn("No task_id from model registration -- skipping."); + return; + } + + crate::output::ok("Waiting for model registration..."); + let mut registered_id = None; + for _ in 0..60 { + tokio::time::sleep(std::time::Duration::from_secs(10)).await; + if let Some(task_resp) = + os_api(&format!("/_plugins/_ml/tasks/{task_id}"), "GET", None).await + { + if let Ok(task) = serde_json::from_str::(&task_resp) { + match task.get("state").and_then(|v| v.as_str()).unwrap_or("") { + "COMPLETED" => { + registered_id = task + .get("model_id") + .and_then(|v| v.as_str()) + .map(String::from); + break; + } + "FAILED" => { + crate::output::warn(&format!( + "ML model registration failed: {task_resp}" + )); + return; + } + _ => {} + } + } + } + } + + let Some(mid) = registered_id else { + crate::output::warn("ML model registration timed out."); + return; + }; + + crate::output::ok("Deploying ML model..."); + os_api( + &format!("/_plugins/_ml/models/{mid}/_deploy"), + "POST", + None, + ) + .await; + for _ in 0..30 { + tokio::time::sleep(std::time::Duration::from_secs(5)).await; + if let Some(r) = + os_api(&format!("/_plugins/_ml/models/{mid}"), "GET", None).await + { + if r.contains("\"DEPLOYED\"") { + break; + } + } + } + model_id = Some(mid); + } + } + + let Some(model_id) = model_id else { + crate::output::warn("No ML model available -- skipping pipeline setup."); + return; + }; + + // 3. Ingest pipeline + let ingest = serde_json::json!({ + "description": "Tuwunel message embedding pipeline", + "processors": [{"text_embedding": { + "model_id": &model_id, + "field_map": {"body": "embedding"}, + }}], + }); + os_api( + "/_ingest/pipeline/tuwunel_embedding_pipeline", + "PUT", + Some(&serde_json::to_string(&ingest).unwrap()), + ) + .await; + + // 4. Search pipeline + let search = serde_json::json!({ + "description": "Tuwunel hybrid BM25+neural search pipeline", + "phase_results_processors": [{"normalization-processor": { + "normalization": {"technique": "min_max"}, + "combination": { + "technique": "arithmetic_mean", + "parameters": {"weights": [0.3, 0.7]}, + }, + }}], + }); + os_api( + "/_search/pipeline/tuwunel_hybrid_pipeline", + "PUT", + Some(&serde_json::to_string(&search).unwrap()), + ) + .await; + + crate::output::ok(&format!("OpenSearch ML ready (model: {model_id}).")); } #[cfg(test)]