Improve sliding-sync selector and windowing logic.

Bump Ruma; eliminate premature release compat.

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk
2025-10-22 07:06:00 +00:00
parent e53968d9eb
commit 4fd60b2605
5 changed files with 104 additions and 81 deletions

21
Cargo.lock generated
View File

@@ -3509,7 +3509,7 @@ dependencies = [
[[package]] [[package]]
name = "ruma" name = "ruma"
version = "0.13.0" version = "0.13.0"
source = "git+https://github.com/matrix-construct/ruma?rev=3c26be1e4d670c2c71e2bc84b3f09c67c4188612#3c26be1e4d670c2c71e2bc84b3f09c67c4188612" source = "git+https://github.com/matrix-construct/ruma?rev=b858d3478f2792a381463ef9b63f6c1469a5d81c#b858d3478f2792a381463ef9b63f6c1469a5d81c"
dependencies = [ dependencies = [
"assign", "assign",
"js_int", "js_int",
@@ -3528,7 +3528,7 @@ dependencies = [
[[package]] [[package]]
name = "ruma-appservice-api" name = "ruma-appservice-api"
version = "0.13.0" version = "0.13.0"
source = "git+https://github.com/matrix-construct/ruma?rev=3c26be1e4d670c2c71e2bc84b3f09c67c4188612#3c26be1e4d670c2c71e2bc84b3f09c67c4188612" source = "git+https://github.com/matrix-construct/ruma?rev=b858d3478f2792a381463ef9b63f6c1469a5d81c#b858d3478f2792a381463ef9b63f6c1469a5d81c"
dependencies = [ dependencies = [
"js_int", "js_int",
"ruma-common", "ruma-common",
@@ -3540,7 +3540,7 @@ dependencies = [
[[package]] [[package]]
name = "ruma-client-api" name = "ruma-client-api"
version = "0.21.0" version = "0.21.0"
source = "git+https://github.com/matrix-construct/ruma?rev=3c26be1e4d670c2c71e2bc84b3f09c67c4188612#3c26be1e4d670c2c71e2bc84b3f09c67c4188612" source = "git+https://github.com/matrix-construct/ruma?rev=b858d3478f2792a381463ef9b63f6c1469a5d81c#b858d3478f2792a381463ef9b63f6c1469a5d81c"
dependencies = [ dependencies = [
"as_variant", "as_variant",
"assign", "assign",
@@ -3565,7 +3565,7 @@ dependencies = [
[[package]] [[package]]
name = "ruma-common" name = "ruma-common"
version = "0.16.0" version = "0.16.0"
source = "git+https://github.com/matrix-construct/ruma?rev=3c26be1e4d670c2c71e2bc84b3f09c67c4188612#3c26be1e4d670c2c71e2bc84b3f09c67c4188612" source = "git+https://github.com/matrix-construct/ruma?rev=b858d3478f2792a381463ef9b63f6c1469a5d81c#b858d3478f2792a381463ef9b63f6c1469a5d81c"
dependencies = [ dependencies = [
"as_variant", "as_variant",
"base64", "base64",
@@ -3598,7 +3598,7 @@ dependencies = [
[[package]] [[package]]
name = "ruma-events" name = "ruma-events"
version = "0.31.0" version = "0.31.0"
source = "git+https://github.com/matrix-construct/ruma?rev=3c26be1e4d670c2c71e2bc84b3f09c67c4188612#3c26be1e4d670c2c71e2bc84b3f09c67c4188612" source = "git+https://github.com/matrix-construct/ruma?rev=b858d3478f2792a381463ef9b63f6c1469a5d81c#b858d3478f2792a381463ef9b63f6c1469a5d81c"
dependencies = [ dependencies = [
"as_variant", "as_variant",
"indexmap", "indexmap",
@@ -3612,6 +3612,7 @@ dependencies = [
"ruma-macros", "ruma-macros",
"serde", "serde",
"serde_json", "serde_json",
"smallstr",
"smallvec", "smallvec",
"thiserror 2.0.16", "thiserror 2.0.16",
"tracing", "tracing",
@@ -3624,7 +3625,7 @@ dependencies = [
[[package]] [[package]]
name = "ruma-federation-api" name = "ruma-federation-api"
version = "0.12.0" version = "0.12.0"
source = "git+https://github.com/matrix-construct/ruma?rev=3c26be1e4d670c2c71e2bc84b3f09c67c4188612#3c26be1e4d670c2c71e2bc84b3f09c67c4188612" source = "git+https://github.com/matrix-construct/ruma?rev=b858d3478f2792a381463ef9b63f6c1469a5d81c#b858d3478f2792a381463ef9b63f6c1469a5d81c"
dependencies = [ dependencies = [
"bytes", "bytes",
"headers", "headers",
@@ -3646,7 +3647,7 @@ dependencies = [
[[package]] [[package]]
name = "ruma-identifiers-validation" name = "ruma-identifiers-validation"
version = "0.11.0" version = "0.11.0"
source = "git+https://github.com/matrix-construct/ruma?rev=3c26be1e4d670c2c71e2bc84b3f09c67c4188612#3c26be1e4d670c2c71e2bc84b3f09c67c4188612" source = "git+https://github.com/matrix-construct/ruma?rev=b858d3478f2792a381463ef9b63f6c1469a5d81c#b858d3478f2792a381463ef9b63f6c1469a5d81c"
dependencies = [ dependencies = [
"js_int", "js_int",
"thiserror 2.0.16", "thiserror 2.0.16",
@@ -3655,7 +3656,7 @@ dependencies = [
[[package]] [[package]]
name = "ruma-macros" name = "ruma-macros"
version = "0.16.0" version = "0.16.0"
source = "git+https://github.com/matrix-construct/ruma?rev=3c26be1e4d670c2c71e2bc84b3f09c67c4188612#3c26be1e4d670c2c71e2bc84b3f09c67c4188612" source = "git+https://github.com/matrix-construct/ruma?rev=b858d3478f2792a381463ef9b63f6c1469a5d81c#b858d3478f2792a381463ef9b63f6c1469a5d81c"
dependencies = [ dependencies = [
"cfg-if", "cfg-if",
"proc-macro-crate", "proc-macro-crate",
@@ -3670,7 +3671,7 @@ dependencies = [
[[package]] [[package]]
name = "ruma-push-gateway-api" name = "ruma-push-gateway-api"
version = "0.12.0" version = "0.12.0"
source = "git+https://github.com/matrix-construct/ruma?rev=3c26be1e4d670c2c71e2bc84b3f09c67c4188612#3c26be1e4d670c2c71e2bc84b3f09c67c4188612" source = "git+https://github.com/matrix-construct/ruma?rev=b858d3478f2792a381463ef9b63f6c1469a5d81c#b858d3478f2792a381463ef9b63f6c1469a5d81c"
dependencies = [ dependencies = [
"js_int", "js_int",
"ruma-common", "ruma-common",
@@ -3682,7 +3683,7 @@ dependencies = [
[[package]] [[package]]
name = "ruma-signatures" name = "ruma-signatures"
version = "0.18.0" version = "0.18.0"
source = "git+https://github.com/matrix-construct/ruma?rev=3c26be1e4d670c2c71e2bc84b3f09c67c4188612#3c26be1e4d670c2c71e2bc84b3f09c67c4188612" source = "git+https://github.com/matrix-construct/ruma?rev=b858d3478f2792a381463ef9b63f6c1469a5d81c#b858d3478f2792a381463ef9b63f6c1469a5d81c"
dependencies = [ dependencies = [
"base64", "base64",
"ed25519-dalek", "ed25519-dalek",

View File

@@ -320,7 +320,7 @@ default-features = false
[workspace.dependencies.ruma] [workspace.dependencies.ruma]
git = "https://github.com/matrix-construct/ruma" git = "https://github.com/matrix-construct/ruma"
rev = "3c26be1e4d670c2c71e2bc84b3f09c67c4188612" rev = "b858d3478f2792a381463ef9b63f6c1469a5d81c"
features = [ features = [
"__compat", "__compat",
"appservice-api-c", "appservice-api-c",

View File

@@ -87,7 +87,7 @@ type ListIds = SmallVec<[ListId; 1]>;
user_id = %body.sender_user().localpart(), user_id = %body.sender_user().localpart(),
device_id = %body.sender_device(), device_id = %body.sender_device(),
conn_id = ?body.body.conn_id.clone().unwrap_or_default(), conn_id = ?body.body.conn_id.clone().unwrap_or_default(),
since = ?body.body.pos.clone().or_else(|| body.body.pos_qrs_.clone()).unwrap_or_default(), since = ?body.body.pos.clone().unwrap_or_default(),
) )
)] )]
pub(crate) async fn sync_events_v5_route( pub(crate) async fn sync_events_v5_route(
@@ -99,14 +99,12 @@ pub(crate) async fn sync_events_v5_route(
let since = request let since = request
.pos .pos
.as_ref() .as_ref()
.or(request.pos_qrs_.as_ref())
.and_then(|string| string.parse().ok()) .and_then(|string| string.parse().ok())
.unwrap_or(0); .unwrap_or(0);
let timeout = request let timeout = request
.timeout .timeout
.as_ref() .as_ref()
.or(request.timeout_qrs_.as_ref())
.map(Duration::as_millis) .map(Duration::as_millis)
.map(TryInto::try_into) .map(TryInto::try_into)
.flat_ok() .flat_ok()
@@ -149,10 +147,10 @@ pub(crate) async fn sync_events_v5_route(
); );
// Update parameters regardless of replay or advance // Update parameters regardless of replay or advance
conn.next_batch = services.globals.wait_pending().await?;
conn.globalsince = since;
conn.update_cache(request); conn.update_cache(request);
conn.update_rooms_prologue(advancing); conn.update_rooms_prologue(advancing);
conn.globalsince = since;
conn.next_batch = services.globals.current_count();
let sync_info = SyncInfo { let sync_info = SyncInfo {
services, services,
@@ -177,13 +175,14 @@ pub(crate) async fn sync_events_v5_route(
let window; let window;
(window, response.lists) = selector(&mut conn, sync_info).boxed().await; (window, response.lists) = selector(&mut conn, sync_info).boxed().await;
let watch_rooms = window.keys().map(AsRef::as_ref).stream(); let watch_rooms = window.keys().map(AsRef::as_ref).stream();
let watchers = services let watchers = services
.sync .sync
.watch(sender_user, sender_device, watch_rooms); .watch(sender_user, sender_device, watch_rooms);
let window;
conn.next_batch = services.globals.wait_pending().await?; conn.next_batch = services.globals.wait_pending().await?;
(window, response.lists) = selector(&mut conn, sync_info).boxed().await;
if conn.globalsince < conn.next_batch { if conn.globalsince < conn.next_batch {
let rooms = let rooms =
handle_rooms(sync_info, &conn, &window).map_ok(|rooms| response.rooms = rooms); handle_rooms(sync_info, &conn, &window).map_ok(|rooms| response.rooms = rooms);
@@ -221,11 +220,7 @@ pub(crate) async fn sync_events_v5_route(
} }
fn is_empty_response(response: &Response) -> bool { fn is_empty_response(response: &Response) -> bool {
response.extensions.is_empty() response.extensions.is_empty() && response.rooms.is_empty()
&& response
.rooms
.iter()
.all(|(_, room)| room.timeline.is_empty() && room.invite_state.is_none())
} }
#[tracing::instrument( #[tracing::instrument(

View File

@@ -8,7 +8,7 @@ use ruma::{
JsOption, MxcUri, OwnedMxcUri, RoomId, UInt, UserId, JsOption, MxcUri, OwnedMxcUri, RoomId, UInt, UserId,
api::client::sync::sync_events::{ api::client::sync::sync_events::{
UnreadNotificationsCount, UnreadNotificationsCount,
v5::{DisplayName, response}, v5::{DisplayName, response, response::Heroes},
}, },
events::{ events::{
StateEventType, StateEventType,
@@ -45,7 +45,9 @@ static DEFAULT_BUMP_TYPES: [TimelineEventType; 6] =
pub(super) async fn handle( pub(super) async fn handle(
SyncInfo { services, sender_user, .. }: SyncInfo<'_>, SyncInfo { services, sender_user, .. }: SyncInfo<'_>,
conn: &Connection, conn: &Connection,
WindowRoom { lists, membership, room_id, .. }: &WindowRoom, WindowRoom {
lists, membership, room_id, last_count, ..
}: &WindowRoom,
) -> Result<Option<response::Room>> { ) -> Result<Option<response::Room>> {
debug_assert!(DEFAULT_BUMP_TYPES.is_sorted(), "DEFAULT_BUMP_TYPES is not sorted"); debug_assert!(DEFAULT_BUMP_TYPES.is_sorted(), "DEFAULT_BUMP_TYPES is not sorted");
@@ -54,6 +56,11 @@ pub(super) async fn handle(
.get(room_id) .get(room_id)
.ok_or_else(|| err!("Missing connection state for {room_id}"))?; .ok_or_else(|| err!("Missing connection state for {room_id}"))?;
debug_assert!(
roomsince == 0 || *last_count > roomsince,
"Stale room shouldn't be in the window"
);
let is_invite = *membership == Some(MembershipState::Invite); let is_invite = *membership == Some(MembershipState::Invite);
let default_details = (0_usize, HashSet::new()); let default_details = (0_usize, HashSet::new());
let (timeline_limit, required_state) = lists let (timeline_limit, required_state) = lists
@@ -92,10 +99,6 @@ pub(super) async fn handle(
let (timeline_pdus, limited, _lastcount) = let (timeline_pdus, limited, _lastcount) =
timeline.unwrap_or_else(|| (Vec::new(), true, PduCount::default())); timeline.unwrap_or_else(|| (Vec::new(), true, PduCount::default()));
if roomsince != 0 && timeline_pdus.is_empty() && !is_invite {
return Ok(None);
}
let prev_batch = timeline_pdus let prev_batch = timeline_pdus
.first() .first()
.map(at!(0)) .map(at!(0))
@@ -112,9 +115,9 @@ pub(super) async fn handle(
.is_ok() .is_ok()
}) })
.fold(Option::<UInt>::None, |mut bump_stamp, (_, pdu)| { .fold(Option::<UInt>::None, |mut bump_stamp, (_, pdu)| {
let ts = pdu.origin_server_ts().0; let ts = pdu.origin_server_ts();
if bump_stamp.is_none_or(|bump_stamp| bump_stamp < ts) { if bump_stamp.is_none_or(|bump_stamp| bump_stamp < ts.get()) {
bump_stamp.replace(ts); bump_stamp.replace(ts.get());
} }
bump_stamp bump_stamp
@@ -235,15 +238,19 @@ pub(super) async fn handle(
.is_direct(room_id, sender_user) .is_direct(room_id, sender_user)
.map(|is_dm| is_dm.then_some(is_dm)); .map(|is_dm| is_dm.then_some(is_dm));
let last_read_count = services
.user
.last_notification_read(sender_user, room_id);
let meta = join3(room_name, room_avatar, is_dm); let meta = join3(room_name, room_avatar, is_dm);
let events = join3(timeline, required_state, invite_state); let events = join3(timeline, required_state, invite_state);
let member_counts = join(joined_count, invited_count); let member_counts = join(joined_count, invited_count);
let notification_counts = join(highlight_count, notification_count); let notification_counts = join3(highlight_count, notification_count, last_read_count);
let ( let (
(room_name, room_avatar, is_dm), (room_name, room_avatar, is_dm),
(timeline, required_state, invite_state), (timeline, required_state, invite_state),
(joined_count, invited_count), (joined_count, invited_count),
(highlight_count, notification_count), (highlight_count, notification_count, _last_notification_read),
) = join4(meta, events, member_counts, notification_counts) ) = join4(meta, events, member_counts, notification_counts)
.boxed() .boxed()
.await; .await;
@@ -288,9 +295,9 @@ async fn calculate_heroes(
room_id: &RoomId, room_id: &RoomId,
room_name: Option<&DisplayName>, room_name: Option<&DisplayName>,
room_avatar: Option<&MxcUri>, room_avatar: Option<&MxcUri>,
) -> Result<(Option<Vec<response::Hero>>, Option<DisplayName>, Option<OwnedMxcUri>)> { ) -> Result<(Option<Heroes>, Option<DisplayName>, Option<OwnedMxcUri>)> {
const MAX_HEROES: usize = 5; const MAX_HEROES: usize = 5;
let heroes: Vec<_> = services let heroes: Heroes = services
.state_cache .state_cache
.room_members(room_id) .room_members(room_id)
.ready_filter(|&member| member != sender_user) .ready_filter(|&member| member != sender_user)

View File

@@ -2,7 +2,7 @@ use std::cmp::Ordering;
use futures::{ use futures::{
FutureExt, StreamExt, TryFutureExt, FutureExt, StreamExt, TryFutureExt,
future::{OptionFuture, join3}, future::{OptionFuture, join5},
}; };
use ruma::{OwnedRoomId, UInt, events::room::member::MembershipState, uint}; use ruma::{OwnedRoomId, UInt, events::room::member::MembershipState, uint};
use tuwunel_core::{ use tuwunel_core::{
@@ -48,6 +48,10 @@ pub(super) async fn selector(
.enumerate() .enumerate()
.for_each(|(i, room)| { .for_each(|(i, room)| {
room.ranked = i; room.ranked = i;
conn.rooms
.entry(room.room_id.clone())
.or_default();
}); });
trace!(?rooms); trace!(?rooms);
@@ -57,12 +61,6 @@ pub(super) async fn selector(
let window = select_window(sync_info, conn, rooms.iter(), &lists).await; let window = select_window(sync_info, conn, rooms.iter(), &lists).await;
trace!(?window); trace!(?window);
for room in &rooms {
conn.rooms
.entry(room.room_id.clone())
.or_default();
}
(window, lists) (window, lists)
} }
@@ -99,19 +97,20 @@ where
rooms rooms
.clone() .clone()
.filter(move |&room| room.lists.contains(&id)) .filter(move |&room| room.lists.contains(&id))
.enumerate() .filter(|&room| {
.skip_while(move |&(i, room)| { conn.rooms
i < start .get(&room.room_id)
|| conn .is_some_and(|conn_room| room.last_count > conn_room.roomsince)
.rooms
.get(&room.room_id)
.is_some_and(|conn_room| conn_room.roomsince >= room.last_count)
}) })
.enumerate()
.skip_while(move |&(i, _)| i < start)
.take(end.saturating_add(1).saturating_sub(start)) .take(end.saturating_add(1).saturating_sub(start))
.map(|(_, room)| (room.room_id.clone(), room.clone())) .map(|(_, room)| (room.room_id.clone(), room.clone()))
}); })
.stream();
conn.subscriptions let subscriptions = conn
.subscriptions
.iter() .iter()
.stream() .stream()
.broad_filter_map(async |(room_id, _)| { .broad_filter_map(async |(room_id, _)| {
@@ -125,10 +124,9 @@ where
last_count: 0, last_count: 0,
}) })
}) })
.map(|room| (room.room_id.clone(), room)) .map(|room| (room.room_id.clone(), room));
.chain(selections.stream())
.collect() subscriptions.chain(selections).collect().await
.await
} }
#[tracing::instrument( #[tracing::instrument(
@@ -145,7 +143,7 @@ async fn match_lists_for_room(
) -> Option<WindowRoom> { ) -> Option<WindowRoom> {
let SyncInfo { services, sender_user, .. } = sync_info; let SyncInfo { services, sender_user, .. } = sync_info;
let lists = conn let (matched, lists) = conn
.lists .lists
.iter() .iter()
.stream() .stream()
@@ -164,56 +162,78 @@ async fn match_lists_for_room(
.then(|| id.clone()) .then(|| id.clone())
}) })
.collect::<ListIds>() .collect::<ListIds>()
.map(|lists| (lists.is_empty().is_false(), lists))
.await; .await;
let last_timeline_count: OptionFuture<_> = lists let last_notification: OptionFuture<_> = matched
.is_empty()
.is_false()
.then(|| { .then(|| {
services services
.timeline .user
.last_timeline_count(None, &room_id, None) .last_notification_read(sender_user, &room_id)
.map_ok(PduCount::into_unsigned) .map(|count| count.min(conn.next_batch))
.ok()
}) })
.into(); .into();
let last_account_count: OptionFuture<_> = lists let last_privateread: OptionFuture<_> = matched
.is_empty()
.is_false()
.then(|| { .then(|| {
services services
.account_data .read_receipt
.last_count(Some(room_id.as_ref()), sender_user, None) .last_privateread_update(sender_user, &room_id)
.ok() .map(|count| count.min(conn.next_batch))
}) })
.into(); .into();
let last_receipt_count: OptionFuture<_> = lists let last_receipt: OptionFuture<_> = matched
.is_empty() .and_is(false) // masked out, maybe unnecessary
.is_false()
.then(|| { .then(|| {
services services
.read_receipt .read_receipt
.last_receipt_count(&room_id, sender_user.into(), None) .last_receipt_count(&room_id, sender_user.into(), None)
.map(Result::ok) .unwrap_or_default()
.map(|count| count.min(conn.next_batch))
}) })
.into(); .into();
let (last_timeline_count, last_account_count, last_receipt_count) = let last_account: OptionFuture<_> = matched
join3(last_timeline_count, last_account_count, last_receipt_count).await; .and_is(false) // masked out, maybe unnecessary
.then(|| {
services
.account_data
.last_count(Some(room_id.as_ref()), sender_user, Some(conn.next_batch))
.unwrap_or_default()
})
.into();
let last_timeline: OptionFuture<_> = matched
.then(|| {
services
.timeline
.last_timeline_count(None, &room_id, Some(conn.next_batch.into()))
.map_ok(PduCount::into_unsigned)
.unwrap_or_default()
})
.into();
let (last_timeline, last_notification, last_account, last_receipt, last_privateread) =
join5(last_timeline, last_notification, last_account, last_receipt, last_privateread)
.await;
Some(WindowRoom { Some(WindowRoom {
room_id: room_id.clone(), room_id: room_id.clone(),
membership, membership,
lists, lists,
ranked: 0, ranked: 0,
last_count: [last_timeline_count, last_account_count, last_receipt_count] last_count: [
.into_iter() last_timeline,
.map(Option::flatten) last_notification,
.map(Option::unwrap_or_default) last_account,
.max() last_receipt,
.unwrap_or_default(), last_privateread,
]
.into_iter()
.map(Option::unwrap_or_default)
.max()
.unwrap_or_default(),
}) })
} }