Optimize sync watchers and key serializations.

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk
2025-07-25 20:23:00 +00:00
parent e09a2c0e0f
commit ce30f83052
2 changed files with 63 additions and 90 deletions

View File

@@ -6,10 +6,11 @@ use std::{
}; };
use futures::pin_mut; use futures::pin_mut;
use serde::Serialize;
use tokio::sync::watch::{Sender, channel}; use tokio::sync::watch::{Sender, channel};
use tuwunel_core::implement; use tuwunel_core::implement;
use crate::keyval::KeyBuf; use crate::keyval::{KeyBuf, serialize_key};
type Watchers = Mutex<BTreeMap<KeyBuf, Sender<()>>>; type Watchers = Mutex<BTreeMap<KeyBuf, Sender<()>>>;
@@ -19,9 +20,18 @@ pub(super) struct Watch {
} }
#[implement(super::Map)] #[implement(super::Map)]
pub fn watch_raw_prefix<K>(&self, prefix: &K) -> impl Future<Output = ()> + Send + use<K> pub fn watch_prefix<K>(&self, prefix: K) -> impl Future<Output = ()> + Send + '_
where where
K: AsRef<[u8]> + ?Sized, K: Serialize,
{
let prefix = serialize_key(prefix).expect("failed to serialize watch prefix key");
self.watch_raw_prefix(&prefix)
}
#[implement(super::Map)]
pub fn watch_raw_prefix<'a, K>(&self, prefix: &'a K) -> impl Future<Output = ()> + Send + use<K>
where
K: AsRef<[u8]> + ?Sized + 'a,
{ {
let rx = match self let rx = match self
.watch .watch

View File

@@ -1,59 +1,61 @@
use futures::{FutureExt, StreamExt, pin_mut, stream::FuturesUnordered}; use futures::{FutureExt, StreamExt, pin_mut, stream::FuturesUnordered};
use ruma::{DeviceId, UserId}; use ruma::{DeviceId, UserId};
use tuwunel_core::{Result, implement, trace}; use tuwunel_core::{Result, implement, trace};
use tuwunel_database::{Interfix, Separator, serialize_key};
#[implement(super::Service)] #[implement(super::Service)]
#[tracing::instrument(skip(self), level = "debug")] #[tracing::instrument(skip(self), level = "debug")]
pub async fn watch(&self, user_id: &UserId, device_id: &DeviceId) -> Result { pub async fn watch(&self, user_id: &UserId, device_id: &DeviceId) -> Result {
let userid_bytes = user_id.as_bytes().to_vec(); let userdeviceid_prefix = (user_id, device_id, Interfix);
let mut userid_prefix = userid_bytes.clone(); let globaluserdata_prefix = (Separator, user_id, Interfix);
userid_prefix.push(0xFF); let userid_prefix =
serialize_key((user_id, Interfix)).expect("failed to serialize watch prefix");
let mut userdeviceid_prefix = userid_prefix.clone(); let watchers = [
userdeviceid_prefix.extend_from_slice(device_id.as_bytes()); // Return when *any* user changed their key
userdeviceid_prefix.push(0xFF); // TODO: only send for user they share a room with
let mut futures = FuturesUnordered::new();
// Return when *any* user changed their key
// TODO: only send for user they share a room with
futures.push(
self.db self.db
.todeviceid_events .todeviceid_events
.watch_raw_prefix(&userdeviceid_prefix) .watch_prefix(&userdeviceid_prefix)
.boxed(), .boxed(),
);
futures.push(
self.db self.db
.userroomid_joined .userroomid_joined
.watch_raw_prefix(&userid_prefix) .watch_raw_prefix(&userid_prefix)
.boxed(), .boxed(),
);
futures.push(
self.db self.db
.userroomid_invitestate .userroomid_invitestate
.watch_raw_prefix(&userid_prefix) .watch_raw_prefix(&userid_prefix)
.boxed(), .boxed(),
);
futures.push(
self.db self.db
.userroomid_leftstate .userroomid_leftstate
.watch_raw_prefix(&userid_prefix) .watch_raw_prefix(&userid_prefix)
.boxed(), .boxed(),
);
futures.push(
self.db self.db
.userroomid_notificationcount .userroomid_notificationcount
.watch_raw_prefix(&userid_prefix) .watch_raw_prefix(&userid_prefix)
.boxed(), .boxed(),
);
futures.push(
self.db self.db
.userroomid_highlightcount .userroomid_highlightcount
.watch_raw_prefix(&userid_prefix) .watch_raw_prefix(&userid_prefix)
.boxed(), .boxed(),
); self.db
.roomusertype_roomuserdataid
.watch_prefix(&globaluserdata_prefix)
.boxed(),
// More key changes (used when user is not joined to any rooms)
self.db
.keychangeid_userid
.watch_raw_prefix(&userid_prefix)
.boxed(),
// One time keys
self.db
.userid_lastonetimekeyupdate
.watch_raw_prefix(&user_id)
.boxed(),
];
let mut futures = FuturesUnordered::new();
futures.extend(watchers.into_iter());
// Events for rooms we are in // Events for rooms we are in
let rooms_joined = self.services.state_cache.rooms_joined(user_id); let rooms_joined = self.services.state_cache.rooms_joined(user_id);
@@ -64,82 +66,43 @@ pub async fn watch(&self, user_id: &UserId, device_id: &DeviceId) -> Result {
continue; continue;
}; };
let roomid_bytes = room_id.as_bytes().to_vec(); let roomid_prefix = (room_id, Interfix);
let mut roomid_prefix = roomid_bytes.clone(); let roomuser_prefix = (room_id, user_id);
roomid_prefix.push(0xFF); let typing_room_id = room_id.to_owned();
let watchers = [
// Key changes // Key changes
futures.push(
self.db self.db
.keychangeid_userid .keychangeid_userid
.watch_raw_prefix(&roomid_prefix) .watch_prefix(&roomid_prefix)
.boxed(), .boxed(),
); // Room account data
// Room account data
let mut roomuser_prefix = roomid_prefix.clone();
roomuser_prefix.extend_from_slice(&userid_prefix);
futures.push(
self.db self.db
.roomusertype_roomuserdataid .roomusertype_roomuserdataid
.watch_raw_prefix(&roomuser_prefix) .watch_prefix(&roomuser_prefix)
.boxed(), .boxed(),
); // PDUs
// PDUs
let short_roomid = short_roomid.to_be_bytes().to_vec();
futures.push(
self.db self.db
.pduid_pdu .pduid_pdu
.watch_raw_prefix(&short_roomid) .watch_prefix(&short_roomid)
.boxed(), .boxed(),
); // EDUs
// EDUs
let typing_room_id = room_id.to_owned();
let typing_wait_for_update = async move {
self.services
.typing
.wait_for_update(&typing_room_id)
.await;
};
futures.push(typing_wait_for_update.boxed());
futures.push(
self.db self.db
.readreceiptid_readreceipt .readreceiptid_readreceipt
.watch_raw_prefix(&roomid_prefix) .watch_prefix(&roomid_prefix)
.boxed(), .boxed(),
); // Typing
async move {
self.services
.typing
.wait_for_update(&typing_room_id)
.await;
}
.boxed(),
];
futures.extend(watchers.into_iter());
} }
let mut globaluserdata_prefix = vec![0xFF];
globaluserdata_prefix.extend_from_slice(&userid_prefix);
futures.push(
self.db
.roomusertype_roomuserdataid
.watch_raw_prefix(&globaluserdata_prefix)
.boxed(),
);
// More key changes (used when user is not joined to any rooms)
futures.push(
self.db
.keychangeid_userid
.watch_raw_prefix(&userid_prefix)
.boxed(),
);
// One time keys
futures.push(
self.db
.userid_lastonetimekeyupdate
.watch_raw_prefix(&userid_bytes)
.boxed(),
);
// Server shutdown // Server shutdown
futures.push(self.services.server.until_shutdown().boxed()); futures.push(self.services.server.until_shutdown().boxed());