rename db to engine

This commit is contained in:
dasha_uwu
2025-09-09 18:12:38 +05:00
committed by Jason Volk
parent e0169e3dca
commit b882e7efdb
25 changed files with 98 additions and 87 deletions

View File

@@ -918,7 +918,7 @@ pub(super) async fn database_files(&self, map: Option<String>, level: Option<i32
let mut files: Vec<_> = self
.services
.db
.db
.engine
.file_list()
.collect::<Result<_>>()?;
@@ -1023,7 +1023,7 @@ pub(super) async fn resync_database(&self) -> Result {
self.services
.db
.db
.engine
.update()
.map_err(|e| err!("Failed to update from primary: {e:?}"))
}

View File

@@ -67,7 +67,7 @@ pub(super) async fn list_features(&self, available: bool, enabled: bool, comma:
#[admin_command]
pub(super) async fn memory_usage(&self) -> Result {
let services_usage = self.services.memory_usage().await?;
let database_usage = self.services.db.db.memory_usage()?;
let database_usage = self.services.db.engine.memory_usage()?;
let allocator_usage = tuwunel_core::alloc::memory_usage()
.map_or(String::new(), |s| format!("\nAllocator:\n{s}"));
@@ -88,7 +88,7 @@ pub(super) async fn clear_caches(&self) -> Result {
pub(super) async fn list_backups(&self) -> Result {
self.services
.db
.db
.engine
.backup_list()?
.try_stream()
.try_for_each(|result| write!(self, "{result}"))
@@ -102,13 +102,13 @@ pub(super) async fn backup_database(&self) -> Result {
.services
.server
.runtime()
.spawn_blocking(move || match db.db.backup() {
.spawn_blocking(move || match db.engine.backup() {
| Ok(()) => "Done".to_owned(),
| Err(e) => format!("Failed: {e}"),
})
.await?;
let count = self.services.db.db.backup_count()?;
let count = self.services.db.engine.backup_count()?;
self.write_str(&format!("{result}. Currently have {count} backups."))
.await
}

View File

@@ -3,7 +3,7 @@ use std::sync::Arc;
use crate::{Database, Engine};
pub struct Cork {
db: Arc<Engine>,
engine: Arc<Engine>,
flush: bool,
sync: bool,
}
@@ -11,33 +11,33 @@ pub struct Cork {
impl Database {
#[inline]
#[must_use]
pub fn cork(&self) -> Cork { Cork::new(&self.db, false, false) }
pub fn cork(&self) -> Cork { Cork::new(&self.engine, false, false) }
#[inline]
#[must_use]
pub fn cork_and_flush(&self) -> Cork { Cork::new(&self.db, true, false) }
pub fn cork_and_flush(&self) -> Cork { Cork::new(&self.engine, true, false) }
#[inline]
#[must_use]
pub fn cork_and_sync(&self) -> Cork { Cork::new(&self.db, true, true) }
pub fn cork_and_sync(&self) -> Cork { Cork::new(&self.engine, true, true) }
}
impl Cork {
#[inline]
pub(super) fn new(db: &Arc<Engine>, flush: bool, sync: bool) -> Self {
db.cork();
Self { db: db.clone(), flush, sync }
pub(super) fn new(engine: &Arc<Engine>, flush: bool, sync: bool) -> Self {
engine.cork();
Self { engine: engine.clone(), flush, sync }
}
}
impl Drop for Cork {
fn drop(&mut self) {
self.db.uncork();
self.engine.uncork();
if self.flush {
self.db.flush().ok();
self.engine.flush().ok();
}
if self.sync {
self.db.sync().ok();
self.engine.sync().ok();
}
}
}

View File

@@ -48,38 +48,40 @@ pub struct Map {
name: &'static str,
watch: Watch,
cf: Arc<ColumnFamily>,
db: Arc<Engine>,
engine: Arc<Engine>,
read_options: ReadOptions,
cache_read_options: ReadOptions,
write_options: WriteOptions,
}
impl Map {
pub(crate) fn open(db: &Arc<Engine>, name: &'static str) -> Result<Arc<Self>> {
pub(crate) fn open(engine: &Arc<Engine>, name: &'static str) -> Result<Arc<Self>> {
Ok(Arc::new(Self {
name,
watch: Watch::default(),
cf: open::open(db, name),
db: db.clone(),
read_options: read_options_default(db),
cache_read_options: cache_read_options_default(db),
write_options: write_options_default(db),
cf: open::open(engine, name),
engine: engine.clone(),
read_options: read_options_default(engine),
cache_read_options: cache_read_options_default(engine),
write_options: write_options_default(engine),
}))
}
#[inline]
pub fn property_integer(&self, name: &CStr) -> Result<u64> {
self.db.property_integer(&self.cf(), name)
self.engine.property_integer(&self.cf(), name)
}
#[inline]
pub fn property(&self, name: &str) -> Result<String> { self.db.property(&self.cf(), name) }
pub fn property(&self, name: &str) -> Result<String> {
self.engine.property(&self.cf(), name)
}
#[inline]
pub fn name(&self) -> &str { self.name }
#[inline]
pub(crate) fn db(&self) -> &Arc<Engine> { &self.db }
pub(crate) fn engine(&self) -> &Arc<Engine> { &self.engine }
#[inline]
pub(crate) fn cf(&self) -> impl AsColumnFamilyRef + '_ { &*self.cf }

View File

@@ -54,7 +54,7 @@ pub fn compact_blocking(&self, opts: Options) -> Result {
| (Some(_), Some(_)) => return Err!("compacting between specific levels not supported"),
}
self.db
self.engine
.db
.compact_range_cf_opt(&self.cf(), opts.range.0, opts.range.1, &co);

View File

@@ -97,7 +97,7 @@ pub(crate) fn maybe_exists<K>(&self, key: &K) -> bool
where
K: AsRef<[u8]> + ?Sized,
{
self.db
self.engine
.db
.key_may_exist_cf_opt(&self.cf(), key, &self.cache_read_options)
}

View File

@@ -37,7 +37,7 @@ where
res: None,
};
self.db
self.engine
.pool
.execute_get(cmd)
.and_then(|mut res| ready(res.remove(0)))
@@ -77,7 +77,7 @@ fn get_blocking_opts<K>(
where
K: AsRef<[u8]> + ?Sized,
{
self.db
self.engine
.db
.get_pinned_cf_opt(&self.cf(), key, read_options)
}

View File

@@ -48,7 +48,7 @@ where
keys.ready_chunks(automatic_amplification())
.widen_then(automatic_width(), |chunk| {
self.db.pool.execute_get(Get {
self.engine.pool.execute_get(Get {
map: self.clone(),
key: chunk
.iter()
@@ -104,7 +104,7 @@ where
// comparator**.
const SORTED: bool = false;
self.db
self.engine
.db
.batched_multi_get_cf_opt(&self.cf(), keys, SORTED, read_options)
.into_iter()

View File

@@ -22,14 +22,14 @@ where
V: AsRef<[u8]>,
{
let write_options = &self.write_options;
self.db
self.engine
.db
.put_cf_opt(&self.cf(), key, val, write_options)
.or_else(or_else)
.expect("database insert error");
if !self.db.corked() {
self.db.flush().expect("database flush error");
if !self.engine.corked() {
self.engine.flush().expect("database flush error");
}
self.notify(key.as_ref());
@@ -49,13 +49,13 @@ where
}
let write_options = &self.write_options;
self.db
self.engine
.db
.write_opt(batch, write_options)
.or_else(or_else)
.expect("database insert batch error");
if !self.db.corked() {
self.db.flush().expect("database flush error");
if !self.engine.corked() {
self.engine.flush().expect("database flush error");
}
}

View File

@@ -23,7 +23,7 @@ where
pub fn raw_keys(self: &Arc<Self>) -> impl Stream<Item = Result<Key<'_>>> + Send {
use crate::pool::Seek;
let opts = super::iter_options_default(&self.db);
let opts = super::iter_options_default(&self.engine);
let state = stream::State::new(self, opts);
if is_cached(self) {
let state = state.init_fwd(None);
@@ -42,7 +42,7 @@ pub fn raw_keys(self: &Arc<Self>) -> impl Stream<Item = Result<Key<'_>>> + Send
res: None,
};
self.db
self.engine
.pool
.execute_iter(seek)
.ok_into::<stream::Keys<'_>>()

View File

@@ -61,7 +61,7 @@ where
{
use crate::pool::Seek;
let opts = super::iter_options_default(&self.db);
let opts = super::iter_options_default(&self.engine);
let state = stream::State::new(self, opts);
if is_cached(self, from) {
return stream::Keys::<'_>::from(state.init_fwd(from.as_ref().into())).boxed();
@@ -75,7 +75,7 @@ where
res: None,
};
self.db
self.engine
.pool
.execute_iter(seek)
.ok_into::<stream::Keys<'_>>()

View File

@@ -4,8 +4,8 @@ use rocksdb::ColumnFamily;
use crate::Engine;
pub(super) fn open(db: &Arc<Engine>, name: &str) -> Arc<ColumnFamily> {
let bounded_arc = db.cf(name);
pub(super) fn open(engine: &Arc<Engine>, name: &str) -> Arc<ColumnFamily> {
let bounded_arc = engine.cf(name);
let bounded_ptr = Arc::into_raw(bounded_arc);
let cf_ptr = bounded_ptr.cast::<ColumnFamily>();

View File

@@ -5,34 +5,34 @@ use rocksdb::{ReadOptions, ReadTier, WriteOptions};
use crate::Engine;
#[inline]
pub(crate) fn cache_iter_options_default(db: &Arc<Engine>) -> ReadOptions {
let mut options = iter_options_default(db);
pub(crate) fn cache_iter_options_default(engine: &Arc<Engine>) -> ReadOptions {
let mut options = iter_options_default(engine);
options.set_read_tier(ReadTier::BlockCache);
options.fill_cache(false);
options
}
#[inline]
pub(crate) fn iter_options_default(db: &Arc<Engine>) -> ReadOptions {
let mut options = read_options_default(db);
pub(crate) fn iter_options_default(engine: &Arc<Engine>) -> ReadOptions {
let mut options = read_options_default(engine);
options.set_background_purge_on_iterator_cleanup(true);
options
}
#[inline]
pub(crate) fn cache_read_options_default(db: &Arc<Engine>) -> ReadOptions {
let mut options = read_options_default(db);
pub(crate) fn cache_read_options_default(engine: &Arc<Engine>) -> ReadOptions {
let mut options = read_options_default(engine);
options.set_read_tier(ReadTier::BlockCache);
options.fill_cache(false);
options
}
#[inline]
pub(crate) fn read_options_default(db: &Arc<Engine>) -> ReadOptions {
pub(crate) fn read_options_default(engine: &Arc<Engine>) -> ReadOptions {
let mut options = ReadOptions::default();
options.set_total_order_seek(true);
if !db.checksums {
if !engine.checksums {
options.set_verify_checksums(false);
}
@@ -40,4 +40,6 @@ pub(crate) fn read_options_default(db: &Arc<Engine>) -> ReadOptions {
}
#[inline]
pub(crate) fn write_options_default(_db: &Arc<Engine>) -> WriteOptions { WriteOptions::default() }
pub(crate) fn write_options_default(_engine: &Arc<Engine>) -> WriteOptions {
WriteOptions::default()
}

View File

@@ -52,7 +52,7 @@ where
.map(|result| result.expect("failed to serialize query key"))
.collect();
self.db
self.engine
.pool
.execute_get(Get { map: self.clone(), key: keys, res: None })
})

View File

@@ -11,13 +11,13 @@ where
K: AsRef<[u8]> + ?Sized + Debug,
{
let write_options = &self.write_options;
self.db
self.engine
.db
.delete_cf_opt(&self.cf(), key, write_options)
.or_else(or_else)
.expect("database remove error");
if !self.db.corked() {
self.db.flush().expect("database flush error");
if !self.engine.corked() {
self.engine.flush().expect("database flush error");
}
}

View File

@@ -23,7 +23,7 @@ where
pub fn rev_raw_keys(self: &Arc<Self>) -> impl Stream<Item = Result<Key<'_>>> + Send {
use crate::pool::Seek;
let opts = super::iter_options_default(&self.db);
let opts = super::iter_options_default(&self.engine);
let state = stream::State::new(self, opts);
if is_cached(self) {
let state = state.init_rev(None);
@@ -42,7 +42,7 @@ pub fn rev_raw_keys(self: &Arc<Self>) -> impl Stream<Item = Result<Key<'_>>> + S
res: None,
};
self.db
self.engine
.pool
.execute_iter(seek)
.ok_into::<stream::KeysRev<'_>>()

View File

@@ -61,7 +61,7 @@ where
{
use crate::pool::Seek;
let opts = super::iter_options_default(&self.db);
let opts = super::iter_options_default(&self.engine);
let state = stream::State::new(self, opts);
if is_cached(self, from) {
return stream::KeysRev::<'_>::from(state.init_rev(from.as_ref().into())).boxed();
@@ -75,7 +75,7 @@ where
res: None,
};
self.db
self.engine
.pool
.execute_iter(seek)
.ok_into::<stream::KeysRev<'_>>()

View File

@@ -31,7 +31,7 @@ where
pub fn rev_raw_stream(self: &Arc<Self>) -> impl Stream<Item = Result<KeyVal<'_>>> + Send {
use crate::pool::Seek;
let opts = super::iter_options_default(&self.db);
let opts = super::iter_options_default(&self.engine);
let state = stream::State::new(self, opts);
if is_cached(self) {
let state = state.init_rev(None);
@@ -50,7 +50,7 @@ pub fn rev_raw_stream(self: &Arc<Self>) -> impl Stream<Item = Result<KeyVal<'_>>
res: None,
};
self.db
self.engine
.pool
.execute_iter(seek)
.ok_into::<stream::ItemsRev<'_>>()
@@ -66,7 +66,7 @@ pub fn rev_raw_stream(self: &Arc<Self>) -> impl Stream<Item = Result<KeyVal<'_>>
fields(%map),
)]
pub(super) fn is_cached(map: &Arc<super::Map>) -> bool {
let opts = super::cache_iter_options_default(&map.db);
let opts = super::cache_iter_options_default(&map.engine);
let state = stream::State::new(map, opts).init_rev(None);
!state.is_incomplete()

View File

@@ -80,7 +80,7 @@ where
{
use crate::pool::Seek;
let opts = super::iter_options_default(&self.db);
let opts = super::iter_options_default(&self.engine);
let state = stream::State::new(self, opts);
if is_cached(self, from) {
let state = state.init_rev(from.as_ref().into());
@@ -99,7 +99,7 @@ where
res: None,
};
self.db
self.engine
.pool
.execute_iter(seek)
.ok_into::<stream::ItemsRev<'_>>()
@@ -118,7 +118,7 @@ pub(super) fn is_cached<P>(map: &Arc<super::Map>, from: &P) -> bool
where
P: AsRef<[u8]> + ?Sized,
{
let cache_opts = super::cache_iter_options_default(&map.db);
let cache_opts = super::cache_iter_options_default(&map.engine);
let cache_status = stream::State::new(map, cache_opts)
.init_rev(from.as_ref().into())
.status();

View File

@@ -31,7 +31,7 @@ where
pub fn raw_stream(self: &Arc<Self>) -> impl Stream<Item = Result<KeyVal<'_>>> + Send {
use crate::pool::Seek;
let opts = super::iter_options_default(&self.db);
let opts = super::iter_options_default(&self.engine);
let state = stream::State::new(self, opts);
if is_cached(self) {
let state = state.init_fwd(None);
@@ -50,7 +50,7 @@ pub fn raw_stream(self: &Arc<Self>) -> impl Stream<Item = Result<KeyVal<'_>>> +
res: None,
};
self.db
self.engine
.pool
.execute_iter(seek)
.ok_into::<stream::Items<'_>>()
@@ -66,7 +66,7 @@ pub fn raw_stream(self: &Arc<Self>) -> impl Stream<Item = Result<KeyVal<'_>>> +
fields(%map),
)]
pub(super) fn is_cached(map: &Arc<super::Map>) -> bool {
let opts = super::cache_iter_options_default(&map.db);
let opts = super::cache_iter_options_default(&map.engine);
let state = stream::State::new(map, opts).init_fwd(None);
!state.is_incomplete()

View File

@@ -79,7 +79,7 @@ where
{
use crate::pool::Seek;
let opts = super::iter_options_default(&self.db);
let opts = super::iter_options_default(&self.engine);
let state = stream::State::new(self, opts);
if is_cached(self, from) {
let state = state.init_fwd(from.as_ref().into());
@@ -98,7 +98,7 @@ where
res: None,
};
self.db
self.engine
.pool
.execute_iter(seek)
.ok_into::<stream::Items<'_>>()
@@ -117,7 +117,7 @@ pub(super) fn is_cached<P>(map: &Arc<super::Map>, from: &P) -> bool
where
P: AsRef<[u8]> + ?Sized,
{
let opts = super::cache_iter_options_default(&map.db);
let opts = super::cache_iter_options_default(&map.engine);
let state = stream::State::new(map, opts).init_fwd(from.as_ref().into());
!state.is_incomplete()

View File

@@ -11,12 +11,12 @@ pub(super) type Maps = BTreeMap<MapsKey, MapsVal>;
pub(super) type MapsKey = &'static str;
pub(super) type MapsVal = Arc<Map>;
pub(super) fn open(db: &Arc<Engine>) -> Result<Maps> { open_list(db, MAPS) }
pub(super) fn open(engine: &Arc<Engine>) -> Result<Maps> { open_list(engine, MAPS) }
#[tracing::instrument(name = "maps", level = "debug", skip_all)]
pub(super) fn open_list(db: &Arc<Engine>, maps: &[Descriptor]) -> Result<Maps> {
pub(super) fn open_list(engine: &Arc<Engine>, maps: &[Descriptor]) -> Result<Maps> {
maps.iter()
.map(|desc| Ok((desc.name, Map::open(db, desc.name)?)))
.map(|desc| Ok((desc.name, Map::open(engine, desc.name)?)))
.collect()
}
@@ -438,4 +438,8 @@ pub(super) static MAPS: &[Descriptor] = &[
name: "userroomid_notificationcount",
..descriptor::RANDOM
},
Descriptor {
name: "roomid_maxremotepowerlevel",
..descriptor::RANDOM_SMALL
},
];

View File

@@ -43,7 +43,7 @@ use crate::maps::{Maps, MapsKey, MapsVal};
pub struct Database {
maps: Maps,
pub db: Arc<Engine>,
pub engine: Arc<Engine>,
pub(crate) _ctx: Arc<Context>,
}
@@ -51,10 +51,10 @@ impl Database {
/// Load an existing database or create a new one.
pub async fn open(server: &Arc<Server>) -> Result<Arc<Self>> {
let ctx = Context::new(server)?;
let db = Engine::open(ctx.clone(), maps::MAPS).await?;
let engine = Engine::open(ctx.clone(), maps::MAPS).await?;
Ok(Arc::new(Self {
maps: maps::open(&db)?,
db: db.clone(),
maps: maps::open(&engine)?,
engine: engine.clone(),
_ctx: ctx,
}))
}
@@ -76,11 +76,11 @@ impl Database {
#[inline]
#[must_use]
pub fn is_read_only(&self) -> bool { self.db.is_read_only() }
pub fn is_read_only(&self) -> bool { self.engine.is_read_only() }
#[inline]
#[must_use]
pub fn is_secondary(&self) -> bool { self.db.is_secondary() }
pub fn is_secondary(&self) -> bool { self.engine.is_secondary() }
}
impl Index<&str> for Database {

View File

@@ -50,7 +50,10 @@ impl<'a> State<'a> {
#[inline]
pub(super) fn new(map: &'a Arc<Map>, opts: ReadOptions) -> Self {
Self {
inner: map.db().db.raw_iterator_cf_opt(&map.cf(), opts),
inner: map
.engine()
.db
.raw_iterator_cf_opt(&map.cf(), opts),
init: true,
seek: false,
}

View File

@@ -390,7 +390,7 @@ async fn fix_bad_double_separator_in_state_cache(services: &Services) -> Result
})
.await;
db.db.sort()?;
db.engine.sort()?;
db["global"].insert(b"fix_bad_double_separator_in_state_cache", []);
info!("Finished fixing");
@@ -475,7 +475,7 @@ async fn retroactively_fix_bad_data_from_roomuserid_joined(services: &Services)
.await;
}
db.db.sort()?;
db.engine.sort()?;
db["global"].insert(b"retroactively_fix_bad_data_from_roomuserid_joined", []);
info!("Finished fixing");
@@ -521,7 +521,7 @@ async fn fix_referencedevents_missing_sep(services: &Services) -> Result {
info!(?total, ?fixed, "Fixed missing record separators in 'referencedevents'.");
db["global"].insert(b"fix_referencedevents_missing_sep", []);
db.db.sort()
db.engine.sort()
}
async fn fix_readreceiptid_readreceipt_duplicates(services: &Services) -> Result {
@@ -573,5 +573,5 @@ async fn fix_readreceiptid_readreceipt_duplicates(services: &Services) -> Result
info!(?total, ?fixed, "Fixed undeleted entries in readreceiptid_readreceipt.");
db["global"].insert(b"fix_readreceiptid_readreceipt_duplicates", []);
db.db.sort()
db.engine.sort()
}