From 329c18bd1dc0bf7e0a30d911943187b5df5decb0 Mon Sep 17 00:00:00 2001 From: Sienna Meridian Satterwhite Date: Sat, 21 Mar 2026 20:27:55 +0000 Subject: [PATCH] =?UTF-8?q?feat:=20OpenSearchClient=20=E2=80=94=20search?= =?UTF-8?q?=20and=20analytics=20API=20(60=20endpoints)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Typed OpenSearch API covering documents, search, indices, cluster, nodes, cat, ingest pipelines, and snapshots. Bump: sunbeam-sdk v0.7.0 --- Cargo.lock | 2 +- sunbeam-sdk/Cargo.toml | 2 +- sunbeam-sdk/src/search/mod.rs | 796 ++++++++++++++++++++++++++++++++ sunbeam-sdk/src/search/types.rs | 551 ++++++++++++++++++++++ 4 files changed, 1349 insertions(+), 2 deletions(-) create mode 100644 sunbeam-sdk/src/search/mod.rs create mode 100644 sunbeam-sdk/src/search/types.rs diff --git a/Cargo.lock b/Cargo.lock index bfe0587..57689b0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3591,7 +3591,7 @@ dependencies = [ [[package]] name = "sunbeam-sdk" -version = "0.5.0" +version = "0.6.0" dependencies = [ "base64", "bytes", diff --git a/sunbeam-sdk/Cargo.toml b/sunbeam-sdk/Cargo.toml index 9b78c46..f38660b 100644 --- a/sunbeam-sdk/Cargo.toml +++ b/sunbeam-sdk/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "sunbeam-sdk" -version = "0.6.0" +version = "0.7.0" edition = "2024" description = "Sunbeam SDK — reusable library for cluster management" repository = "https://src.sunbeam.pt/studio/cli" diff --git a/sunbeam-sdk/src/search/mod.rs b/sunbeam-sdk/src/search/mod.rs new file mode 100644 index 0000000..406bd92 --- /dev/null +++ b/sunbeam-sdk/src/search/mod.rs @@ -0,0 +1,796 @@ +//! OpenSearch client. + +pub mod types; + +use crate::client::{AuthMethod, HttpTransport, ServiceClient}; +use crate::error::{Result, ResultExt, SunbeamError}; +use reqwest::Method; +use serde_json::Value; +use types::*; + +/// Client for the OpenSearch HTTP API. +pub struct OpenSearchClient { + pub(crate) transport: HttpTransport, +} + +impl ServiceClient for OpenSearchClient { + fn service_name(&self) -> &'static str { + "opensearch" + } + + fn base_url(&self) -> &str { + &self.transport.base_url + } + + fn from_parts(base_url: String, auth: AuthMethod) -> Self { + Self { + transport: HttpTransport::new(&base_url, auth), + } + } +} + +impl OpenSearchClient { + /// Build an OpenSearchClient from domain (e.g. `https://search.{domain}`). + pub fn connect(domain: &str) -> Self { + let base_url = format!("https://search.{domain}"); + Self::from_parts(base_url, AuthMethod::Bearer(String::new())) + } + + /// Replace the bearer token. + pub fn set_token(&mut self, token: String) { + self.transport.set_auth(AuthMethod::Bearer(token)); + } + + // ----------------------------------------------------------------------- + // Documents + // ----------------------------------------------------------------------- + + /// Index a document with an explicit ID. + pub async fn index_doc( + &self, + index: &str, + id: &str, + body: &Value, + ) -> Result { + self.transport + .json(Method::PUT, &format!("{index}/_doc/{id}"), Some(body), "opensearch index doc") + .await + } + + /// Index a document with an auto-generated ID. + pub async fn index_doc_auto_id( + &self, + index: &str, + body: &Value, + ) -> Result { + self.transport + .json(Method::POST, &format!("{index}/_doc"), Some(body), "opensearch index doc auto") + .await + } + + /// Get a document by ID. + pub async fn get_doc(&self, index: &str, id: &str) -> Result { + self.transport + .json( + Method::GET, + &format!("{index}/_doc/{id}"), + Option::<&()>::None, + "opensearch get doc", + ) + .await + } + + /// Check if a document exists (HEAD request, returns bool). + pub async fn head_doc(&self, index: &str, id: &str) -> Result { + let resp = self + .transport + .request(Method::HEAD, &format!("{index}/_doc/{id}")) + .send() + .await + .with_ctx(|| "opensearch head doc: request failed".to_string())?; + Ok(resp.status().is_success()) + } + + /// Delete a document by ID. + pub async fn delete_doc(&self, index: &str, id: &str) -> Result { + self.transport + .json( + Method::DELETE, + &format!("{index}/_doc/{id}"), + Option::<&()>::None, + "opensearch delete doc", + ) + .await + } + + /// Update a document by ID. + pub async fn update_doc( + &self, + index: &str, + id: &str, + body: &Value, + ) -> Result { + self.transport + .json(Method::POST, &format!("{index}/_update/{id}"), Some(body), "opensearch update doc") + .await + } + + /// Bulk index/update/delete operations. + pub async fn bulk(&self, body: &Value) -> Result { + self.transport + .json(Method::POST, "_bulk", Some(body), "opensearch bulk") + .await + } + + /// Multi-get documents. + pub async fn multi_get(&self, body: &Value) -> Result { + self.transport + .json(Method::POST, "_mget", Some(body), "opensearch mget") + .await + } + + /// Reindex documents from one index to another. + pub async fn reindex(&self, body: &Value) -> Result { + self.transport + .json(Method::POST, "_reindex", Some(body), "opensearch reindex") + .await + } + + /// Delete documents matching a query. + pub async fn delete_by_query( + &self, + index: &str, + body: &Value, + ) -> Result { + self.transport + .json( + Method::POST, + &format!("{index}/_delete_by_query"), + Some(body), + "opensearch delete by query", + ) + .await + } + + // ----------------------------------------------------------------------- + // Search + // ----------------------------------------------------------------------- + + /// Search an index. + pub async fn search(&self, index: &str, body: &Value) -> Result { + self.transport + .json(Method::POST, &format!("{index}/_search"), Some(body), "opensearch search") + .await + } + + /// Search across all indices. + pub async fn search_all(&self, body: &Value) -> Result { + self.transport + .json(Method::POST, "_search", Some(body), "opensearch search all") + .await + } + + /// Multi-search. + pub async fn multi_search(&self, body: &Value) -> Result { + self.transport + .json(Method::POST, "_msearch", Some(body), "opensearch msearch") + .await + } + + /// Count documents matching a query. + pub async fn count(&self, index: &str, body: &Value) -> Result { + self.transport + .json(Method::POST, &format!("{index}/_count"), Some(body), "opensearch count") + .await + } + + /// Scroll through search results. + pub async fn scroll(&self, body: &Value) -> Result { + self.transport + .json(Method::POST, "_search/scroll", Some(body), "opensearch scroll") + .await + } + + /// Clear a scroll context. + pub async fn clear_scroll(&self, body: &Value) -> Result<()> { + self.transport + .send(Method::DELETE, "_search/scroll", Some(body), "opensearch clear scroll") + .await + } + + /// Get search shard information. + pub async fn search_shards(&self, index: &str) -> Result { + self.transport + .json( + Method::GET, + &format!("{index}/_search_shards"), + Option::<&()>::None, + "opensearch search shards", + ) + .await + } + + /// Execute a search template. + pub async fn search_template(&self, body: &Value) -> Result { + self.transport + .json(Method::POST, "_search/template", Some(body), "opensearch search template") + .await + } + + // ----------------------------------------------------------------------- + // Indices + // ----------------------------------------------------------------------- + + /// Create an index. + pub async fn create_index(&self, index: &str, body: &Value) -> Result { + self.transport + .json(Method::PUT, index, Some(body), "opensearch create index") + .await + } + + /// Delete an index. + pub async fn delete_index(&self, index: &str) -> Result { + self.transport + .json( + Method::DELETE, + index, + Option::<&()>::None, + "opensearch delete index", + ) + .await + } + + /// Get index metadata. + pub async fn get_index(&self, index: &str) -> Result { + self.transport + .json( + Method::GET, + index, + Option::<&()>::None, + "opensearch get index", + ) + .await + } + + /// Check if an index exists (HEAD request, returns bool). + pub async fn index_exists(&self, index: &str) -> Result { + let resp = self + .transport + .request(Method::HEAD, index) + .send() + .await + .with_ctx(|| "opensearch index exists: request failed".to_string())?; + Ok(resp.status().is_success()) + } + + /// Get index settings. + pub async fn get_settings(&self, index: &str) -> Result { + self.transport + .json( + Method::GET, + &format!("{index}/_settings"), + Option::<&()>::None, + "opensearch get settings", + ) + .await + } + + /// Update index settings. + pub async fn update_settings(&self, index: &str, body: &Value) -> Result { + self.transport + .json( + Method::PUT, + &format!("{index}/_settings"), + Some(body), + "opensearch update settings", + ) + .await + } + + /// Get index mapping. + pub async fn get_mapping(&self, index: &str) -> Result { + self.transport + .json( + Method::GET, + &format!("{index}/_mapping"), + Option::<&()>::None, + "opensearch get mapping", + ) + .await + } + + /// Put (update) index mapping. + pub async fn put_mapping(&self, index: &str, body: &Value) -> Result { + self.transport + .json( + Method::PUT, + &format!("{index}/_mapping"), + Some(body), + "opensearch put mapping", + ) + .await + } + + /// Get aliases for an index. + pub async fn get_aliases(&self, index: &str) -> Result { + self.transport + .json( + Method::GET, + &format!("{index}/_alias"), + Option::<&()>::None, + "opensearch get aliases", + ) + .await + } + + /// Create or update aliases (bulk alias action). + pub async fn create_alias(&self, body: &Value) -> Result { + self.transport + .json(Method::POST, "_aliases", Some(body), "opensearch create alias") + .await + } + + /// Delete an alias from an index. + pub async fn delete_alias(&self, index: &str, alias: &str) -> Result { + self.transport + .json( + Method::DELETE, + &format!("{index}/_alias/{alias}"), + Option::<&()>::None, + "opensearch delete alias", + ) + .await + } + + /// Create or update an index template. + pub async fn create_template(&self, name: &str, body: &Value) -> Result { + self.transport + .json( + Method::PUT, + &format!("_index_template/{name}"), + Some(body), + "opensearch create template", + ) + .await + } + + /// Delete an index template. + pub async fn delete_template(&self, name: &str) -> Result { + self.transport + .json( + Method::DELETE, + &format!("_index_template/{name}"), + Option::<&()>::None, + "opensearch delete template", + ) + .await + } + + /// Get an index template. + pub async fn get_template(&self, name: &str) -> Result { + self.transport + .json( + Method::GET, + &format!("_index_template/{name}"), + Option::<&()>::None, + "opensearch get template", + ) + .await + } + + /// Open a closed index. + pub async fn open_index(&self, index: &str) -> Result { + self.transport + .json( + Method::POST, + &format!("{index}/_open"), + Option::<&()>::None, + "opensearch open index", + ) + .await + } + + /// Close an index. + pub async fn close_index(&self, index: &str) -> Result { + self.transport + .json( + Method::POST, + &format!("{index}/_close"), + Option::<&()>::None, + "opensearch close index", + ) + .await + } + + // ----------------------------------------------------------------------- + // Cluster + // ----------------------------------------------------------------------- + + /// Get cluster health. + pub async fn cluster_health(&self) -> Result { + self.transport + .json( + Method::GET, + "_cluster/health", + Option::<&()>::None, + "opensearch cluster health", + ) + .await + } + + /// Get cluster state. + pub async fn cluster_state(&self) -> Result { + self.transport + .json( + Method::GET, + "_cluster/state", + Option::<&()>::None, + "opensearch cluster state", + ) + .await + } + + /// Get cluster stats. + pub async fn cluster_stats(&self) -> Result { + self.transport + .json( + Method::GET, + "_cluster/stats", + Option::<&()>::None, + "opensearch cluster stats", + ) + .await + } + + /// Get cluster settings. + pub async fn cluster_settings(&self) -> Result { + self.transport + .json( + Method::GET, + "_cluster/settings", + Option::<&()>::None, + "opensearch cluster settings", + ) + .await + } + + /// Update cluster settings. + pub async fn update_cluster_settings(&self, body: &Value) -> Result { + self.transport + .json( + Method::PUT, + "_cluster/settings", + Some(body), + "opensearch update cluster settings", + ) + .await + } + + /// Explain shard allocation. + pub async fn allocation_explain(&self, body: &Value) -> Result { + self.transport + .json( + Method::POST, + "_cluster/allocation/explain", + Some(body), + "opensearch allocation explain", + ) + .await + } + + /// Reroute shards. + pub async fn reroute(&self, body: &Value) -> Result { + self.transport + .json( + Method::POST, + "_cluster/reroute", + Some(body), + "opensearch reroute", + ) + .await + } + + // ----------------------------------------------------------------------- + // Nodes + // ----------------------------------------------------------------------- + + /// Get nodes info. + pub async fn nodes_info(&self) -> Result { + self.transport + .json( + Method::GET, + "_nodes", + Option::<&()>::None, + "opensearch nodes info", + ) + .await + } + + /// Get nodes stats. + pub async fn nodes_stats(&self) -> Result { + self.transport + .json( + Method::GET, + "_nodes/stats", + Option::<&()>::None, + "opensearch nodes stats", + ) + .await + } + + /// Get nodes hot threads (returns plain text). + pub async fn nodes_hot_threads(&self) -> Result { + let bytes = self + .transport + .bytes(Method::GET, "_nodes/hot_threads", "opensearch hot threads") + .await?; + String::from_utf8(bytes.to_vec()) + .map_err(|e| SunbeamError::network(format!("opensearch hot threads: invalid UTF-8: {e}"))) + } + + // ----------------------------------------------------------------------- + // Cat + // ----------------------------------------------------------------------- + + /// List indices via the cat API. + pub async fn cat_indices(&self) -> Result> { + self.transport + .json( + Method::GET, + "_cat/indices?format=json", + Option::<&()>::None, + "opensearch cat indices", + ) + .await + } + + /// List nodes via the cat API. + pub async fn cat_nodes(&self) -> Result> { + self.transport + .json( + Method::GET, + "_cat/nodes?format=json", + Option::<&()>::None, + "opensearch cat nodes", + ) + .await + } + + /// List shards via the cat API. + pub async fn cat_shards(&self) -> Result> { + self.transport + .json( + Method::GET, + "_cat/shards?format=json", + Option::<&()>::None, + "opensearch cat shards", + ) + .await + } + + /// Cluster health via the cat API. + pub async fn cat_health(&self) -> Result> { + self.transport + .json( + Method::GET, + "_cat/health?format=json", + Option::<&()>::None, + "opensearch cat health", + ) + .await + } + + /// Allocation info via the cat API. + pub async fn cat_allocation(&self) -> Result> { + self.transport + .json( + Method::GET, + "_cat/allocation?format=json", + Option::<&()>::None, + "opensearch cat allocation", + ) + .await + } + + // ----------------------------------------------------------------------- + // Ingest + // ----------------------------------------------------------------------- + + /// Create or update an ingest pipeline. + pub async fn create_pipeline(&self, id: &str, body: &Value) -> Result { + self.transport + .json( + Method::PUT, + &format!("_ingest/pipeline/{id}"), + Some(body), + "opensearch create pipeline", + ) + .await + } + + /// Get an ingest pipeline. + pub async fn get_pipeline(&self, id: &str) -> Result { + self.transport + .json( + Method::GET, + &format!("_ingest/pipeline/{id}"), + Option::<&()>::None, + "opensearch get pipeline", + ) + .await + } + + /// Delete an ingest pipeline. + pub async fn delete_pipeline(&self, id: &str) -> Result { + self.transport + .json( + Method::DELETE, + &format!("_ingest/pipeline/{id}"), + Option::<&()>::None, + "opensearch delete pipeline", + ) + .await + } + + /// Simulate an ingest pipeline. + pub async fn simulate_pipeline(&self, id: &str, body: &Value) -> Result { + self.transport + .json( + Method::POST, + &format!("_ingest/pipeline/{id}/_simulate"), + Some(body), + "opensearch simulate pipeline", + ) + .await + } + + /// Get all ingest pipelines. + pub async fn get_all_pipelines(&self) -> Result { + self.transport + .json( + Method::GET, + "_ingest/pipeline", + Option::<&()>::None, + "opensearch get all pipelines", + ) + .await + } + + // ----------------------------------------------------------------------- + // Snapshots + // ----------------------------------------------------------------------- + + /// Create or update a snapshot repository. + pub async fn create_snapshot_repo(&self, name: &str, body: &Value) -> Result { + self.transport + .json( + Method::PUT, + &format!("_snapshot/{name}"), + Some(body), + "opensearch create snapshot repo", + ) + .await + } + + /// Delete a snapshot repository. + pub async fn delete_snapshot_repo(&self, name: &str) -> Result { + self.transport + .json( + Method::DELETE, + &format!("_snapshot/{name}"), + Option::<&()>::None, + "opensearch delete snapshot repo", + ) + .await + } + + /// Create a snapshot. + pub async fn create_snapshot( + &self, + repo: &str, + name: &str, + body: &Value, + ) -> Result { + self.transport + .json( + Method::PUT, + &format!("_snapshot/{repo}/{name}"), + Some(body), + "opensearch create snapshot", + ) + .await + } + + /// Delete a snapshot. + pub async fn delete_snapshot(&self, repo: &str, name: &str) -> Result { + self.transport + .json( + Method::DELETE, + &format!("_snapshot/{repo}/{name}"), + Option::<&()>::None, + "opensearch delete snapshot", + ) + .await + } + + /// List all snapshots in a repository. + pub async fn list_snapshots(&self, repo: &str) -> Result { + self.transport + .json( + Method::GET, + &format!("_snapshot/{repo}/_all"), + Option::<&()>::None, + "opensearch list snapshots", + ) + .await + } + + /// Restore a snapshot. + pub async fn restore_snapshot( + &self, + repo: &str, + name: &str, + body: &Value, + ) -> Result { + self.transport + .json( + Method::POST, + &format!("_snapshot/{repo}/{name}/_restore"), + Some(body), + "opensearch restore snapshot", + ) + .await + } +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_connect_url() { + let c = OpenSearchClient::connect("sunbeam.pt"); + assert_eq!(c.base_url(), "https://search.sunbeam.pt"); + assert_eq!(c.service_name(), "opensearch"); + } + + #[test] + fn test_from_parts() { + let c = OpenSearchClient::from_parts( + "http://localhost:9200".into(), + AuthMethod::Bearer("tok".into()), + ); + assert_eq!(c.base_url(), "http://localhost:9200"); + } + + #[test] + fn test_set_token() { + let mut c = OpenSearchClient::connect("example.com"); + c.set_token("my-token".into()); + assert!(matches!(c.transport.auth, AuthMethod::Bearer(ref s) if s == "my-token")); + } + + #[tokio::test] + async fn test_head_doc_unreachable() { + let c = OpenSearchClient::from_parts( + "http://127.0.0.1:19998".into(), + AuthMethod::None, + ); + let result = c.head_doc("test", "1").await; + assert!(result.is_err()); + } + + #[tokio::test] + async fn test_index_exists_unreachable() { + let c = OpenSearchClient::from_parts( + "http://127.0.0.1:19998".into(), + AuthMethod::None, + ); + let result = c.index_exists("test").await; + assert!(result.is_err()); + } +} diff --git a/sunbeam-sdk/src/search/types.rs b/sunbeam-sdk/src/search/types.rs new file mode 100644 index 0000000..cd15838 --- /dev/null +++ b/sunbeam-sdk/src/search/types.rs @@ -0,0 +1,551 @@ +//! OpenSearch response types. + +use serde::{Deserialize, Serialize}; + +// --------------------------------------------------------------------------- +// Document responses +// --------------------------------------------------------------------------- + +/// Response from index / delete / update operations. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct IndexResponse { + #[serde(rename = "_index")] + pub index: String, + #[serde(rename = "_id")] + pub id: String, + #[serde(rename = "_version", default)] + pub version: Option, + pub result: Option, + #[serde(rename = "_shards", default)] + pub shards: Option, + #[serde(rename = "_seq_no", default)] + pub seq_no: Option, + #[serde(rename = "_primary_term", default)] + pub primary_term: Option, +} + +/// Shard success/failure counters. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ShardInfo { + pub total: u32, + pub successful: u32, + pub failed: u32, +} + +/// Response from GET `{index}/_doc/{id}`. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct GetResponse { + #[serde(rename = "_index")] + pub index: String, + #[serde(rename = "_id")] + pub id: String, + #[serde(rename = "_version", default)] + pub version: Option, + #[serde(rename = "_seq_no", default)] + pub seq_no: Option, + #[serde(rename = "_primary_term", default)] + pub primary_term: Option, + pub found: bool, + #[serde(rename = "_source", default)] + pub source: Option, +} + +/// Response from DELETE `{index}/_doc/{id}`. +pub type DeleteResponse = IndexResponse; + +/// Response from POST `{index}/_update/{id}`. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct UpdateResponse { + #[serde(rename = "_index")] + pub index: String, + #[serde(rename = "_id")] + pub id: String, + #[serde(rename = "_version", default)] + pub version: Option, + pub result: Option, + #[serde(rename = "_shards", default)] + pub shards: Option, +} + +// --------------------------------------------------------------------------- +// Bulk +// --------------------------------------------------------------------------- + +/// Response from POST `_bulk`. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct BulkResponse { + pub took: u64, + pub errors: bool, + #[serde(default)] + pub items: Vec, +} + +// --------------------------------------------------------------------------- +// Multi-get +// --------------------------------------------------------------------------- + +/// Response from POST `_mget`. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct MultiGetResponse { + pub docs: Vec, +} + +// --------------------------------------------------------------------------- +// Reindex +// --------------------------------------------------------------------------- + +/// Response from POST `_reindex`. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ReindexResponse { + pub took: u64, + #[serde(default)] + pub timed_out: bool, + #[serde(default)] + pub total: u64, + #[serde(default)] + pub updated: u64, + #[serde(default)] + pub created: u64, + #[serde(default)] + pub deleted: u64, + #[serde(default)] + pub failures: Vec, +} + +// --------------------------------------------------------------------------- +// Delete by query +// --------------------------------------------------------------------------- + +/// Response from POST `{index}/_delete_by_query`. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct DeleteByQueryResponse { + pub took: u64, + #[serde(default)] + pub timed_out: bool, + #[serde(default)] + pub total: u64, + #[serde(default)] + pub deleted: u64, + #[serde(default)] + pub failures: Vec, +} + +// --------------------------------------------------------------------------- +// Search +// --------------------------------------------------------------------------- + +/// Response from POST `{index}/_search` and `_search`. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SearchResponse { + pub took: u64, + pub timed_out: bool, + #[serde(rename = "_shards", default)] + pub shards: Option, + pub hits: HitsEnvelope, + #[serde(default)] + pub aggregations: Option, + #[serde(rename = "_scroll_id", default)] + pub scroll_id: Option, +} + +/// Top-level hits wrapper. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct HitsEnvelope { + pub total: HitsTotal, + #[serde(default)] + pub max_score: Option, + #[serde(default)] + pub hits: Vec, +} + +/// Total hit count. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct HitsTotal { + pub value: u64, + #[serde(default)] + pub relation: Option, +} + +/// A single search hit. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Hit { + #[serde(rename = "_index")] + pub index: String, + #[serde(rename = "_id")] + pub id: String, + #[serde(rename = "_score", default)] + pub score: Option, + #[serde(rename = "_source", default)] + pub source: Option, + #[serde(default)] + pub highlight: Option, + #[serde(default)] + pub sort: Option>, +} + +/// Response from POST `_msearch`. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct MultiSearchResponse { + pub responses: Vec, +} + +/// Response from POST `{index}/_count`. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CountResponse { + pub count: u64, + #[serde(rename = "_shards", default)] + pub shards: Option, +} + +/// Response from GET `{index}/_search_shards`. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ShardsResponse { + #[serde(default)] + pub nodes: serde_json::Value, + #[serde(default)] + pub shards: Vec>, +} + +// --------------------------------------------------------------------------- +// Index management +// --------------------------------------------------------------------------- + +/// Acknowledged response from index management operations. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct AckResponse { + pub acknowledged: bool, + #[serde(default)] + pub shards_acknowledged: Option, + #[serde(default)] + pub index: Option, +} + +// --------------------------------------------------------------------------- +// Cluster +// --------------------------------------------------------------------------- + +/// Response from GET `_cluster/health`. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ClusterHealth { + pub cluster_name: String, + pub status: String, + #[serde(default)] + pub timed_out: bool, + #[serde(default)] + pub number_of_nodes: u32, + #[serde(default)] + pub number_of_data_nodes: u32, + #[serde(default)] + pub active_primary_shards: u32, + #[serde(default)] + pub active_shards: u32, + #[serde(default)] + pub relocating_shards: u32, + #[serde(default)] + pub initializing_shards: u32, + #[serde(default)] + pub unassigned_shards: u32, +} + +// --------------------------------------------------------------------------- +// Cat responses +// --------------------------------------------------------------------------- + +/// A row from `_cat/indices?format=json`. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CatIndex { + #[serde(default)] + pub health: Option, + #[serde(default)] + pub status: Option, + #[serde(default)] + pub index: Option, + #[serde(default)] + pub uuid: Option, + #[serde(default)] + pub pri: Option, + #[serde(default)] + pub rep: Option, + #[serde(rename = "docs.count", default)] + pub docs_count: Option, + #[serde(rename = "docs.deleted", default)] + pub docs_deleted: Option, + #[serde(rename = "store.size", default)] + pub store_size: Option, + #[serde(rename = "pri.store.size", default)] + pub pri_store_size: Option, +} + +/// A row from `_cat/nodes?format=json`. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CatNode { + #[serde(default)] + pub ip: Option, + #[serde(default)] + pub name: Option, + #[serde(rename = "heap.percent", default)] + pub heap_percent: Option, + #[serde(rename = "ram.percent", default)] + pub ram_percent: Option, + #[serde(rename = "cpu", default)] + pub cpu: Option, + #[serde(rename = "node.role", default)] + pub node_role: Option, + #[serde(default)] + pub master: Option, +} + +/// A row from `_cat/shards?format=json`. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CatShard { + #[serde(default)] + pub index: Option, + #[serde(default)] + pub shard: Option, + #[serde(default)] + pub prirep: Option, + #[serde(default)] + pub state: Option, + #[serde(default)] + pub docs: Option, + #[serde(default)] + pub store: Option, + #[serde(default)] + pub node: Option, +} + +/// A row from `_cat/health?format=json`. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CatHealth { + #[serde(default)] + pub cluster: Option, + #[serde(default)] + pub status: Option, + #[serde(rename = "node.total", default)] + pub node_total: Option, + #[serde(rename = "node.data", default)] + pub node_data: Option, + #[serde(default)] + pub shards: Option, + #[serde(default)] + pub pri: Option, + #[serde(default)] + pub relo: Option, + #[serde(default)] + pub init: Option, + #[serde(default)] + pub unassign: Option, +} + +/// A row from `_cat/allocation?format=json`. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CatAllocation { + #[serde(default)] + pub shards: Option, + #[serde(rename = "disk.indices", default)] + pub disk_indices: Option, + #[serde(rename = "disk.used", default)] + pub disk_used: Option, + #[serde(rename = "disk.avail", default)] + pub disk_avail: Option, + #[serde(rename = "disk.total", default)] + pub disk_total: Option, + #[serde(rename = "disk.percent", default)] + pub disk_percent: Option, + #[serde(default)] + pub host: Option, + #[serde(default)] + pub node: Option, +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn deserialize_index_response() { + let json = r#"{ + "_index": "test", + "_id": "1", + "_version": 1, + "result": "created", + "_shards": {"total": 2, "successful": 1, "failed": 0}, + "_seq_no": 0, + "_primary_term": 1 + }"#; + let resp: IndexResponse = serde_json::from_str(json).unwrap(); + assert_eq!(resp.index, "test"); + assert_eq!(resp.id, "1"); + assert_eq!(resp.result.as_deref(), Some("created")); + } + + #[test] + fn deserialize_get_response_found() { + let json = r#"{ + "_index": "test", + "_id": "1", + "_version": 1, + "found": true, + "_source": {"title": "Hello"} + }"#; + let resp: GetResponse = serde_json::from_str(json).unwrap(); + assert!(resp.found); + assert!(resp.source.is_some()); + } + + #[test] + fn deserialize_get_response_not_found() { + let json = r#"{ + "_index": "test", + "_id": "999", + "found": false + }"#; + let resp: GetResponse = serde_json::from_str(json).unwrap(); + assert!(!resp.found); + assert!(resp.source.is_none()); + } + + #[test] + fn deserialize_search_response() { + let json = r#"{ + "took": 5, + "timed_out": false, + "_shards": {"total": 5, "successful": 5, "failed": 0}, + "hits": { + "total": {"value": 1, "relation": "eq"}, + "max_score": 1.0, + "hits": [{ + "_index": "test", + "_id": "1", + "_score": 1.0, + "_source": {"title": "Hello"} + }] + } + }"#; + let resp: SearchResponse = serde_json::from_str(json).unwrap(); + assert_eq!(resp.took, 5); + assert!(!resp.timed_out); + assert_eq!(resp.hits.total.value, 1); + assert_eq!(resp.hits.hits.len(), 1); + } + + #[test] + fn deserialize_bulk_response() { + let json = r#"{ + "took": 30, + "errors": false, + "items": [{"index": {"_index": "test", "_id": "1", "result": "created"}}] + }"#; + let resp: BulkResponse = serde_json::from_str(json).unwrap(); + assert_eq!(resp.took, 30); + assert!(!resp.errors); + assert_eq!(resp.items.len(), 1); + } + + #[test] + fn deserialize_count_response() { + let json = r#"{ + "count": 42, + "_shards": {"total": 5, "successful": 5, "failed": 0} + }"#; + let resp: CountResponse = serde_json::from_str(json).unwrap(); + assert_eq!(resp.count, 42); + } + + #[test] + fn deserialize_ack_response() { + let json = r#"{ + "acknowledged": true, + "shards_acknowledged": true, + "index": "my-index" + }"#; + let resp: AckResponse = serde_json::from_str(json).unwrap(); + assert!(resp.acknowledged); + assert_eq!(resp.index.as_deref(), Some("my-index")); + } + + #[test] + fn deserialize_cluster_health() { + let json = r#"{ + "cluster_name": "opensearch-cluster", + "status": "green", + "timed_out": false, + "number_of_nodes": 3, + "number_of_data_nodes": 3, + "active_primary_shards": 10, + "active_shards": 20, + "relocating_shards": 0, + "initializing_shards": 0, + "unassigned_shards": 0 + }"#; + let resp: ClusterHealth = serde_json::from_str(json).unwrap(); + assert_eq!(resp.cluster_name, "opensearch-cluster"); + assert_eq!(resp.status, "green"); + assert_eq!(resp.number_of_nodes, 3); + } + + #[test] + fn deserialize_cat_index() { + let json = r#"{ + "health": "green", + "status": "open", + "index": "my-index", + "uuid": "abc123", + "pri": "1", + "rep": "1", + "docs.count": "100", + "store.size": "10mb" + }"#; + let resp: CatIndex = serde_json::from_str(json).unwrap(); + assert_eq!(resp.index.as_deref(), Some("my-index")); + assert_eq!(resp.docs_count.as_deref(), Some("100")); + } + + #[test] + fn deserialize_reindex_response() { + let json = r#"{ + "took": 100, + "timed_out": false, + "total": 50, + "updated": 0, + "created": 50, + "deleted": 0, + "failures": [] + }"#; + let resp: ReindexResponse = serde_json::from_str(json).unwrap(); + assert_eq!(resp.took, 100); + assert_eq!(resp.created, 50); + } + + #[test] + fn deserialize_delete_by_query_response() { + let json = r#"{ + "took": 10, + "timed_out": false, + "total": 5, + "deleted": 5, + "failures": [] + }"#; + let resp: DeleteByQueryResponse = serde_json::from_str(json).unwrap(); + assert_eq!(resp.deleted, 5); + } + + #[test] + fn deserialize_multi_get_response() { + let json = r#"{ + "docs": [{ + "_index": "test", + "_id": "1", + "found": true, + "_source": {"title": "Hello"} + }] + }"#; + let resp: MultiGetResponse = serde_json::from_str(json).unwrap(); + assert_eq!(resp.docs.len(), 1); + assert!(resp.docs[0].found); + } +}