Fixes for sync v3 protocol compliance.

Add sync v3 filter support.

Fix events duplicated between state and timeline; cleanup some lets.

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk
2025-08-03 22:06:05 +00:00
parent b00361d274
commit c337ea1864
2 changed files with 177 additions and 129 deletions

View File

@@ -39,6 +39,7 @@ use tuwunel_core::{
Result, at, err, error, extract_variant, is_equal_to,
matrix::{
Event,
event::Matches,
pdu::{EventHash, PduCount, PduEvent},
},
pair_of, ref_at,
@@ -70,8 +71,6 @@ struct StateChanges {
joined_member_count: Option<u64>,
invited_member_count: Option<u64>,
state_events: Vec<PduEvent>,
device_list_updates: HashSet<OwnedUserId>,
left_encrypted_users: HashSet<OwnedUserId>,
}
type PresenceUpdates = HashMap<OwnedUserId, PresenceEventContent>;
@@ -401,6 +400,14 @@ async fn build_sync_events(
.collect()
.await;
let presence_events = presence_updates
.into_iter()
.flat_map(IntoIterator::into_iter)
.map(|(sender, content)| PresenceEvent { content, sender })
.map(|ref event| Raw::new(event))
.filter_map(Result::ok)
.collect();
Ok(sync_events::v3::Response {
account_data: GlobalAccountData { events: account_data },
device_lists: DeviceLists {
@@ -411,15 +418,7 @@ async fn build_sync_events(
// Fallback keys are not yet supported
device_unused_fallback_key_types: None,
next_batch: next_batch.to_string(),
presence: Presence {
events: presence_updates
.into_iter()
.flat_map(IntoIterator::into_iter)
.map(|(sender, content)| PresenceEvent { content, sender })
.map(|ref event| Raw::new(event))
.filter_map(Result::ok)
.collect(),
},
presence: Presence { events: presence_events },
rooms: Rooms {
leave: left_rooms,
join: joined_rooms,
@@ -677,13 +676,20 @@ async fn load_joined_room(
.ok()
.map(Ok);
let timeline_limit: usize = filter
.room
.timeline
.limit
.unwrap_or_else(|| uint!(10))
.try_into()?;
let timeline = load_timeline(
services,
sender_user,
room_id,
sincecount,
Some(next_batchcount),
10_usize,
timeline_limit,
);
let receipt_events = services
@@ -724,7 +730,6 @@ async fn load_joined_room(
.user
.associate_token_shortstatehash(room_id, next_batch, current_shortstatehash);
let initial = since_shortstatehash.is_none() || since == 0;
let lazy_loading_enabled = filter.room.state.lazy_load_options.is_enabled()
|| filter
.room
@@ -740,6 +745,8 @@ async fn load_joined_room(
options: Some(&filter.room.state.lazy_load_options),
};
let initial = since == 0 || since_shortstatehash.is_none();
// Reset lazy loading because this is an initial sync
let lazy_load_reset: OptionFuture<_> = initial
.then(|| {
@@ -768,6 +775,19 @@ async fn load_joined_room(
})
.into();
let horizon_shortstatehash: OptionFuture<_> = timeline_pdus
.iter()
.map(at!(0))
.map(PduCount::into_unsigned)
.map(|shorteventid| {
services
.rooms
.state_accessor
.get_shortstatehash(shorteventid)
})
.next()
.into();
let last_notification_read: OptionFuture<_> = timeline_pdus
.is_empty()
.then(|| {
@@ -788,20 +808,28 @@ async fn load_joined_room(
})
.into();
let encrypted_room = services
.rooms
.state_accessor
.state_get(current_shortstatehash, &StateEventType::RoomEncryption, "")
.is_ok();
let last_privateread_update = services
.rooms
.read_receipt
.last_privateread_update(sender_user, room_id);
let (last_privateread_update, last_notification_read, since_sender_member, witness, ()) =
join5(
last_privateread_update,
last_notification_read,
since_sender_member,
witness,
associate_token,
)
.await;
let (
witness,
(encrypted_room, last_privateread_update, last_notification_read),
(since_sender_member, horizon_shortstatehash, ()),
) = join3(
witness,
join3(encrypted_room, last_privateread_update, last_notification_read),
join3(since_sender_member, horizon_shortstatehash, associate_token),
)
.boxed()
.await;
let joined_since_last_sync =
since_sender_member
@@ -815,26 +843,27 @@ async fn load_joined_room(
joined_member_count,
invited_member_count,
mut state_events,
mut device_list_updates,
left_encrypted_users,
} = calculate_state_changes(
services,
sender_user,
room_id,
full_state,
filter,
encrypted_room,
since_shortstatehash,
horizon_shortstatehash.flat_ok(),
current_shortstatehash,
joined_since_last_sync,
witness.as_ref(),
)
.await?;
let is_sender_membership = |pdu: &PduEvent| {
pdu.kind == StateEventType::RoomMember.into()
&& pdu
.state_key
.as_deref()
let send_notification_counts =
last_notification_read.is_none_or(|last_count| last_count.gt(&since));
let is_sender_membership = |event: &PduEvent| {
*event.event_type() == StateEventType::RoomMember.into()
&& event
.state_key()
.is_some_and(is_equal_to!(sender_user.as_str()))
};
@@ -847,39 +876,6 @@ async fn load_joined_room(
})
.flatten();
let prev_batch = timeline_pdus.first().map(at!(0)).or_else(|| {
joined_sender_member
.is_some()
.then_some(since)
.map(Into::into)
});
let room_events = timeline_pdus
.into_iter()
.stream()
.wide_filter_map(|item| ignored_filter(services, item, sender_user))
.map(at!(1))
.chain(joined_sender_member.into_iter().stream())
.map(Event::into_format)
.collect::<Vec<_>>();
let account_data_events = services
.account_data
.changes_since(Some(room_id), sender_user, since, Some(next_batch))
.ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Room))
.collect();
// Look for device list updates in this room
let device_updates = services
.users
.room_keys_changed(room_id, since, Some(next_batch))
.map(|(user_id, _)| user_id)
.map(ToOwned::to_owned)
.collect::<Vec<_>>();
let send_notification_counts =
last_notification_read.is_none_or(|last_count| last_count.gt(&since));
let notification_count: OptionFuture<_> = send_notification_counts
.then(|| {
services
@@ -928,13 +924,85 @@ async fn load_joined_room(
})
.unwrap_or(Vec::new());
let extract_membership = |event: &PduEvent| {
let content: RoomMemberEventContent = event.get_content().ok()?;
let user_id: OwnedUserId = event.state_key()?.parse().ok()?;
Some((content, user_id))
};
let timeline_membership_changes: Vec<_> = timeline_pdus
.iter()
.map(ref_at!(1))
.filter(|_| !initial)
.filter_map(extract_membership)
.collect();
let device_list_updates = state_events
.iter()
.stream()
.ready_filter(|_| !initial)
.ready_filter(|state_event| *state_event.event_type() == RoomMember)
.ready_filter_map(extract_membership)
.chain(timeline_membership_changes.into_iter().stream())
.fold_default(async |(mut dlu, mut leu): pair_of!(HashSet<_>), (content, user_id)| {
use MembershipState::*;
let shares_encrypted_room = async |user_id| {
share_encrypted_room(services, sender_user, user_id, Some(room_id)).await
};
match content.membership {
| Leave => leu.insert(user_id),
| Join if joined_since_last_sync || !shares_encrypted_room(&user_id).await =>
dlu.insert(user_id),
| _ => false,
};
(dlu, leu)
});
let prev_batch = timeline_pdus.first().map(at!(0)).or_else(|| {
joined_sender_member
.is_some()
.then_some(since)
.map(Into::into)
});
let include_in_timeline = |event: &PduEvent| {
let filter = &filter.room.timeline;
filter.matches(event)
};
let room_events = timeline_pdus
.into_iter()
.stream()
.wide_filter_map(|item| ignored_filter(services, item, sender_user))
.map(at!(1))
.chain(joined_sender_member.into_iter().stream())
.ready_filter(include_in_timeline)
.collect::<Vec<_>>();
let device_updates = services
.users
.room_keys_changed(room_id, since, Some(next_batch))
.map(|(user_id, _)| user_id)
.map(ToOwned::to_owned)
.collect::<Vec<_>>();
let account_data_events = services
.account_data
.changes_since(Some(room_id), sender_user, since, Some(next_batch))
.ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Room))
.collect();
let (
device_updates,
(notification_count, highlight_count),
((mut device_list_updates, left_encrypted_users), device_updates),
(room_events, account_data_events, typing_events, private_read_event),
) = join3(
device_updates,
join(notification_count, highlight_count),
join(device_list_updates, device_updates),
join4(room_events, account_data_events, typing_events, private_read_event),
)
.boxed()
@@ -942,6 +1010,31 @@ async fn load_joined_room(
device_list_updates.extend(device_updates);
let is_in_timeline = |event: &PduEvent| {
room_events
.iter()
.map(Event::event_id)
.any(is_equal_to!(event.event_id()))
};
let include_in_state = |event: &PduEvent| {
let filter = &filter.room.state;
filter.matches(event) && (full_state || !is_in_timeline(event))
};
let state_events = state_events
.into_iter()
.filter(include_in_state)
.map(Event::into_format)
.collect();
let heroes = heroes
.into_iter()
.flatten()
.map(TryInto::try_into)
.filter_map(Result::ok)
.collect();
let edus: Vec<Raw<AnySyncEphemeralRoomEvent>> = receipt_events
.into_values()
.chain(typing_events.into_iter())
@@ -950,28 +1043,21 @@ async fn load_joined_room(
let joined_room = JoinedRoom {
account_data: RoomAccountData { events: account_data_events },
ephemeral: Ephemeral { events: edus },
state: RoomState { events: state_events },
summary: RoomSummary {
joined_member_count: joined_member_count.map(ruma_from_u64),
invited_member_count: invited_member_count.map(ruma_from_u64),
heroes: heroes
.into_iter()
.flatten()
.map(TryInto::try_into)
.filter_map(Result::ok)
.collect(),
},
state: RoomState {
events: state_events
.into_iter()
.map(Event::into_format)
.collect(),
heroes,
},
timeline: Timeline {
limited: limited || joined_since_last_sync,
prev_batch: prev_batch.as_ref().map(ToString::to_string),
events: room_events,
events: room_events
.into_iter()
.map(Event::into_format)
.collect(),
},
ephemeral: Ephemeral { events: edus },
unread_notifications: UnreadNotificationsCount { highlight_count, notification_count },
unread_thread_notifications: BTreeMap::new(),
};
@@ -995,8 +1081,9 @@ async fn calculate_state_changes<'a>(
sender_user: &UserId,
room_id: &RoomId,
full_state: bool,
_filter: &FilterDefinition,
encrypted_room: bool,
since_shortstatehash: Option<ShortStateHash>,
horizon_shortstatehash: Option<ShortStateHash>,
current_shortstatehash: ShortStateHash,
joined_since_last_sync: bool,
witness: Option<&'a Witness>,
@@ -1005,21 +1092,16 @@ async fn calculate_state_changes<'a>(
let incremental = !initial && since_shortstatehash != Some(current_shortstatehash);
let since_shortstatehash = since_shortstatehash.unwrap_or(current_shortstatehash);
let horizon_shortstatehash = horizon_shortstatehash.unwrap_or(current_shortstatehash);
let encrypted_room = services
.rooms
.state_accessor
.state_get(current_shortstatehash, &StateEventType::RoomEncryption, "")
.is_ok()
.await;
let since_shortstatehash = since_shortstatehash.unwrap_or(horizon_shortstatehash);
let state_get_shorteventid = |user_id: &'a UserId| {
services
.rooms
.state_accessor
.state_get_shortid(
current_shortstatehash,
horizon_shortstatehash,
&StateEventType::RoomMember,
user_id.as_str(),
)
@@ -1044,7 +1126,7 @@ async fn calculate_state_changes<'a>(
services
.rooms
.state_accessor
.state_added((since_shortstatehash, current_shortstatehash))
.state_added((since_shortstatehash, horizon_shortstatehash))
.boxed(),
)
})
@@ -1056,7 +1138,7 @@ async fn calculate_state_changes<'a>(
services
.rooms
.state_accessor
.state_full_shortids(current_shortstatehash)
.state_full_shortids(horizon_shortstatehash)
.expect_ok(),
)
})
@@ -1092,34 +1174,6 @@ async fn calculate_state_changes<'a>(
.boxed()
.await;
let device_updates = state_events
.iter()
.stream()
.ready_filter(|_| encrypted_room && !initial)
.ready_filter(|state_event| state_event.kind == RoomMember)
.ready_filter_map(|state_event| {
let content: RoomMemberEventContent = state_event.get_content().ok()?;
let user_id: OwnedUserId = state_event.state_key.as_ref()?.parse().ok()?;
Some((content, user_id))
})
.fold_default(async |(mut dlu, mut leu): pair_of!(HashSet<_>), (content, user_id)| {
use MembershipState::*;
let shares_encrypted_room = async |user_id| {
share_encrypted_room(services, sender_user, user_id, Some(room_id)).await
};
match content.membership {
| Leave => leu.insert(user_id),
| Join if joined_since_last_sync || !shares_encrypted_room(&user_id).await =>
dlu.insert(user_id),
| _ => false,
};
(dlu, leu)
});
let send_member_counts = state_events
.iter()
.any(|event| event.kind == RoomMember);
@@ -1128,20 +1182,14 @@ async fn calculate_state_changes<'a>(
.then(|| calculate_counts(services, room_id, sender_user))
.into();
let (member_counts, device_updates) = join(member_counts, device_updates).await;
let (device_list_updates, left_encrypted_users) = device_updates;
let (joined_member_count, invited_member_count, heroes) =
member_counts.unwrap_or((None, None, None));
member_counts.await.unwrap_or((None, None, None));
Ok(StateChanges {
heroes,
joined_member_count,
invited_member_count,
state_events,
device_list_updates,
left_encrypted_users,
})
}

View File

@@ -344,7 +344,7 @@
{"Action":"pass","Test":"TestMembersLocal/Parallel/Existing_members_see_new_members'_presence_(in_initial_sync)"}
{"Action":"pass","Test":"TestMembersLocal/Parallel/New_room_members_see_their_own_join_event"}
{"Action":"fail","Test":"TestMembershipOnEvents"}
{"Action":"fail","Test":"TestNetworkPartitionOrdering"}
{"Action":"pass","Test":"TestNetworkPartitionOrdering"}
{"Action":"pass","Test":"TestNotPresentUserCannotBanOthers"}
{"Action":"pass","Test":"TestOlderLeftRoomsNotInLeaveSection"}
{"Action":"fail","Test":"TestOutboundFederationEventSizeGetMissingEvents"}
@@ -643,11 +643,11 @@
{"Action":"fail","Test":"TestSync"}
{"Action":"fail","Test":"TestSync/parallel"}
{"Action":"pass","Test":"TestSync/parallel/Can_sync_a_joined_room"}
{"Action":"fail","Test":"TestSync/parallel/Device_list_tracking"}
{"Action":"fail","Test":"TestSync/parallel/Device_list_tracking/User_is_correctly_listed_when_they_leave,_even_when_lazy_loading_is_enabled"}
{"Action":"pass","Test":"TestSync/parallel/Device_list_tracking"}
{"Action":"pass","Test":"TestSync/parallel/Device_list_tracking/User_is_correctly_listed_when_they_leave,_even_when_lazy_loading_is_enabled"}
{"Action":"pass","Test":"TestSync/parallel/Full_state_sync_includes_joined_rooms"}
{"Action":"fail","Test":"TestSync/parallel/Get_presence_for_newly_joined_members_in_incremental_sync"}
{"Action":"fail","Test":"TestSync/parallel/Newly_joined_room_has_correct_timeline_in_incremental_sync"}
{"Action":"pass","Test":"TestSync/parallel/Newly_joined_room_has_correct_timeline_in_incremental_sync"}
{"Action":"fail","Test":"TestSync/parallel/Newly_joined_room_includes_presence_in_incremental_sync"}
{"Action":"pass","Test":"TestSync/parallel/Newly_joined_room_is_included_in_an_incremental_sync"}
{"Action":"pass","Test":"TestSync/parallel/sync_should_succeed_even_if_the_sync_token_points_to_a_redaction_of_an_unknown_event"}