Optimize typing event collection.

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk
2025-07-24 05:34:22 +00:00
parent 9ea8fbc482
commit 299d3230a1

View File

@@ -28,7 +28,7 @@ use tuwunel_core::{
matrix::{Event, TypeStateKey, pdu::PduCount},
trace,
utils::{
BoolExt, FutureBoolExt, IterStream, ReadyExt, TryFutureExtExt,
BoolExt, FutureBoolExt, IterStream, ReadyExt, TryFutureExtExt, TryReadyExt,
future::ReadyEqExt,
math::{ruma_from_usize, usize_from_ruma},
},
@@ -181,9 +181,8 @@ pub(crate) async fn sync_events_v5_route(
)
.await;
let all_rooms: Vec<OwnedRoomId> = all_rooms.map(ToOwned::to_owned).collect();
let typing = collect_typing_events(services, sender_user, &body, &all_rooms).await?;
response.extensions.typing = typing;
response.extensions.typing =
collect_typing_events(services, sender_user, &body, all_joined_rooms.clone()).await?;
fetch_subscriptions(services, sync_info, &known_rooms, &mut todo_rooms).await;
@@ -973,42 +972,45 @@ async fn collect_to_device(
})
}
async fn collect_typing_events(
async fn collect_typing_events<'a, Rooms>(
services: &Services,
sender_user: &UserId,
body: &sync_events::v5::Request,
all_rooms: &Vec<OwnedRoomId>,
) -> Result<sync_events::v5::response::Typing> {
rooms: Rooms,
) -> Result<sync_events::v5::response::Typing>
where
Rooms: Iterator<Item = &'a RoomId> + Send + 'a,
{
use sync_events::v5::response::Typing;
if !body.extensions.typing.enabled.unwrap_or(false) {
return Ok(sync_events::v5::response::Typing::default());
return Ok(Typing::default());
}
let mut typing_response = sync_events::v5::response::Typing::default();
rooms
.stream()
.filter_map(async |room_id| {
services
.rooms
.typing
.typing_users_for_user(room_id, sender_user)
.inspect_err(|e| warn!(%room_id, "Failed to get typing events for room: {e}"))
.await
.ok()
.filter(|users| !users.is_empty())
.map(|users| Ok((room_id, users)))
})
.ready_try_fold_default(|mut response: Typing, (room_id, users)| {
response.rooms.insert(
room_id.to_owned(),
Raw::new(&ruma::events::typing::SyncTypingEvent {
content: TypingEventContent::new(users),
})?,
);
for room_id in all_rooms {
match services
.rooms
.typing
.typing_users_for_user(room_id, sender_user)
.await
{
| Ok(typing_users) => {
if !typing_users.is_empty() {
typing_response.rooms.insert(
room_id.to_owned(), // Already OwnedRoomId
Raw::new(&ruma::events::typing::SyncTypingEvent {
content: TypingEventContent::new(typing_users),
})?,
);
}
},
| Err(e) => {
warn!(%room_id, "Failed to get typing events for room: {}", e);
},
}
}
Ok(typing_response)
Ok(response)
})
.await
}
async fn collect_receipts(_services: &Services) -> sync_events::v5::response::Receipts {