Pipeline private read receipt fetch; cleanup tuples syncv3.

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk
2025-07-08 13:06:51 +00:00
parent 299d3230a1
commit 3c47516c85

View File

@@ -317,16 +317,27 @@ pub(crate) async fn build_sync_events(
.users
.remove_to_device_events(sender_user, sender_device, since);
let rooms = join4(joined_rooms, left_rooms, invited_rooms, knocked_rooms);
let ephemeral = join3(remove_to_device_events, to_device_events, presence_updates);
let top = join5(account_data, ephemeral, device_one_time_keys_count, keys_changed, rooms)
.boxed()
.await;
let (
account_data,
keys_changed,
device_one_time_keys_count,
((), to_device_events, presence_updates),
(
(joined_rooms, mut device_list_updates, left_encrypted_users),
left_rooms,
invited_rooms,
knocked_rooms,
),
) = join5(
account_data,
keys_changed,
device_one_time_keys_count,
join3(remove_to_device_events, to_device_events, presence_updates),
join4(joined_rooms, left_rooms, invited_rooms, knocked_rooms),
)
.boxed()
.await;
let (account_data, ephemeral, device_one_time_keys_count, keys_changed, rooms) = top;
let ((), to_device_events, presence_updates) = ephemeral;
let (joined_rooms, left_rooms, invited_rooms, knocked_rooms) = rooms;
let (joined_rooms, mut device_list_updates, left_encrypted_users) = joined_rooms;
device_list_updates.extend(keys_changed);
// If the user doesn't share an encrypted room with the target anymore, we need
@@ -630,12 +641,11 @@ async fn load_joined_room(
.collect::<HashMap<OwnedUserId, Raw<AnySyncEphemeralRoomEvent>>>()
.map(Ok);
let (current_shortstatehash, since_shortstatehash, timeline, receipt_events) =
let (current_shortstatehash, since_shortstatehash, (timeline_pdus, limited), receipt_events) =
try_join4(current_shortstatehash, since_shortstatehash, timeline, receipt_events)
.boxed()
.await?;
let (timeline_pdus, limited) = timeline;
let initial = since_shortstatehash.is_none();
let lazy_loading_enabled = filter.room.state.lazy_load_options.is_enabled()
|| filter
@@ -700,8 +710,14 @@ async fn load_joined_room(
})
.into();
let (last_notification_read, since_sender_member, witness) =
join3(last_notification_read, since_sender_member, witness).await;
let last_privateread_update = services
.rooms
.read_receipt
.last_privateread_update(sender_user, room_id);
let (last_privateread_update, last_notification_read, since_sender_member, witness) =
join4(last_privateread_update, last_notification_read, since_sender_member, witness)
.await;
let joined_since_last_sync =
since_sender_member
@@ -801,6 +817,17 @@ async fn load_joined_room(
})
.into();
let private_read_event: OptionFuture<_> = last_privateread_update
.gt(&since)
.then(|| {
services
.rooms
.read_receipt
.private_read_get(room_id, sender_user)
.map(Result::ok)
})
.into();
let typing_events = services
.rooms
.typing
@@ -820,39 +847,24 @@ async fn load_joined_room(
})
.unwrap_or(Vec::new());
let unread_notifications = join(notification_count, highlight_count);
let events = join3(room_events, account_data_events, typing_events);
let (unread_notifications, events, device_updates) =
join3(unread_notifications, events, device_updates)
.boxed()
.await;
let (room_events, account_data_events, typing_events) = events;
let (notification_count, highlight_count) = unread_notifications;
let (
device_updates,
(notification_count, highlight_count),
(room_events, account_data_events, typing_events, private_read_event),
) = join3(
device_updates,
join(notification_count, highlight_count),
join4(room_events, account_data_events, typing_events, private_read_event),
)
.boxed()
.await;
device_list_updates.extend(device_updates);
let last_privateread_update = services
.rooms
.read_receipt
.last_privateread_update(sender_user, room_id)
.await > since;
let private_read_event = if last_privateread_update {
services
.rooms
.read_receipt
.private_read_get(room_id, sender_user)
.await
.ok()
} else {
None
};
let edus: Vec<Raw<AnySyncEphemeralRoomEvent>> = receipt_events
.into_values()
.chain(typing_events.into_iter())
.chain(private_read_event.into_iter())
.chain(private_read_event.flatten().into_iter())
.collect();
// Save the state after this sync so we can send the correct state diff next
@@ -875,19 +887,19 @@ async fn load_joined_room(
.filter_map(Result::ok)
.collect(),
},
unread_notifications: UnreadNotificationsCount { highlight_count, notification_count },
timeline: Timeline {
limited: limited || joined_since_last_sync,
prev_batch: prev_batch.as_ref().map(ToString::to_string),
events: room_events,
},
state: RoomState {
events: state_events
.into_iter()
.map(Event::into_format)
.collect(),
},
timeline: Timeline {
limited: limited || joined_since_last_sync,
prev_batch: prev_batch.as_ref().map(ToString::to_string),
events: room_events,
},
ephemeral: Ephemeral { events: edus },
unread_notifications: UnreadNotificationsCount { highlight_count, notification_count },
unread_thread_notifications: BTreeMap::new(),
};