feat: implement OpenSearch ML setup and model_id injection

ensure_opensearch_ml: cluster settings, model registration/deployment
(all-mpnet-base-v2), ingest + search pipelines for hybrid BM25+neural.

inject_opensearch_model_id: reads model_id from ingest pipeline,
writes to matrix/opensearch-ml-config ConfigMap.

os_api helper: kube exec curl inside opensearch pod.
This commit is contained in:
2026-03-20 13:16:00 +00:00
parent bc5eeaae6e
commit 503e407243

View File

@@ -1,4 +1,4 @@
use anyhow::Result; use crate::error::Result;
pub const MANAGED_NS: &[&str] = &[ pub const MANAGED_NS: &[&str] = &[
"data", "data",
@@ -56,7 +56,7 @@ pub async fn cmd_apply(env: &str, domain: &str, email: &str, namespace: &str) ->
domain.to_string() domain.to_string()
}; };
if d.is_empty() { if d.is_empty() {
anyhow::bail!("--domain is required for production apply on first deploy"); bail!("--domain is required for production apply on first deploy");
} }
let overlay = infra_dir.join("overlays").join("production"); let overlay = infra_dir.join("overlays").join("production");
(d, overlay) (d, overlay)
@@ -404,16 +404,313 @@ async fn patch_tuwunel_oauth2_redirect(domain: &str) {
} }
} }
// ---------------------------------------------------------------------------
// OpenSearch helpers (kube exec + curl inside pod)
// ---------------------------------------------------------------------------
/// Call OpenSearch API via kube exec curl inside the opensearch pod.
async fn os_api(path: &str, method: &str, body: Option<&str>) -> Option<String> {
let url = format!("http://localhost:9200{path}");
let mut curl_args: Vec<&str> = vec!["curl", "-sf", &url];
if method != "GET" {
curl_args.extend_from_slice(&["-X", method]);
}
let body_string;
if let Some(b) = body {
body_string = b.to_string();
curl_args.extend_from_slice(&["-H", "Content-Type: application/json", "-d", &body_string]);
}
// Build the full exec command: exec deploy/opensearch -n data -c opensearch -- curl ...
let mut exec_cmd: Vec<&str> = vec!["curl"];
exec_cmd = curl_args;
match crate::kube::kube_exec("data", "opensearch-0", &exec_cmd, Some("opensearch")).await {
Ok((0, out)) if !out.is_empty() => Some(out),
_ => None,
}
}
/// Inject OpenSearch model_id into matrix/opensearch-ml-config ConfigMap. /// Inject OpenSearch model_id into matrix/opensearch-ml-config ConfigMap.
async fn inject_opensearch_model_id() { async fn inject_opensearch_model_id() {
// Read model_id from the ingest pipeline via OpenSearch API let pipe_resp =
// This requires port-forward to opensearch — skip if not reachable match os_api("/_ingest/pipeline/tuwunel_embedding_pipeline", "GET", None).await {
// TODO: implement opensearch API calls via port-forward + reqwest Some(r) => r,
None => {
crate::output::warn(
"OpenSearch ingest pipeline not found -- skipping model_id injection.",
);
return;
}
};
let model_id = serde_json::from_str::<serde_json::Value>(&pipe_resp)
.ok()
.and_then(|v| {
v.get("tuwunel_embedding_pipeline")?
.get("processors")?
.as_array()?
.iter()
.find_map(|p| {
p.get("text_embedding")?
.get("model_id")?
.as_str()
.map(String::from)
})
});
let Some(model_id) = model_id else {
crate::output::warn(
"No model_id in ingest pipeline -- tuwunel hybrid search unavailable.",
);
return;
};
// Check if ConfigMap already has this value
if let Ok(current) =
crate::kube::kube_get_secret_field("matrix", "opensearch-ml-config", "model_id").await
{
if current == model_id {
return;
}
}
let cm = serde_json::json!({
"apiVersion": "v1",
"kind": "ConfigMap",
"metadata": {"name": "opensearch-ml-config", "namespace": "matrix"},
"data": {"model_id": &model_id},
});
let manifest = serde_json::to_string(&cm).unwrap_or_default();
if let Err(e) = crate::kube::kube_apply(&manifest).await {
crate::output::warn(&format!("Failed to inject OpenSearch model_id: {e}"));
} else {
crate::output::ok(&format!(
"Injected OpenSearch model_id ({model_id}) into matrix/opensearch-ml-config."
));
}
} }
/// Configure OpenSearch ML Commons for neural search. /// Configure OpenSearch ML Commons for neural search.
///
/// 1. Sets cluster settings to allow ML on data nodes.
/// 2. Registers and deploys all-mpnet-base-v2 (pre-trained, 384-dim).
/// 3. Creates ingest + search pipelines for hybrid BM25+neural scoring.
async fn ensure_opensearch_ml() { async fn ensure_opensearch_ml() {
// TODO: implement opensearch ML setup via port-forward + reqwest if os_api("/_cluster/health", "GET", None).await.is_none() {
crate::output::warn("OpenSearch not reachable -- skipping ML setup.");
return;
}
// 1. ML Commons cluster settings
let settings = serde_json::json!({
"persistent": {
"plugins.ml_commons.only_run_on_ml_node": false,
"plugins.ml_commons.native_memory_threshold": 90,
"plugins.ml_commons.model_access_control_enabled": false,
"plugins.ml_commons.allow_registering_model_via_url": true,
}
});
os_api(
"/_cluster/settings",
"PUT",
Some(&serde_json::to_string(&settings).unwrap()),
)
.await;
// 2. Check if model already registered and deployed
let search_body =
r#"{"query":{"match":{"name":"huggingface/sentence-transformers/all-mpnet-base-v2"}}}"#;
let search_resp = match os_api("/_plugins/_ml/models/_search", "POST", Some(search_body)).await
{
Some(r) => r,
None => {
crate::output::warn("OpenSearch ML search API failed -- skipping ML setup.");
return;
}
};
let resp: serde_json::Value = match serde_json::from_str(&search_resp) {
Ok(v) => v,
Err(_) => return,
};
let hits = resp
.get("hits")
.and_then(|h| h.get("hits"))
.and_then(|h| h.as_array())
.cloned()
.unwrap_or_default();
let mut model_id: Option<String> = None;
let mut already_deployed = false;
for hit in &hits {
let state = hit
.get("_source")
.and_then(|s| s.get("model_state"))
.and_then(|v| v.as_str())
.unwrap_or("");
let id = hit.get("_id").and_then(|v| v.as_str()).unwrap_or("");
match state {
"DEPLOYED" => {
model_id = Some(id.to_string());
already_deployed = true;
break;
}
"REGISTERED" | "DEPLOYING" => {
model_id = Some(id.to_string());
}
_ => {}
}
}
if !already_deployed {
if let Some(ref mid) = model_id {
// Registered but not deployed -- deploy it
crate::output::ok("Deploying OpenSearch ML model...");
os_api(
&format!("/_plugins/_ml/models/{mid}/_deploy"),
"POST",
None,
)
.await;
for _ in 0..30 {
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
if let Some(r) =
os_api(&format!("/_plugins/_ml/models/{mid}"), "GET", None).await
{
if r.contains("\"DEPLOYED\"") {
break;
}
}
}
} else {
// Register from pre-trained hub
crate::output::ok("Registering OpenSearch ML model (all-mpnet-base-v2)...");
let reg_body = serde_json::json!({
"name": "huggingface/sentence-transformers/all-mpnet-base-v2",
"version": "1.0.1",
"model_format": "TORCH_SCRIPT",
});
let reg_resp = match os_api(
"/_plugins/_ml/models/_register",
"POST",
Some(&serde_json::to_string(&reg_body).unwrap()),
)
.await
{
Some(r) => r,
None => {
crate::output::warn("Failed to register ML model -- skipping.");
return;
}
};
let task_id = serde_json::from_str::<serde_json::Value>(&reg_resp)
.ok()
.and_then(|v| v.get("task_id")?.as_str().map(String::from))
.unwrap_or_default();
if task_id.is_empty() {
crate::output::warn("No task_id from model registration -- skipping.");
return;
}
crate::output::ok("Waiting for model registration...");
let mut registered_id = None;
for _ in 0..60 {
tokio::time::sleep(std::time::Duration::from_secs(10)).await;
if let Some(task_resp) =
os_api(&format!("/_plugins/_ml/tasks/{task_id}"), "GET", None).await
{
if let Ok(task) = serde_json::from_str::<serde_json::Value>(&task_resp) {
match task.get("state").and_then(|v| v.as_str()).unwrap_or("") {
"COMPLETED" => {
registered_id = task
.get("model_id")
.and_then(|v| v.as_str())
.map(String::from);
break;
}
"FAILED" => {
crate::output::warn(&format!(
"ML model registration failed: {task_resp}"
));
return;
}
_ => {}
}
}
}
}
let Some(mid) = registered_id else {
crate::output::warn("ML model registration timed out.");
return;
};
crate::output::ok("Deploying ML model...");
os_api(
&format!("/_plugins/_ml/models/{mid}/_deploy"),
"POST",
None,
)
.await;
for _ in 0..30 {
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
if let Some(r) =
os_api(&format!("/_plugins/_ml/models/{mid}"), "GET", None).await
{
if r.contains("\"DEPLOYED\"") {
break;
}
}
}
model_id = Some(mid);
}
}
let Some(model_id) = model_id else {
crate::output::warn("No ML model available -- skipping pipeline setup.");
return;
};
// 3. Ingest pipeline
let ingest = serde_json::json!({
"description": "Tuwunel message embedding pipeline",
"processors": [{"text_embedding": {
"model_id": &model_id,
"field_map": {"body": "embedding"},
}}],
});
os_api(
"/_ingest/pipeline/tuwunel_embedding_pipeline",
"PUT",
Some(&serde_json::to_string(&ingest).unwrap()),
)
.await;
// 4. Search pipeline
let search = serde_json::json!({
"description": "Tuwunel hybrid BM25+neural search pipeline",
"phase_results_processors": [{"normalization-processor": {
"normalization": {"technique": "min_max"},
"combination": {
"technique": "arithmetic_mean",
"parameters": {"weights": [0.3, 0.7]},
},
}}],
});
os_api(
"/_search/pipeline/tuwunel_hybrid_pipeline",
"PUT",
Some(&serde_json::to_string(&search).unwrap()),
)
.await;
crate::output::ok(&format!("OpenSearch ML ready (model: {model_id})."));
} }
#[cfg(test)] #[cfg(test)]