diff --git a/src/api/client/sync/mod.rs b/src/api/client/sync/mod.rs index 7a3aa777..c2814a06 100644 --- a/src/api/client/sync/mod.rs +++ b/src/api/client/sync/mod.rs @@ -1,7 +1,7 @@ mod v3; mod v5; -use futures::{StreamExt, pin_mut}; +use futures::{FutureExt, StreamExt, pin_mut}; use ruma::{RoomId, UserId}; use tuwunel_core::{ Error, PduCount, Result, @@ -42,10 +42,12 @@ async fn load_timeline( .by_ref() .take(limit) .collect() + .map(|mut pdus: Vec<_>| { + pdus.reverse(); + pdus + }) .await; - let timeline_pdus: Vec<_> = timeline_pdus.into_iter().rev().collect(); - // They /sync response doesn't always return all messages, so we say the output // is limited unless there are events in non_timeline_pdus let limited = non_timeline_pdus.next().await.is_some(); diff --git a/src/api/client/sync/v5/room.rs b/src/api/client/sync/v5/room.rs index 5cb0f0a2..bba9f7a7 100644 --- a/src/api/client/sync/v5/room.rs +++ b/src/api/client/sync/v5/room.rs @@ -5,7 +5,7 @@ use futures::{ future::{OptionFuture, join, join3, join4}, }; use ruma::{ - JsOption, MxcUri, OwnedMxcUri, RoomId, UInt, UserId, + JsOption, MxcUri, OwnedMxcUri, RoomId, UserId, api::client::sync::sync_events::{ UnreadNotificationsCount, v5::{DisplayName, response, response::Heroes}, @@ -19,7 +19,7 @@ use ruma::{ }, }; use tuwunel_core::{ - Result, at, debug_error, err, is_equal_to, + Result, at, err, is_equal_to, matrix::{Event, StateKey, pdu::PduCount}, ref_at, utils::{ @@ -49,7 +49,10 @@ pub(super) async fn handle( lists, membership, room_id, last_count, .. }: &WindowRoom, ) -> Result> { - debug_assert!(DEFAULT_BUMP_TYPES.is_sorted(), "DEFAULT_BUMP_TYPES is not sorted"); + debug_assert!( + DEFAULT_BUMP_TYPES.is_sorted(), + "DEFAULT_BUMP_TYPES must be sorted for binary search" + ); let &Room { roomsince, .. } = conn .rooms @@ -57,7 +60,7 @@ pub(super) async fn handle( .ok_or_else(|| err!("Missing connection state for {room_id}"))?; debug_assert!( - roomsince == 0 || *last_count > roomsince, + *last_count > roomsince || *last_count == 0 || roomsince == 0, "Stale room shouldn't be in the window" ); @@ -91,13 +94,10 @@ pub(super) async fn handle( }) .into(); - let Ok(timeline) = timeline.await.transpose() else { - debug_error!(?room_id, "Missing timeline."); - return Ok(None); - }; - - let (timeline_pdus, limited, _lastcount) = - timeline.unwrap_or_else(|| (Vec::new(), true, PduCount::default())); + let (timeline_pdus, limited, _lastcount) = timeline + .await + .flat_ok() + .unwrap_or_else(|| (Vec::new(), true, PduCount::default())); let prev_batch = timeline_pdus .first() @@ -114,14 +114,25 @@ pub(super) async fn handle( .binary_search(pdu.event_type()) .is_ok() }) - .fold(Option::::None, |mut bump_stamp, (_, pdu)| { - let ts = pdu.origin_server_ts(); - if bump_stamp.is_none_or(|bump_stamp| bump_stamp < ts.get()) { - bump_stamp.replace(ts.get()); - } + .map(at!(0)) + .filter(|count| matches!(count, PduCount::Normal(_))) + .map(PduCount::into_unsigned) + .max() + .map(TryInto::try_into) + .flat_ok(); - bump_stamp - }); + let num_live: OptionFuture<_> = roomsince + .ne(&0) + .and_is(limited || timeline_pdus.len() >= timeline_limit) + .then(|| { + services + .timeline + .pdus(None, room_id, Some(roomsince.into())) + .count() + .map(TryInto::try_into) + .map(Result::ok) + }) + .into(); let lazy = required_state .iter() @@ -141,6 +152,14 @@ pub(super) async fn handle( .map(|sender| (StateEventType::RoomMember, StateKey::from_str(sender.as_str()))) .stream(); + let timeline = timeline_pdus + .iter() + .stream() + .filter_map(|item| ignored_filter(services, item.clone(), sender_user)) + .map(at!(1)) + .map(Event::into_format) + .collect(); + let wildcard_state = required_state .iter() .filter(|(_, state_key)| state_key == "*") @@ -186,14 +205,6 @@ pub(super) async fn handle( }) .into(); - let timeline = timeline_pdus - .iter() - .stream() - .filter_map(|item| ignored_filter(services, item.clone(), sender_user)) - .map(at!(1)) - .map(Event::into_format) - .collect(); - let room_name = services .state_accessor .get_name(room_id) @@ -243,12 +254,12 @@ pub(super) async fn handle( .last_notification_read(sender_user, room_id); let meta = join3(room_name, room_avatar, is_dm); - let events = join3(timeline, required_state, invite_state); + let events = join4(timeline, num_live, required_state, invite_state); let member_counts = join(joined_count, invited_count); let notification_counts = join3(highlight_count, notification_count, last_read_count); let ( (room_name, room_avatar, is_dm), - (timeline, required_state, invite_state), + (timeline, num_live, required_state, invite_state), (joined_count, invited_count), (highlight_count, notification_count, _last_notification_read), ) = join4(meta, events, member_counts, notification_counts) @@ -264,23 +275,21 @@ pub(super) async fn handle( ) .await?; - let num_live = None; // Count events in timeline greater than global sync counter - Ok(Some(response::Room { - initial: Some(roomsince == 0), + initial: roomsince.eq(&0).then_some(true), lists: lists.clone(), membership: membership.clone(), name: room_name.or(hero_name), avatar: JsOption::from_option(room_avatar.or(heroes_avatar)), is_dm, + heroes, required_state, invite_state: invite_state.flatten(), prev_batch: prev_batch.as_deref().map(Into::into), + num_live: num_live.flatten(), limited, timeline, bump_stamp, - heroes, - num_live, joined_count, invited_count, unread_notifications: UnreadNotificationsCount { highlight_count, notification_count },