diff --git a/src/admin/debug/commands.rs b/src/admin/debug/commands.rs index bb6108b7..eaa72352 100644 --- a/src/admin/debug/commands.rs +++ b/src/admin/debug/commands.rs @@ -918,7 +918,7 @@ pub(super) async fn database_files(&self, map: Option, level: Option = self .services .db - .db + .engine .file_list() .collect::>()?; @@ -1023,7 +1023,7 @@ pub(super) async fn resync_database(&self) -> Result { self.services .db - .db + .engine .update() .map_err(|e| err!("Failed to update from primary: {e:?}")) } diff --git a/src/admin/server/commands.rs b/src/admin/server/commands.rs index f07731d7..b8e7b280 100644 --- a/src/admin/server/commands.rs +++ b/src/admin/server/commands.rs @@ -67,7 +67,7 @@ pub(super) async fn list_features(&self, available: bool, enabled: bool, comma: #[admin_command] pub(super) async fn memory_usage(&self) -> Result { let services_usage = self.services.memory_usage().await?; - let database_usage = self.services.db.db.memory_usage()?; + let database_usage = self.services.db.engine.memory_usage()?; let allocator_usage = tuwunel_core::alloc::memory_usage() .map_or(String::new(), |s| format!("\nAllocator:\n{s}")); @@ -88,7 +88,7 @@ pub(super) async fn clear_caches(&self) -> Result { pub(super) async fn list_backups(&self) -> Result { self.services .db - .db + .engine .backup_list()? .try_stream() .try_for_each(|result| write!(self, "{result}")) @@ -102,13 +102,13 @@ pub(super) async fn backup_database(&self) -> Result { .services .server .runtime() - .spawn_blocking(move || match db.db.backup() { + .spawn_blocking(move || match db.engine.backup() { | Ok(()) => "Done".to_owned(), | Err(e) => format!("Failed: {e}"), }) .await?; - let count = self.services.db.db.backup_count()?; + let count = self.services.db.engine.backup_count()?; self.write_str(&format!("{result}. Currently have {count} backups.")) .await } diff --git a/src/database/cork.rs b/src/database/cork.rs index 11b6efd7..5274547a 100644 --- a/src/database/cork.rs +++ b/src/database/cork.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use crate::{Database, Engine}; pub struct Cork { - db: Arc, + engine: Arc, flush: bool, sync: bool, } @@ -11,33 +11,33 @@ pub struct Cork { impl Database { #[inline] #[must_use] - pub fn cork(&self) -> Cork { Cork::new(&self.db, false, false) } + pub fn cork(&self) -> Cork { Cork::new(&self.engine, false, false) } #[inline] #[must_use] - pub fn cork_and_flush(&self) -> Cork { Cork::new(&self.db, true, false) } + pub fn cork_and_flush(&self) -> Cork { Cork::new(&self.engine, true, false) } #[inline] #[must_use] - pub fn cork_and_sync(&self) -> Cork { Cork::new(&self.db, true, true) } + pub fn cork_and_sync(&self) -> Cork { Cork::new(&self.engine, true, true) } } impl Cork { #[inline] - pub(super) fn new(db: &Arc, flush: bool, sync: bool) -> Self { - db.cork(); - Self { db: db.clone(), flush, sync } + pub(super) fn new(engine: &Arc, flush: bool, sync: bool) -> Self { + engine.cork(); + Self { engine: engine.clone(), flush, sync } } } impl Drop for Cork { fn drop(&mut self) { - self.db.uncork(); + self.engine.uncork(); if self.flush { - self.db.flush().ok(); + self.engine.flush().ok(); } if self.sync { - self.db.sync().ok(); + self.engine.sync().ok(); } } } diff --git a/src/database/map.rs b/src/database/map.rs index 578c0bcb..ede09bf8 100644 --- a/src/database/map.rs +++ b/src/database/map.rs @@ -48,38 +48,40 @@ pub struct Map { name: &'static str, watch: Watch, cf: Arc, - db: Arc, + engine: Arc, read_options: ReadOptions, cache_read_options: ReadOptions, write_options: WriteOptions, } impl Map { - pub(crate) fn open(db: &Arc, name: &'static str) -> Result> { + pub(crate) fn open(engine: &Arc, name: &'static str) -> Result> { Ok(Arc::new(Self { name, watch: Watch::default(), - cf: open::open(db, name), - db: db.clone(), - read_options: read_options_default(db), - cache_read_options: cache_read_options_default(db), - write_options: write_options_default(db), + cf: open::open(engine, name), + engine: engine.clone(), + read_options: read_options_default(engine), + cache_read_options: cache_read_options_default(engine), + write_options: write_options_default(engine), })) } #[inline] pub fn property_integer(&self, name: &CStr) -> Result { - self.db.property_integer(&self.cf(), name) + self.engine.property_integer(&self.cf(), name) } #[inline] - pub fn property(&self, name: &str) -> Result { self.db.property(&self.cf(), name) } + pub fn property(&self, name: &str) -> Result { + self.engine.property(&self.cf(), name) + } #[inline] pub fn name(&self) -> &str { self.name } #[inline] - pub(crate) fn db(&self) -> &Arc { &self.db } + pub(crate) fn engine(&self) -> &Arc { &self.engine } #[inline] pub(crate) fn cf(&self) -> impl AsColumnFamilyRef + '_ { &*self.cf } diff --git a/src/database/map/compact.rs b/src/database/map/compact.rs index 96aa5c96..ddd267bc 100644 --- a/src/database/map/compact.rs +++ b/src/database/map/compact.rs @@ -54,7 +54,7 @@ pub fn compact_blocking(&self, opts: Options) -> Result { | (Some(_), Some(_)) => return Err!("compacting between specific levels not supported"), } - self.db + self.engine .db .compact_range_cf_opt(&self.cf(), opts.range.0, opts.range.1, &co); diff --git a/src/database/map/contains.rs b/src/database/map/contains.rs index 17f4a664..d8d631f8 100644 --- a/src/database/map/contains.rs +++ b/src/database/map/contains.rs @@ -97,7 +97,7 @@ pub(crate) fn maybe_exists(&self, key: &K) -> bool where K: AsRef<[u8]> + ?Sized, { - self.db + self.engine .db .key_may_exist_cf_opt(&self.cf(), key, &self.cache_read_options) } diff --git a/src/database/map/get.rs b/src/database/map/get.rs index 186c7e83..3c355644 100644 --- a/src/database/map/get.rs +++ b/src/database/map/get.rs @@ -37,7 +37,7 @@ where res: None, }; - self.db + self.engine .pool .execute_get(cmd) .and_then(|mut res| ready(res.remove(0))) @@ -77,7 +77,7 @@ fn get_blocking_opts( where K: AsRef<[u8]> + ?Sized, { - self.db + self.engine .db .get_pinned_cf_opt(&self.cf(), key, read_options) } diff --git a/src/database/map/get_batch.rs b/src/database/map/get_batch.rs index 836f58e3..dfe7614d 100644 --- a/src/database/map/get_batch.rs +++ b/src/database/map/get_batch.rs @@ -48,7 +48,7 @@ where keys.ready_chunks(automatic_amplification()) .widen_then(automatic_width(), |chunk| { - self.db.pool.execute_get(Get { + self.engine.pool.execute_get(Get { map: self.clone(), key: chunk .iter() @@ -104,7 +104,7 @@ where // comparator**. const SORTED: bool = false; - self.db + self.engine .db .batched_multi_get_cf_opt(&self.cf(), keys, SORTED, read_options) .into_iter() diff --git a/src/database/map/insert.rs b/src/database/map/insert.rs index cc40e836..694543e9 100644 --- a/src/database/map/insert.rs +++ b/src/database/map/insert.rs @@ -22,14 +22,14 @@ where V: AsRef<[u8]>, { let write_options = &self.write_options; - self.db + self.engine .db .put_cf_opt(&self.cf(), key, val, write_options) .or_else(or_else) .expect("database insert error"); - if !self.db.corked() { - self.db.flush().expect("database flush error"); + if !self.engine.corked() { + self.engine.flush().expect("database flush error"); } self.notify(key.as_ref()); @@ -49,13 +49,13 @@ where } let write_options = &self.write_options; - self.db + self.engine .db .write_opt(batch, write_options) .or_else(or_else) .expect("database insert batch error"); - if !self.db.corked() { - self.db.flush().expect("database flush error"); + if !self.engine.corked() { + self.engine.flush().expect("database flush error"); } } diff --git a/src/database/map/keys.rs b/src/database/map/keys.rs index 87c86188..a1b3b5ee 100644 --- a/src/database/map/keys.rs +++ b/src/database/map/keys.rs @@ -23,7 +23,7 @@ where pub fn raw_keys(self: &Arc) -> impl Stream>> + Send { use crate::pool::Seek; - let opts = super::iter_options_default(&self.db); + let opts = super::iter_options_default(&self.engine); let state = stream::State::new(self, opts); if is_cached(self) { let state = state.init_fwd(None); @@ -42,7 +42,7 @@ pub fn raw_keys(self: &Arc) -> impl Stream>> + Send res: None, }; - self.db + self.engine .pool .execute_iter(seek) .ok_into::>() diff --git a/src/database/map/keys_from.rs b/src/database/map/keys_from.rs index f6d3eee3..2efa3588 100644 --- a/src/database/map/keys_from.rs +++ b/src/database/map/keys_from.rs @@ -61,7 +61,7 @@ where { use crate::pool::Seek; - let opts = super::iter_options_default(&self.db); + let opts = super::iter_options_default(&self.engine); let state = stream::State::new(self, opts); if is_cached(self, from) { return stream::Keys::<'_>::from(state.init_fwd(from.as_ref().into())).boxed(); @@ -75,7 +75,7 @@ where res: None, }; - self.db + self.engine .pool .execute_iter(seek) .ok_into::>() diff --git a/src/database/map/open.rs b/src/database/map/open.rs index 07f7a0c6..00023e08 100644 --- a/src/database/map/open.rs +++ b/src/database/map/open.rs @@ -4,8 +4,8 @@ use rocksdb::ColumnFamily; use crate::Engine; -pub(super) fn open(db: &Arc, name: &str) -> Arc { - let bounded_arc = db.cf(name); +pub(super) fn open(engine: &Arc, name: &str) -> Arc { + let bounded_arc = engine.cf(name); let bounded_ptr = Arc::into_raw(bounded_arc); let cf_ptr = bounded_ptr.cast::(); diff --git a/src/database/map/options.rs b/src/database/map/options.rs index 9e2ad898..0e12ccfc 100644 --- a/src/database/map/options.rs +++ b/src/database/map/options.rs @@ -5,34 +5,34 @@ use rocksdb::{ReadOptions, ReadTier, WriteOptions}; use crate::Engine; #[inline] -pub(crate) fn cache_iter_options_default(db: &Arc) -> ReadOptions { - let mut options = iter_options_default(db); +pub(crate) fn cache_iter_options_default(engine: &Arc) -> ReadOptions { + let mut options = iter_options_default(engine); options.set_read_tier(ReadTier::BlockCache); options.fill_cache(false); options } #[inline] -pub(crate) fn iter_options_default(db: &Arc) -> ReadOptions { - let mut options = read_options_default(db); +pub(crate) fn iter_options_default(engine: &Arc) -> ReadOptions { + let mut options = read_options_default(engine); options.set_background_purge_on_iterator_cleanup(true); options } #[inline] -pub(crate) fn cache_read_options_default(db: &Arc) -> ReadOptions { - let mut options = read_options_default(db); +pub(crate) fn cache_read_options_default(engine: &Arc) -> ReadOptions { + let mut options = read_options_default(engine); options.set_read_tier(ReadTier::BlockCache); options.fill_cache(false); options } #[inline] -pub(crate) fn read_options_default(db: &Arc) -> ReadOptions { +pub(crate) fn read_options_default(engine: &Arc) -> ReadOptions { let mut options = ReadOptions::default(); options.set_total_order_seek(true); - if !db.checksums { + if !engine.checksums { options.set_verify_checksums(false); } @@ -40,4 +40,6 @@ pub(crate) fn read_options_default(db: &Arc) -> ReadOptions { } #[inline] -pub(crate) fn write_options_default(_db: &Arc) -> WriteOptions { WriteOptions::default() } +pub(crate) fn write_options_default(_engine: &Arc) -> WriteOptions { + WriteOptions::default() +} diff --git a/src/database/map/qry_batch.rs b/src/database/map/qry_batch.rs index 0cb5294c..8e4c6768 100644 --- a/src/database/map/qry_batch.rs +++ b/src/database/map/qry_batch.rs @@ -52,7 +52,7 @@ where .map(|result| result.expect("failed to serialize query key")) .collect(); - self.db + self.engine .pool .execute_get(Get { map: self.clone(), key: keys, res: None }) }) diff --git a/src/database/map/remove.rs b/src/database/map/remove.rs index 1e67908a..b75b5df5 100644 --- a/src/database/map/remove.rs +++ b/src/database/map/remove.rs @@ -11,13 +11,13 @@ where K: AsRef<[u8]> + ?Sized + Debug, { let write_options = &self.write_options; - self.db + self.engine .db .delete_cf_opt(&self.cf(), key, write_options) .or_else(or_else) .expect("database remove error"); - if !self.db.corked() { - self.db.flush().expect("database flush error"); + if !self.engine.corked() { + self.engine.flush().expect("database flush error"); } } diff --git a/src/database/map/rev_keys.rs b/src/database/map/rev_keys.rs index 3bf1a531..3dc1e9b9 100644 --- a/src/database/map/rev_keys.rs +++ b/src/database/map/rev_keys.rs @@ -23,7 +23,7 @@ where pub fn rev_raw_keys(self: &Arc) -> impl Stream>> + Send { use crate::pool::Seek; - let opts = super::iter_options_default(&self.db); + let opts = super::iter_options_default(&self.engine); let state = stream::State::new(self, opts); if is_cached(self) { let state = state.init_rev(None); @@ -42,7 +42,7 @@ pub fn rev_raw_keys(self: &Arc) -> impl Stream>> + S res: None, }; - self.db + self.engine .pool .execute_iter(seek) .ok_into::>() diff --git a/src/database/map/rev_keys_from.rs b/src/database/map/rev_keys_from.rs index 18e34309..9fd9a562 100644 --- a/src/database/map/rev_keys_from.rs +++ b/src/database/map/rev_keys_from.rs @@ -61,7 +61,7 @@ where { use crate::pool::Seek; - let opts = super::iter_options_default(&self.db); + let opts = super::iter_options_default(&self.engine); let state = stream::State::new(self, opts); if is_cached(self, from) { return stream::KeysRev::<'_>::from(state.init_rev(from.as_ref().into())).boxed(); @@ -75,7 +75,7 @@ where res: None, }; - self.db + self.engine .pool .execute_iter(seek) .ok_into::>() diff --git a/src/database/map/rev_stream.rs b/src/database/map/rev_stream.rs index 96bec397..6ab2f4a2 100644 --- a/src/database/map/rev_stream.rs +++ b/src/database/map/rev_stream.rs @@ -31,7 +31,7 @@ where pub fn rev_raw_stream(self: &Arc) -> impl Stream>> + Send { use crate::pool::Seek; - let opts = super::iter_options_default(&self.db); + let opts = super::iter_options_default(&self.engine); let state = stream::State::new(self, opts); if is_cached(self) { let state = state.init_rev(None); @@ -50,7 +50,7 @@ pub fn rev_raw_stream(self: &Arc) -> impl Stream> res: None, }; - self.db + self.engine .pool .execute_iter(seek) .ok_into::>() @@ -66,7 +66,7 @@ pub fn rev_raw_stream(self: &Arc) -> impl Stream> fields(%map), )] pub(super) fn is_cached(map: &Arc) -> bool { - let opts = super::cache_iter_options_default(&map.db); + let opts = super::cache_iter_options_default(&map.engine); let state = stream::State::new(map, opts).init_rev(None); !state.is_incomplete() diff --git a/src/database/map/rev_stream_from.rs b/src/database/map/rev_stream_from.rs index 1489b392..6bbc1c40 100644 --- a/src/database/map/rev_stream_from.rs +++ b/src/database/map/rev_stream_from.rs @@ -80,7 +80,7 @@ where { use crate::pool::Seek; - let opts = super::iter_options_default(&self.db); + let opts = super::iter_options_default(&self.engine); let state = stream::State::new(self, opts); if is_cached(self, from) { let state = state.init_rev(from.as_ref().into()); @@ -99,7 +99,7 @@ where res: None, }; - self.db + self.engine .pool .execute_iter(seek) .ok_into::>() @@ -118,7 +118,7 @@ pub(super) fn is_cached

(map: &Arc, from: &P) -> bool where P: AsRef<[u8]> + ?Sized, { - let cache_opts = super::cache_iter_options_default(&map.db); + let cache_opts = super::cache_iter_options_default(&map.engine); let cache_status = stream::State::new(map, cache_opts) .init_rev(from.as_ref().into()) .status(); diff --git a/src/database/map/stream.rs b/src/database/map/stream.rs index 0fbfd09b..ec88defc 100644 --- a/src/database/map/stream.rs +++ b/src/database/map/stream.rs @@ -31,7 +31,7 @@ where pub fn raw_stream(self: &Arc) -> impl Stream>> + Send { use crate::pool::Seek; - let opts = super::iter_options_default(&self.db); + let opts = super::iter_options_default(&self.engine); let state = stream::State::new(self, opts); if is_cached(self) { let state = state.init_fwd(None); @@ -50,7 +50,7 @@ pub fn raw_stream(self: &Arc) -> impl Stream>> + res: None, }; - self.db + self.engine .pool .execute_iter(seek) .ok_into::>() @@ -66,7 +66,7 @@ pub fn raw_stream(self: &Arc) -> impl Stream>> + fields(%map), )] pub(super) fn is_cached(map: &Arc) -> bool { - let opts = super::cache_iter_options_default(&map.db); + let opts = super::cache_iter_options_default(&map.engine); let state = stream::State::new(map, opts).init_fwd(None); !state.is_incomplete() diff --git a/src/database/map/stream_from.rs b/src/database/map/stream_from.rs index bef59331..e8bb2ba7 100644 --- a/src/database/map/stream_from.rs +++ b/src/database/map/stream_from.rs @@ -79,7 +79,7 @@ where { use crate::pool::Seek; - let opts = super::iter_options_default(&self.db); + let opts = super::iter_options_default(&self.engine); let state = stream::State::new(self, opts); if is_cached(self, from) { let state = state.init_fwd(from.as_ref().into()); @@ -98,7 +98,7 @@ where res: None, }; - self.db + self.engine .pool .execute_iter(seek) .ok_into::>() @@ -117,7 +117,7 @@ pub(super) fn is_cached

(map: &Arc, from: &P) -> bool where P: AsRef<[u8]> + ?Sized, { - let opts = super::cache_iter_options_default(&map.db); + let opts = super::cache_iter_options_default(&map.engine); let state = stream::State::new(map, opts).init_fwd(from.as_ref().into()); !state.is_incomplete() diff --git a/src/database/maps.rs b/src/database/maps.rs index 4d14d8e4..0177cf23 100644 --- a/src/database/maps.rs +++ b/src/database/maps.rs @@ -11,12 +11,12 @@ pub(super) type Maps = BTreeMap; pub(super) type MapsKey = &'static str; pub(super) type MapsVal = Arc; -pub(super) fn open(db: &Arc) -> Result { open_list(db, MAPS) } +pub(super) fn open(engine: &Arc) -> Result { open_list(engine, MAPS) } #[tracing::instrument(name = "maps", level = "debug", skip_all)] -pub(super) fn open_list(db: &Arc, maps: &[Descriptor]) -> Result { +pub(super) fn open_list(engine: &Arc, maps: &[Descriptor]) -> Result { maps.iter() - .map(|desc| Ok((desc.name, Map::open(db, desc.name)?))) + .map(|desc| Ok((desc.name, Map::open(engine, desc.name)?))) .collect() } @@ -438,4 +438,8 @@ pub(super) static MAPS: &[Descriptor] = &[ name: "userroomid_notificationcount", ..descriptor::RANDOM }, + Descriptor { + name: "roomid_maxremotepowerlevel", + ..descriptor::RANDOM_SMALL + }, ]; diff --git a/src/database/mod.rs b/src/database/mod.rs index a5b50651..d70038fb 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -43,7 +43,7 @@ use crate::maps::{Maps, MapsKey, MapsVal}; pub struct Database { maps: Maps, - pub db: Arc, + pub engine: Arc, pub(crate) _ctx: Arc, } @@ -51,10 +51,10 @@ impl Database { /// Load an existing database or create a new one. pub async fn open(server: &Arc) -> Result> { let ctx = Context::new(server)?; - let db = Engine::open(ctx.clone(), maps::MAPS).await?; + let engine = Engine::open(ctx.clone(), maps::MAPS).await?; Ok(Arc::new(Self { - maps: maps::open(&db)?, - db: db.clone(), + maps: maps::open(&engine)?, + engine: engine.clone(), _ctx: ctx, })) } @@ -76,11 +76,11 @@ impl Database { #[inline] #[must_use] - pub fn is_read_only(&self) -> bool { self.db.is_read_only() } + pub fn is_read_only(&self) -> bool { self.engine.is_read_only() } #[inline] #[must_use] - pub fn is_secondary(&self) -> bool { self.db.is_secondary() } + pub fn is_secondary(&self) -> bool { self.engine.is_secondary() } } impl Index<&str> for Database { diff --git a/src/database/stream.rs b/src/database/stream.rs index 72a45824..7160d1f2 100644 --- a/src/database/stream.rs +++ b/src/database/stream.rs @@ -50,7 +50,10 @@ impl<'a> State<'a> { #[inline] pub(super) fn new(map: &'a Arc, opts: ReadOptions) -> Self { Self { - inner: map.db().db.raw_iterator_cf_opt(&map.cf(), opts), + inner: map + .engine() + .db + .raw_iterator_cf_opt(&map.cf(), opts), init: true, seek: false, } diff --git a/src/service/migrations.rs b/src/service/migrations.rs index cbd5ba57..b218e3fb 100644 --- a/src/service/migrations.rs +++ b/src/service/migrations.rs @@ -390,7 +390,7 @@ async fn fix_bad_double_separator_in_state_cache(services: &Services) -> Result }) .await; - db.db.sort()?; + db.engine.sort()?; db["global"].insert(b"fix_bad_double_separator_in_state_cache", []); info!("Finished fixing"); @@ -475,7 +475,7 @@ async fn retroactively_fix_bad_data_from_roomuserid_joined(services: &Services) .await; } - db.db.sort()?; + db.engine.sort()?; db["global"].insert(b"retroactively_fix_bad_data_from_roomuserid_joined", []); info!("Finished fixing"); @@ -521,7 +521,7 @@ async fn fix_referencedevents_missing_sep(services: &Services) -> Result { info!(?total, ?fixed, "Fixed missing record separators in 'referencedevents'."); db["global"].insert(b"fix_referencedevents_missing_sep", []); - db.db.sort() + db.engine.sort() } async fn fix_readreceiptid_readreceipt_duplicates(services: &Services) -> Result { @@ -573,5 +573,5 @@ async fn fix_readreceiptid_readreceipt_duplicates(services: &Services) -> Result info!(?total, ?fixed, "Fixed undeleted entries in readreceiptid_readreceipt."); db["global"].insert(b"fix_readreceiptid_readreceipt_duplicates", []); - db.db.sort() + db.engine.sort() }