Implement num_live for sliding-sync room response.

Avoid using origin_server_ts for recency stamp.

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk
2025-10-22 08:14:18 +00:00
parent 4fd60b2605
commit 024e8eae62
2 changed files with 47 additions and 36 deletions

View File

@@ -1,7 +1,7 @@
mod v3;
mod v5;
use futures::{StreamExt, pin_mut};
use futures::{FutureExt, StreamExt, pin_mut};
use ruma::{RoomId, UserId};
use tuwunel_core::{
Error, PduCount, Result,
@@ -42,10 +42,12 @@ async fn load_timeline(
.by_ref()
.take(limit)
.collect()
.map(|mut pdus: Vec<_>| {
pdus.reverse();
pdus
})
.await;
let timeline_pdus: Vec<_> = timeline_pdus.into_iter().rev().collect();
// They /sync response doesn't always return all messages, so we say the output
// is limited unless there are events in non_timeline_pdus
let limited = non_timeline_pdus.next().await.is_some();

View File

@@ -5,7 +5,7 @@ use futures::{
future::{OptionFuture, join, join3, join4},
};
use ruma::{
JsOption, MxcUri, OwnedMxcUri, RoomId, UInt, UserId,
JsOption, MxcUri, OwnedMxcUri, RoomId, UserId,
api::client::sync::sync_events::{
UnreadNotificationsCount,
v5::{DisplayName, response, response::Heroes},
@@ -19,7 +19,7 @@ use ruma::{
},
};
use tuwunel_core::{
Result, at, debug_error, err, is_equal_to,
Result, at, err, is_equal_to,
matrix::{Event, StateKey, pdu::PduCount},
ref_at,
utils::{
@@ -49,7 +49,10 @@ pub(super) async fn handle(
lists, membership, room_id, last_count, ..
}: &WindowRoom,
) -> Result<Option<response::Room>> {
debug_assert!(DEFAULT_BUMP_TYPES.is_sorted(), "DEFAULT_BUMP_TYPES is not sorted");
debug_assert!(
DEFAULT_BUMP_TYPES.is_sorted(),
"DEFAULT_BUMP_TYPES must be sorted for binary search"
);
let &Room { roomsince, .. } = conn
.rooms
@@ -57,7 +60,7 @@ pub(super) async fn handle(
.ok_or_else(|| err!("Missing connection state for {room_id}"))?;
debug_assert!(
roomsince == 0 || *last_count > roomsince,
*last_count > roomsince || *last_count == 0 || roomsince == 0,
"Stale room shouldn't be in the window"
);
@@ -91,13 +94,10 @@ pub(super) async fn handle(
})
.into();
let Ok(timeline) = timeline.await.transpose() else {
debug_error!(?room_id, "Missing timeline.");
return Ok(None);
};
let (timeline_pdus, limited, _lastcount) =
timeline.unwrap_or_else(|| (Vec::new(), true, PduCount::default()));
let (timeline_pdus, limited, _lastcount) = timeline
.await
.flat_ok()
.unwrap_or_else(|| (Vec::new(), true, PduCount::default()));
let prev_batch = timeline_pdus
.first()
@@ -114,14 +114,25 @@ pub(super) async fn handle(
.binary_search(pdu.event_type())
.is_ok()
})
.fold(Option::<UInt>::None, |mut bump_stamp, (_, pdu)| {
let ts = pdu.origin_server_ts();
if bump_stamp.is_none_or(|bump_stamp| bump_stamp < ts.get()) {
bump_stamp.replace(ts.get());
}
.map(at!(0))
.filter(|count| matches!(count, PduCount::Normal(_)))
.map(PduCount::into_unsigned)
.max()
.map(TryInto::try_into)
.flat_ok();
bump_stamp
});
let num_live: OptionFuture<_> = roomsince
.ne(&0)
.and_is(limited || timeline_pdus.len() >= timeline_limit)
.then(|| {
services
.timeline
.pdus(None, room_id, Some(roomsince.into()))
.count()
.map(TryInto::try_into)
.map(Result::ok)
})
.into();
let lazy = required_state
.iter()
@@ -141,6 +152,14 @@ pub(super) async fn handle(
.map(|sender| (StateEventType::RoomMember, StateKey::from_str(sender.as_str())))
.stream();
let timeline = timeline_pdus
.iter()
.stream()
.filter_map(|item| ignored_filter(services, item.clone(), sender_user))
.map(at!(1))
.map(Event::into_format)
.collect();
let wildcard_state = required_state
.iter()
.filter(|(_, state_key)| state_key == "*")
@@ -186,14 +205,6 @@ pub(super) async fn handle(
})
.into();
let timeline = timeline_pdus
.iter()
.stream()
.filter_map(|item| ignored_filter(services, item.clone(), sender_user))
.map(at!(1))
.map(Event::into_format)
.collect();
let room_name = services
.state_accessor
.get_name(room_id)
@@ -243,12 +254,12 @@ pub(super) async fn handle(
.last_notification_read(sender_user, room_id);
let meta = join3(room_name, room_avatar, is_dm);
let events = join3(timeline, required_state, invite_state);
let events = join4(timeline, num_live, required_state, invite_state);
let member_counts = join(joined_count, invited_count);
let notification_counts = join3(highlight_count, notification_count, last_read_count);
let (
(room_name, room_avatar, is_dm),
(timeline, required_state, invite_state),
(timeline, num_live, required_state, invite_state),
(joined_count, invited_count),
(highlight_count, notification_count, _last_notification_read),
) = join4(meta, events, member_counts, notification_counts)
@@ -264,23 +275,21 @@ pub(super) async fn handle(
)
.await?;
let num_live = None; // Count events in timeline greater than global sync counter
Ok(Some(response::Room {
initial: Some(roomsince == 0),
initial: roomsince.eq(&0).then_some(true),
lists: lists.clone(),
membership: membership.clone(),
name: room_name.or(hero_name),
avatar: JsOption::from_option(room_avatar.or(heroes_avatar)),
is_dm,
heroes,
required_state,
invite_state: invite_state.flatten(),
prev_batch: prev_batch.as_deref().map(Into::into),
num_live: num_live.flatten(),
limited,
timeline,
bump_stamp,
heroes,
num_live,
joined_count,
invited_count,
unread_notifications: UnreadNotificationsCount { highlight_count, notification_count },