diff --git a/src/api/client/sync/v5.rs b/src/api/client/sync/v5.rs index d4b17861..8acd43ba 100644 --- a/src/api/client/sync/v5.rs +++ b/src/api/client/sync/v5.rs @@ -28,7 +28,7 @@ use tuwunel_core::{ matrix::{Event, TypeStateKey, pdu::PduCount}, trace, utils::{ - BoolExt, FutureBoolExt, IterStream, ReadyExt, TryFutureExtExt, + BoolExt, FutureBoolExt, IterStream, ReadyExt, TryFutureExtExt, TryReadyExt, future::ReadyEqExt, math::{ruma_from_usize, usize_from_ruma}, }, @@ -181,9 +181,8 @@ pub(crate) async fn sync_events_v5_route( ) .await; - let all_rooms: Vec = all_rooms.map(ToOwned::to_owned).collect(); - let typing = collect_typing_events(services, sender_user, &body, &all_rooms).await?; - response.extensions.typing = typing; + response.extensions.typing = + collect_typing_events(services, sender_user, &body, all_joined_rooms.clone()).await?; fetch_subscriptions(services, sync_info, &known_rooms, &mut todo_rooms).await; @@ -973,42 +972,45 @@ async fn collect_to_device( }) } -async fn collect_typing_events( +async fn collect_typing_events<'a, Rooms>( services: &Services, sender_user: &UserId, body: &sync_events::v5::Request, - all_rooms: &Vec, -) -> Result { + rooms: Rooms, +) -> Result +where + Rooms: Iterator + Send + 'a, +{ + use sync_events::v5::response::Typing; + if !body.extensions.typing.enabled.unwrap_or(false) { - return Ok(sync_events::v5::response::Typing::default()); + return Ok(Typing::default()); } - let mut typing_response = sync_events::v5::response::Typing::default(); + rooms + .stream() + .filter_map(async |room_id| { + services + .rooms + .typing + .typing_users_for_user(room_id, sender_user) + .inspect_err(|e| warn!(%room_id, "Failed to get typing events for room: {e}")) + .await + .ok() + .filter(|users| !users.is_empty()) + .map(|users| Ok((room_id, users))) + }) + .ready_try_fold_default(|mut response: Typing, (room_id, users)| { + response.rooms.insert( + room_id.to_owned(), + Raw::new(&ruma::events::typing::SyncTypingEvent { + content: TypingEventContent::new(users), + })?, + ); - for room_id in all_rooms { - match services - .rooms - .typing - .typing_users_for_user(room_id, sender_user) - .await - { - | Ok(typing_users) => { - if !typing_users.is_empty() { - typing_response.rooms.insert( - room_id.to_owned(), // Already OwnedRoomId - Raw::new(&ruma::events::typing::SyncTypingEvent { - content: TypingEventContent::new(typing_users), - })?, - ); - } - }, - | Err(e) => { - warn!(%room_id, "Failed to get typing events for room: {}", e); - }, - } - } - - Ok(typing_response) + Ok(response) + }) + .await } async fn collect_receipts(_services: &Services) -> sync_events::v5::response::Receipts {