diff --git a/src/admin/query/room_timeline.rs b/src/admin/query/room_timeline.rs index c90b1cb8..eea2455e 100644 --- a/src/admin/query/room_timeline.rs +++ b/src/admin/query/room_timeline.rs @@ -36,7 +36,7 @@ pub(super) async fn last(&self, room_id: OwnedRoomOrAliasId) -> Result { .services .rooms .timeline - .last_timeline_count(None, &room_id) + .last_timeline_count(None, &room_id, None) .await?; self.write_str(&format!("{result:#?}")).await diff --git a/src/api/client/sync/mod.rs b/src/api/client/sync/mod.rs index 25e0b666..4be9cb9d 100644 --- a/src/api/client/sync/mod.rs +++ b/src/api/client/sync/mod.rs @@ -11,7 +11,7 @@ use ruma::{ use tuwunel_core::{ Error, PduCount, Result, matrix::pdu::PduEvent, - utils::stream::{BroadbandExt, ReadyExt, TryIgnore}, + utils::stream::{BroadbandExt, ReadyExt}, }; use tuwunel_service::Services; @@ -27,22 +27,22 @@ async fn load_timeline( roomsincecount: PduCount, next_batch: Option, limit: usize, -) -> Result<(Vec<(PduCount, PduEvent)>, bool), Error> { +) -> Result<(Vec<(PduCount, PduEvent)>, bool, PduCount), Error> { let last_timeline_count = services .rooms .timeline - .last_timeline_count(Some(sender_user), room_id) + .last_timeline_count(Some(sender_user), room_id, next_batch) .await?; if last_timeline_count <= roomsincecount { - return Ok((Vec::new(), false)); + return Ok((Vec::new(), false, last_timeline_count)); } let non_timeline_pdus = services .rooms .timeline .pdus_rev(Some(sender_user), room_id, None) - .ignore_err() + .ready_filter_map(Result::ok) .ready_skip_while(|&(pducount, _)| pducount > next_batch.unwrap_or_else(PduCount::max)) .ready_take_while(|&(pducount, _)| pducount > roomsincecount); @@ -60,7 +60,7 @@ async fn load_timeline( // is limited unless there are events in non_timeline_pdus let limited = non_timeline_pdus.next().await.is_some(); - Ok((timeline_pdus, limited)) + Ok((timeline_pdus, limited, last_timeline_count)) } async fn share_encrypted_room( diff --git a/src/api/client/sync/v3.rs b/src/api/client/sync/v3.rs index 137fdf9d..3cce1369 100644 --- a/src/api/client/sync/v3.rs +++ b/src/api/client/sync/v3.rs @@ -6,7 +6,7 @@ use std::{ use axum::extract::State; use futures::{ FutureExt, StreamExt, TryFutureExt, TryStreamExt, - future::{OptionFuture, join, join3, join4, join5, try_join4}, + future::{OptionFuture, join, join3, join4, join5, try_join3}, pin_mut, }; use ruma::{ @@ -658,15 +658,6 @@ async fn load_joined_room( full_state: bool, filter: &FilterDefinition, ) -> Result<(JoinedRoom, HashSet, HashSet)> { - let sincecount = PduCount::Normal(since); - let next_batchcount = PduCount::Normal(next_batch); - - let current_shortstatehash = services - .rooms - .state - .get_room_shortstatehash(room_id) - .map_err(|_| err!(Database(error!("Room {room_id} has no state")))); - let since_shortstatehash = services .rooms .user @@ -685,8 +676,8 @@ async fn load_joined_room( services, sender_user, room_id, - sincecount, - Some(next_batchcount), + PduCount::Normal(since), + Some(PduCount::Normal(next_batch)), timeline_limit, ); @@ -704,24 +695,37 @@ async fn load_joined_room( .collect::>>() .map(Ok); - let (current_shortstatehash, since_shortstatehash, (timeline_pdus, limited), receipt_events) = - try_join4(current_shortstatehash, since_shortstatehash, timeline, receipt_events) + let (since_shortstatehash, (timeline_pdus, limited, last_timeline_count), receipt_events) = + try_join3(since_shortstatehash, timeline, receipt_events) .boxed() .await?; - // State was changed after the cutoff for this sync; similar to other handlers. - if current_shortstatehash > next_batch { - // Transfer the since_shortstatehash not the current over to the next sync. - if let Some(since_shortstatehash) = since_shortstatehash { + let horizon_shortstatehash: OptionFuture<_> = timeline_pdus + .iter() + .map(at!(0)) + .map(PduCount::into_unsigned) + .map(|shorteventid| { services .rooms - .user - .associate_token_shortstatehash(room_id, next_batch, since_shortstatehash) - .await; - } + .state_accessor + .get_shortstatehash(shorteventid) + }) + .next() + .into(); - return Ok((JoinedRoom::default(), HashSet::new(), HashSet::new())); - } + let current_shortstatehash = services + .rooms + .state_accessor + .get_shortstatehash(last_timeline_count.into_unsigned()) + .or_else(|_| services.state.get_room_shortstatehash(room_id)); + + let (horizon_shortstatehash, current_shortstatehash) = + join(horizon_shortstatehash, current_shortstatehash) + .boxed() + .await; + + let current_shortstatehash = current_shortstatehash + .map_err(|_| err!(Database(error!("Room {room_id} has no state"))))?; let associate_token = services .rooms @@ -773,19 +777,6 @@ async fn load_joined_room( }) .into(); - let horizon_shortstatehash: OptionFuture<_> = timeline_pdus - .iter() - .map(at!(0)) - .map(PduCount::into_unsigned) - .map(|shorteventid| { - services - .rooms - .state_accessor - .get_shortstatehash(shorteventid) - }) - .next() - .into(); - let last_notification_read: OptionFuture<_> = timeline_pdus .is_empty() .then(|| { @@ -809,8 +800,7 @@ async fn load_joined_room( let encrypted_room = services .rooms .state_accessor - .state_get(current_shortstatehash, &StateEventType::RoomEncryption, "") - .is_ok(); + .is_encrypted_room(room_id); let last_privateread_update = services .rooms @@ -818,13 +808,13 @@ async fn load_joined_room( .last_privateread_update(sender_user, room_id); let ( - witness, - (encrypted_room, last_privateread_update, last_notification_read), - (since_sender_member, horizon_shortstatehash, ()), + (witness, since_sender_member), + (encrypted_room, ()), + (last_privateread_update, last_notification_read), ) = join3( - witness, - join3(encrypted_room, last_privateread_update, last_notification_read), - join3(since_sender_member, horizon_shortstatehash, associate_token), + join(witness, since_sender_member), + join(encrypted_room, associate_token), + join(last_privateread_update, last_notification_read), ) .boxed() .await; diff --git a/src/api/client/sync/v5.rs b/src/api/client/sync/v5.rs index 9422495a..2c45be5d 100644 --- a/src/api/client/sync/v5.rs +++ b/src/api/client/sync/v5.rs @@ -422,7 +422,7 @@ where (timeline_pdus, limited) = (Vec::new(), true); } else { - (timeline_pdus, limited) = match load_timeline( + (timeline_pdus, limited, _) = match load_timeline( services, sender_user, room_id, diff --git a/src/service/rooms/timeline/mod.rs b/src/service/rooms/timeline/mod.rs index c5da5fa4..3bd3586b 100644 --- a/src/service/rooms/timeline/mod.rs +++ b/src/service/rooms/timeline/mod.rs @@ -222,11 +222,14 @@ pub async fn last_timeline_count( &self, sender_user: Option<&UserId>, room_id: &RoomId, + upper_bound: Option, ) -> Result { + let upper_bound = upper_bound.unwrap_or_else(PduCount::max); let pdus_rev = self.pdus_rev(sender_user, room_id, None); pin_mut!(pdus_rev); let last_count = pdus_rev + .ready_try_skip_while(|&(pducount, _)| Ok(pducount > upper_bound)) .try_next() .await? .map(at!(0))