From 0397bb8237deedc2e355bd74ca715720cb5e8736 Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Wed, 22 Oct 2025 20:53:04 +0000 Subject: [PATCH] Implement specified non-blocking semantic for sliding-sync. Simplify sliding-sync watch loop and bounds. Signed-off-by: Jason Volk --- docker/Dockerfile.matrix-rust-sdk | 3 +- src/api/client/sync/v5.rs | 44 ++++++++++++++++++------------ src/api/client/sync/v5/room.rs | 22 +++++++++------ src/api/client/sync/v5/selector.rs | 6 +--- src/service/sync/mod.rs | 2 +- 5 files changed, 42 insertions(+), 35 deletions(-) diff --git a/docker/Dockerfile.matrix-rust-sdk b/docker/Dockerfile.matrix-rust-sdk index c301c000..3f6ee31c 100644 --- a/docker/Dockerfile.matrix-rust-sdk +++ b/docker/Dockerfile.matrix-rust-sdk @@ -13,8 +13,7 @@ ARG mrsdk_profile="release" ARG mrsdk_target_share="${MRSDK_TARGET_DIR}/${sys_name}/${sys_version}/${rust_target}/${rust_toolchain}/${mrsdk_profile}/_shared_cache" ARG mrsdk_test_args="" ARG mrsdk_test_opts="" -#TODO!!! -ARG mrsdk_skip_list="--skip test_history_share_on_invite_pin_violation --skip test_room_notification_count" +ARG mrsdk_skip_list="" WORKDIR / COPY --link --from=input . . diff --git a/src/api/client/sync/v5.rs b/src/api/client/sync/v5.rs index dbd0b3af..4a305c8a 100644 --- a/src/api/client/sync/v5.rs +++ b/src/api/client/sync/v5.rs @@ -107,13 +107,12 @@ pub(crate) async fn sync_events_v5_route( .map(Duration::as_millis) .map(TryInto::try_into) .flat_ok() - .unwrap_or(services.config.client_sync_timeout_default) - .max(services.config.client_sync_timeout_min) - .min(services.config.client_sync_timeout_max); - - let stop_at = Instant::now() - .checked_add(Duration::from_millis(timeout)) - .expect("configuration must limit maximum timeout"); + .map(|timeout: u64| { + timeout + .max(services.config.client_sync_timeout_min) + .min(services.config.client_sync_timeout_max) + }) + .unwrap_or(0); let conn_key = into_connection_key(sender_user, sender_device, request.conn_id.as_deref()); let conn_val = since @@ -147,7 +146,7 @@ pub(crate) async fn sync_events_v5_route( // Update parameters regardless of replay or advance conn.next_batch = services.globals.wait_pending().await?; - conn.globalsince = since; + conn.globalsince = since.min(conn.next_batch); conn.update_cache(request); conn.update_rooms_prologue(advancing); @@ -159,6 +158,10 @@ pub(crate) async fn sync_events_v5_route( extensions: Default::default(), }; + let stop_at = Instant::now() + .checked_add(Duration::from_millis(timeout)) + .expect("configuration must limit maximum timeout"); + let sync_info = SyncInfo { services, sender_user, sender_device }; loop { debug_assert!( @@ -167,21 +170,20 @@ pub(crate) async fn sync_events_v5_route( ); let window; - (window, response.lists) = selector(&mut conn, sync_info).boxed().await; - let watch_rooms = window.keys().map(AsRef::as_ref).stream(); - let watchers = services - .sync - .watch(sender_user, sender_device, watch_rooms); + let watchers = services.sync.watch( + sender_user, + sender_device, + services.state_cache.rooms_joined(sender_user), + ); - let window; conn.next_batch = services.globals.wait_pending().await?; (window, response.lists) = selector(&mut conn, sync_info).boxed().await; if conn.globalsince < conn.next_batch { - let rooms = - handle_rooms(sync_info, &conn, &window).map_ok(|rooms| response.rooms = rooms); + let rooms = handle_rooms(sync_info, &conn, &window) + .map_ok(|response_rooms| response.rooms = response_rooms); let extensions = handle_extensions(sync_info, &conn, &window) - .map_ok(|extensions| response.extensions = extensions); + .map_ok(|response_extensions| response.extensions = response_extensions); try_join(rooms, extensions).boxed().await?; @@ -194,7 +196,13 @@ pub(crate) async fn sync_events_v5_route( } } - if timeout_at(stop_at, watchers).await.is_err() || services.server.is_stopping() { + if timeout == 0 + || services.server.is_stopping() + || timeout_at(stop_at, watchers) + .boxed() + .await + .is_err() + { response.pos = conn.next_batch.to_string().into(); trace!(conn.globalsince, conn.next_batch, "timeout; empty response {response:?}"); return Ok(response); diff --git a/src/api/client/sync/v5/room.rs b/src/api/client/sync/v5/room.rs index bba9f7a7..ced0a64c 100644 --- a/src/api/client/sync/v5/room.rs +++ b/src/api/client/sync/v5/room.rs @@ -99,11 +99,15 @@ pub(super) async fn handle( .flat_ok() .unwrap_or_else(|| (Vec::new(), true, PduCount::default())); + let required_state = required_state + .into_iter() + .filter(|_| !timeline_pdus.is_empty()) + .collect::>(); + let prev_batch = timeline_pdus .first() .map(at!(0)) .map(PduCount::into_unsigned) - .or_else(|| roomsince.ne(&0).then_some(roomsince)) .as_ref() .map(ToString::to_string); @@ -152,14 +156,6 @@ pub(super) async fn handle( .map(|sender| (StateEventType::RoomMember, StateKey::from_str(sender.as_str()))) .stream(); - let timeline = timeline_pdus - .iter() - .stream() - .filter_map(|item| ignored_filter(services, item.clone(), sender_user)) - .map(at!(1)) - .map(Event::into_format) - .collect(); - let wildcard_state = required_state .iter() .filter(|(_, state_key)| state_key == "*") @@ -253,6 +249,14 @@ pub(super) async fn handle( .user .last_notification_read(sender_user, room_id); + let timeline = timeline_pdus + .iter() + .stream() + .filter_map(|item| ignored_filter(services, item.clone(), sender_user)) + .map(at!(1)) + .map(Event::into_format) + .collect(); + let meta = join3(room_name, room_avatar, is_dm); let events = join4(timeline, num_live, required_state, invite_state); let member_counts = join(joined_count, invited_count); diff --git a/src/api/client/sync/v5/selector.rs b/src/api/client/sync/v5/selector.rs index ecc132cb..ce584a6c 100644 --- a/src/api/client/sync/v5/selector.rs +++ b/src/api/client/sync/v5/selector.rs @@ -174,7 +174,6 @@ async fn match_lists_for_room( services .user .last_notification_read(sender_user, &room_id) - .map(|count| count.min(conn.next_batch)) }) .into(); @@ -183,23 +182,19 @@ async fn match_lists_for_room( services .read_receipt .last_privateread_update(sender_user, &room_id) - .map(|count| count.min(conn.next_batch)) }) .into(); let last_receipt: OptionFuture<_> = matched - .and_is(false) // masked out, maybe unnecessary .then(|| { services .read_receipt .last_receipt_count(&room_id, sender_user.into(), None) .unwrap_or_default() - .map(|count| count.min(conn.next_batch)) }) .into(); let last_account: OptionFuture<_> = matched - .and_is(false) // masked out, maybe unnecessary .then(|| { services .account_data @@ -236,6 +231,7 @@ async fn match_lists_for_room( ] .into_iter() .map(Option::unwrap_or_default) + .filter(|count| conn.next_batch.ge(count)) .max() .unwrap_or_default(), }) diff --git a/src/service/sync/mod.rs b/src/service/sync/mod.rs index a7b3eaac..9c9ea503 100644 --- a/src/service/sync/mod.rs +++ b/src/service/sync/mod.rs @@ -50,7 +50,7 @@ pub struct Connection { pub next_batch: u64, } -#[derive(Clone, Debug, Default)] +#[derive(Clone, Copy, Debug, Default)] pub struct Room { pub roomsince: u64, pub last_batch: u64,