From 0dbe79df8e78d795e68a0aaebb467bb0bacbfcea Mon Sep 17 00:00:00 2001 From: dasha_uwu Date: Thu, 22 Jan 2026 01:58:48 +0500 Subject: [PATCH] Refactor `admin query raw` --- src/admin/query/raw.rs | 297 ++++++++++++++++++++++------------------- 1 file changed, 162 insertions(+), 135 deletions(-) diff --git a/src/admin/query/raw.rs b/src/admin/query/raw.rs index 38a6bbbe..6113fc03 100644 --- a/src/admin/query/raw.rs +++ b/src/admin/query/raw.rs @@ -1,17 +1,20 @@ -use std::{borrow::Cow, collections::BTreeMap, ops::Deref, sync::Arc}; +use std::{collections::BTreeMap, fmt::Write, sync::Arc}; use base64::prelude::*; use clap::Subcommand; -use futures::{FutureExt, Stream, StreamExt, TryStreamExt}; +use futures::{FutureExt, StreamExt, TryStreamExt}; use tokio::time::Instant; use tuwunel_core::{ - Err, Result, apply, at, is_zero, + Err, Result, apply, at, err, is_zero, + itertools::Itertools, utils::{ + TryReadyExt, + math::Expected, stream::{IterStream, ReadyExt, TryIgnore, TryParallelExt}, string::EMPTY, }, }; -use tuwunel_database::Map; +use tuwunel_database::{KeyVal, Map}; use tuwunel_service::Services; use crate::{admin_command, admin_command_dispatch}; @@ -43,6 +46,39 @@ pub(crate) enum RawCommand { /// Key prefix prefix: Option, + + /// Limit + #[arg(short, long)] + limit: Option, + + /// Lower bound + #[arg(short, long)] + from: Option, + + /// Reverse iteration order + #[arg(short, long, default_value("false"))] + backwards: bool, + }, + + /// - Raw database items iteration + Iter { + /// Map name + map: String, + + /// Key prefix + prefix: Option, + + /// Limit + #[arg(short, long)] + limit: Option, + + /// Lower bound + #[arg(short, long)] + from: Option, + + /// Reverse iteration order + #[arg(short, long, default_value("false"))] + backwards: bool, }, /// - Raw database key size breakdown @@ -81,41 +117,6 @@ pub(crate) enum RawCommand { prefix: Option, }, - /// - Raw database items iteration - Iter { - /// Map name - map: String, - - /// Key prefix - prefix: Option, - }, - - /// - Raw database keys iteration - KeysFrom { - /// Map name - map: String, - - /// Lower-bound - start: String, - - /// Limit - #[arg(short, long)] - limit: Option, - }, - - /// - Raw database items iteration - IterFrom { - /// Map name - map: String, - - /// Lower-bound - start: String, - - /// Limit - #[arg(short, long)] - limit: Option, - }, - /// - Raw database record count Count { /// Map name @@ -147,7 +148,7 @@ pub(crate) enum RawCommand { /// - Compact database DANGER!!! Compact { #[arg(short, long, alias("column"))] - map: Option>, + maps: Option>, #[arg(long)] start: Option, @@ -176,7 +177,7 @@ pub(crate) enum RawCommand { #[admin_command] pub(super) async fn raw_compact( &self, - map: Option>, + maps: Option>, start: Option, stop: Option, from: Option, @@ -186,26 +187,7 @@ pub(super) async fn raw_compact( ) -> Result { use tuwunel_database::compact::Options; - let default_all_maps: Option<_> = map.is_none().then(|| { - self.services - .db - .keys() - .map(Deref::deref) - .map(ToOwned::to_owned) - }); - - let maps: Vec<_> = map - .unwrap_or_default() - .into_iter() - .chain(default_all_maps.into_iter().flatten()) - .map(|map| self.services.db.get(&map)) - .filter_map(Result::ok) - .cloned() - .collect(); - - if maps.is_empty() { - return Err!("--map argument invalid. not found in database"); - } + let maps = with_maps_or(maps.as_deref(), self.services)?; let range = ( start @@ -247,7 +229,9 @@ pub(super) async fn raw_count(&self, map: Option, prefix: Option let prefix = prefix.as_deref().unwrap_or(EMPTY); let timer = Instant::now(); - let count = with_maps_or(map.as_deref(), self.services) + let count = with_map_or(map.as_deref(), self.services)? + .iter() + .stream() .then(|map| map.raw_count_prefix(&prefix)) .ready_fold(0_usize, usize::saturating_add) .await; @@ -258,16 +242,45 @@ pub(super) async fn raw_count(&self, map: Option, prefix: Option } #[admin_command] -pub(super) async fn raw_keys(&self, map: String, prefix: Option) -> Result { +pub(super) async fn raw_keys( + &self, + map: String, + prefix: Option, + limit: Option, + from: Option, + backwards: bool, +) -> Result { writeln!(self, "```").boxed().await?; let map = self.services.db.get(map.as_str())?; let timer = Instant::now(); - prefix - .as_deref() - .map_or_else(|| map.raw_keys().boxed(), |prefix| map.raw_keys_prefix(prefix).boxed()) - .map_ok(String::from_utf8_lossy) - .try_for_each(|str| writeln!(self, "{str:?}")) + + let stream = match from.as_ref().or(prefix.as_ref()) { + | Some(from) => + if !backwards { + map.raw_keys_from(from).boxed() + } else { + map.rev_raw_keys_from(from).boxed() + }, + | None => + if !backwards { + map.raw_keys().boxed() + } else { + map.rev_raw_keys().boxed() + }, + }; + + let prefix = prefix.as_ref().map(String::as_bytes); + + stream + .ready_try_take_while(|k| { + Ok(prefix + .map(|prefix| k.starts_with(prefix)) + .unwrap_or(true)) + }) + .take(limit.unwrap_or(usize::MAX)) + .map_ok(encode) + .try_for_each(|str| writeln!(self, "{str}")) .boxed() .await?; @@ -281,7 +294,9 @@ pub(super) async fn raw_keys_sizes(&self, map: Option, prefix: Option, prefix: Option, prefix: Option, prefix: Option, prefix: Option) -> Result { - writeln!(self, "```").await?; - - let map = self.services.db.get(&map)?; - let timer = Instant::now(); - prefix - .as_deref() - .map_or_else(|| map.raw_stream().boxed(), |prefix| map.raw_stream_prefix(prefix).boxed()) - .map_ok(apply!(2, String::from_utf8_lossy)) - .map_ok(apply!(2, Cow::into_owned)) - .try_for_each(|keyval| writeln!(self, "{keyval:?}")) - .boxed() - .await?; - - let query_time = timer.elapsed(); - self.write_str(&format!("\n```\n\nQuery completed in {query_time:?}")) - .await -} - -#[admin_command] -pub(super) async fn raw_keys_from( +pub(super) async fn raw_iter( &self, map: String, - start: String, + prefix: Option, limit: Option, + from: Option, + backwards: bool, ) -> Result { writeln!(self, "```").await?; let map = self.services.db.get(&map)?; let timer = Instant::now(); - map.raw_keys_from(&start) - .map_ok(String::from_utf8_lossy) + let stream = match from.as_ref().or(prefix.as_ref()) { + | Some(from) => + if !backwards { + map.raw_stream_from(from).boxed() + } else { + map.rev_raw_stream_from(from).boxed() + }, + | None => + if !backwards { + map.raw_stream().boxed() + } else { + map.rev_raw_stream().boxed() + }, + }; + + let prefix = prefix.as_ref().map(String::as_bytes); + + stream + .ready_try_take_while(|(k, _): &KeyVal<'_>| { + Ok(prefix + .map(|prefix| k.starts_with(prefix)) + .unwrap_or(true)) + }) .take(limit.unwrap_or(usize::MAX)) - .try_for_each(|str| writeln!(self, "{str:?}")) + .map_ok(apply!(2, encode)) + .try_for_each(|(key, val)| writeln!(self, "{{{key} => {val}}}")) .boxed() .await?; @@ -401,28 +426,6 @@ pub(super) async fn raw_keys_from( .await } -#[admin_command] -pub(super) async fn raw_iter_from( - &self, - map: String, - start: String, - limit: Option, -) -> Result { - let map = self.services.db.get(&map)?; - let timer = Instant::now(); - let result = map - .raw_stream_from(&start) - .map_ok(apply!(2, String::from_utf8_lossy)) - .map_ok(apply!(2, Cow::into_owned)) - .take(limit.unwrap_or(usize::MAX)) - .try_collect::>() - .await?; - - let query_time = timer.elapsed(); - self.write_str(&format!("Query completed in {query_time:?}:\n\n```rs\n{result:#?}\n```")) - .await -} - #[admin_command] pub(super) async fn raw_del(&self, map: String, key: String) -> Result { let map = self.services.db.get(&map)?; @@ -467,7 +470,7 @@ pub(super) async fn raw_get(&self, map: String, key: String, base64: bool) -> Re let result = if base64 { BASE64_STANDARD.encode(&handle) } else { - String::from_utf8_lossy(&handle).to_string() + encode(&handle) }; self.write_str(&format!("Query completed in {query_time:?}:\n\n```rs\n{result:?}\n```")) @@ -487,19 +490,43 @@ pub(super) async fn raw_maps(&self) -> Result { self.write_str(&format!("{list:#?}")).await } -fn with_maps_or<'a>( - map: Option<&'a str>, - services: &'a Services, -) -> impl Stream> + Send + 'a { - let default_all_maps = map - .is_none() - .then(|| services.db.keys().map(Deref::deref)) - .into_iter() - .flatten(); - - map.into_iter() - .chain(default_all_maps) - .map(|map| services.db.get(map)) - .filter_map(Result::ok) - .stream() +fn with_map_or(map: Option<&str>, services: &Services) -> Result>> { + with_maps_or( + map.map(|map| [map]) + .as_ref() + .map(<[&str; 1]>::as_slice), + services, + ) +} + +fn with_maps_or>(maps: Option<&[S]>, services: &Services) -> Result>> { + Ok(if let Some(maps) = maps { + maps.iter() + .map(|map| { + let map = map.as_ref(); + services + .db + .get(map) + .cloned() + .map_err(|_| err!("map {map} not found")) + }) + .try_collect()? + } else { + services.db.iter().map(|x| x.1.clone()).collect() + }) +} + +#[expect(clippy::as_conversions)] +fn encode(data: &[u8]) -> String { + let mut res = String::with_capacity(data.len().expected_mul(4)); + + for byte in data { + if *byte < 0x20 || *byte > 0x7E { + let _ = write!(res, "\\x{byte:02x}"); + } else { + res.push(*byte as char); + } + } + + res }