From 39c84fabb446fc701f62f7299cfbdccb979e08e0 Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Thu, 6 Nov 2025 06:25:17 +0000 Subject: [PATCH] Optimize filter fetch in syncv3 prologue. Signed-off-by: Jason Volk --- src/api/client/sync/v3.rs | 65 ++++++++++++++++++++++++--------------- 1 file changed, 40 insertions(+), 25 deletions(-) diff --git a/src/api/client/sync/v3.rs b/src/api/client/sync/v3.rs index a1b700c8..c09d9740 100644 --- a/src/api/client/sync/v3.rs +++ b/src/api/client/sync/v3.rs @@ -128,16 +128,33 @@ pub(crate) async fn sync_events_route( ) -> Result { let (sender_user, sender_device) = body.sender(); + let filter: OptionFuture<_> = body + .body + .filter + .as_ref() + .map(async |filter| match filter { + | Filter::FilterDefinition(filter) => filter.clone(), + | Filter::FilterId(filter_id) => services + .users + .get_filter(sender_user, filter_id) + .await + .unwrap_or_default(), + }) + .into(); + + let filter = filter.map(Option::unwrap_or_default); + let full_state = body.body.full_state; + let set_presence = &body.body.set_presence; let ping_presence = services .presence - .maybe_ping_presence(sender_user, Some(sender_device), &body.body.set_presence) + .maybe_ping_presence(sender_user, body.sender_device.as_deref(), set_presence) .inspect_err(inspect_log) .ok(); // Record user as actively syncing for push suppression heuristic. let note_sync = services.presence.note_sync(sender_user); - join(ping_presence, note_sync).await; + let (filter, ..) = join3(filter, ping_presence, note_sync).await; let mut since = body .body @@ -175,22 +192,31 @@ pub(crate) async fn sync_events_route( let next_batch = services.globals.wait_pending().await?; debug_assert!(since <= next_batch, "next_batch is monotonic"); - if since < next_batch || body.body.full_state { - let response = build_sync_events(&services, &body, since, next_batch).await?; + if since < next_batch || full_state { + let response = build_sync_events( + &services, + body.sender(), + since, + next_batch, + full_state, + &filter, + ) + .await?; + let empty = response.rooms.is_empty() && response.presence.is_empty() && response.account_data.is_empty() && response.device_lists.is_empty() && response.to_device.is_empty(); - if !empty || body.body.full_state { + if !empty || full_state { return Ok(response); } } // Wait for activity if time::timeout_at(stop_at, watchers).await.is_err() || services.server.is_stopping() { - let response = build_empty_response(&services, &body, next_batch).await; + let response = build_empty_response(&services, body.sender(), next_batch).await; trace!(since, next_batch, "empty response"); return Ok(response); } @@ -209,13 +235,13 @@ pub(crate) async fn sync_events_route( async fn build_empty_response( services: &Services, - body: &Ruma, + (sender_user, sender_device): (&UserId, &DeviceId), next_batch: u64, ) -> sync_events::v3::Response { sync_events::v3::Response { device_one_time_keys_count: services .users - .count_one_time_keys(body.sender_user(), body.sender_device()) + .count_one_time_keys(sender_user, sender_device) .await, ..sync_events::v3::Response::new(to_small_string(next_batch)) @@ -235,23 +261,12 @@ async fn build_empty_response( )] async fn build_sync_events( services: &Services, - body: &Ruma, + (sender_user, sender_device): (&UserId, &DeviceId), since: u64, next_batch: u64, + full_state: bool, + filter: &FilterDefinition, ) -> Result { - let (sender_user, sender_device) = body.sender(); - - let full_state = body.body.full_state; - let filter = match body.body.filter.as_ref() { - | None => FilterDefinition::default(), - | Some(Filter::FilterDefinition(filter)) => filter.clone(), - | Some(Filter::FilterId(filter_id)) => services - .users - .get_filter(sender_user, filter_id) - .await - .unwrap_or_default(), - }; - let joined_rooms = services .state_cache .rooms_joined(sender_user) @@ -266,7 +281,7 @@ async fn build_sync_events( since, next_batch, full_state, - &filter, + filter, ) .map_ok(move |(joined_room, dlu, jeu)| (room_id, joined_room, dlu, jeu)) .ok() @@ -297,7 +312,7 @@ async fn build_sync_events( sender_user, next_batch, full_state, - &filter, + filter, ) .map_ok(move |left_room| (room_id, left_room)) .ok() @@ -356,7 +371,7 @@ async fn build_sync_events( let presence_updates: OptionFuture<_> = services .config .allow_local_presence - .then(|| process_presence_updates(services, since, next_batch, sender_user, &filter)) + .then(|| process_presence_updates(services, since, next_batch, sender_user, filter)) .into(); let account_data = services