From 7666bb63d8bca53f69b8e27c03fd7c7a60531f17 Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Wed, 26 Nov 2025 22:09:24 +0000 Subject: [PATCH] Optimize syncv3 joined room path with additional conditional queries. Signed-off-by: Jason Volk --- src/api/client/sync/v3.rs | 116 +++++++++++++++++++++++++------------- 1 file changed, 76 insertions(+), 40 deletions(-) diff --git a/src/api/client/sync/v3.rs b/src/api/client/sync/v3.rs index 7e2b8827..cae639a5 100644 --- a/src/api/client/sync/v3.rs +++ b/src/api/client/sync/v3.rs @@ -35,11 +35,11 @@ use ruma::{ }; use tokio::time; use tuwunel_core::{ - Error, Result, at, + Result, at, debug::INFO_SPAN_LEVEL, err, error::{inspect_debug_log, inspect_log}, - extract_variant, is_equal_to, + extract_variant, is_equal_to, is_false, is_true, matrix::{ Event, event::Matches, @@ -744,8 +744,10 @@ async fn load_joined_room( .room .timeline .limit - .unwrap_or_else(|| uint!(10)) - .try_into()?; + .map(TryInto::try_into) + .map_expect("UInt to usize") + .unwrap_or(10) + .min(100); let (timeline_pdus, limited, last_timeline_count) = load_timeline( services, @@ -757,10 +759,20 @@ async fn load_joined_room( ) .await?; - let since_shortstatehash = services - .timeline - .prev_shortstatehash(room_id, PduCount::Normal(since).saturating_add(1)) - .ok(); + let timeline_changed = last_timeline_count.into_unsigned() > since; + debug_assert!( + timeline_pdus.is_empty() || timeline_changed, + "if timeline events, last_timeline_count must be in the since window." + ); + + let since_shortstatehash: OptionFuture<_> = timeline_changed + .then(|| { + services + .timeline + .prev_shortstatehash(room_id, PduCount::Normal(since).saturating_add(1)) + .ok() + }) + .into(); let horizon_shortstatehash: OptionFuture<_> = timeline_pdus .first() @@ -773,12 +785,20 @@ async fn load_joined_room( }) .into(); - let current_shortstatehash = services - .timeline - .get_shortstatehash(room_id, last_timeline_count) - .inspect_err(inspect_debug_log) - .or_else(|_| services.state.get_room_shortstatehash(room_id)) - .map_err(|_| err!(Database(error!("Room {room_id} has no state")))); + let current_shortstatehash: OptionFuture<_> = timeline_changed + .then(|| { + services + .timeline + .get_shortstatehash(room_id, last_timeline_count) + .inspect_err(inspect_debug_log) + .or_else(|_| services.state.get_room_shortstatehash(room_id)) + .map_err(|_| err!(Database(error!("Room {room_id} has no state")))) + }) + .into(); + + let encrypted_room: OptionFuture<_> = timeline_changed + .then(|| services.state_accessor.is_encrypted_room(room_id)) + .into(); let receipt_events = services .read_receipt @@ -792,8 +812,6 @@ async fn load_joined_room( }) .collect::>>(); - let encrypted_room = services.state_accessor.is_encrypted_room(room_id); - let ( (since_shortstatehash, horizon_shortstatehash, current_shortstatehash), receipt_events, @@ -803,8 +821,12 @@ async fn load_joined_room( receipt_events, encrypted_room, ) - .map(|((since, horizon, current), receipt, encrypted_room)| { - Ok::<_, Error>(((since, horizon.flat_ok(), current?), receipt, encrypted_room)) + .map(|((since, horizon, current), receipt, encrypted_room)| -> Result<_> { + Ok(( + (since.flatten(), horizon.flat_ok(), current.transpose()?), + receipt, + encrypted_room, + )) }) .boxed() .await?; @@ -812,7 +834,7 @@ async fn load_joined_room( let lazy_load_options = [&filter.room.state.lazy_load_options, &filter.room.timeline.lazy_load_options]; - let lazy_loading_enabled = !encrypted_room + let lazy_loading_enabled = encrypted_room.is_some_and(is_false!()) && lazy_load_options .iter() .any(|opts| opts.is_enabled()); @@ -847,9 +869,14 @@ async fn load_joined_room( }) .into(); - let sender_joined_count = services - .state_cache - .get_joined_count(room_id, sender_user); + let sender_joined_count: OptionFuture<_> = timeline_changed + .then(|| { + services + .state_cache + .get_joined_count(room_id, sender_user) + .unwrap_or(0) + }) + .into(); let since_encryption: OptionFuture<_> = since_shortstatehash .map(|shortstatehash| { @@ -859,10 +886,6 @@ async fn load_joined_room( }) .into(); - let last_privateread_update = services - .read_receipt - .last_privateread_update(sender_user, room_id); - let last_notification_read: OptionFuture<_> = timeline_pdus .is_empty() .then(|| { @@ -873,6 +896,10 @@ async fn load_joined_room( }) .into(); + let last_privateread_update = services + .read_receipt + .last_privateread_update(sender_user, room_id); + let ( (last_privateread_update, last_notification_read), (sender_joined_count, since_encryption), @@ -884,27 +911,36 @@ async fn load_joined_room( ) .await; - let _encrypted_since_last_sync = !initial && encrypted_room && since_encryption.is_none(); + let _encrypted_since_last_sync = + !initial && encrypted_room.is_some_and(is_true!()) && since_encryption.is_none(); - let joined_since_last_sync = sender_joined_count.is_ok_and(|count| count > since); + let joined_since_last_sync = sender_joined_count.unwrap_or(0) > since; + + let state_changes: OptionFuture<_> = current_shortstatehash + .map(|current_shortstatehash| { + calculate_state_changes( + services, + sender_user, + room_id, + full_state || initial, + since_shortstatehash, + horizon_shortstatehash, + current_shortstatehash, + joined_since_last_sync, + witness.as_ref(), + ) + }) + .into(); let StateChanges { heroes, joined_member_count, invited_member_count, mut state_events, - } = calculate_state_changes( - services, - sender_user, - room_id, - full_state || initial, - since_shortstatehash, - horizon_shortstatehash, - current_shortstatehash, - joined_since_last_sync, - witness.as_ref(), - ) - .await?; + } = state_changes + .await + .transpose()? + .unwrap_or_default(); let is_sender_membership = |event: &PduEvent| { *event.event_type() == StateEventType::RoomMember.into()