Refactor admin query raw

This commit is contained in:
dasha_uwu
2026-01-22 01:58:48 +05:00
committed by Jason Volk
parent 3caab50e0d
commit 0dbe79df8e

View File

@@ -1,17 +1,20 @@
use std::{borrow::Cow, collections::BTreeMap, ops::Deref, sync::Arc}; use std::{collections::BTreeMap, fmt::Write, sync::Arc};
use base64::prelude::*; use base64::prelude::*;
use clap::Subcommand; use clap::Subcommand;
use futures::{FutureExt, Stream, StreamExt, TryStreamExt}; use futures::{FutureExt, StreamExt, TryStreamExt};
use tokio::time::Instant; use tokio::time::Instant;
use tuwunel_core::{ use tuwunel_core::{
Err, Result, apply, at, is_zero, Err, Result, apply, at, err, is_zero,
itertools::Itertools,
utils::{ utils::{
TryReadyExt,
math::Expected,
stream::{IterStream, ReadyExt, TryIgnore, TryParallelExt}, stream::{IterStream, ReadyExt, TryIgnore, TryParallelExt},
string::EMPTY, string::EMPTY,
}, },
}; };
use tuwunel_database::Map; use tuwunel_database::{KeyVal, Map};
use tuwunel_service::Services; use tuwunel_service::Services;
use crate::{admin_command, admin_command_dispatch}; use crate::{admin_command, admin_command_dispatch};
@@ -43,6 +46,39 @@ pub(crate) enum RawCommand {
/// Key prefix /// Key prefix
prefix: Option<String>, prefix: Option<String>,
/// Limit
#[arg(short, long)]
limit: Option<usize>,
/// Lower bound
#[arg(short, long)]
from: Option<String>,
/// Reverse iteration order
#[arg(short, long, default_value("false"))]
backwards: bool,
},
/// - Raw database items iteration
Iter {
/// Map name
map: String,
/// Key prefix
prefix: Option<String>,
/// Limit
#[arg(short, long)]
limit: Option<usize>,
/// Lower bound
#[arg(short, long)]
from: Option<String>,
/// Reverse iteration order
#[arg(short, long, default_value("false"))]
backwards: bool,
}, },
/// - Raw database key size breakdown /// - Raw database key size breakdown
@@ -81,41 +117,6 @@ pub(crate) enum RawCommand {
prefix: Option<String>, prefix: Option<String>,
}, },
/// - Raw database items iteration
Iter {
/// Map name
map: String,
/// Key prefix
prefix: Option<String>,
},
/// - Raw database keys iteration
KeysFrom {
/// Map name
map: String,
/// Lower-bound
start: String,
/// Limit
#[arg(short, long)]
limit: Option<usize>,
},
/// - Raw database items iteration
IterFrom {
/// Map name
map: String,
/// Lower-bound
start: String,
/// Limit
#[arg(short, long)]
limit: Option<usize>,
},
/// - Raw database record count /// - Raw database record count
Count { Count {
/// Map name /// Map name
@@ -147,7 +148,7 @@ pub(crate) enum RawCommand {
/// - Compact database DANGER!!! /// - Compact database DANGER!!!
Compact { Compact {
#[arg(short, long, alias("column"))] #[arg(short, long, alias("column"))]
map: Option<Vec<String>>, maps: Option<Vec<String>>,
#[arg(long)] #[arg(long)]
start: Option<String>, start: Option<String>,
@@ -176,7 +177,7 @@ pub(crate) enum RawCommand {
#[admin_command] #[admin_command]
pub(super) async fn raw_compact( pub(super) async fn raw_compact(
&self, &self,
map: Option<Vec<String>>, maps: Option<Vec<String>>,
start: Option<String>, start: Option<String>,
stop: Option<String>, stop: Option<String>,
from: Option<usize>, from: Option<usize>,
@@ -186,26 +187,7 @@ pub(super) async fn raw_compact(
) -> Result { ) -> Result {
use tuwunel_database::compact::Options; use tuwunel_database::compact::Options;
let default_all_maps: Option<_> = map.is_none().then(|| { let maps = with_maps_or(maps.as_deref(), self.services)?;
self.services
.db
.keys()
.map(Deref::deref)
.map(ToOwned::to_owned)
});
let maps: Vec<_> = map
.unwrap_or_default()
.into_iter()
.chain(default_all_maps.into_iter().flatten())
.map(|map| self.services.db.get(&map))
.filter_map(Result::ok)
.cloned()
.collect();
if maps.is_empty() {
return Err!("--map argument invalid. not found in database");
}
let range = ( let range = (
start start
@@ -247,7 +229,9 @@ pub(super) async fn raw_count(&self, map: Option<String>, prefix: Option<String>
let prefix = prefix.as_deref().unwrap_or(EMPTY); let prefix = prefix.as_deref().unwrap_or(EMPTY);
let timer = Instant::now(); let timer = Instant::now();
let count = with_maps_or(map.as_deref(), self.services) let count = with_map_or(map.as_deref(), self.services)?
.iter()
.stream()
.then(|map| map.raw_count_prefix(&prefix)) .then(|map| map.raw_count_prefix(&prefix))
.ready_fold(0_usize, usize::saturating_add) .ready_fold(0_usize, usize::saturating_add)
.await; .await;
@@ -258,16 +242,45 @@ pub(super) async fn raw_count(&self, map: Option<String>, prefix: Option<String>
} }
#[admin_command] #[admin_command]
pub(super) async fn raw_keys(&self, map: String, prefix: Option<String>) -> Result { pub(super) async fn raw_keys(
&self,
map: String,
prefix: Option<String>,
limit: Option<usize>,
from: Option<String>,
backwards: bool,
) -> Result {
writeln!(self, "```").boxed().await?; writeln!(self, "```").boxed().await?;
let map = self.services.db.get(map.as_str())?; let map = self.services.db.get(map.as_str())?;
let timer = Instant::now(); let timer = Instant::now();
prefix
.as_deref() let stream = match from.as_ref().or(prefix.as_ref()) {
.map_or_else(|| map.raw_keys().boxed(), |prefix| map.raw_keys_prefix(prefix).boxed()) | Some(from) =>
.map_ok(String::from_utf8_lossy) if !backwards {
.try_for_each(|str| writeln!(self, "{str:?}")) map.raw_keys_from(from).boxed()
} else {
map.rev_raw_keys_from(from).boxed()
},
| None =>
if !backwards {
map.raw_keys().boxed()
} else {
map.rev_raw_keys().boxed()
},
};
let prefix = prefix.as_ref().map(String::as_bytes);
stream
.ready_try_take_while(|k| {
Ok(prefix
.map(|prefix| k.starts_with(prefix))
.unwrap_or(true))
})
.take(limit.unwrap_or(usize::MAX))
.map_ok(encode)
.try_for_each(|str| writeln!(self, "{str}"))
.boxed() .boxed()
.await?; .await?;
@@ -281,7 +294,9 @@ pub(super) async fn raw_keys_sizes(&self, map: Option<String>, prefix: Option<St
let prefix = prefix.as_deref().unwrap_or(EMPTY); let prefix = prefix.as_deref().unwrap_or(EMPTY);
let timer = Instant::now(); let timer = Instant::now();
let result = with_maps_or(map.as_deref(), self.services) let result = with_map_or(map.as_deref(), self.services)?
.iter()
.stream()
.map(|map| map.raw_keys_prefix(&prefix)) .map(|map| map.raw_keys_prefix(&prefix))
.flatten() .flatten()
.ignore_err() .ignore_err()
@@ -303,7 +318,9 @@ pub(super) async fn raw_keys_total(&self, map: Option<String>, prefix: Option<St
let prefix = prefix.as_deref().unwrap_or(EMPTY); let prefix = prefix.as_deref().unwrap_or(EMPTY);
let timer = Instant::now(); let timer = Instant::now();
let result = with_maps_or(map.as_deref(), self.services) let result = with_map_or(map.as_deref(), self.services)?
.iter()
.stream()
.map(|map| map.raw_keys_prefix(&prefix)) .map(|map| map.raw_keys_prefix(&prefix))
.flatten() .flatten()
.ignore_err() .ignore_err()
@@ -321,7 +338,9 @@ pub(super) async fn raw_vals_sizes(&self, map: Option<String>, prefix: Option<St
let prefix = prefix.as_deref().unwrap_or(EMPTY); let prefix = prefix.as_deref().unwrap_or(EMPTY);
let timer = Instant::now(); let timer = Instant::now();
let result = with_maps_or(map.as_deref(), self.services) let result = with_map_or(map.as_deref(), self.services)?
.iter()
.stream()
.map(|map| map.raw_stream_prefix(&prefix)) .map(|map| map.raw_stream_prefix(&prefix))
.flatten() .flatten()
.ignore_err() .ignore_err()
@@ -344,7 +363,9 @@ pub(super) async fn raw_vals_total(&self, map: Option<String>, prefix: Option<St
let prefix = prefix.as_deref().unwrap_or(EMPTY); let prefix = prefix.as_deref().unwrap_or(EMPTY);
let timer = Instant::now(); let timer = Instant::now();
let result = with_maps_or(map.as_deref(), self.services) let result = with_map_or(map.as_deref(), self.services)?
.iter()
.stream()
.map(|map| map.raw_stream_prefix(&prefix)) .map(|map| map.raw_stream_prefix(&prefix))
.flatten() .flatten()
.ignore_err() .ignore_err()
@@ -359,40 +380,44 @@ pub(super) async fn raw_vals_total(&self, map: Option<String>, prefix: Option<St
} }
#[admin_command] #[admin_command]
pub(super) async fn raw_iter(&self, map: String, prefix: Option<String>) -> Result { pub(super) async fn raw_iter(
writeln!(self, "```").await?;
let map = self.services.db.get(&map)?;
let timer = Instant::now();
prefix
.as_deref()
.map_or_else(|| map.raw_stream().boxed(), |prefix| map.raw_stream_prefix(prefix).boxed())
.map_ok(apply!(2, String::from_utf8_lossy))
.map_ok(apply!(2, Cow::into_owned))
.try_for_each(|keyval| writeln!(self, "{keyval:?}"))
.boxed()
.await?;
let query_time = timer.elapsed();
self.write_str(&format!("\n```\n\nQuery completed in {query_time:?}"))
.await
}
#[admin_command]
pub(super) async fn raw_keys_from(
&self, &self,
map: String, map: String,
start: String, prefix: Option<String>,
limit: Option<usize>, limit: Option<usize>,
from: Option<String>,
backwards: bool,
) -> Result { ) -> Result {
writeln!(self, "```").await?; writeln!(self, "```").await?;
let map = self.services.db.get(&map)?; let map = self.services.db.get(&map)?;
let timer = Instant::now(); let timer = Instant::now();
map.raw_keys_from(&start) let stream = match from.as_ref().or(prefix.as_ref()) {
.map_ok(String::from_utf8_lossy) | Some(from) =>
if !backwards {
map.raw_stream_from(from).boxed()
} else {
map.rev_raw_stream_from(from).boxed()
},
| None =>
if !backwards {
map.raw_stream().boxed()
} else {
map.rev_raw_stream().boxed()
},
};
let prefix = prefix.as_ref().map(String::as_bytes);
stream
.ready_try_take_while(|(k, _): &KeyVal<'_>| {
Ok(prefix
.map(|prefix| k.starts_with(prefix))
.unwrap_or(true))
})
.take(limit.unwrap_or(usize::MAX)) .take(limit.unwrap_or(usize::MAX))
.try_for_each(|str| writeln!(self, "{str:?}")) .map_ok(apply!(2, encode))
.try_for_each(|(key, val)| writeln!(self, "{{{key} => {val}}}"))
.boxed() .boxed()
.await?; .await?;
@@ -401,28 +426,6 @@ pub(super) async fn raw_keys_from(
.await .await
} }
#[admin_command]
pub(super) async fn raw_iter_from(
&self,
map: String,
start: String,
limit: Option<usize>,
) -> Result {
let map = self.services.db.get(&map)?;
let timer = Instant::now();
let result = map
.raw_stream_from(&start)
.map_ok(apply!(2, String::from_utf8_lossy))
.map_ok(apply!(2, Cow::into_owned))
.take(limit.unwrap_or(usize::MAX))
.try_collect::<Vec<(String, String)>>()
.await?;
let query_time = timer.elapsed();
self.write_str(&format!("Query completed in {query_time:?}:\n\n```rs\n{result:#?}\n```"))
.await
}
#[admin_command] #[admin_command]
pub(super) async fn raw_del(&self, map: String, key: String) -> Result { pub(super) async fn raw_del(&self, map: String, key: String) -> Result {
let map = self.services.db.get(&map)?; let map = self.services.db.get(&map)?;
@@ -467,7 +470,7 @@ pub(super) async fn raw_get(&self, map: String, key: String, base64: bool) -> Re
let result = if base64 { let result = if base64 {
BASE64_STANDARD.encode(&handle) BASE64_STANDARD.encode(&handle)
} else { } else {
String::from_utf8_lossy(&handle).to_string() encode(&handle)
}; };
self.write_str(&format!("Query completed in {query_time:?}:\n\n```rs\n{result:?}\n```")) self.write_str(&format!("Query completed in {query_time:?}:\n\n```rs\n{result:?}\n```"))
@@ -487,19 +490,43 @@ pub(super) async fn raw_maps(&self) -> Result {
self.write_str(&format!("{list:#?}")).await self.write_str(&format!("{list:#?}")).await
} }
fn with_maps_or<'a>( fn with_map_or(map: Option<&str>, services: &Services) -> Result<Vec<Arc<Map>>> {
map: Option<&'a str>, with_maps_or(
services: &'a Services, map.map(|map| [map])
) -> impl Stream<Item = &'a Arc<Map>> + Send + 'a { .as_ref()
let default_all_maps = map .map(<[&str; 1]>::as_slice),
.is_none() services,
.then(|| services.db.keys().map(Deref::deref)) )
.into_iter() }
.flatten();
fn with_maps_or<S: AsRef<str>>(maps: Option<&[S]>, services: &Services) -> Result<Vec<Arc<Map>>> {
map.into_iter() Ok(if let Some(maps) = maps {
.chain(default_all_maps) maps.iter()
.map(|map| services.db.get(map)) .map(|map| {
.filter_map(Result::ok) let map = map.as_ref();
.stream() services
.db
.get(map)
.cloned()
.map_err(|_| err!("map {map} not found"))
})
.try_collect()?
} else {
services.db.iter().map(|x| x.1.clone()).collect()
})
}
#[expect(clippy::as_conversions)]
fn encode(data: &[u8]) -> String {
let mut res = String::with_capacity(data.len().expected_mul(4));
for byte in data {
if *byte < 0x20 || *byte > 0x7E {
let _ = write!(res, "\\x{byte:02x}");
} else {
res.push(*byte as char);
}
}
res
} }