diff --git a/src/api/client/sync/v3.rs b/src/api/client/sync/v3.rs index c00a6ffd..38e1a92a 100644 --- a/src/api/client/sync/v3.rs +++ b/src/api/client/sync/v3.rs @@ -163,10 +163,14 @@ pub(crate) async fn sync_events_route( .expect("configuration must limit maximum timeout"); loop { - let watchers = services.sync.watch(sender_user, sender_device); - let next_batch = services.globals.wait_pending().await?; + let watch_rooms = services.state_cache.rooms_joined(sender_user); + let watchers = services + .sync + .watch(sender_user, sender_device, watch_rooms); + let next_batch = services.globals.wait_pending().await?; debug_assert!(since <= next_batch, "next_batch is monotonic"); + if since < next_batch || body.body.full_state { let response = build_sync_events(&services, &body, since, next_batch).await?; let empty = response.rooms.is_empty() diff --git a/src/api/client/sync/v5.rs b/src/api/client/sync/v5.rs index 14e131e6..22d9a69b 100644 --- a/src/api/client/sync/v5.rs +++ b/src/api/client/sync/v5.rs @@ -160,10 +160,14 @@ pub(crate) async fn sync_events_v5_route( }; loop { - let watchers = services.sync.watch(sender_user, sender_device); - let next_batch = services.globals.wait_pending().await?; + let watch_rooms = todo_rooms.keys().map(AsRef::as_ref).stream(); + let watchers = services + .sync + .watch(sender_user, sender_device, watch_rooms); + let next_batch = services.globals.wait_pending().await?; debug_assert!(globalsince <= next_batch, "next_batch is monotonic"); + if globalsince < next_batch { let rooms = handle_rooms(services, sync_info, next_batch, &todo_rooms) .map_ok(|rooms| response.rooms = rooms); diff --git a/src/service/sync/watch.rs b/src/service/sync/watch.rs index 28c31c93..699f9382 100644 --- a/src/service/sync/watch.rs +++ b/src/service/sync/watch.rs @@ -1,11 +1,19 @@ -use futures::{FutureExt, StreamExt, pin_mut, stream::FuturesUnordered}; +use futures::{FutureExt, Stream, StreamExt, pin_mut, stream::FuturesUnordered}; use ruma::{DeviceId, RoomId, UserId}; use tuwunel_core::{Result, implement, trace}; use tuwunel_database::{Interfix, Separator, serialize_key}; #[implement(super::Service)] -#[tracing::instrument(skip(self), level = "debug")] -pub async fn watch(&self, user_id: &UserId, device_id: &DeviceId) -> Result { +#[tracing::instrument(skip(self, rooms), level = "debug")] +pub async fn watch<'a, Rooms>( + &self, + user_id: &UserId, + device_id: &DeviceId, + rooms: Rooms, +) -> Result +where + Rooms: Stream + Send + 'a, +{ let userdeviceid_prefix = (user_id, device_id, Interfix); let globaluserdata_prefix = (Separator, user_id, Interfix); let roomuserdataid_prefix = (Option::<&RoomId>::None, user_id, Interfix); @@ -67,11 +75,8 @@ pub async fn watch(&self, user_id: &UserId, device_id: &DeviceId) -> Result { let mut futures = FuturesUnordered::new(); futures.extend(watchers.into_iter()); - // Events for rooms we are in - let rooms_joined = self.services.state_cache.rooms_joined(user_id); - - pin_mut!(rooms_joined); - while let Some(room_id) = rooms_joined.next().await { + pin_mut!(rooms); + while let Some(room_id) = rooms.next().await { let Ok(short_roomid) = self.services.short.get_shortroomid(room_id).await else { continue; };