Optimize syncv3 joined room path with additional conditional queries.
Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
@@ -35,11 +35,11 @@ use ruma::{
|
||||
};
|
||||
use tokio::time;
|
||||
use tuwunel_core::{
|
||||
Error, Result, at,
|
||||
Result, at,
|
||||
debug::INFO_SPAN_LEVEL,
|
||||
err,
|
||||
error::{inspect_debug_log, inspect_log},
|
||||
extract_variant, is_equal_to,
|
||||
extract_variant, is_equal_to, is_false, is_true,
|
||||
matrix::{
|
||||
Event,
|
||||
event::Matches,
|
||||
@@ -744,8 +744,10 @@ async fn load_joined_room(
|
||||
.room
|
||||
.timeline
|
||||
.limit
|
||||
.unwrap_or_else(|| uint!(10))
|
||||
.try_into()?;
|
||||
.map(TryInto::try_into)
|
||||
.map_expect("UInt to usize")
|
||||
.unwrap_or(10)
|
||||
.min(100);
|
||||
|
||||
let (timeline_pdus, limited, last_timeline_count) = load_timeline(
|
||||
services,
|
||||
@@ -757,10 +759,20 @@ async fn load_joined_room(
|
||||
)
|
||||
.await?;
|
||||
|
||||
let since_shortstatehash = services
|
||||
.timeline
|
||||
.prev_shortstatehash(room_id, PduCount::Normal(since).saturating_add(1))
|
||||
.ok();
|
||||
let timeline_changed = last_timeline_count.into_unsigned() > since;
|
||||
debug_assert!(
|
||||
timeline_pdus.is_empty() || timeline_changed,
|
||||
"if timeline events, last_timeline_count must be in the since window."
|
||||
);
|
||||
|
||||
let since_shortstatehash: OptionFuture<_> = timeline_changed
|
||||
.then(|| {
|
||||
services
|
||||
.timeline
|
||||
.prev_shortstatehash(room_id, PduCount::Normal(since).saturating_add(1))
|
||||
.ok()
|
||||
})
|
||||
.into();
|
||||
|
||||
let horizon_shortstatehash: OptionFuture<_> = timeline_pdus
|
||||
.first()
|
||||
@@ -773,12 +785,20 @@ async fn load_joined_room(
|
||||
})
|
||||
.into();
|
||||
|
||||
let current_shortstatehash = services
|
||||
.timeline
|
||||
.get_shortstatehash(room_id, last_timeline_count)
|
||||
.inspect_err(inspect_debug_log)
|
||||
.or_else(|_| services.state.get_room_shortstatehash(room_id))
|
||||
.map_err(|_| err!(Database(error!("Room {room_id} has no state"))));
|
||||
let current_shortstatehash: OptionFuture<_> = timeline_changed
|
||||
.then(|| {
|
||||
services
|
||||
.timeline
|
||||
.get_shortstatehash(room_id, last_timeline_count)
|
||||
.inspect_err(inspect_debug_log)
|
||||
.or_else(|_| services.state.get_room_shortstatehash(room_id))
|
||||
.map_err(|_| err!(Database(error!("Room {room_id} has no state"))))
|
||||
})
|
||||
.into();
|
||||
|
||||
let encrypted_room: OptionFuture<_> = timeline_changed
|
||||
.then(|| services.state_accessor.is_encrypted_room(room_id))
|
||||
.into();
|
||||
|
||||
let receipt_events = services
|
||||
.read_receipt
|
||||
@@ -792,8 +812,6 @@ async fn load_joined_room(
|
||||
})
|
||||
.collect::<HashMap<OwnedUserId, Raw<AnySyncEphemeralRoomEvent>>>();
|
||||
|
||||
let encrypted_room = services.state_accessor.is_encrypted_room(room_id);
|
||||
|
||||
let (
|
||||
(since_shortstatehash, horizon_shortstatehash, current_shortstatehash),
|
||||
receipt_events,
|
||||
@@ -803,8 +821,12 @@ async fn load_joined_room(
|
||||
receipt_events,
|
||||
encrypted_room,
|
||||
)
|
||||
.map(|((since, horizon, current), receipt, encrypted_room)| {
|
||||
Ok::<_, Error>(((since, horizon.flat_ok(), current?), receipt, encrypted_room))
|
||||
.map(|((since, horizon, current), receipt, encrypted_room)| -> Result<_> {
|
||||
Ok((
|
||||
(since.flatten(), horizon.flat_ok(), current.transpose()?),
|
||||
receipt,
|
||||
encrypted_room,
|
||||
))
|
||||
})
|
||||
.boxed()
|
||||
.await?;
|
||||
@@ -812,7 +834,7 @@ async fn load_joined_room(
|
||||
let lazy_load_options =
|
||||
[&filter.room.state.lazy_load_options, &filter.room.timeline.lazy_load_options];
|
||||
|
||||
let lazy_loading_enabled = !encrypted_room
|
||||
let lazy_loading_enabled = encrypted_room.is_some_and(is_false!())
|
||||
&& lazy_load_options
|
||||
.iter()
|
||||
.any(|opts| opts.is_enabled());
|
||||
@@ -847,9 +869,14 @@ async fn load_joined_room(
|
||||
})
|
||||
.into();
|
||||
|
||||
let sender_joined_count = services
|
||||
.state_cache
|
||||
.get_joined_count(room_id, sender_user);
|
||||
let sender_joined_count: OptionFuture<_> = timeline_changed
|
||||
.then(|| {
|
||||
services
|
||||
.state_cache
|
||||
.get_joined_count(room_id, sender_user)
|
||||
.unwrap_or(0)
|
||||
})
|
||||
.into();
|
||||
|
||||
let since_encryption: OptionFuture<_> = since_shortstatehash
|
||||
.map(|shortstatehash| {
|
||||
@@ -859,10 +886,6 @@ async fn load_joined_room(
|
||||
})
|
||||
.into();
|
||||
|
||||
let last_privateread_update = services
|
||||
.read_receipt
|
||||
.last_privateread_update(sender_user, room_id);
|
||||
|
||||
let last_notification_read: OptionFuture<_> = timeline_pdus
|
||||
.is_empty()
|
||||
.then(|| {
|
||||
@@ -873,6 +896,10 @@ async fn load_joined_room(
|
||||
})
|
||||
.into();
|
||||
|
||||
let last_privateread_update = services
|
||||
.read_receipt
|
||||
.last_privateread_update(sender_user, room_id);
|
||||
|
||||
let (
|
||||
(last_privateread_update, last_notification_read),
|
||||
(sender_joined_count, since_encryption),
|
||||
@@ -884,27 +911,36 @@ async fn load_joined_room(
|
||||
)
|
||||
.await;
|
||||
|
||||
let _encrypted_since_last_sync = !initial && encrypted_room && since_encryption.is_none();
|
||||
let _encrypted_since_last_sync =
|
||||
!initial && encrypted_room.is_some_and(is_true!()) && since_encryption.is_none();
|
||||
|
||||
let joined_since_last_sync = sender_joined_count.is_ok_and(|count| count > since);
|
||||
let joined_since_last_sync = sender_joined_count.unwrap_or(0) > since;
|
||||
|
||||
let state_changes: OptionFuture<_> = current_shortstatehash
|
||||
.map(|current_shortstatehash| {
|
||||
calculate_state_changes(
|
||||
services,
|
||||
sender_user,
|
||||
room_id,
|
||||
full_state || initial,
|
||||
since_shortstatehash,
|
||||
horizon_shortstatehash,
|
||||
current_shortstatehash,
|
||||
joined_since_last_sync,
|
||||
witness.as_ref(),
|
||||
)
|
||||
})
|
||||
.into();
|
||||
|
||||
let StateChanges {
|
||||
heroes,
|
||||
joined_member_count,
|
||||
invited_member_count,
|
||||
mut state_events,
|
||||
} = calculate_state_changes(
|
||||
services,
|
||||
sender_user,
|
||||
room_id,
|
||||
full_state || initial,
|
||||
since_shortstatehash,
|
||||
horizon_shortstatehash,
|
||||
current_shortstatehash,
|
||||
joined_since_last_sync,
|
||||
witness.as_ref(),
|
||||
)
|
||||
.await?;
|
||||
} = state_changes
|
||||
.await
|
||||
.transpose()?
|
||||
.unwrap_or_default();
|
||||
|
||||
let is_sender_membership = |event: &PduEvent| {
|
||||
*event.event_type() == StateEventType::RoomMember.into()
|
||||
|
||||
Reference in New Issue
Block a user