diff --git a/src/database/map/watch.rs b/src/database/map/watch.rs index 7cb7ef64..19776872 100644 --- a/src/database/map/watch.rs +++ b/src/database/map/watch.rs @@ -6,10 +6,11 @@ use std::{ }; use futures::pin_mut; +use serde::Serialize; use tokio::sync::watch::{Sender, channel}; use tuwunel_core::implement; -use crate::keyval::KeyBuf; +use crate::keyval::{KeyBuf, serialize_key}; type Watchers = Mutex>>; @@ -19,9 +20,18 @@ pub(super) struct Watch { } #[implement(super::Map)] -pub fn watch_raw_prefix(&self, prefix: &K) -> impl Future + Send + use +pub fn watch_prefix(&self, prefix: K) -> impl Future + Send + '_ where - K: AsRef<[u8]> + ?Sized, + K: Serialize, +{ + let prefix = serialize_key(prefix).expect("failed to serialize watch prefix key"); + self.watch_raw_prefix(&prefix) +} + +#[implement(super::Map)] +pub fn watch_raw_prefix<'a, K>(&self, prefix: &'a K) -> impl Future + Send + use +where + K: AsRef<[u8]> + ?Sized + 'a, { let rx = match self .watch diff --git a/src/service/sync/watch.rs b/src/service/sync/watch.rs index de8aeca8..ae40694d 100644 --- a/src/service/sync/watch.rs +++ b/src/service/sync/watch.rs @@ -1,59 +1,61 @@ use futures::{FutureExt, StreamExt, pin_mut, stream::FuturesUnordered}; use ruma::{DeviceId, UserId}; use tuwunel_core::{Result, implement, trace}; +use tuwunel_database::{Interfix, Separator, serialize_key}; #[implement(super::Service)] #[tracing::instrument(skip(self), level = "debug")] pub async fn watch(&self, user_id: &UserId, device_id: &DeviceId) -> Result { - let userid_bytes = user_id.as_bytes().to_vec(); - let mut userid_prefix = userid_bytes.clone(); - userid_prefix.push(0xFF); + let userdeviceid_prefix = (user_id, device_id, Interfix); + let globaluserdata_prefix = (Separator, user_id, Interfix); + let userid_prefix = + serialize_key((user_id, Interfix)).expect("failed to serialize watch prefix"); - let mut userdeviceid_prefix = userid_prefix.clone(); - userdeviceid_prefix.extend_from_slice(device_id.as_bytes()); - userdeviceid_prefix.push(0xFF); - - let mut futures = FuturesUnordered::new(); - - // Return when *any* user changed their key - // TODO: only send for user they share a room with - futures.push( + let watchers = [ + // Return when *any* user changed their key + // TODO: only send for user they share a room with self.db .todeviceid_events - .watch_raw_prefix(&userdeviceid_prefix) + .watch_prefix(&userdeviceid_prefix) .boxed(), - ); - - futures.push( self.db .userroomid_joined .watch_raw_prefix(&userid_prefix) .boxed(), - ); - futures.push( self.db .userroomid_invitestate .watch_raw_prefix(&userid_prefix) .boxed(), - ); - futures.push( self.db .userroomid_leftstate .watch_raw_prefix(&userid_prefix) .boxed(), - ); - futures.push( self.db .userroomid_notificationcount .watch_raw_prefix(&userid_prefix) .boxed(), - ); - futures.push( self.db .userroomid_highlightcount .watch_raw_prefix(&userid_prefix) .boxed(), - ); + self.db + .roomusertype_roomuserdataid + .watch_prefix(&globaluserdata_prefix) + .boxed(), + // More key changes (used when user is not joined to any rooms) + self.db + .keychangeid_userid + .watch_raw_prefix(&userid_prefix) + .boxed(), + // One time keys + self.db + .userid_lastonetimekeyupdate + .watch_raw_prefix(&user_id) + .boxed(), + ]; + + let mut futures = FuturesUnordered::new(); + futures.extend(watchers.into_iter()); // Events for rooms we are in let rooms_joined = self.services.state_cache.rooms_joined(user_id); @@ -64,82 +66,43 @@ pub async fn watch(&self, user_id: &UserId, device_id: &DeviceId) -> Result { continue; }; - let roomid_bytes = room_id.as_bytes().to_vec(); - let mut roomid_prefix = roomid_bytes.clone(); - roomid_prefix.push(0xFF); - - // Key changes - futures.push( + let roomid_prefix = (room_id, Interfix); + let roomuser_prefix = (room_id, user_id); + let typing_room_id = room_id.to_owned(); + let watchers = [ + // Key changes self.db .keychangeid_userid - .watch_raw_prefix(&roomid_prefix) + .watch_prefix(&roomid_prefix) .boxed(), - ); - - // Room account data - let mut roomuser_prefix = roomid_prefix.clone(); - roomuser_prefix.extend_from_slice(&userid_prefix); - - futures.push( + // Room account data self.db .roomusertype_roomuserdataid - .watch_raw_prefix(&roomuser_prefix) + .watch_prefix(&roomuser_prefix) .boxed(), - ); - - // PDUs - let short_roomid = short_roomid.to_be_bytes().to_vec(); - futures.push( + // PDUs self.db .pduid_pdu - .watch_raw_prefix(&short_roomid) + .watch_prefix(&short_roomid) .boxed(), - ); - - // EDUs - let typing_room_id = room_id.to_owned(); - let typing_wait_for_update = async move { - self.services - .typing - .wait_for_update(&typing_room_id) - .await; - }; - - futures.push(typing_wait_for_update.boxed()); - futures.push( + // EDUs self.db .readreceiptid_readreceipt - .watch_raw_prefix(&roomid_prefix) + .watch_prefix(&roomid_prefix) .boxed(), - ); + // Typing + async move { + self.services + .typing + .wait_for_update(&typing_room_id) + .await; + } + .boxed(), + ]; + + futures.extend(watchers.into_iter()); } - let mut globaluserdata_prefix = vec![0xFF]; - globaluserdata_prefix.extend_from_slice(&userid_prefix); - - futures.push( - self.db - .roomusertype_roomuserdataid - .watch_raw_prefix(&globaluserdata_prefix) - .boxed(), - ); - - // More key changes (used when user is not joined to any rooms) - futures.push( - self.db - .keychangeid_userid - .watch_raw_prefix(&userid_prefix) - .boxed(), - ); - - // One time keys - futures.push( - self.db - .userid_lastonetimekeyupdate - .watch_raw_prefix(&userid_bytes) - .boxed(), - ); - // Server shutdown futures.push(self.services.server.until_shutdown().boxed());