From 3c47516c85298690c0d3f6b456ae6a04e96511ec Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Tue, 8 Jul 2025 13:06:51 +0000 Subject: [PATCH] Pipeline private read receipt fetch; cleanup tuples syncv3. Signed-off-by: Jason Volk --- src/api/client/sync/v3.rs | 104 +++++++++++++++++++++----------------- 1 file changed, 58 insertions(+), 46 deletions(-) diff --git a/src/api/client/sync/v3.rs b/src/api/client/sync/v3.rs index e69f9977..237118ac 100644 --- a/src/api/client/sync/v3.rs +++ b/src/api/client/sync/v3.rs @@ -317,16 +317,27 @@ pub(crate) async fn build_sync_events( .users .remove_to_device_events(sender_user, sender_device, since); - let rooms = join4(joined_rooms, left_rooms, invited_rooms, knocked_rooms); - let ephemeral = join3(remove_to_device_events, to_device_events, presence_updates); - let top = join5(account_data, ephemeral, device_one_time_keys_count, keys_changed, rooms) - .boxed() - .await; + let ( + account_data, + keys_changed, + device_one_time_keys_count, + ((), to_device_events, presence_updates), + ( + (joined_rooms, mut device_list_updates, left_encrypted_users), + left_rooms, + invited_rooms, + knocked_rooms, + ), + ) = join5( + account_data, + keys_changed, + device_one_time_keys_count, + join3(remove_to_device_events, to_device_events, presence_updates), + join4(joined_rooms, left_rooms, invited_rooms, knocked_rooms), + ) + .boxed() + .await; - let (account_data, ephemeral, device_one_time_keys_count, keys_changed, rooms) = top; - let ((), to_device_events, presence_updates) = ephemeral; - let (joined_rooms, left_rooms, invited_rooms, knocked_rooms) = rooms; - let (joined_rooms, mut device_list_updates, left_encrypted_users) = joined_rooms; device_list_updates.extend(keys_changed); // If the user doesn't share an encrypted room with the target anymore, we need @@ -630,12 +641,11 @@ async fn load_joined_room( .collect::>>() .map(Ok); - let (current_shortstatehash, since_shortstatehash, timeline, receipt_events) = + let (current_shortstatehash, since_shortstatehash, (timeline_pdus, limited), receipt_events) = try_join4(current_shortstatehash, since_shortstatehash, timeline, receipt_events) .boxed() .await?; - let (timeline_pdus, limited) = timeline; let initial = since_shortstatehash.is_none(); let lazy_loading_enabled = filter.room.state.lazy_load_options.is_enabled() || filter @@ -700,8 +710,14 @@ async fn load_joined_room( }) .into(); - let (last_notification_read, since_sender_member, witness) = - join3(last_notification_read, since_sender_member, witness).await; + let last_privateread_update = services + .rooms + .read_receipt + .last_privateread_update(sender_user, room_id); + + let (last_privateread_update, last_notification_read, since_sender_member, witness) = + join4(last_privateread_update, last_notification_read, since_sender_member, witness) + .await; let joined_since_last_sync = since_sender_member @@ -801,6 +817,17 @@ async fn load_joined_room( }) .into(); + let private_read_event: OptionFuture<_> = last_privateread_update + .gt(&since) + .then(|| { + services + .rooms + .read_receipt + .private_read_get(room_id, sender_user) + .map(Result::ok) + }) + .into(); + let typing_events = services .rooms .typing @@ -820,39 +847,24 @@ async fn load_joined_room( }) .unwrap_or(Vec::new()); - let unread_notifications = join(notification_count, highlight_count); - let events = join3(room_events, account_data_events, typing_events); - let (unread_notifications, events, device_updates) = - join3(unread_notifications, events, device_updates) - .boxed() - .await; - - let (room_events, account_data_events, typing_events) = events; - let (notification_count, highlight_count) = unread_notifications; + let ( + device_updates, + (notification_count, highlight_count), + (room_events, account_data_events, typing_events, private_read_event), + ) = join3( + device_updates, + join(notification_count, highlight_count), + join4(room_events, account_data_events, typing_events, private_read_event), + ) + .boxed() + .await; device_list_updates.extend(device_updates); - let last_privateread_update = services - .rooms - .read_receipt - .last_privateread_update(sender_user, room_id) - .await > since; - - let private_read_event = if last_privateread_update { - services - .rooms - .read_receipt - .private_read_get(room_id, sender_user) - .await - .ok() - } else { - None - }; - let edus: Vec> = receipt_events .into_values() .chain(typing_events.into_iter()) - .chain(private_read_event.into_iter()) + .chain(private_read_event.flatten().into_iter()) .collect(); // Save the state after this sync so we can send the correct state diff next @@ -875,19 +887,19 @@ async fn load_joined_room( .filter_map(Result::ok) .collect(), }, - unread_notifications: UnreadNotificationsCount { highlight_count, notification_count }, - timeline: Timeline { - limited: limited || joined_since_last_sync, - prev_batch: prev_batch.as_ref().map(ToString::to_string), - events: room_events, - }, state: RoomState { events: state_events .into_iter() .map(Event::into_format) .collect(), }, + timeline: Timeline { + limited: limited || joined_since_last_sync, + prev_batch: prev_batch.as_ref().map(ToString::to_string), + events: room_events, + }, ephemeral: Ephemeral { events: edus }, + unread_notifications: UnreadNotificationsCount { highlight_count, notification_count }, unread_thread_notifications: BTreeMap::new(), };