From e58ef326d4c0907574891db8809f4ef683064ebf Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Fri, 25 Jul 2025 09:17:54 +0000 Subject: [PATCH] Refactor legacy database watcher system. Signed-off-by: Jason Volk --- src/database/map.rs | 22 +++--------- src/database/map/insert.rs | 2 +- src/database/map/watch.rs | 69 ++++++++++++++++++++++++++++++++++++++ src/database/mod.rs | 1 - src/database/watchers.rs | 63 ---------------------------------- src/service/sync/watch.rs | 43 +++++++++++++++++------- 6 files changed, 105 insertions(+), 95 deletions(-) create mode 100644 src/database/map/watch.rs delete mode 100644 src/database/watchers.rs diff --git a/src/database/map.rs b/src/database/map.rs index 5e496cb5..578c0bcb 100644 --- a/src/database/map.rs +++ b/src/database/map.rs @@ -24,14 +24,12 @@ mod rev_stream_prefix; mod stream; mod stream_from; mod stream_prefix; +mod watch; use std::{ - convert::AsRef, ffi::CStr, fmt, fmt::{Debug, Display}, - future::Future, - pin::Pin, sync::Arc, }; @@ -42,12 +40,13 @@ pub(crate) use self::options::{ cache_iter_options_default, cache_read_options_default, iter_options_default, read_options_default, write_options_default, }; +use self::watch::Watch; pub use self::{get_batch::Get, qry_batch::Qry}; -use crate::{Engine, watchers::Watchers}; +use crate::Engine; pub struct Map { name: &'static str, - watchers: Watchers, + watch: Watch, cf: Arc, db: Arc, read_options: ReadOptions, @@ -59,7 +58,7 @@ impl Map { pub(crate) fn open(db: &Arc, name: &'static str) -> Result> { Ok(Arc::new(Self { name, - watchers: Watchers::default(), + watch: Watch::default(), cf: open::open(db, name), db: db.clone(), read_options: read_options_default(db), @@ -68,17 +67,6 @@ impl Map { })) } - #[inline] - pub fn watch_prefix<'a, K>( - &'a self, - prefix: &K, - ) -> Pin + Send + 'a>> - where - K: AsRef<[u8]> + ?Sized + Debug, - { - self.watchers.watch(prefix.as_ref()) - } - #[inline] pub fn property_integer(&self, name: &CStr) -> Result { self.db.property_integer(&self.cf(), name) diff --git a/src/database/map/insert.rs b/src/database/map/insert.rs index f3663b3e..cc40e836 100644 --- a/src/database/map/insert.rs +++ b/src/database/map/insert.rs @@ -32,7 +32,7 @@ where self.db.flush().expect("database flush error"); } - self.watchers.wake(key.as_ref()); + self.notify(key.as_ref()); } #[implement(super::Map)] diff --git a/src/database/map/watch.rs b/src/database/map/watch.rs new file mode 100644 index 00000000..7cb7ef64 --- /dev/null +++ b/src/database/map/watch.rs @@ -0,0 +1,69 @@ +use std::{ + collections::{BTreeMap, btree_map::Entry}, + future::Future, + ops::RangeToInclusive, + sync::Mutex, +}; + +use futures::pin_mut; +use tokio::sync::watch::{Sender, channel}; +use tuwunel_core::implement; + +use crate::keyval::KeyBuf; + +type Watchers = Mutex>>; + +#[derive(Default)] +pub(super) struct Watch { + watchers: Watchers, +} + +#[implement(super::Map)] +pub fn watch_raw_prefix(&self, prefix: &K) -> impl Future + Send + use +where + K: AsRef<[u8]> + ?Sized, +{ + let rx = match self + .watch + .watchers + .lock() + .expect("locked") + .entry(prefix.as_ref().into()) + { + | Entry::Occupied(node) => node.get().subscribe(), + | Entry::Vacant(node) => { + let (tx, rx) = channel(()); + node.insert(tx); + rx + }, + }; + + async move { + pin_mut!(rx); + rx.changed() + .await + .expect("watcher sender dropped"); + } +} + +#[implement(super::Map)] +pub(crate) fn notify(&self, key: &K) +where + K: AsRef<[u8]> + Ord + ?Sized, +{ + let range = RangeToInclusive:: { end: key.as_ref().into() }; + + let mut watchers = self.watch.watchers.lock().expect("locked"); + + watchers + .range(range) + .rev() + .take_while(|(k, _)| key.as_ref().starts_with(k)) + .filter_map(|(k, tx)| tx.send(()).is_err().then_some(k)) + .cloned() + .collect::>() + .into_iter() + .for_each(|k| { + watchers.remove(&k); + }); +} diff --git a/src/database/mod.rs b/src/database/mod.rs index 1dde8adc..a5b50651 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -22,7 +22,6 @@ mod stream; #[cfg(test)] mod tests; pub(crate) mod util; -mod watchers; use std::{ops::Index, sync::Arc}; diff --git a/src/database/watchers.rs b/src/database/watchers.rs deleted file mode 100644 index d40d673c..00000000 --- a/src/database/watchers.rs +++ /dev/null @@ -1,63 +0,0 @@ -use std::{ - collections::{HashMap, hash_map}, - future::Future, - pin::Pin, - sync::RwLock, -}; - -use tokio::sync::watch; - -type Watcher = RwLock, (watch::Sender<()>, watch::Receiver<()>)>>; - -#[derive(Default)] -pub(crate) struct Watchers { - watchers: Watcher, -} - -impl Watchers { - pub(crate) fn watch<'a>( - &'a self, - prefix: &[u8], - ) -> Pin + Send + 'a>> { - let mut rx = match self - .watchers - .write() - .unwrap() - .entry(prefix.to_vec()) - { - | hash_map::Entry::Occupied(o) => o.get().1.clone(), - | hash_map::Entry::Vacant(v) => { - let (tx, rx) = watch::channel(()); - v.insert((tx, rx.clone())); - rx - }, - }; - - Box::pin(async move { - // Tx is never destroyed - rx.changed().await.unwrap(); - }) - } - - pub(crate) fn wake(&self, key: &[u8]) { - let watchers = self.watchers.read().unwrap(); - let mut triggered = Vec::new(); - for length in 0..=key.len() { - if watchers.contains_key(&key[..length]) { - triggered.push(&key[..length]); - } - } - - drop(watchers); - - if !triggered.is_empty() { - let mut watchers = self.watchers.write().unwrap(); - for prefix in triggered { - if let Some(tx) = watchers.remove(prefix) { - tx.0.send(()) - .expect("channel should still be open"); - } - } - } - } -} diff --git a/src/service/sync/watch.rs b/src/service/sync/watch.rs index 66330f6c..de8aeca8 100644 --- a/src/service/sync/watch.rs +++ b/src/service/sync/watch.rs @@ -20,33 +20,39 @@ pub async fn watch(&self, user_id: &UserId, device_id: &DeviceId) -> Result { futures.push( self.db .todeviceid_events - .watch_prefix(&userdeviceid_prefix), + .watch_raw_prefix(&userdeviceid_prefix) + .boxed(), ); futures.push( self.db .userroomid_joined - .watch_prefix(&userid_prefix), + .watch_raw_prefix(&userid_prefix) + .boxed(), ); futures.push( self.db .userroomid_invitestate - .watch_prefix(&userid_prefix), + .watch_raw_prefix(&userid_prefix) + .boxed(), ); futures.push( self.db .userroomid_leftstate - .watch_prefix(&userid_prefix), + .watch_raw_prefix(&userid_prefix) + .boxed(), ); futures.push( self.db .userroomid_notificationcount - .watch_prefix(&userid_prefix), + .watch_raw_prefix(&userid_prefix) + .boxed(), ); futures.push( self.db .userroomid_highlightcount - .watch_prefix(&userid_prefix), + .watch_raw_prefix(&userid_prefix) + .boxed(), ); // Events for rooms we are in @@ -66,7 +72,8 @@ pub async fn watch(&self, user_id: &UserId, device_id: &DeviceId) -> Result { futures.push( self.db .keychangeid_userid - .watch_prefix(&roomid_prefix), + .watch_raw_prefix(&roomid_prefix) + .boxed(), ); // Room account data @@ -76,12 +83,18 @@ pub async fn watch(&self, user_id: &UserId, device_id: &DeviceId) -> Result { futures.push( self.db .roomusertype_roomuserdataid - .watch_prefix(&roomuser_prefix), + .watch_raw_prefix(&roomuser_prefix) + .boxed(), ); // PDUs let short_roomid = short_roomid.to_be_bytes().to_vec(); - futures.push(self.db.pduid_pdu.watch_prefix(&short_roomid)); + futures.push( + self.db + .pduid_pdu + .watch_raw_prefix(&short_roomid) + .boxed(), + ); // EDUs let typing_room_id = room_id.to_owned(); @@ -96,7 +109,8 @@ pub async fn watch(&self, user_id: &UserId, device_id: &DeviceId) -> Result { futures.push( self.db .readreceiptid_readreceipt - .watch_prefix(&roomid_prefix), + .watch_raw_prefix(&roomid_prefix) + .boxed(), ); } @@ -106,21 +120,24 @@ pub async fn watch(&self, user_id: &UserId, device_id: &DeviceId) -> Result { futures.push( self.db .roomusertype_roomuserdataid - .watch_prefix(&globaluserdata_prefix), + .watch_raw_prefix(&globaluserdata_prefix) + .boxed(), ); // More key changes (used when user is not joined to any rooms) futures.push( self.db .keychangeid_userid - .watch_prefix(&userid_prefix), + .watch_raw_prefix(&userid_prefix) + .boxed(), ); // One time keys futures.push( self.db .userid_lastonetimekeyupdate - .watch_prefix(&userid_bytes), + .watch_raw_prefix(&userid_bytes) + .boxed(), ); // Server shutdown