feat: implement OpenSearch ML setup and model_id injection
ensure_opensearch_ml: cluster settings, model registration/deployment (all-mpnet-base-v2), ingest + search pipelines for hybrid BM25+neural. inject_opensearch_model_id: reads model_id from ingest pipeline, writes to matrix/opensearch-ml-config ConfigMap. os_api helper: kube exec curl inside opensearch pod.
This commit is contained in:
309
src/manifests.rs
309
src/manifests.rs
@@ -1,4 +1,4 @@
|
|||||||
use anyhow::Result;
|
use crate::error::Result;
|
||||||
|
|
||||||
pub const MANAGED_NS: &[&str] = &[
|
pub const MANAGED_NS: &[&str] = &[
|
||||||
"data",
|
"data",
|
||||||
@@ -56,7 +56,7 @@ pub async fn cmd_apply(env: &str, domain: &str, email: &str, namespace: &str) ->
|
|||||||
domain.to_string()
|
domain.to_string()
|
||||||
};
|
};
|
||||||
if d.is_empty() {
|
if d.is_empty() {
|
||||||
anyhow::bail!("--domain is required for production apply on first deploy");
|
bail!("--domain is required for production apply on first deploy");
|
||||||
}
|
}
|
||||||
let overlay = infra_dir.join("overlays").join("production");
|
let overlay = infra_dir.join("overlays").join("production");
|
||||||
(d, overlay)
|
(d, overlay)
|
||||||
@@ -404,16 +404,313 @@ async fn patch_tuwunel_oauth2_redirect(domain: &str) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ---------------------------------------------------------------------------
|
||||||
|
// OpenSearch helpers (kube exec + curl inside pod)
|
||||||
|
// ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
/// Call OpenSearch API via kube exec curl inside the opensearch pod.
|
||||||
|
async fn os_api(path: &str, method: &str, body: Option<&str>) -> Option<String> {
|
||||||
|
let url = format!("http://localhost:9200{path}");
|
||||||
|
let mut curl_args: Vec<&str> = vec!["curl", "-sf", &url];
|
||||||
|
if method != "GET" {
|
||||||
|
curl_args.extend_from_slice(&["-X", method]);
|
||||||
|
}
|
||||||
|
let body_string;
|
||||||
|
if let Some(b) = body {
|
||||||
|
body_string = b.to_string();
|
||||||
|
curl_args.extend_from_slice(&["-H", "Content-Type: application/json", "-d", &body_string]);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Build the full exec command: exec deploy/opensearch -n data -c opensearch -- curl ...
|
||||||
|
let mut exec_cmd: Vec<&str> = vec!["curl"];
|
||||||
|
exec_cmd = curl_args;
|
||||||
|
|
||||||
|
match crate::kube::kube_exec("data", "opensearch-0", &exec_cmd, Some("opensearch")).await {
|
||||||
|
Ok((0, out)) if !out.is_empty() => Some(out),
|
||||||
|
_ => None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Inject OpenSearch model_id into matrix/opensearch-ml-config ConfigMap.
|
/// Inject OpenSearch model_id into matrix/opensearch-ml-config ConfigMap.
|
||||||
async fn inject_opensearch_model_id() {
|
async fn inject_opensearch_model_id() {
|
||||||
// Read model_id from the ingest pipeline via OpenSearch API
|
let pipe_resp =
|
||||||
// This requires port-forward to opensearch — skip if not reachable
|
match os_api("/_ingest/pipeline/tuwunel_embedding_pipeline", "GET", None).await {
|
||||||
// TODO: implement opensearch API calls via port-forward + reqwest
|
Some(r) => r,
|
||||||
|
None => {
|
||||||
|
crate::output::warn(
|
||||||
|
"OpenSearch ingest pipeline not found -- skipping model_id injection.",
|
||||||
|
);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let model_id = serde_json::from_str::<serde_json::Value>(&pipe_resp)
|
||||||
|
.ok()
|
||||||
|
.and_then(|v| {
|
||||||
|
v.get("tuwunel_embedding_pipeline")?
|
||||||
|
.get("processors")?
|
||||||
|
.as_array()?
|
||||||
|
.iter()
|
||||||
|
.find_map(|p| {
|
||||||
|
p.get("text_embedding")?
|
||||||
|
.get("model_id")?
|
||||||
|
.as_str()
|
||||||
|
.map(String::from)
|
||||||
|
})
|
||||||
|
});
|
||||||
|
|
||||||
|
let Some(model_id) = model_id else {
|
||||||
|
crate::output::warn(
|
||||||
|
"No model_id in ingest pipeline -- tuwunel hybrid search unavailable.",
|
||||||
|
);
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
|
||||||
|
// Check if ConfigMap already has this value
|
||||||
|
if let Ok(current) =
|
||||||
|
crate::kube::kube_get_secret_field("matrix", "opensearch-ml-config", "model_id").await
|
||||||
|
{
|
||||||
|
if current == model_id {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let cm = serde_json::json!({
|
||||||
|
"apiVersion": "v1",
|
||||||
|
"kind": "ConfigMap",
|
||||||
|
"metadata": {"name": "opensearch-ml-config", "namespace": "matrix"},
|
||||||
|
"data": {"model_id": &model_id},
|
||||||
|
});
|
||||||
|
|
||||||
|
let manifest = serde_json::to_string(&cm).unwrap_or_default();
|
||||||
|
if let Err(e) = crate::kube::kube_apply(&manifest).await {
|
||||||
|
crate::output::warn(&format!("Failed to inject OpenSearch model_id: {e}"));
|
||||||
|
} else {
|
||||||
|
crate::output::ok(&format!(
|
||||||
|
"Injected OpenSearch model_id ({model_id}) into matrix/opensearch-ml-config."
|
||||||
|
));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Configure OpenSearch ML Commons for neural search.
|
/// Configure OpenSearch ML Commons for neural search.
|
||||||
|
///
|
||||||
|
/// 1. Sets cluster settings to allow ML on data nodes.
|
||||||
|
/// 2. Registers and deploys all-mpnet-base-v2 (pre-trained, 384-dim).
|
||||||
|
/// 3. Creates ingest + search pipelines for hybrid BM25+neural scoring.
|
||||||
async fn ensure_opensearch_ml() {
|
async fn ensure_opensearch_ml() {
|
||||||
// TODO: implement opensearch ML setup via port-forward + reqwest
|
if os_api("/_cluster/health", "GET", None).await.is_none() {
|
||||||
|
crate::output::warn("OpenSearch not reachable -- skipping ML setup.");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 1. ML Commons cluster settings
|
||||||
|
let settings = serde_json::json!({
|
||||||
|
"persistent": {
|
||||||
|
"plugins.ml_commons.only_run_on_ml_node": false,
|
||||||
|
"plugins.ml_commons.native_memory_threshold": 90,
|
||||||
|
"plugins.ml_commons.model_access_control_enabled": false,
|
||||||
|
"plugins.ml_commons.allow_registering_model_via_url": true,
|
||||||
|
}
|
||||||
|
});
|
||||||
|
os_api(
|
||||||
|
"/_cluster/settings",
|
||||||
|
"PUT",
|
||||||
|
Some(&serde_json::to_string(&settings).unwrap()),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
// 2. Check if model already registered and deployed
|
||||||
|
let search_body =
|
||||||
|
r#"{"query":{"match":{"name":"huggingface/sentence-transformers/all-mpnet-base-v2"}}}"#;
|
||||||
|
let search_resp = match os_api("/_plugins/_ml/models/_search", "POST", Some(search_body)).await
|
||||||
|
{
|
||||||
|
Some(r) => r,
|
||||||
|
None => {
|
||||||
|
crate::output::warn("OpenSearch ML search API failed -- skipping ML setup.");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let resp: serde_json::Value = match serde_json::from_str(&search_resp) {
|
||||||
|
Ok(v) => v,
|
||||||
|
Err(_) => return,
|
||||||
|
};
|
||||||
|
|
||||||
|
let hits = resp
|
||||||
|
.get("hits")
|
||||||
|
.and_then(|h| h.get("hits"))
|
||||||
|
.and_then(|h| h.as_array())
|
||||||
|
.cloned()
|
||||||
|
.unwrap_or_default();
|
||||||
|
|
||||||
|
let mut model_id: Option<String> = None;
|
||||||
|
let mut already_deployed = false;
|
||||||
|
|
||||||
|
for hit in &hits {
|
||||||
|
let state = hit
|
||||||
|
.get("_source")
|
||||||
|
.and_then(|s| s.get("model_state"))
|
||||||
|
.and_then(|v| v.as_str())
|
||||||
|
.unwrap_or("");
|
||||||
|
let id = hit.get("_id").and_then(|v| v.as_str()).unwrap_or("");
|
||||||
|
match state {
|
||||||
|
"DEPLOYED" => {
|
||||||
|
model_id = Some(id.to_string());
|
||||||
|
already_deployed = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
"REGISTERED" | "DEPLOYING" => {
|
||||||
|
model_id = Some(id.to_string());
|
||||||
|
}
|
||||||
|
_ => {}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if !already_deployed {
|
||||||
|
if let Some(ref mid) = model_id {
|
||||||
|
// Registered but not deployed -- deploy it
|
||||||
|
crate::output::ok("Deploying OpenSearch ML model...");
|
||||||
|
os_api(
|
||||||
|
&format!("/_plugins/_ml/models/{mid}/_deploy"),
|
||||||
|
"POST",
|
||||||
|
None,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
for _ in 0..30 {
|
||||||
|
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
|
||||||
|
if let Some(r) =
|
||||||
|
os_api(&format!("/_plugins/_ml/models/{mid}"), "GET", None).await
|
||||||
|
{
|
||||||
|
if r.contains("\"DEPLOYED\"") {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Register from pre-trained hub
|
||||||
|
crate::output::ok("Registering OpenSearch ML model (all-mpnet-base-v2)...");
|
||||||
|
let reg_body = serde_json::json!({
|
||||||
|
"name": "huggingface/sentence-transformers/all-mpnet-base-v2",
|
||||||
|
"version": "1.0.1",
|
||||||
|
"model_format": "TORCH_SCRIPT",
|
||||||
|
});
|
||||||
|
let reg_resp = match os_api(
|
||||||
|
"/_plugins/_ml/models/_register",
|
||||||
|
"POST",
|
||||||
|
Some(&serde_json::to_string(®_body).unwrap()),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Some(r) => r,
|
||||||
|
None => {
|
||||||
|
crate::output::warn("Failed to register ML model -- skipping.");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let task_id = serde_json::from_str::<serde_json::Value>(®_resp)
|
||||||
|
.ok()
|
||||||
|
.and_then(|v| v.get("task_id")?.as_str().map(String::from))
|
||||||
|
.unwrap_or_default();
|
||||||
|
|
||||||
|
if task_id.is_empty() {
|
||||||
|
crate::output::warn("No task_id from model registration -- skipping.");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
crate::output::ok("Waiting for model registration...");
|
||||||
|
let mut registered_id = None;
|
||||||
|
for _ in 0..60 {
|
||||||
|
tokio::time::sleep(std::time::Duration::from_secs(10)).await;
|
||||||
|
if let Some(task_resp) =
|
||||||
|
os_api(&format!("/_plugins/_ml/tasks/{task_id}"), "GET", None).await
|
||||||
|
{
|
||||||
|
if let Ok(task) = serde_json::from_str::<serde_json::Value>(&task_resp) {
|
||||||
|
match task.get("state").and_then(|v| v.as_str()).unwrap_or("") {
|
||||||
|
"COMPLETED" => {
|
||||||
|
registered_id = task
|
||||||
|
.get("model_id")
|
||||||
|
.and_then(|v| v.as_str())
|
||||||
|
.map(String::from);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
"FAILED" => {
|
||||||
|
crate::output::warn(&format!(
|
||||||
|
"ML model registration failed: {task_resp}"
|
||||||
|
));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
_ => {}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let Some(mid) = registered_id else {
|
||||||
|
crate::output::warn("ML model registration timed out.");
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
|
||||||
|
crate::output::ok("Deploying ML model...");
|
||||||
|
os_api(
|
||||||
|
&format!("/_plugins/_ml/models/{mid}/_deploy"),
|
||||||
|
"POST",
|
||||||
|
None,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
for _ in 0..30 {
|
||||||
|
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
|
||||||
|
if let Some(r) =
|
||||||
|
os_api(&format!("/_plugins/_ml/models/{mid}"), "GET", None).await
|
||||||
|
{
|
||||||
|
if r.contains("\"DEPLOYED\"") {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
model_id = Some(mid);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let Some(model_id) = model_id else {
|
||||||
|
crate::output::warn("No ML model available -- skipping pipeline setup.");
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
|
||||||
|
// 3. Ingest pipeline
|
||||||
|
let ingest = serde_json::json!({
|
||||||
|
"description": "Tuwunel message embedding pipeline",
|
||||||
|
"processors": [{"text_embedding": {
|
||||||
|
"model_id": &model_id,
|
||||||
|
"field_map": {"body": "embedding"},
|
||||||
|
}}],
|
||||||
|
});
|
||||||
|
os_api(
|
||||||
|
"/_ingest/pipeline/tuwunel_embedding_pipeline",
|
||||||
|
"PUT",
|
||||||
|
Some(&serde_json::to_string(&ingest).unwrap()),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
// 4. Search pipeline
|
||||||
|
let search = serde_json::json!({
|
||||||
|
"description": "Tuwunel hybrid BM25+neural search pipeline",
|
||||||
|
"phase_results_processors": [{"normalization-processor": {
|
||||||
|
"normalization": {"technique": "min_max"},
|
||||||
|
"combination": {
|
||||||
|
"technique": "arithmetic_mean",
|
||||||
|
"parameters": {"weights": [0.3, 0.7]},
|
||||||
|
},
|
||||||
|
}}],
|
||||||
|
});
|
||||||
|
os_api(
|
||||||
|
"/_search/pipeline/tuwunel_hybrid_pipeline",
|
||||||
|
"PUT",
|
||||||
|
Some(&serde_json::to_string(&search).unwrap()),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
crate::output::ok(&format!("OpenSearch ML ready (model: {model_id})."));
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
|
|||||||
Reference in New Issue
Block a user