diff --git a/src/api/client/sync/v3.rs b/src/api/client/sync/v3.rs index 9133aa08..7063b94d 100644 --- a/src/api/client/sync/v3.rs +++ b/src/api/client/sync/v3.rs @@ -5,8 +5,8 @@ use std::{ use axum::extract::State; use futures::{ - FutureExt, StreamExt, TryFutureExt, TryStreamExt, - future::{OptionFuture, join, join3, join4, join5, try_join3}, + FutureExt, StreamExt, TryFutureExt, + future::{OptionFuture, join, join3, join4, join5}, pin_mut, }; use ruma::{ @@ -36,7 +36,11 @@ use ruma::{ }; use tokio::time; use tuwunel_core::{ - Result, at, err, error, extract_variant, is_equal_to, + Error, Result, at, + debug::INFO_SPAN_LEVEL, + err, error, + error::inspect_debug_log, + extract_variant, is_equal_to, matrix::{ Event, event::Matches, @@ -212,7 +216,7 @@ async fn build_empty_response( #[tracing::instrument( name = "build", - level = "debug", + level = INFO_SPAN_LEVEL, ret(level = "trace"), skip_all, fields( @@ -562,20 +566,22 @@ async fn handle_left_room( let mut left_state_events = Vec::new(); let since_shortstatehash = services - .user - .get_token_shortstatehash(room_id, since); + .timeline + .prev_shortstatehash(room_id, PduCount::Normal(since).saturating_add(1)) + .await + .ok(); let since_state_ids: HashMap<_, OwnedEventId> = since_shortstatehash - .map_ok(|since_shortstatehash| { + .map(|since_shortstatehash| { services .state_accessor .state_full_ids(since_shortstatehash) - .map(Ok) }) - .try_flatten_stream() - .try_collect() - .await - .unwrap_or_default(); + .into_iter() + .stream() + .flatten() + .collect() + .await; let Ok(left_event_id): Result = services .state_accessor @@ -666,12 +672,7 @@ async fn load_joined_room( full_state: bool, filter: &FilterDefinition, ) -> Result<(JoinedRoom, HashSet, HashSet)> { - let since_shortstatehash = services - .user - .get_token_shortstatehash(room_id, since) - .ok() - .map(Ok); - + let initial = since == 0; let timeline_limit: usize = filter .room .timeline @@ -679,14 +680,38 @@ async fn load_joined_room( .unwrap_or_else(|| uint!(10)) .try_into()?; - let timeline = load_timeline( + let (timeline_pdus, limited, last_timeline_count) = load_timeline( services, sender_user, room_id, PduCount::Normal(since), Some(PduCount::Normal(next_batch)), timeline_limit, - ); + ) + .await?; + + let since_shortstatehash = services + .timeline + .prev_shortstatehash(room_id, PduCount::Normal(since).saturating_add(1)) + .ok(); + + let horizon_shortstatehash: OptionFuture<_> = timeline_pdus + .first() + .map(at!(0)) + .map(|count| { + services + .timeline + .get_shortstatehash(room_id, count) + .inspect_err(inspect_debug_log) + }) + .into(); + + let current_shortstatehash = services + .timeline + .get_shortstatehash(room_id, last_timeline_count) + .inspect_err(inspect_debug_log) + .or_else(|_| services.state.get_room_shortstatehash(room_id)) + .map_err(|_| err!(Database(error!("Room {room_id} has no state")))); let receipt_events = services .read_receipt @@ -698,46 +723,32 @@ async fn load_joined_room( .await .or_some((read_user.to_owned(), edu)) }) - .collect::>>() - .map(Ok); + .collect::>>(); - let (since_shortstatehash, (timeline_pdus, limited, last_timeline_count), receipt_events) = - try_join3(since_shortstatehash, timeline, receipt_events) - .boxed() - .await?; + let encrypted_room = services.state_accessor.is_encrypted_room(room_id); - let horizon_shortstatehash: OptionFuture<_> = timeline_pdus - .iter() - .map(at!(0)) - .map(PduCount::into_unsigned) - .map(|shorteventid| services.state.get_shortstatehash(shorteventid)) - .next() - .into(); + let ( + (since_shortstatehash, horizon_shortstatehash, current_shortstatehash), + receipt_events, + encrypted_room, + ) = join3( + join3(since_shortstatehash, horizon_shortstatehash, current_shortstatehash), + receipt_events, + encrypted_room, + ) + .map(|((since, horizon, current), receipt, encrypted_room)| { + Ok::<_, Error>(((since, horizon.flat_ok(), current?), receipt, encrypted_room)) + }) + .boxed() + .await?; - let current_shortstatehash = services - .state - .get_shortstatehash(last_timeline_count.into_unsigned()) - .or_else(|_| services.state.get_room_shortstatehash(room_id)); + let lazy_load_options = + [&filter.room.state.lazy_load_options, &filter.room.timeline.lazy_load_options]; - let (horizon_shortstatehash, current_shortstatehash) = - join(horizon_shortstatehash, current_shortstatehash) - .boxed() - .await; - - let current_shortstatehash = current_shortstatehash - .map_err(|_| err!(Database(error!("Room {room_id} has no state"))))?; - - let associate_token = - services - .user - .associate_token_shortstatehash(room_id, next_batch, current_shortstatehash); - - let lazy_loading_enabled = filter.room.state.lazy_load_options.is_enabled() - || filter - .room - .timeline - .lazy_load_options - .is_enabled(); + let lazy_loading_enabled = !encrypted_room + && lazy_load_options + .iter() + .any(|opts| opts.is_enabled()); let lazy_loading_context = &lazy_loading::Context { user_id: sender_user, @@ -747,8 +758,6 @@ async fn load_joined_room( options: Some(&filter.room.state.lazy_load_options), }; - let initial = since == 0 || since_shortstatehash.is_none(); - // Reset lazy loading because this is an initial sync let lazy_load_reset: OptionFuture<_> = initial .then(|| services.lazy_loading.reset(lazy_loading_context)) @@ -771,6 +780,22 @@ async fn load_joined_room( }) .into(); + let sender_joined_count = services + .state_cache + .get_joined_count(room_id, sender_user); + + let since_encryption: OptionFuture<_> = since_shortstatehash + .map(|shortstatehash| { + services + .state_accessor + .state_get(shortstatehash, &StateEventType::RoomEncryption, "") + }) + .into(); + + let last_privateread_update = services + .read_receipt + .last_privateread_update(sender_user, room_id); + let last_notification_read: OptionFuture<_> = timeline_pdus .is_empty() .then(|| { @@ -780,38 +805,20 @@ async fn load_joined_room( }) .into(); - let since_sender_member: OptionFuture<_> = since_shortstatehash - .map(|short| { - services - .state_accessor - .state_get_content(short, &StateEventType::RoomMember, sender_user.as_str()) - .ok() - }) - .into(); - - let encrypted_room = services.state_accessor.is_encrypted_room(room_id); - - let last_privateread_update = services - .read_receipt - .last_privateread_update(sender_user, room_id); - let ( - (witness, since_sender_member), - (encrypted_room, ()), (last_privateread_update, last_notification_read), + (sender_joined_count, since_encryption), + witness, ) = join3( - join(witness, since_sender_member), - join(encrypted_room, associate_token), join(last_privateread_update, last_notification_read), + join(sender_joined_count, since_encryption), + witness, ) .await; - let joined_since_last_sync = - since_sender_member - .flatten() - .is_none_or(|content: RoomMemberEventContent| { - content.membership != MembershipState::Join - }); + let _encrypted_since_last_sync = !initial && encrypted_room && since_encryption.is_none(); + + let joined_since_last_sync = sender_joined_count.is_ok_and(|count| count > since); let StateChanges { heroes, @@ -822,19 +829,15 @@ async fn load_joined_room( services, sender_user, room_id, - full_state, - encrypted_room, + full_state || initial, since_shortstatehash, - horizon_shortstatehash.flat_ok(), + horizon_shortstatehash, current_shortstatehash, joined_since_last_sync, witness.as_ref(), ) .await?; - let send_notification_counts = - last_notification_read.is_none_or(|last_count| last_count.gt(&since)); - let is_sender_membership = |event: &PduEvent| { *event.event_type() == StateEventType::RoomMember.into() && event @@ -842,14 +845,25 @@ async fn load_joined_room( .is_some_and(is_equal_to!(sender_user.as_str())) }; - let joined_sender_member: Option<_> = (joined_since_last_sync && timeline_pdus.is_empty()) - .then(|| { - state_events - .iter() - .position(is_sender_membership) - .map(|pos| state_events.swap_remove(pos)) - }) - .flatten(); + let joined_sender_member: Option<_> = + (joined_since_last_sync && timeline_pdus.is_empty() && !initial) + .then(|| { + state_events + .iter() + .position(is_sender_membership) + .map(|pos| state_events.swap_remove(pos)) + }) + .flatten(); + + let prev_batch = timeline_pdus.first().map(at!(0)).or_else(|| { + joined_sender_member + .is_some() + .then_some(since) + .map(Into::into) + }); + + let send_notification_counts = + last_notification_read.is_none_or(|last_count| last_count.gt(&since)); let notification_count: OptionFuture<_> = send_notification_counts .then(|| { @@ -895,19 +909,26 @@ async fn load_joined_room( }) .unwrap_or(Vec::new()); + let keys_changed = services + .users + .room_keys_changed(room_id, since, Some(next_batch)) + .map(|(user_id, _)| user_id) + .map(ToOwned::to_owned) + .collect::>(); + let extract_membership = |event: &PduEvent| { let content: RoomMemberEventContent = event.get_content().ok()?; let user_id: OwnedUserId = event.state_key()?.parse().ok()?; - Some((content, user_id)) + Some((content.membership, user_id)) }; - let timeline_membership_changes: Vec<_> = timeline_pdus + let timeline_membership_changes = timeline_pdus .iter() - .map(ref_at!(1)) .filter(|_| !initial) + .map(ref_at!(1)) .filter_map(extract_membership) - .collect(); + .collect::>(); let device_list_updates = state_events .iter() @@ -915,31 +936,27 @@ async fn load_joined_room( .ready_filter(|_| !initial) .ready_filter(|state_event| *state_event.event_type() == RoomMember) .ready_filter_map(extract_membership) - .chain(timeline_membership_changes.into_iter().stream()) - .fold_default(async |(mut dlu, mut leu): pair_of!(HashSet<_>), (content, user_id)| { + .chain(timeline_membership_changes.stream()) + .fold_default(async |(mut dlu, mut leu): pair_of!(HashSet<_>), (membership, user_id)| { use MembershipState::*; - let shares_encrypted_room = async |user_id| { - share_encrypted_room(services, sender_user, user_id, Some(room_id)).await + let requires_update = async |user_id| { + !share_encrypted_room(services, sender_user, user_id, Some(room_id)).await }; - match content.membership { + match membership { + | Join if requires_update(&user_id).await => dlu.insert(user_id), | Leave => leu.insert(user_id), - | Join if joined_since_last_sync || !shares_encrypted_room(&user_id).await => - dlu.insert(user_id), | _ => false, }; + (dlu, leu) + }) + .then(async |(mut dlu, leu)| { + dlu.extend(keys_changed.await); (dlu, leu) }); - let prev_batch = timeline_pdus.first().map(at!(0)).or_else(|| { - joined_sender_member - .is_some() - .then_some(since) - .map(Into::into) - }); - let include_in_timeline = |event: &PduEvent| { let filter = &filter.room.timeline; filter.matches(event) @@ -954,33 +971,26 @@ async fn load_joined_room( .ready_filter(include_in_timeline) .collect::>(); - let device_updates = services - .users - .room_keys_changed(room_id, since, Some(next_batch)) - .map(|(user_id, _)| user_id) - .map(ToOwned::to_owned) - .collect::>(); - let account_data_events = services .account_data - .changes_since(Some(room_id), sender_user, since, Some(next_batch)) + .changes_since(Some(room_id), sender_user, since, None) .ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Room)) .collect(); let ( + (room_events, account_data_events), + (typing_events, private_read_event), (notification_count, highlight_count), - ((mut device_list_updates, left_encrypted_users), device_updates), - (room_events, account_data_events, typing_events, private_read_event), - ) = join3( + (device_list_updates, left_encrypted_users), + ) = join4( + join(room_events, account_data_events), + join(typing_events, private_read_event), join(notification_count, highlight_count), - join(device_list_updates, device_updates), - join4(room_events, account_data_events, typing_events, private_read_event), + device_list_updates, ) .boxed() .await; - device_list_updates.extend(device_updates); - let is_in_timeline = |event: &PduEvent| { room_events .iter() @@ -1042,8 +1052,9 @@ async fn load_joined_room( skip_all, fields( full = %full_state, - cs = %current_shortstatehash, ss = ?since_shortstatehash, + hs = ?horizon_shortstatehash, + cs = %current_shortstatehash, ) )] #[allow(clippy::too_many_arguments)] @@ -1052,16 +1063,13 @@ async fn calculate_state_changes<'a>( sender_user: &UserId, room_id: &RoomId, full_state: bool, - encrypted_room: bool, since_shortstatehash: Option, horizon_shortstatehash: Option, current_shortstatehash: ShortStateHash, joined_since_last_sync: bool, witness: Option<&'a Witness>, ) -> Result { - let initial = full_state || since_shortstatehash.is_none() || joined_since_last_sync; - - let incremental = !initial && since_shortstatehash != Some(current_shortstatehash); + let incremental = !full_state && !joined_since_last_sync && since_shortstatehash.is_some(); let horizon_shortstatehash = horizon_shortstatehash.unwrap_or(current_shortstatehash); @@ -1079,36 +1087,33 @@ async fn calculate_state_changes<'a>( }; let lazy_state_ids: OptionFuture<_> = witness - .filter(|_| !encrypted_room) .map(|witness| { - StreamExt::into_future( - witness - .iter() - .stream() - .broad_filter_map(|user_id| state_get_shorteventid(user_id)), - ) + witness + .iter() + .stream() + .ready_filter(|&user_id| user_id != sender_user) + .broad_filter_map(|user_id| state_get_shorteventid(user_id)) + .into_future() }) .into(); let state_diff_ids: OptionFuture<_> = incremental .then(|| { - StreamExt::into_future( - services - .state_accessor - .state_added((since_shortstatehash, horizon_shortstatehash)) - .boxed(), - ) + services + .state_accessor + .state_added((since_shortstatehash, horizon_shortstatehash)) + .boxed() + .into_future() }) .into(); - let current_state_ids: OptionFuture<_> = initial + let current_state_ids: OptionFuture<_> = (!incremental) .then(|| { - StreamExt::into_future( - services - .state_accessor - .state_full_shortids(horizon_shortstatehash) - .expect_ok(), - ) + services + .state_accessor + .state_full_shortids(horizon_shortstatehash) + .expect_ok() + .into_future() }) .into(); @@ -1116,28 +1121,24 @@ async fn calculate_state_changes<'a>( .stream() .chain(state_diff_ids.stream()) .broad_filter_map(async |(shortstatekey, shorteventid)| { - if witness.is_none() || encrypted_room { - return Some(shorteventid); - } - - lazy_filter(services, sender_user, shortstatekey, shorteventid).await + lazy_filter(services, sender_user, witness, shortstatekey, shorteventid).await }) .chain(lazy_state_ids.stream()) .broad_filter_map(|shorteventid| { services .short .get_eventid_from_short(shorteventid) + .and_then(async |event_id: OwnedEventId| { + services.timeline.get_pdu(&event_id).await + }) .ok() }) - .broad_filter_map(async |event_id: OwnedEventId| { - services.timeline.get_pdu(&event_id).ok().await - }) .collect::>() .await; let send_member_counts = state_events .iter() - .any(|event| event.kind == RoomMember); + .any(|event| *event.kind() == RoomMember); let member_counts: OptionFuture<_> = send_member_counts .then(|| calculate_counts(services, room_id, sender_user)) @@ -1157,9 +1158,14 @@ async fn calculate_state_changes<'a>( async fn lazy_filter( services: &Services, sender_user: &UserId, + witness: Option<&Witness>, shortstatekey: ShortStateKey, shorteventid: ShortEventId, ) -> Option { + if witness.is_none() { + return Some(shorteventid); + } + let (event_type, state_key) = services .short .get_statekey_from_short(shortstatekey) diff --git a/src/api/client/sync/v5.rs b/src/api/client/sync/v5.rs index 26204878..4751fd84 100644 --- a/src/api/client/sync/v5.rs +++ b/src/api/client/sync/v5.rs @@ -1100,8 +1100,8 @@ where }; let since_shortstatehash = services - .user - .get_token_shortstatehash(room_id, globalsince) + .timeline + .next_shortstatehash(room_id, PduCount::Normal(globalsince)) .await .ok(); @@ -1117,26 +1117,44 @@ where continue; } - let since_encryption = services - .state_accessor - .state_get(since_shortstatehash, &StateEventType::RoomEncryption, "") - .await; + let synced_shortstatehash = services + .timeline + .prev_shortstatehash(room_id, PduCount::Normal(globalsince).saturating_add(1)) + .await + .ok(); - let since_sender_member: Option = services - .state_accessor - .state_get_content( - since_shortstatehash, - &StateEventType::RoomMember, - sender_user.as_str(), - ) - .ok() - .await; + let synced_sender_member: OptionFuture<_> = synced_shortstatehash + .map(|shortstatehash| { + services + .state_accessor + .state_get_content( + shortstatehash, + &StateEventType::RoomMember, + sender_user.as_str(), + ) + .map_ok(|content: RoomMemberEventContent| content) + }) + .into(); - let joined_since_last_sync = since_sender_member + let joined_since_last_sync = synced_sender_member + .await + .and_then(Result::ok) .as_ref() .is_none_or(|member| member.membership != MembershipState::Join); - let new_encrypted_room = encrypted_room && since_encryption.is_err(); + let synced_encryption: OptionFuture<_> = synced_shortstatehash + .map(|shortstatehash| { + services.state_accessor.state_get( + shortstatehash, + &StateEventType::RoomEncryption, + "", + ) + }) + .into(); + + let synced_encryption = synced_encryption.await.and_then(Result::ok); + + let new_encrypted_room = encrypted_room && synced_encryption.is_none(); if encrypted_room { let current_state_ids: HashMap<_, OwnedEventId> = services diff --git a/src/database/maps.rs b/src/database/maps.rs index 0177cf23..2347326e 100644 --- a/src/database/maps.rs +++ b/src/database/maps.rs @@ -183,15 +183,6 @@ pub(super) static MAPS: &[Descriptor] = &[ name: "roomserverids", ..descriptor::RANDOM_SMALL }, - Descriptor { - name: "roomsynctoken_shortstatehash", - file_shape: 3, - val_size_hint: Some(8), - block_size: 512, - compression_level: 3, - bottommost_level: Some(6), - ..descriptor::SEQUENTIAL - }, Descriptor { name: "roomuserdataid_accountdata", ..descriptor::RANDOM_SMALL diff --git a/src/service/rooms/delete/mod.rs b/src/service/rooms/delete/mod.rs index 26f897da..86803bc1 100644 --- a/src/service/rooms/delete/mod.rs +++ b/src/service/rooms/delete/mod.rs @@ -167,14 +167,6 @@ impl Service { debug!("Final stages of deleting the room"); - debug!("Deleting room sync tokens from our database"); - self.services - .user - .delete_room_synctokens(room_id) - .await - .log_err() - .ok(); - debug!("Deleting room state hash from our database"); self.services .state diff --git a/src/service/rooms/user/mod.rs b/src/service/rooms/user/mod.rs index 341135dc..68b29ca2 100644 --- a/src/service/rooms/user/mod.rs +++ b/src/service/rooms/user/mod.rs @@ -5,9 +5,7 @@ use tuwunel_core::{ Result, implement, trace, utils::stream::{ReadyExt, TryIgnore}, }; -use tuwunel_database::{Database, Deserialized, Interfix, Map}; - -use crate::rooms::short::ShortStateHash; +use tuwunel_database::{Deserialized, Interfix, Map}; pub struct Service { db: Data, @@ -15,22 +13,18 @@ pub struct Service { } struct Data { - db: Arc, userroomid_notificationcount: Arc, userroomid_highlightcount: Arc, roomuserid_lastnotificationread: Arc, - roomsynctoken_shortstatehash: Arc, } impl crate::Service for Service { fn build(args: &crate::Args<'_>) -> Result> { Ok(Arc::new(Self { db: Data { - db: args.db.clone(), userroomid_notificationcount: args.db["userroomid_notificationcount"].clone(), userroomid_highlightcount: args.db["userroomid_highlightcount"].clone(), roomuserid_lastnotificationread: args.db["userroomid_highlightcount"].clone(), - roomsynctoken_shortstatehash: args.db["roomsynctoken_shortstatehash"].clone(), }, services: args.services.clone(), })) @@ -107,66 +101,3 @@ pub async fn delete_room_notification_read(&self, room_id: &RoomId) -> Result { Ok(()) } - -#[implement(Service)] -#[tracing::instrument(level = "trace", skip(self))] -pub async fn associate_token_shortstatehash( - &self, - room_id: &RoomId, - token: u64, - shortstatehash: ShortStateHash, -) { - let shortroomid = self - .services - .short - .get_shortroomid(room_id) - .await - .expect("room exists"); - - let _cork = self.db.db.cork(); - let key: &[u64] = &[shortroomid, token]; - self.db - .roomsynctoken_shortstatehash - .put(key, shortstatehash); -} - -#[implement(Service)] -pub async fn get_token_shortstatehash( - &self, - room_id: &RoomId, - token: u64, -) -> Result { - let shortroomid = self - .services - .short - .get_shortroomid(room_id) - .await?; - - let key: &[u64] = &[shortroomid, token]; - self.db - .roomsynctoken_shortstatehash - .qry(key) - .await - .deserialized() -} - -#[implement(Service)] -pub async fn delete_room_synctokens(&self, room_id: &RoomId) -> Result { - let shortroomid = self - .services - .short - .get_shortroomid(room_id) - .await?; - - self.db - .roomsynctoken_shortstatehash - .keys_prefix_raw(&shortroomid) - .ignore_err() - .ready_for_each(|key| { - trace!("Removing key: {key:?}"); - self.db.roomsynctoken_shortstatehash.remove(key); - }) - .await; - - Ok(()) -}