From 4fd60b2605fce74d5d4589eccaf38a1cbbcc0ea4 Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Wed, 22 Oct 2025 07:06:00 +0000 Subject: [PATCH] Improve sliding-sync selector and windowing logic. Bump Ruma; eliminate premature release compat. Signed-off-by: Jason Volk --- Cargo.lock | 21 +++--- Cargo.toml | 2 +- src/api/client/sync/v5.rs | 17 ++--- src/api/client/sync/v5/room.rs | 33 +++++---- src/api/client/sync/v5/selector.rs | 112 +++++++++++++++++------------ 5 files changed, 104 insertions(+), 81 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6228d0ec..9597924f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3509,7 +3509,7 @@ dependencies = [ [[package]] name = "ruma" version = "0.13.0" -source = "git+https://github.com/matrix-construct/ruma?rev=3c26be1e4d670c2c71e2bc84b3f09c67c4188612#3c26be1e4d670c2c71e2bc84b3f09c67c4188612" +source = "git+https://github.com/matrix-construct/ruma?rev=b858d3478f2792a381463ef9b63f6c1469a5d81c#b858d3478f2792a381463ef9b63f6c1469a5d81c" dependencies = [ "assign", "js_int", @@ -3528,7 +3528,7 @@ dependencies = [ [[package]] name = "ruma-appservice-api" version = "0.13.0" -source = "git+https://github.com/matrix-construct/ruma?rev=3c26be1e4d670c2c71e2bc84b3f09c67c4188612#3c26be1e4d670c2c71e2bc84b3f09c67c4188612" +source = "git+https://github.com/matrix-construct/ruma?rev=b858d3478f2792a381463ef9b63f6c1469a5d81c#b858d3478f2792a381463ef9b63f6c1469a5d81c" dependencies = [ "js_int", "ruma-common", @@ -3540,7 +3540,7 @@ dependencies = [ [[package]] name = "ruma-client-api" version = "0.21.0" -source = "git+https://github.com/matrix-construct/ruma?rev=3c26be1e4d670c2c71e2bc84b3f09c67c4188612#3c26be1e4d670c2c71e2bc84b3f09c67c4188612" +source = "git+https://github.com/matrix-construct/ruma?rev=b858d3478f2792a381463ef9b63f6c1469a5d81c#b858d3478f2792a381463ef9b63f6c1469a5d81c" dependencies = [ "as_variant", "assign", @@ -3565,7 +3565,7 @@ dependencies = [ [[package]] name = "ruma-common" version = "0.16.0" -source = "git+https://github.com/matrix-construct/ruma?rev=3c26be1e4d670c2c71e2bc84b3f09c67c4188612#3c26be1e4d670c2c71e2bc84b3f09c67c4188612" +source = "git+https://github.com/matrix-construct/ruma?rev=b858d3478f2792a381463ef9b63f6c1469a5d81c#b858d3478f2792a381463ef9b63f6c1469a5d81c" dependencies = [ "as_variant", "base64", @@ -3598,7 +3598,7 @@ dependencies = [ [[package]] name = "ruma-events" version = "0.31.0" -source = "git+https://github.com/matrix-construct/ruma?rev=3c26be1e4d670c2c71e2bc84b3f09c67c4188612#3c26be1e4d670c2c71e2bc84b3f09c67c4188612" +source = "git+https://github.com/matrix-construct/ruma?rev=b858d3478f2792a381463ef9b63f6c1469a5d81c#b858d3478f2792a381463ef9b63f6c1469a5d81c" dependencies = [ "as_variant", "indexmap", @@ -3612,6 +3612,7 @@ dependencies = [ "ruma-macros", "serde", "serde_json", + "smallstr", "smallvec", "thiserror 2.0.16", "tracing", @@ -3624,7 +3625,7 @@ dependencies = [ [[package]] name = "ruma-federation-api" version = "0.12.0" -source = "git+https://github.com/matrix-construct/ruma?rev=3c26be1e4d670c2c71e2bc84b3f09c67c4188612#3c26be1e4d670c2c71e2bc84b3f09c67c4188612" +source = "git+https://github.com/matrix-construct/ruma?rev=b858d3478f2792a381463ef9b63f6c1469a5d81c#b858d3478f2792a381463ef9b63f6c1469a5d81c" dependencies = [ "bytes", "headers", @@ -3646,7 +3647,7 @@ dependencies = [ [[package]] name = "ruma-identifiers-validation" version = "0.11.0" -source = "git+https://github.com/matrix-construct/ruma?rev=3c26be1e4d670c2c71e2bc84b3f09c67c4188612#3c26be1e4d670c2c71e2bc84b3f09c67c4188612" +source = "git+https://github.com/matrix-construct/ruma?rev=b858d3478f2792a381463ef9b63f6c1469a5d81c#b858d3478f2792a381463ef9b63f6c1469a5d81c" dependencies = [ "js_int", "thiserror 2.0.16", @@ -3655,7 +3656,7 @@ dependencies = [ [[package]] name = "ruma-macros" version = "0.16.0" -source = "git+https://github.com/matrix-construct/ruma?rev=3c26be1e4d670c2c71e2bc84b3f09c67c4188612#3c26be1e4d670c2c71e2bc84b3f09c67c4188612" +source = "git+https://github.com/matrix-construct/ruma?rev=b858d3478f2792a381463ef9b63f6c1469a5d81c#b858d3478f2792a381463ef9b63f6c1469a5d81c" dependencies = [ "cfg-if", "proc-macro-crate", @@ -3670,7 +3671,7 @@ dependencies = [ [[package]] name = "ruma-push-gateway-api" version = "0.12.0" -source = "git+https://github.com/matrix-construct/ruma?rev=3c26be1e4d670c2c71e2bc84b3f09c67c4188612#3c26be1e4d670c2c71e2bc84b3f09c67c4188612" +source = "git+https://github.com/matrix-construct/ruma?rev=b858d3478f2792a381463ef9b63f6c1469a5d81c#b858d3478f2792a381463ef9b63f6c1469a5d81c" dependencies = [ "js_int", "ruma-common", @@ -3682,7 +3683,7 @@ dependencies = [ [[package]] name = "ruma-signatures" version = "0.18.0" -source = "git+https://github.com/matrix-construct/ruma?rev=3c26be1e4d670c2c71e2bc84b3f09c67c4188612#3c26be1e4d670c2c71e2bc84b3f09c67c4188612" +source = "git+https://github.com/matrix-construct/ruma?rev=b858d3478f2792a381463ef9b63f6c1469a5d81c#b858d3478f2792a381463ef9b63f6c1469a5d81c" dependencies = [ "base64", "ed25519-dalek", diff --git a/Cargo.toml b/Cargo.toml index fd6b6b7c..bbbedee4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -320,7 +320,7 @@ default-features = false [workspace.dependencies.ruma] git = "https://github.com/matrix-construct/ruma" -rev = "3c26be1e4d670c2c71e2bc84b3f09c67c4188612" +rev = "b858d3478f2792a381463ef9b63f6c1469a5d81c" features = [ "__compat", "appservice-api-c", diff --git a/src/api/client/sync/v5.rs b/src/api/client/sync/v5.rs index 45eb23d5..2619b6c4 100644 --- a/src/api/client/sync/v5.rs +++ b/src/api/client/sync/v5.rs @@ -87,7 +87,7 @@ type ListIds = SmallVec<[ListId; 1]>; user_id = %body.sender_user().localpart(), device_id = %body.sender_device(), conn_id = ?body.body.conn_id.clone().unwrap_or_default(), - since = ?body.body.pos.clone().or_else(|| body.body.pos_qrs_.clone()).unwrap_or_default(), + since = ?body.body.pos.clone().unwrap_or_default(), ) )] pub(crate) async fn sync_events_v5_route( @@ -99,14 +99,12 @@ pub(crate) async fn sync_events_v5_route( let since = request .pos .as_ref() - .or(request.pos_qrs_.as_ref()) .and_then(|string| string.parse().ok()) .unwrap_or(0); let timeout = request .timeout .as_ref() - .or(request.timeout_qrs_.as_ref()) .map(Duration::as_millis) .map(TryInto::try_into) .flat_ok() @@ -149,10 +147,10 @@ pub(crate) async fn sync_events_v5_route( ); // Update parameters regardless of replay or advance + conn.next_batch = services.globals.wait_pending().await?; + conn.globalsince = since; conn.update_cache(request); conn.update_rooms_prologue(advancing); - conn.globalsince = since; - conn.next_batch = services.globals.current_count(); let sync_info = SyncInfo { services, @@ -177,13 +175,14 @@ pub(crate) async fn sync_events_v5_route( let window; (window, response.lists) = selector(&mut conn, sync_info).boxed().await; - let watch_rooms = window.keys().map(AsRef::as_ref).stream(); let watchers = services .sync .watch(sender_user, sender_device, watch_rooms); + let window; conn.next_batch = services.globals.wait_pending().await?; + (window, response.lists) = selector(&mut conn, sync_info).boxed().await; if conn.globalsince < conn.next_batch { let rooms = handle_rooms(sync_info, &conn, &window).map_ok(|rooms| response.rooms = rooms); @@ -221,11 +220,7 @@ pub(crate) async fn sync_events_v5_route( } fn is_empty_response(response: &Response) -> bool { - response.extensions.is_empty() - && response - .rooms - .iter() - .all(|(_, room)| room.timeline.is_empty() && room.invite_state.is_none()) + response.extensions.is_empty() && response.rooms.is_empty() } #[tracing::instrument( diff --git a/src/api/client/sync/v5/room.rs b/src/api/client/sync/v5/room.rs index 01415808..5cb0f0a2 100644 --- a/src/api/client/sync/v5/room.rs +++ b/src/api/client/sync/v5/room.rs @@ -8,7 +8,7 @@ use ruma::{ JsOption, MxcUri, OwnedMxcUri, RoomId, UInt, UserId, api::client::sync::sync_events::{ UnreadNotificationsCount, - v5::{DisplayName, response}, + v5::{DisplayName, response, response::Heroes}, }, events::{ StateEventType, @@ -45,7 +45,9 @@ static DEFAULT_BUMP_TYPES: [TimelineEventType; 6] = pub(super) async fn handle( SyncInfo { services, sender_user, .. }: SyncInfo<'_>, conn: &Connection, - WindowRoom { lists, membership, room_id, .. }: &WindowRoom, + WindowRoom { + lists, membership, room_id, last_count, .. + }: &WindowRoom, ) -> Result> { debug_assert!(DEFAULT_BUMP_TYPES.is_sorted(), "DEFAULT_BUMP_TYPES is not sorted"); @@ -54,6 +56,11 @@ pub(super) async fn handle( .get(room_id) .ok_or_else(|| err!("Missing connection state for {room_id}"))?; + debug_assert!( + roomsince == 0 || *last_count > roomsince, + "Stale room shouldn't be in the window" + ); + let is_invite = *membership == Some(MembershipState::Invite); let default_details = (0_usize, HashSet::new()); let (timeline_limit, required_state) = lists @@ -92,10 +99,6 @@ pub(super) async fn handle( let (timeline_pdus, limited, _lastcount) = timeline.unwrap_or_else(|| (Vec::new(), true, PduCount::default())); - if roomsince != 0 && timeline_pdus.is_empty() && !is_invite { - return Ok(None); - } - let prev_batch = timeline_pdus .first() .map(at!(0)) @@ -112,9 +115,9 @@ pub(super) async fn handle( .is_ok() }) .fold(Option::::None, |mut bump_stamp, (_, pdu)| { - let ts = pdu.origin_server_ts().0; - if bump_stamp.is_none_or(|bump_stamp| bump_stamp < ts) { - bump_stamp.replace(ts); + let ts = pdu.origin_server_ts(); + if bump_stamp.is_none_or(|bump_stamp| bump_stamp < ts.get()) { + bump_stamp.replace(ts.get()); } bump_stamp @@ -235,15 +238,19 @@ pub(super) async fn handle( .is_direct(room_id, sender_user) .map(|is_dm| is_dm.then_some(is_dm)); + let last_read_count = services + .user + .last_notification_read(sender_user, room_id); + let meta = join3(room_name, room_avatar, is_dm); let events = join3(timeline, required_state, invite_state); let member_counts = join(joined_count, invited_count); - let notification_counts = join(highlight_count, notification_count); + let notification_counts = join3(highlight_count, notification_count, last_read_count); let ( (room_name, room_avatar, is_dm), (timeline, required_state, invite_state), (joined_count, invited_count), - (highlight_count, notification_count), + (highlight_count, notification_count, _last_notification_read), ) = join4(meta, events, member_counts, notification_counts) .boxed() .await; @@ -288,9 +295,9 @@ async fn calculate_heroes( room_id: &RoomId, room_name: Option<&DisplayName>, room_avatar: Option<&MxcUri>, -) -> Result<(Option>, Option, Option)> { +) -> Result<(Option, Option, Option)> { const MAX_HEROES: usize = 5; - let heroes: Vec<_> = services + let heroes: Heroes = services .state_cache .room_members(room_id) .ready_filter(|&member| member != sender_user) diff --git a/src/api/client/sync/v5/selector.rs b/src/api/client/sync/v5/selector.rs index f2f1bc33..7e43bc43 100644 --- a/src/api/client/sync/v5/selector.rs +++ b/src/api/client/sync/v5/selector.rs @@ -2,7 +2,7 @@ use std::cmp::Ordering; use futures::{ FutureExt, StreamExt, TryFutureExt, - future::{OptionFuture, join3}, + future::{OptionFuture, join5}, }; use ruma::{OwnedRoomId, UInt, events::room::member::MembershipState, uint}; use tuwunel_core::{ @@ -48,6 +48,10 @@ pub(super) async fn selector( .enumerate() .for_each(|(i, room)| { room.ranked = i; + + conn.rooms + .entry(room.room_id.clone()) + .or_default(); }); trace!(?rooms); @@ -57,12 +61,6 @@ pub(super) async fn selector( let window = select_window(sync_info, conn, rooms.iter(), &lists).await; trace!(?window); - for room in &rooms { - conn.rooms - .entry(room.room_id.clone()) - .or_default(); - } - (window, lists) } @@ -99,19 +97,20 @@ where rooms .clone() .filter(move |&room| room.lists.contains(&id)) - .enumerate() - .skip_while(move |&(i, room)| { - i < start - || conn - .rooms - .get(&room.room_id) - .is_some_and(|conn_room| conn_room.roomsince >= room.last_count) + .filter(|&room| { + conn.rooms + .get(&room.room_id) + .is_some_and(|conn_room| room.last_count > conn_room.roomsince) }) + .enumerate() + .skip_while(move |&(i, _)| i < start) .take(end.saturating_add(1).saturating_sub(start)) .map(|(_, room)| (room.room_id.clone(), room.clone())) - }); + }) + .stream(); - conn.subscriptions + let subscriptions = conn + .subscriptions .iter() .stream() .broad_filter_map(async |(room_id, _)| { @@ -125,10 +124,9 @@ where last_count: 0, }) }) - .map(|room| (room.room_id.clone(), room)) - .chain(selections.stream()) - .collect() - .await + .map(|room| (room.room_id.clone(), room)); + + subscriptions.chain(selections).collect().await } #[tracing::instrument( @@ -145,7 +143,7 @@ async fn match_lists_for_room( ) -> Option { let SyncInfo { services, sender_user, .. } = sync_info; - let lists = conn + let (matched, lists) = conn .lists .iter() .stream() @@ -164,56 +162,78 @@ async fn match_lists_for_room( .then(|| id.clone()) }) .collect::() + .map(|lists| (lists.is_empty().is_false(), lists)) .await; - let last_timeline_count: OptionFuture<_> = lists - .is_empty() - .is_false() + let last_notification: OptionFuture<_> = matched .then(|| { services - .timeline - .last_timeline_count(None, &room_id, None) - .map_ok(PduCount::into_unsigned) - .ok() + .user + .last_notification_read(sender_user, &room_id) + .map(|count| count.min(conn.next_batch)) }) .into(); - let last_account_count: OptionFuture<_> = lists - .is_empty() - .is_false() + let last_privateread: OptionFuture<_> = matched .then(|| { services - .account_data - .last_count(Some(room_id.as_ref()), sender_user, None) - .ok() + .read_receipt + .last_privateread_update(sender_user, &room_id) + .map(|count| count.min(conn.next_batch)) }) .into(); - let last_receipt_count: OptionFuture<_> = lists - .is_empty() - .is_false() + let last_receipt: OptionFuture<_> = matched + .and_is(false) // masked out, maybe unnecessary .then(|| { services .read_receipt .last_receipt_count(&room_id, sender_user.into(), None) - .map(Result::ok) + .unwrap_or_default() + .map(|count| count.min(conn.next_batch)) }) .into(); - let (last_timeline_count, last_account_count, last_receipt_count) = - join3(last_timeline_count, last_account_count, last_receipt_count).await; + let last_account: OptionFuture<_> = matched + .and_is(false) // masked out, maybe unnecessary + .then(|| { + services + .account_data + .last_count(Some(room_id.as_ref()), sender_user, Some(conn.next_batch)) + .unwrap_or_default() + }) + .into(); + + let last_timeline: OptionFuture<_> = matched + .then(|| { + services + .timeline + .last_timeline_count(None, &room_id, Some(conn.next_batch.into())) + .map_ok(PduCount::into_unsigned) + .unwrap_or_default() + }) + .into(); + + let (last_timeline, last_notification, last_account, last_receipt, last_privateread) = + join5(last_timeline, last_notification, last_account, last_receipt, last_privateread) + .await; Some(WindowRoom { room_id: room_id.clone(), membership, lists, ranked: 0, - last_count: [last_timeline_count, last_account_count, last_receipt_count] - .into_iter() - .map(Option::flatten) - .map(Option::unwrap_or_default) - .max() - .unwrap_or_default(), + last_count: [ + last_timeline, + last_notification, + last_account, + last_receipt, + last_privateread, + ] + .into_iter() + .map(Option::unwrap_or_default) + .max() + .unwrap_or_default(), }) }