Refactor legacy database watcher system.

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk
2025-07-25 09:17:54 +00:00
parent 0d439188e8
commit e58ef326d4
6 changed files with 105 additions and 95 deletions

View File

@@ -24,14 +24,12 @@ mod rev_stream_prefix;
mod stream;
mod stream_from;
mod stream_prefix;
mod watch;
use std::{
convert::AsRef,
ffi::CStr,
fmt,
fmt::{Debug, Display},
future::Future,
pin::Pin,
sync::Arc,
};
@@ -42,12 +40,13 @@ pub(crate) use self::options::{
cache_iter_options_default, cache_read_options_default, iter_options_default,
read_options_default, write_options_default,
};
use self::watch::Watch;
pub use self::{get_batch::Get, qry_batch::Qry};
use crate::{Engine, watchers::Watchers};
use crate::Engine;
pub struct Map {
name: &'static str,
watchers: Watchers,
watch: Watch,
cf: Arc<ColumnFamily>,
db: Arc<Engine>,
read_options: ReadOptions,
@@ -59,7 +58,7 @@ impl Map {
pub(crate) fn open(db: &Arc<Engine>, name: &'static str) -> Result<Arc<Self>> {
Ok(Arc::new(Self {
name,
watchers: Watchers::default(),
watch: Watch::default(),
cf: open::open(db, name),
db: db.clone(),
read_options: read_options_default(db),
@@ -68,17 +67,6 @@ impl Map {
}))
}
#[inline]
pub fn watch_prefix<'a, K>(
&'a self,
prefix: &K,
) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>>
where
K: AsRef<[u8]> + ?Sized + Debug,
{
self.watchers.watch(prefix.as_ref())
}
#[inline]
pub fn property_integer(&self, name: &CStr) -> Result<u64> {
self.db.property_integer(&self.cf(), name)

View File

@@ -32,7 +32,7 @@ where
self.db.flush().expect("database flush error");
}
self.watchers.wake(key.as_ref());
self.notify(key.as_ref());
}
#[implement(super::Map)]

69
src/database/map/watch.rs Normal file
View File

@@ -0,0 +1,69 @@
use std::{
collections::{BTreeMap, btree_map::Entry},
future::Future,
ops::RangeToInclusive,
sync::Mutex,
};
use futures::pin_mut;
use tokio::sync::watch::{Sender, channel};
use tuwunel_core::implement;
use crate::keyval::KeyBuf;
type Watchers = Mutex<BTreeMap<KeyBuf, Sender<()>>>;
#[derive(Default)]
pub(super) struct Watch {
watchers: Watchers,
}
#[implement(super::Map)]
pub fn watch_raw_prefix<K>(&self, prefix: &K) -> impl Future<Output = ()> + Send + use<K>
where
K: AsRef<[u8]> + ?Sized,
{
let rx = match self
.watch
.watchers
.lock()
.expect("locked")
.entry(prefix.as_ref().into())
{
| Entry::Occupied(node) => node.get().subscribe(),
| Entry::Vacant(node) => {
let (tx, rx) = channel(());
node.insert(tx);
rx
},
};
async move {
pin_mut!(rx);
rx.changed()
.await
.expect("watcher sender dropped");
}
}
#[implement(super::Map)]
pub(crate) fn notify<K>(&self, key: &K)
where
K: AsRef<[u8]> + Ord + ?Sized,
{
let range = RangeToInclusive::<KeyBuf> { end: key.as_ref().into() };
let mut watchers = self.watch.watchers.lock().expect("locked");
watchers
.range(range)
.rev()
.take_while(|(k, _)| key.as_ref().starts_with(k))
.filter_map(|(k, tx)| tx.send(()).is_err().then_some(k))
.cloned()
.collect::<Vec<_>>()
.into_iter()
.for_each(|k| {
watchers.remove(&k);
});
}

View File

@@ -22,7 +22,6 @@ mod stream;
#[cfg(test)]
mod tests;
pub(crate) mod util;
mod watchers;
use std::{ops::Index, sync::Arc};

View File

@@ -1,63 +0,0 @@
use std::{
collections::{HashMap, hash_map},
future::Future,
pin::Pin,
sync::RwLock,
};
use tokio::sync::watch;
type Watcher = RwLock<HashMap<Vec<u8>, (watch::Sender<()>, watch::Receiver<()>)>>;
#[derive(Default)]
pub(crate) struct Watchers {
watchers: Watcher,
}
impl Watchers {
pub(crate) fn watch<'a>(
&'a self,
prefix: &[u8],
) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
let mut rx = match self
.watchers
.write()
.unwrap()
.entry(prefix.to_vec())
{
| hash_map::Entry::Occupied(o) => o.get().1.clone(),
| hash_map::Entry::Vacant(v) => {
let (tx, rx) = watch::channel(());
v.insert((tx, rx.clone()));
rx
},
};
Box::pin(async move {
// Tx is never destroyed
rx.changed().await.unwrap();
})
}
pub(crate) fn wake(&self, key: &[u8]) {
let watchers = self.watchers.read().unwrap();
let mut triggered = Vec::new();
for length in 0..=key.len() {
if watchers.contains_key(&key[..length]) {
triggered.push(&key[..length]);
}
}
drop(watchers);
if !triggered.is_empty() {
let mut watchers = self.watchers.write().unwrap();
for prefix in triggered {
if let Some(tx) = watchers.remove(prefix) {
tx.0.send(())
.expect("channel should still be open");
}
}
}
}
}