feat: implement OpenSearch ML setup and model_id injection
ensure_opensearch_ml: cluster settings, model registration/deployment (all-mpnet-base-v2), ingest + search pipelines for hybrid BM25+neural. inject_opensearch_model_id: reads model_id from ingest pipeline, writes to matrix/opensearch-ml-config ConfigMap. os_api helper: kube exec curl inside opensearch pod.
This commit is contained in:
309
src/manifests.rs
309
src/manifests.rs
@@ -1,4 +1,4 @@
|
||||
use anyhow::Result;
|
||||
use crate::error::Result;
|
||||
|
||||
pub const MANAGED_NS: &[&str] = &[
|
||||
"data",
|
||||
@@ -56,7 +56,7 @@ pub async fn cmd_apply(env: &str, domain: &str, email: &str, namespace: &str) ->
|
||||
domain.to_string()
|
||||
};
|
||||
if d.is_empty() {
|
||||
anyhow::bail!("--domain is required for production apply on first deploy");
|
||||
bail!("--domain is required for production apply on first deploy");
|
||||
}
|
||||
let overlay = infra_dir.join("overlays").join("production");
|
||||
(d, overlay)
|
||||
@@ -404,16 +404,313 @@ async fn patch_tuwunel_oauth2_redirect(domain: &str) {
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// OpenSearch helpers (kube exec + curl inside pod)
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
/// Call OpenSearch API via kube exec curl inside the opensearch pod.
|
||||
async fn os_api(path: &str, method: &str, body: Option<&str>) -> Option<String> {
|
||||
let url = format!("http://localhost:9200{path}");
|
||||
let mut curl_args: Vec<&str> = vec!["curl", "-sf", &url];
|
||||
if method != "GET" {
|
||||
curl_args.extend_from_slice(&["-X", method]);
|
||||
}
|
||||
let body_string;
|
||||
if let Some(b) = body {
|
||||
body_string = b.to_string();
|
||||
curl_args.extend_from_slice(&["-H", "Content-Type: application/json", "-d", &body_string]);
|
||||
}
|
||||
|
||||
// Build the full exec command: exec deploy/opensearch -n data -c opensearch -- curl ...
|
||||
let mut exec_cmd: Vec<&str> = vec!["curl"];
|
||||
exec_cmd = curl_args;
|
||||
|
||||
match crate::kube::kube_exec("data", "opensearch-0", &exec_cmd, Some("opensearch")).await {
|
||||
Ok((0, out)) if !out.is_empty() => Some(out),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Inject OpenSearch model_id into matrix/opensearch-ml-config ConfigMap.
|
||||
async fn inject_opensearch_model_id() {
|
||||
// Read model_id from the ingest pipeline via OpenSearch API
|
||||
// This requires port-forward to opensearch — skip if not reachable
|
||||
// TODO: implement opensearch API calls via port-forward + reqwest
|
||||
let pipe_resp =
|
||||
match os_api("/_ingest/pipeline/tuwunel_embedding_pipeline", "GET", None).await {
|
||||
Some(r) => r,
|
||||
None => {
|
||||
crate::output::warn(
|
||||
"OpenSearch ingest pipeline not found -- skipping model_id injection.",
|
||||
);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let model_id = serde_json::from_str::<serde_json::Value>(&pipe_resp)
|
||||
.ok()
|
||||
.and_then(|v| {
|
||||
v.get("tuwunel_embedding_pipeline")?
|
||||
.get("processors")?
|
||||
.as_array()?
|
||||
.iter()
|
||||
.find_map(|p| {
|
||||
p.get("text_embedding")?
|
||||
.get("model_id")?
|
||||
.as_str()
|
||||
.map(String::from)
|
||||
})
|
||||
});
|
||||
|
||||
let Some(model_id) = model_id else {
|
||||
crate::output::warn(
|
||||
"No model_id in ingest pipeline -- tuwunel hybrid search unavailable.",
|
||||
);
|
||||
return;
|
||||
};
|
||||
|
||||
// Check if ConfigMap already has this value
|
||||
if let Ok(current) =
|
||||
crate::kube::kube_get_secret_field("matrix", "opensearch-ml-config", "model_id").await
|
||||
{
|
||||
if current == model_id {
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
let cm = serde_json::json!({
|
||||
"apiVersion": "v1",
|
||||
"kind": "ConfigMap",
|
||||
"metadata": {"name": "opensearch-ml-config", "namespace": "matrix"},
|
||||
"data": {"model_id": &model_id},
|
||||
});
|
||||
|
||||
let manifest = serde_json::to_string(&cm).unwrap_or_default();
|
||||
if let Err(e) = crate::kube::kube_apply(&manifest).await {
|
||||
crate::output::warn(&format!("Failed to inject OpenSearch model_id: {e}"));
|
||||
} else {
|
||||
crate::output::ok(&format!(
|
||||
"Injected OpenSearch model_id ({model_id}) into matrix/opensearch-ml-config."
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
/// Configure OpenSearch ML Commons for neural search.
|
||||
///
|
||||
/// 1. Sets cluster settings to allow ML on data nodes.
|
||||
/// 2. Registers and deploys all-mpnet-base-v2 (pre-trained, 384-dim).
|
||||
/// 3. Creates ingest + search pipelines for hybrid BM25+neural scoring.
|
||||
async fn ensure_opensearch_ml() {
|
||||
// TODO: implement opensearch ML setup via port-forward + reqwest
|
||||
if os_api("/_cluster/health", "GET", None).await.is_none() {
|
||||
crate::output::warn("OpenSearch not reachable -- skipping ML setup.");
|
||||
return;
|
||||
}
|
||||
|
||||
// 1. ML Commons cluster settings
|
||||
let settings = serde_json::json!({
|
||||
"persistent": {
|
||||
"plugins.ml_commons.only_run_on_ml_node": false,
|
||||
"plugins.ml_commons.native_memory_threshold": 90,
|
||||
"plugins.ml_commons.model_access_control_enabled": false,
|
||||
"plugins.ml_commons.allow_registering_model_via_url": true,
|
||||
}
|
||||
});
|
||||
os_api(
|
||||
"/_cluster/settings",
|
||||
"PUT",
|
||||
Some(&serde_json::to_string(&settings).unwrap()),
|
||||
)
|
||||
.await;
|
||||
|
||||
// 2. Check if model already registered and deployed
|
||||
let search_body =
|
||||
r#"{"query":{"match":{"name":"huggingface/sentence-transformers/all-mpnet-base-v2"}}}"#;
|
||||
let search_resp = match os_api("/_plugins/_ml/models/_search", "POST", Some(search_body)).await
|
||||
{
|
||||
Some(r) => r,
|
||||
None => {
|
||||
crate::output::warn("OpenSearch ML search API failed -- skipping ML setup.");
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let resp: serde_json::Value = match serde_json::from_str(&search_resp) {
|
||||
Ok(v) => v,
|
||||
Err(_) => return,
|
||||
};
|
||||
|
||||
let hits = resp
|
||||
.get("hits")
|
||||
.and_then(|h| h.get("hits"))
|
||||
.and_then(|h| h.as_array())
|
||||
.cloned()
|
||||
.unwrap_or_default();
|
||||
|
||||
let mut model_id: Option<String> = None;
|
||||
let mut already_deployed = false;
|
||||
|
||||
for hit in &hits {
|
||||
let state = hit
|
||||
.get("_source")
|
||||
.and_then(|s| s.get("model_state"))
|
||||
.and_then(|v| v.as_str())
|
||||
.unwrap_or("");
|
||||
let id = hit.get("_id").and_then(|v| v.as_str()).unwrap_or("");
|
||||
match state {
|
||||
"DEPLOYED" => {
|
||||
model_id = Some(id.to_string());
|
||||
already_deployed = true;
|
||||
break;
|
||||
}
|
||||
"REGISTERED" | "DEPLOYING" => {
|
||||
model_id = Some(id.to_string());
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
if !already_deployed {
|
||||
if let Some(ref mid) = model_id {
|
||||
// Registered but not deployed -- deploy it
|
||||
crate::output::ok("Deploying OpenSearch ML model...");
|
||||
os_api(
|
||||
&format!("/_plugins/_ml/models/{mid}/_deploy"),
|
||||
"POST",
|
||||
None,
|
||||
)
|
||||
.await;
|
||||
for _ in 0..30 {
|
||||
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
|
||||
if let Some(r) =
|
||||
os_api(&format!("/_plugins/_ml/models/{mid}"), "GET", None).await
|
||||
{
|
||||
if r.contains("\"DEPLOYED\"") {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// Register from pre-trained hub
|
||||
crate::output::ok("Registering OpenSearch ML model (all-mpnet-base-v2)...");
|
||||
let reg_body = serde_json::json!({
|
||||
"name": "huggingface/sentence-transformers/all-mpnet-base-v2",
|
||||
"version": "1.0.1",
|
||||
"model_format": "TORCH_SCRIPT",
|
||||
});
|
||||
let reg_resp = match os_api(
|
||||
"/_plugins/_ml/models/_register",
|
||||
"POST",
|
||||
Some(&serde_json::to_string(®_body).unwrap()),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Some(r) => r,
|
||||
None => {
|
||||
crate::output::warn("Failed to register ML model -- skipping.");
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let task_id = serde_json::from_str::<serde_json::Value>(®_resp)
|
||||
.ok()
|
||||
.and_then(|v| v.get("task_id")?.as_str().map(String::from))
|
||||
.unwrap_or_default();
|
||||
|
||||
if task_id.is_empty() {
|
||||
crate::output::warn("No task_id from model registration -- skipping.");
|
||||
return;
|
||||
}
|
||||
|
||||
crate::output::ok("Waiting for model registration...");
|
||||
let mut registered_id = None;
|
||||
for _ in 0..60 {
|
||||
tokio::time::sleep(std::time::Duration::from_secs(10)).await;
|
||||
if let Some(task_resp) =
|
||||
os_api(&format!("/_plugins/_ml/tasks/{task_id}"), "GET", None).await
|
||||
{
|
||||
if let Ok(task) = serde_json::from_str::<serde_json::Value>(&task_resp) {
|
||||
match task.get("state").and_then(|v| v.as_str()).unwrap_or("") {
|
||||
"COMPLETED" => {
|
||||
registered_id = task
|
||||
.get("model_id")
|
||||
.and_then(|v| v.as_str())
|
||||
.map(String::from);
|
||||
break;
|
||||
}
|
||||
"FAILED" => {
|
||||
crate::output::warn(&format!(
|
||||
"ML model registration failed: {task_resp}"
|
||||
));
|
||||
return;
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let Some(mid) = registered_id else {
|
||||
crate::output::warn("ML model registration timed out.");
|
||||
return;
|
||||
};
|
||||
|
||||
crate::output::ok("Deploying ML model...");
|
||||
os_api(
|
||||
&format!("/_plugins/_ml/models/{mid}/_deploy"),
|
||||
"POST",
|
||||
None,
|
||||
)
|
||||
.await;
|
||||
for _ in 0..30 {
|
||||
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
|
||||
if let Some(r) =
|
||||
os_api(&format!("/_plugins/_ml/models/{mid}"), "GET", None).await
|
||||
{
|
||||
if r.contains("\"DEPLOYED\"") {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
model_id = Some(mid);
|
||||
}
|
||||
}
|
||||
|
||||
let Some(model_id) = model_id else {
|
||||
crate::output::warn("No ML model available -- skipping pipeline setup.");
|
||||
return;
|
||||
};
|
||||
|
||||
// 3. Ingest pipeline
|
||||
let ingest = serde_json::json!({
|
||||
"description": "Tuwunel message embedding pipeline",
|
||||
"processors": [{"text_embedding": {
|
||||
"model_id": &model_id,
|
||||
"field_map": {"body": "embedding"},
|
||||
}}],
|
||||
});
|
||||
os_api(
|
||||
"/_ingest/pipeline/tuwunel_embedding_pipeline",
|
||||
"PUT",
|
||||
Some(&serde_json::to_string(&ingest).unwrap()),
|
||||
)
|
||||
.await;
|
||||
|
||||
// 4. Search pipeline
|
||||
let search = serde_json::json!({
|
||||
"description": "Tuwunel hybrid BM25+neural search pipeline",
|
||||
"phase_results_processors": [{"normalization-processor": {
|
||||
"normalization": {"technique": "min_max"},
|
||||
"combination": {
|
||||
"technique": "arithmetic_mean",
|
||||
"parameters": {"weights": [0.3, 0.7]},
|
||||
},
|
||||
}}],
|
||||
});
|
||||
os_api(
|
||||
"/_search/pipeline/tuwunel_hybrid_pipeline",
|
||||
"PUT",
|
||||
Some(&serde_json::to_string(&search).unwrap()),
|
||||
)
|
||||
.await;
|
||||
|
||||
crate::output::ok(&format!("OpenSearch ML ready (model: {model_id})."));
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
Reference in New Issue
Block a user