diff --git a/src/api/client/sync/v5/rooms.rs b/src/api/client/sync/v5/rooms.rs index 89f7502c..1e1cf598 100644 --- a/src/api/client/sync/v5/rooms.rs +++ b/src/api/client/sync/v5/rooms.rs @@ -22,14 +22,12 @@ use ruma::{ }, }; use tuwunel_core::{ - Result, at, err, is_equal_to, + Result, at, err, error, is_equal_to, matrix::{Event, StateKey, pdu::PduCount}, ref_at, utils::{ - BoolExt, IterStream, ReadyExt, TryFutureExtExt, - math::usize_from_ruma, - result::FlatOk, - stream::{BroadbandExt, TryBroadbandExt, TryReadyExt}, + BoolExt, IterStream, ReadyExt, TryFutureExtExt, math::usize_from_ruma, result::FlatOk, + stream::BroadbandExt, }, }; use tuwunel_service::{Services, sync::Room}; @@ -56,15 +54,16 @@ pub(super) async fn handle( ) -> Result> { window .iter() - .try_stream() - .broad_and_then(async |(room_id, room)| { + .stream() + .broad_filter_map(async |(room_id, room)| { handle_room(sync_info, conn, room) - .map_ok(|room| (room_id, room)) + .map_ok(move |room| (room_id.clone(), room)) + .inspect_err(|e| error!(?room_id, "sync handler: {e:?}")) .await + .ok() }) - .ready_try_filter_map(|(room_id, room)| Ok(room.map(|room| (room_id, room)))) - .map_ok(|(room_id, room)| (room_id.to_owned(), room)) - .try_collect() + .collect() + .map(Ok) .await } @@ -81,7 +80,7 @@ async fn handle_room( WindowRoom { lists, membership, room_id, last_count, .. }: &WindowRoom, -) -> Result> { +) -> Result { debug_assert!( DEFAULT_BUMP_TYPES.is_sorted(), "DEFAULT_BUMP_TYPES must be sorted for binary search" @@ -98,7 +97,7 @@ async fn handle_room( ); if *membership == Some(MembershipState::Leave) { - return Ok(Some(response::Room { + return Ok(response::Room { initial: roomsince.eq(&0).then_some(true), lists: lists.clone(), membership: membership.clone(), @@ -113,7 +112,7 @@ async fn handle_room( ], ..Default::default() - })); + }); } let is_invite = *membership == Some(MembershipState::Invite); @@ -337,7 +336,7 @@ async fn handle_room( ) .await?; - Ok(Some(response::Room { + Ok(response::Room { initial: roomsince.eq(&0).then_some(true), lists: lists.clone(), membership: membership.clone(), @@ -355,7 +354,7 @@ async fn handle_room( joined_count, invited_count, unread_notifications: UnreadNotificationsCount { highlight_count, notification_count }, - })) + }) } #[tracing::instrument(name = "heroes", level = "trace", skip_all)]