sliding-sync: Stop propagation from room handlers.

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk
2025-10-30 16:27:22 +00:00
parent ce1ac277a6
commit d24986edf1

View File

@@ -22,14 +22,12 @@ use ruma::{
},
};
use tuwunel_core::{
Result, at, err, is_equal_to,
Result, at, err, error, is_equal_to,
matrix::{Event, StateKey, pdu::PduCount},
ref_at,
utils::{
BoolExt, IterStream, ReadyExt, TryFutureExtExt,
math::usize_from_ruma,
result::FlatOk,
stream::{BroadbandExt, TryBroadbandExt, TryReadyExt},
BoolExt, IterStream, ReadyExt, TryFutureExtExt, math::usize_from_ruma, result::FlatOk,
stream::BroadbandExt,
},
};
use tuwunel_service::{Services, sync::Room};
@@ -56,15 +54,16 @@ pub(super) async fn handle(
) -> Result<BTreeMap<OwnedRoomId, response::Room>> {
window
.iter()
.try_stream()
.broad_and_then(async |(room_id, room)| {
.stream()
.broad_filter_map(async |(room_id, room)| {
handle_room(sync_info, conn, room)
.map_ok(|room| (room_id, room))
.map_ok(move |room| (room_id.clone(), room))
.inspect_err(|e| error!(?room_id, "sync handler: {e:?}"))
.await
.ok()
})
.ready_try_filter_map(|(room_id, room)| Ok(room.map(|room| (room_id, room))))
.map_ok(|(room_id, room)| (room_id.to_owned(), room))
.try_collect()
.collect()
.map(Ok)
.await
}
@@ -81,7 +80,7 @@ async fn handle_room(
WindowRoom {
lists, membership, room_id, last_count, ..
}: &WindowRoom,
) -> Result<Option<response::Room>> {
) -> Result<response::Room> {
debug_assert!(
DEFAULT_BUMP_TYPES.is_sorted(),
"DEFAULT_BUMP_TYPES must be sorted for binary search"
@@ -98,7 +97,7 @@ async fn handle_room(
);
if *membership == Some(MembershipState::Leave) {
return Ok(Some(response::Room {
return Ok(response::Room {
initial: roomsince.eq(&0).then_some(true),
lists: lists.clone(),
membership: membership.clone(),
@@ -113,7 +112,7 @@ async fn handle_room(
],
..Default::default()
}));
});
}
let is_invite = *membership == Some(MembershipState::Invite);
@@ -337,7 +336,7 @@ async fn handle_room(
)
.await?;
Ok(Some(response::Room {
Ok(response::Room {
initial: roomsince.eq(&0).then_some(true),
lists: lists.clone(),
membership: membership.clone(),
@@ -355,7 +354,7 @@ async fn handle_room(
joined_count,
invited_count,
unread_notifications: UnreadNotificationsCount { highlight_count, notification_count },
}))
})
}
#[tracing::instrument(name = "heroes", level = "trace", skip_all)]