From c337ea1864251320b2a65f41cf50aafadf2ac17b Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Sun, 3 Aug 2025 22:06:05 +0000 Subject: [PATCH] Fixes for sync v3 protocol compliance. Add sync v3 filter support. Fix events duplicated between state and timeline; cleanup some lets. Signed-off-by: Jason Volk --- src/api/client/sync/v3.rs | 298 ++++++++++-------- .../complement/test_results.jsonl | 8 +- 2 files changed, 177 insertions(+), 129 deletions(-) diff --git a/src/api/client/sync/v3.rs b/src/api/client/sync/v3.rs index b1b4cb51..1dd9c2d1 100644 --- a/src/api/client/sync/v3.rs +++ b/src/api/client/sync/v3.rs @@ -39,6 +39,7 @@ use tuwunel_core::{ Result, at, err, error, extract_variant, is_equal_to, matrix::{ Event, + event::Matches, pdu::{EventHash, PduCount, PduEvent}, }, pair_of, ref_at, @@ -70,8 +71,6 @@ struct StateChanges { joined_member_count: Option, invited_member_count: Option, state_events: Vec, - device_list_updates: HashSet, - left_encrypted_users: HashSet, } type PresenceUpdates = HashMap; @@ -401,6 +400,14 @@ async fn build_sync_events( .collect() .await; + let presence_events = presence_updates + .into_iter() + .flat_map(IntoIterator::into_iter) + .map(|(sender, content)| PresenceEvent { content, sender }) + .map(|ref event| Raw::new(event)) + .filter_map(Result::ok) + .collect(); + Ok(sync_events::v3::Response { account_data: GlobalAccountData { events: account_data }, device_lists: DeviceLists { @@ -411,15 +418,7 @@ async fn build_sync_events( // Fallback keys are not yet supported device_unused_fallback_key_types: None, next_batch: next_batch.to_string(), - presence: Presence { - events: presence_updates - .into_iter() - .flat_map(IntoIterator::into_iter) - .map(|(sender, content)| PresenceEvent { content, sender }) - .map(|ref event| Raw::new(event)) - .filter_map(Result::ok) - .collect(), - }, + presence: Presence { events: presence_events }, rooms: Rooms { leave: left_rooms, join: joined_rooms, @@ -677,13 +676,20 @@ async fn load_joined_room( .ok() .map(Ok); + let timeline_limit: usize = filter + .room + .timeline + .limit + .unwrap_or_else(|| uint!(10)) + .try_into()?; + let timeline = load_timeline( services, sender_user, room_id, sincecount, Some(next_batchcount), - 10_usize, + timeline_limit, ); let receipt_events = services @@ -724,7 +730,6 @@ async fn load_joined_room( .user .associate_token_shortstatehash(room_id, next_batch, current_shortstatehash); - let initial = since_shortstatehash.is_none() || since == 0; let lazy_loading_enabled = filter.room.state.lazy_load_options.is_enabled() || filter .room @@ -740,6 +745,8 @@ async fn load_joined_room( options: Some(&filter.room.state.lazy_load_options), }; + let initial = since == 0 || since_shortstatehash.is_none(); + // Reset lazy loading because this is an initial sync let lazy_load_reset: OptionFuture<_> = initial .then(|| { @@ -768,6 +775,19 @@ async fn load_joined_room( }) .into(); + let horizon_shortstatehash: OptionFuture<_> = timeline_pdus + .iter() + .map(at!(0)) + .map(PduCount::into_unsigned) + .map(|shorteventid| { + services + .rooms + .state_accessor + .get_shortstatehash(shorteventid) + }) + .next() + .into(); + let last_notification_read: OptionFuture<_> = timeline_pdus .is_empty() .then(|| { @@ -788,20 +808,28 @@ async fn load_joined_room( }) .into(); + let encrypted_room = services + .rooms + .state_accessor + .state_get(current_shortstatehash, &StateEventType::RoomEncryption, "") + .is_ok(); + let last_privateread_update = services .rooms .read_receipt .last_privateread_update(sender_user, room_id); - let (last_privateread_update, last_notification_read, since_sender_member, witness, ()) = - join5( - last_privateread_update, - last_notification_read, - since_sender_member, - witness, - associate_token, - ) - .await; + let ( + witness, + (encrypted_room, last_privateread_update, last_notification_read), + (since_sender_member, horizon_shortstatehash, ()), + ) = join3( + witness, + join3(encrypted_room, last_privateread_update, last_notification_read), + join3(since_sender_member, horizon_shortstatehash, associate_token), + ) + .boxed() + .await; let joined_since_last_sync = since_sender_member @@ -815,26 +843,27 @@ async fn load_joined_room( joined_member_count, invited_member_count, mut state_events, - mut device_list_updates, - left_encrypted_users, } = calculate_state_changes( services, sender_user, room_id, full_state, - filter, + encrypted_room, since_shortstatehash, + horizon_shortstatehash.flat_ok(), current_shortstatehash, joined_since_last_sync, witness.as_ref(), ) .await?; - let is_sender_membership = |pdu: &PduEvent| { - pdu.kind == StateEventType::RoomMember.into() - && pdu - .state_key - .as_deref() + let send_notification_counts = + last_notification_read.is_none_or(|last_count| last_count.gt(&since)); + + let is_sender_membership = |event: &PduEvent| { + *event.event_type() == StateEventType::RoomMember.into() + && event + .state_key() .is_some_and(is_equal_to!(sender_user.as_str())) }; @@ -847,39 +876,6 @@ async fn load_joined_room( }) .flatten(); - let prev_batch = timeline_pdus.first().map(at!(0)).or_else(|| { - joined_sender_member - .is_some() - .then_some(since) - .map(Into::into) - }); - - let room_events = timeline_pdus - .into_iter() - .stream() - .wide_filter_map(|item| ignored_filter(services, item, sender_user)) - .map(at!(1)) - .chain(joined_sender_member.into_iter().stream()) - .map(Event::into_format) - .collect::>(); - - let account_data_events = services - .account_data - .changes_since(Some(room_id), sender_user, since, Some(next_batch)) - .ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Room)) - .collect(); - - // Look for device list updates in this room - let device_updates = services - .users - .room_keys_changed(room_id, since, Some(next_batch)) - .map(|(user_id, _)| user_id) - .map(ToOwned::to_owned) - .collect::>(); - - let send_notification_counts = - last_notification_read.is_none_or(|last_count| last_count.gt(&since)); - let notification_count: OptionFuture<_> = send_notification_counts .then(|| { services @@ -928,13 +924,85 @@ async fn load_joined_room( }) .unwrap_or(Vec::new()); + let extract_membership = |event: &PduEvent| { + let content: RoomMemberEventContent = event.get_content().ok()?; + let user_id: OwnedUserId = event.state_key()?.parse().ok()?; + + Some((content, user_id)) + }; + + let timeline_membership_changes: Vec<_> = timeline_pdus + .iter() + .map(ref_at!(1)) + .filter(|_| !initial) + .filter_map(extract_membership) + .collect(); + + let device_list_updates = state_events + .iter() + .stream() + .ready_filter(|_| !initial) + .ready_filter(|state_event| *state_event.event_type() == RoomMember) + .ready_filter_map(extract_membership) + .chain(timeline_membership_changes.into_iter().stream()) + .fold_default(async |(mut dlu, mut leu): pair_of!(HashSet<_>), (content, user_id)| { + use MembershipState::*; + + let shares_encrypted_room = async |user_id| { + share_encrypted_room(services, sender_user, user_id, Some(room_id)).await + }; + + match content.membership { + | Leave => leu.insert(user_id), + | Join if joined_since_last_sync || !shares_encrypted_room(&user_id).await => + dlu.insert(user_id), + | _ => false, + }; + + (dlu, leu) + }); + + let prev_batch = timeline_pdus.first().map(at!(0)).or_else(|| { + joined_sender_member + .is_some() + .then_some(since) + .map(Into::into) + }); + + let include_in_timeline = |event: &PduEvent| { + let filter = &filter.room.timeline; + filter.matches(event) + }; + + let room_events = timeline_pdus + .into_iter() + .stream() + .wide_filter_map(|item| ignored_filter(services, item, sender_user)) + .map(at!(1)) + .chain(joined_sender_member.into_iter().stream()) + .ready_filter(include_in_timeline) + .collect::>(); + + let device_updates = services + .users + .room_keys_changed(room_id, since, Some(next_batch)) + .map(|(user_id, _)| user_id) + .map(ToOwned::to_owned) + .collect::>(); + + let account_data_events = services + .account_data + .changes_since(Some(room_id), sender_user, since, Some(next_batch)) + .ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Room)) + .collect(); + let ( - device_updates, (notification_count, highlight_count), + ((mut device_list_updates, left_encrypted_users), device_updates), (room_events, account_data_events, typing_events, private_read_event), ) = join3( - device_updates, join(notification_count, highlight_count), + join(device_list_updates, device_updates), join4(room_events, account_data_events, typing_events, private_read_event), ) .boxed() @@ -942,6 +1010,31 @@ async fn load_joined_room( device_list_updates.extend(device_updates); + let is_in_timeline = |event: &PduEvent| { + room_events + .iter() + .map(Event::event_id) + .any(is_equal_to!(event.event_id())) + }; + + let include_in_state = |event: &PduEvent| { + let filter = &filter.room.state; + filter.matches(event) && (full_state || !is_in_timeline(event)) + }; + + let state_events = state_events + .into_iter() + .filter(include_in_state) + .map(Event::into_format) + .collect(); + + let heroes = heroes + .into_iter() + .flatten() + .map(TryInto::try_into) + .filter_map(Result::ok) + .collect(); + let edus: Vec> = receipt_events .into_values() .chain(typing_events.into_iter()) @@ -950,28 +1043,21 @@ async fn load_joined_room( let joined_room = JoinedRoom { account_data: RoomAccountData { events: account_data_events }, + ephemeral: Ephemeral { events: edus }, + state: RoomState { events: state_events }, summary: RoomSummary { joined_member_count: joined_member_count.map(ruma_from_u64), invited_member_count: invited_member_count.map(ruma_from_u64), - heroes: heroes - .into_iter() - .flatten() - .map(TryInto::try_into) - .filter_map(Result::ok) - .collect(), - }, - state: RoomState { - events: state_events - .into_iter() - .map(Event::into_format) - .collect(), + heroes, }, timeline: Timeline { limited: limited || joined_since_last_sync, prev_batch: prev_batch.as_ref().map(ToString::to_string), - events: room_events, + events: room_events + .into_iter() + .map(Event::into_format) + .collect(), }, - ephemeral: Ephemeral { events: edus }, unread_notifications: UnreadNotificationsCount { highlight_count, notification_count }, unread_thread_notifications: BTreeMap::new(), }; @@ -995,8 +1081,9 @@ async fn calculate_state_changes<'a>( sender_user: &UserId, room_id: &RoomId, full_state: bool, - _filter: &FilterDefinition, + encrypted_room: bool, since_shortstatehash: Option, + horizon_shortstatehash: Option, current_shortstatehash: ShortStateHash, joined_since_last_sync: bool, witness: Option<&'a Witness>, @@ -1005,21 +1092,16 @@ async fn calculate_state_changes<'a>( let incremental = !initial && since_shortstatehash != Some(current_shortstatehash); - let since_shortstatehash = since_shortstatehash.unwrap_or(current_shortstatehash); + let horizon_shortstatehash = horizon_shortstatehash.unwrap_or(current_shortstatehash); - let encrypted_room = services - .rooms - .state_accessor - .state_get(current_shortstatehash, &StateEventType::RoomEncryption, "") - .is_ok() - .await; + let since_shortstatehash = since_shortstatehash.unwrap_or(horizon_shortstatehash); let state_get_shorteventid = |user_id: &'a UserId| { services .rooms .state_accessor .state_get_shortid( - current_shortstatehash, + horizon_shortstatehash, &StateEventType::RoomMember, user_id.as_str(), ) @@ -1044,7 +1126,7 @@ async fn calculate_state_changes<'a>( services .rooms .state_accessor - .state_added((since_shortstatehash, current_shortstatehash)) + .state_added((since_shortstatehash, horizon_shortstatehash)) .boxed(), ) }) @@ -1056,7 +1138,7 @@ async fn calculate_state_changes<'a>( services .rooms .state_accessor - .state_full_shortids(current_shortstatehash) + .state_full_shortids(horizon_shortstatehash) .expect_ok(), ) }) @@ -1092,34 +1174,6 @@ async fn calculate_state_changes<'a>( .boxed() .await; - let device_updates = state_events - .iter() - .stream() - .ready_filter(|_| encrypted_room && !initial) - .ready_filter(|state_event| state_event.kind == RoomMember) - .ready_filter_map(|state_event| { - let content: RoomMemberEventContent = state_event.get_content().ok()?; - let user_id: OwnedUserId = state_event.state_key.as_ref()?.parse().ok()?; - - Some((content, user_id)) - }) - .fold_default(async |(mut dlu, mut leu): pair_of!(HashSet<_>), (content, user_id)| { - use MembershipState::*; - - let shares_encrypted_room = async |user_id| { - share_encrypted_room(services, sender_user, user_id, Some(room_id)).await - }; - - match content.membership { - | Leave => leu.insert(user_id), - | Join if joined_since_last_sync || !shares_encrypted_room(&user_id).await => - dlu.insert(user_id), - | _ => false, - }; - - (dlu, leu) - }); - let send_member_counts = state_events .iter() .any(|event| event.kind == RoomMember); @@ -1128,20 +1182,14 @@ async fn calculate_state_changes<'a>( .then(|| calculate_counts(services, room_id, sender_user)) .into(); - let (member_counts, device_updates) = join(member_counts, device_updates).await; - - let (device_list_updates, left_encrypted_users) = device_updates; - let (joined_member_count, invited_member_count, heroes) = - member_counts.unwrap_or((None, None, None)); + member_counts.await.unwrap_or((None, None, None)); Ok(StateChanges { heroes, joined_member_count, invited_member_count, state_events, - device_list_updates, - left_encrypted_users, }) } diff --git a/tests/test_results/complement/test_results.jsonl b/tests/test_results/complement/test_results.jsonl index dbc37bf0..a556c802 100644 --- a/tests/test_results/complement/test_results.jsonl +++ b/tests/test_results/complement/test_results.jsonl @@ -344,7 +344,7 @@ {"Action":"pass","Test":"TestMembersLocal/Parallel/Existing_members_see_new_members'_presence_(in_initial_sync)"} {"Action":"pass","Test":"TestMembersLocal/Parallel/New_room_members_see_their_own_join_event"} {"Action":"fail","Test":"TestMembershipOnEvents"} -{"Action":"fail","Test":"TestNetworkPartitionOrdering"} +{"Action":"pass","Test":"TestNetworkPartitionOrdering"} {"Action":"pass","Test":"TestNotPresentUserCannotBanOthers"} {"Action":"pass","Test":"TestOlderLeftRoomsNotInLeaveSection"} {"Action":"fail","Test":"TestOutboundFederationEventSizeGetMissingEvents"} @@ -643,11 +643,11 @@ {"Action":"fail","Test":"TestSync"} {"Action":"fail","Test":"TestSync/parallel"} {"Action":"pass","Test":"TestSync/parallel/Can_sync_a_joined_room"} -{"Action":"fail","Test":"TestSync/parallel/Device_list_tracking"} -{"Action":"fail","Test":"TestSync/parallel/Device_list_tracking/User_is_correctly_listed_when_they_leave,_even_when_lazy_loading_is_enabled"} +{"Action":"pass","Test":"TestSync/parallel/Device_list_tracking"} +{"Action":"pass","Test":"TestSync/parallel/Device_list_tracking/User_is_correctly_listed_when_they_leave,_even_when_lazy_loading_is_enabled"} {"Action":"pass","Test":"TestSync/parallel/Full_state_sync_includes_joined_rooms"} {"Action":"fail","Test":"TestSync/parallel/Get_presence_for_newly_joined_members_in_incremental_sync"} -{"Action":"fail","Test":"TestSync/parallel/Newly_joined_room_has_correct_timeline_in_incremental_sync"} +{"Action":"pass","Test":"TestSync/parallel/Newly_joined_room_has_correct_timeline_in_incremental_sync"} {"Action":"fail","Test":"TestSync/parallel/Newly_joined_room_includes_presence_in_incremental_sync"} {"Action":"pass","Test":"TestSync/parallel/Newly_joined_room_is_included_in_an_incremental_sync"} {"Action":"pass","Test":"TestSync/parallel/sync_should_succeed_even_if_the_sync_token_points_to_a_redaction_of_an_unknown_event"}