diff --git a/src/api/client/sync/v5.rs b/src/api/client/sync/v5.rs index bda97c47..723a16ec 100644 --- a/src/api/client/sync/v5.rs +++ b/src/api/client/sync/v5.rs @@ -1,50 +1,35 @@ -mod account_data; -mod e2ee; +mod extensions; mod filter; -mod receipts; -mod room; +mod rooms; mod selector; -mod to_device; -mod typing; use std::{collections::BTreeMap, fmt::Debug, time::Duration}; use axum::extract::State; use futures::{ - FutureExt, TryFutureExt, TryStreamExt, - future::{OptionFuture, join, join5, try_join}, + FutureExt, TryFutureExt, + future::{join, try_join}, }; use ruma::{ - DeviceId, OwnedRoomId, RoomId, UserId, - api::client::sync::sync_events::v5::{ - ListId, Request, Response, request::ExtensionRoomConfig, response, - }, + DeviceId, OwnedRoomId, UserId, + api::client::sync::sync_events::v5::{ListId, Request, Response, response}, events::room::member::MembershipState, }; use tokio::time::{Instant, timeout_at}; use tuwunel_core::{ - Err, Result, apply, at, debug, + Err, Result, debug, debug::INFO_SPAN_LEVEL, debug_warn, error::inspect_log, - extract_variant, smallvec::SmallVec, trace, - utils::{ - BoolExt, IterStream, TryFutureExtExt, - result::FlatOk, - stream::{TryBroadbandExt, TryReadyExt}, - }, + utils::{TryFutureExtExt, result::FlatOk}, }; use tuwunel_service::{ Services, sync::{Connection, into_connection_key}, }; -use self::{ - filter::{filter_room, filter_room_meta}, - selector::selector, -}; use super::share_encrypted_room; use crate::Ruma; @@ -185,12 +170,15 @@ pub(crate) async fn sync_events_v5_route( ); conn.next_batch = services.globals.wait_pending().await?; - (window, response.lists) = selector(&mut conn, sync_info).boxed().await; + (window, response.lists) = selector::selector(&mut conn, sync_info) + .boxed() + .await; + if conn.globalsince < conn.next_batch { - let rooms = handle_rooms(sync_info, &conn, &window) + let rooms = rooms::handle(sync_info, &conn, &window) .map_ok(|response_rooms| response.rooms = response_rooms); - let extensions = handle_extensions(sync_info, &conn, &window) + let extensions = extensions::handle(sync_info, &conn, &window) .map_ok(|response_extensions| response.extensions = response_extensions); try_join(rooms, extensions).boxed().await?; @@ -233,161 +221,3 @@ pub(crate) async fn sync_events_v5_route( fn is_empty_response(response: &Response) -> bool { response.extensions.is_empty() && response.rooms.is_empty() } - -#[tracing::instrument( - name = "rooms", - level = "debug", - skip_all, - fields( - next_batch = conn.next_batch, - window = window.len(), - ) -)] -async fn handle_rooms( - sync_info: SyncInfo<'_>, - conn: &Connection, - window: &Window, -) -> Result> { - window - .iter() - .try_stream() - .broad_and_then(async |(room_id, room)| { - room::handle(sync_info, conn, room) - .map_ok(|room| (room_id, room)) - .await - }) - .ready_try_filter_map(|(room_id, room)| Ok(room.map(|room| (room_id, room)))) - .map_ok(|(room_id, room)| (room_id.to_owned(), room)) - .try_collect() - .await -} - -#[tracing::instrument( - name = "extensions", - level = "debug", - skip_all, - fields( - next_batch = conn.next_batch, - window = window.len(), - rooms = conn.rooms.len(), - subs = conn.subscriptions.len(), - ) -)] -async fn handle_extensions( - sync_info: SyncInfo<'_>, - conn: &Connection, - window: &Window, -) -> Result { - let SyncInfo { .. } = sync_info; - - let account_data: OptionFuture<_> = conn - .extensions - .account_data - .enabled - .unwrap_or(false) - .then(|| account_data::collect(sync_info, conn, window)) - .into(); - - let receipts: OptionFuture<_> = conn - .extensions - .receipts - .enabled - .unwrap_or(false) - .then(|| receipts::collect(sync_info, conn, window)) - .into(); - - let typing: OptionFuture<_> = conn - .extensions - .typing - .enabled - .unwrap_or(false) - .then(|| typing::collect(sync_info, conn, window)) - .into(); - - let to_device: OptionFuture<_> = conn - .extensions - .to_device - .enabled - .unwrap_or(false) - .then(|| to_device::collect(sync_info, conn)) - .into(); - - let e2ee: OptionFuture<_> = conn - .extensions - .e2ee - .enabled - .unwrap_or(false) - .then(|| e2ee::collect(sync_info, conn)) - .into(); - - let (account_data, receipts, typing, to_device, e2ee) = - join5(account_data, receipts, typing, to_device, e2ee) - .map(apply!(5, |t: Option<_>| t.unwrap_or(Ok(Default::default())))) - .await; - - Ok(response::Extensions { - account_data: account_data?, - receipts: receipts?, - typing: typing?, - to_device: to_device?, - e2ee: e2ee?, - }) -} - -#[tracing::instrument( - name = "selector", - level = "trace", - skip_all, - fields(?implicit, ?explicit), -)] -fn extension_rooms_selector<'a, ListIter, SubsIter>( - SyncInfo { .. }: SyncInfo<'a>, - conn: &'a Connection, - window: &'a Window, - implicit: Option, - explicit: Option, -) -> impl Iterator + Send + Sync + 'a -where - ListIter: Iterator + Clone + Debug + Send + Sync + 'a, - SubsIter: Iterator + Clone + Debug + Send + Sync + 'a, -{ - let has_all_subscribed = explicit - .clone() - .into_iter() - .flatten() - .any(|erc| matches!(erc, ExtensionRoomConfig::AllSubscribed)); - - let all_subscribed = has_all_subscribed - .then(|| conn.subscriptions.keys()) - .into_iter() - .flatten() - .map(AsRef::as_ref); - - let rooms_explicit = has_all_subscribed - .is_false() - .then(move || { - explicit - .into_iter() - .flatten() - .filter_map(|erc| extract_variant!(erc, ExtensionRoomConfig::Room)) - .map(AsRef::as_ref) - }) - .into_iter() - .flatten(); - - let rooms_selected = window - .iter() - .filter(move |(_, room)| { - implicit.as_ref().is_none_or(|lists| { - lists - .clone() - .any(|list| room.lists.contains(list)) - }) - }) - .map(at!(0)) - .map(AsRef::as_ref); - - all_subscribed - .chain(rooms_explicit) - .chain(rooms_selected) -} diff --git a/src/api/client/sync/v5/extensions.rs b/src/api/client/sync/v5/extensions.rs new file mode 100644 index 00000000..f3099d20 --- /dev/null +++ b/src/api/client/sync/v5/extensions.rs @@ -0,0 +1,150 @@ +mod account_data; +mod e2ee; +mod receipts; +mod to_device; +mod typing; + +use std::fmt::Debug; + +use futures::{ + FutureExt, + future::{OptionFuture, join5}, +}; +use ruma::{ + RoomId, + api::client::sync::sync_events::v5::{ListId, request::ExtensionRoomConfig, response}, +}; +use tuwunel_core::{Result, apply, at, extract_variant, utils::BoolExt}; +use tuwunel_service::sync::Connection; + +use super::{SyncInfo, Window, share_encrypted_room}; + +#[tracing::instrument( + name = "extensions", + level = "debug", + skip_all, + fields( + next_batch = conn.next_batch, + window = window.len(), + rooms = conn.rooms.len(), + subs = conn.subscriptions.len(), + ) +)] +pub(super) async fn handle( + sync_info: SyncInfo<'_>, + conn: &Connection, + window: &Window, +) -> Result { + let SyncInfo { .. } = sync_info; + + let account_data: OptionFuture<_> = conn + .extensions + .account_data + .enabled + .unwrap_or(false) + .then(|| account_data::collect(sync_info, conn, window)) + .into(); + + let receipts: OptionFuture<_> = conn + .extensions + .receipts + .enabled + .unwrap_or(false) + .then(|| receipts::collect(sync_info, conn, window)) + .into(); + + let typing: OptionFuture<_> = conn + .extensions + .typing + .enabled + .unwrap_or(false) + .then(|| typing::collect(sync_info, conn, window)) + .into(); + + let to_device: OptionFuture<_> = conn + .extensions + .to_device + .enabled + .unwrap_or(false) + .then(|| to_device::collect(sync_info, conn)) + .into(); + + let e2ee: OptionFuture<_> = conn + .extensions + .e2ee + .enabled + .unwrap_or(false) + .then(|| e2ee::collect(sync_info, conn)) + .into(); + + let (account_data, receipts, typing, to_device, e2ee) = + join5(account_data, receipts, typing, to_device, e2ee) + .map(apply!(5, |t: Option<_>| t.unwrap_or(Ok(Default::default())))) + .await; + + Ok(response::Extensions { + account_data: account_data?, + receipts: receipts?, + typing: typing?, + to_device: to_device?, + e2ee: e2ee?, + }) +} + +#[tracing::instrument( + name = "selector", + level = "trace", + skip_all, + fields(?implicit, ?explicit), +)] +fn selector<'a, ListIter, SubsIter>( + SyncInfo { .. }: SyncInfo<'a>, + conn: &'a Connection, + window: &'a Window, + implicit: Option, + explicit: Option, +) -> impl Iterator + Send + Sync + 'a +where + ListIter: Iterator + Clone + Debug + Send + Sync + 'a, + SubsIter: Iterator + Clone + Debug + Send + Sync + 'a, +{ + let has_all_subscribed = explicit + .clone() + .into_iter() + .flatten() + .any(|erc| matches!(erc, ExtensionRoomConfig::AllSubscribed)); + + let all_subscribed = has_all_subscribed + .then(|| conn.subscriptions.keys()) + .into_iter() + .flatten() + .map(AsRef::as_ref); + + let rooms_explicit = has_all_subscribed + .is_false() + .then(move || { + explicit + .into_iter() + .flatten() + .filter_map(|erc| extract_variant!(erc, ExtensionRoomConfig::Room)) + .map(AsRef::as_ref) + }) + .into_iter() + .flatten(); + + let rooms_selected = window + .iter() + .filter(move |(_, room)| { + implicit.as_ref().is_none_or(|lists| { + lists + .clone() + .any(|list| room.lists.contains(list)) + }) + }) + .map(at!(0)) + .map(AsRef::as_ref); + + all_subscribed + .chain(rooms_explicit) + .chain(rooms_selected) +} diff --git a/src/api/client/sync/v5/account_data.rs b/src/api/client/sync/v5/extensions/account_data.rs similarity index 90% rename from src/api/client/sync/v5/account_data.rs rename to src/api/client/sync/v5/extensions/account_data.rs index 5f9a4936..17796d36 100644 --- a/src/api/client/sync/v5/account_data.rs +++ b/src/api/client/sync/v5/extensions/account_data.rs @@ -6,7 +6,7 @@ use tuwunel_core::{ }; use tuwunel_service::sync::Room; -use super::{Connection, SyncInfo, Window, extension_rooms_selector}; +use super::{Connection, SyncInfo, Window, selector}; #[tracing::instrument(name = "account_data", level = "trace", skip_all)] pub(super) async fn collect( @@ -30,7 +30,7 @@ pub(super) async fn collect( .as_deref() .map(<[_]>::iter); - let rooms = extension_rooms_selector(sync_info, conn, window, implicit, explicit) + let rooms = selector(sync_info, conn, window, implicit, explicit) .stream() .broad_filter_map(async |room_id| { let &Room { roomsince, .. } = conn.rooms.get(room_id)?; diff --git a/src/api/client/sync/v5/e2ee.rs b/src/api/client/sync/v5/extensions/e2ee.rs similarity index 100% rename from src/api/client/sync/v5/e2ee.rs rename to src/api/client/sync/v5/extensions/e2ee.rs diff --git a/src/api/client/sync/v5/receipts.rs b/src/api/client/sync/v5/extensions/receipts.rs similarity index 93% rename from src/api/client/sync/v5/receipts.rs rename to src/api/client/sync/v5/extensions/receipts.rs index 51c1162b..e0c37767 100644 --- a/src/api/client/sync/v5/receipts.rs +++ b/src/api/client/sync/v5/extensions/receipts.rs @@ -11,7 +11,7 @@ use tuwunel_core::{ }; use tuwunel_service::{rooms::read_receipt::pack_receipts, sync::Room}; -use super::{Connection, SyncInfo, Window, extension_rooms_selector}; +use super::{Connection, SyncInfo, Window, selector}; #[tracing::instrument(name = "receipts", level = "trace", skip_all)] pub(super) async fn collect( @@ -35,7 +35,7 @@ pub(super) async fn collect( .as_deref() .map(<[_]>::iter); - let rooms = extension_rooms_selector(sync_info, conn, window, implicit, explicit) + let rooms = selector(sync_info, conn, window, implicit, explicit) .stream() .broad_filter_map(|room_id| collect_room(sync_info, conn, window, room_id)) .collect() diff --git a/src/api/client/sync/v5/to_device.rs b/src/api/client/sync/v5/extensions/to_device.rs similarity index 100% rename from src/api/client/sync/v5/to_device.rs rename to src/api/client/sync/v5/extensions/to_device.rs diff --git a/src/api/client/sync/v5/typing.rs b/src/api/client/sync/v5/extensions/typing.rs similarity index 90% rename from src/api/client/sync/v5/typing.rs rename to src/api/client/sync/v5/extensions/typing.rs index 47d55463..6fc6c5eb 100644 --- a/src/api/client/sync/v5/typing.rs +++ b/src/api/client/sync/v5/extensions/typing.rs @@ -11,7 +11,7 @@ use tuwunel_core::{ utils::{IterStream, ReadyExt}, }; -use super::{Connection, SyncInfo, Window, extension_rooms_selector}; +use super::{Connection, SyncInfo, Window, selector}; #[tracing::instrument(name = "typing", level = "trace", skip_all, ret)] pub(super) async fn collect( @@ -37,7 +37,7 @@ pub(super) async fn collect( .as_deref() .map(<[_]>::iter); - extension_rooms_selector(sync_info, conn, window, implicit, explicit) + selector(sync_info, conn, window, implicit, explicit) .stream() .filter_map(async |room_id| { services diff --git a/src/api/client/sync/v5/room.rs b/src/api/client/sync/v5/rooms.rs similarity index 90% rename from src/api/client/sync/v5/room.rs rename to src/api/client/sync/v5/rooms.rs index 212031ad..aaa761fb 100644 --- a/src/api/client/sync/v5/room.rs +++ b/src/api/client/sync/v5/rooms.rs @@ -1,11 +1,14 @@ -use std::{cmp::Ordering, collections::HashSet}; +use std::{ + cmp::Ordering, + collections::{BTreeMap, HashSet}, +}; use futures::{ FutureExt, StreamExt, TryFutureExt, TryStreamExt, future::{OptionFuture, join, join3, join4}, }; use ruma::{ - JsOption, MxcUri, OwnedMxcUri, RoomId, UserId, + JsOption, MxcUri, OwnedMxcUri, OwnedRoomId, RoomId, UserId, api::client::sync::sync_events::{ UnreadNotificationsCount, v5::{DisplayName, response, response::Heroes}, @@ -23,18 +26,48 @@ use tuwunel_core::{ matrix::{Event, StateKey, pdu::PduCount}, ref_at, utils::{ - BoolExt, IterStream, ReadyExt, TryFutureExtExt, math::usize_from_ruma, result::FlatOk, - stream::BroadbandExt, + BoolExt, IterStream, ReadyExt, TryFutureExtExt, + math::usize_from_ruma, + result::FlatOk, + stream::{BroadbandExt, TryBroadbandExt, TryReadyExt}, }, }; use tuwunel_service::{Services, sync::Room}; -use super::{super::load_timeline, Connection, SyncInfo, WindowRoom}; +use super::{super::load_timeline, Connection, SyncInfo, Window, WindowRoom}; use crate::client::ignored_filter; static DEFAULT_BUMP_TYPES: [TimelineEventType; 6] = [CallInvite, PollStart, Beacon, RoomEncrypted, RoomMessage, Sticker]; +#[tracing::instrument( + name = "rooms", + level = "debug", + skip_all, + fields( + next_batch = conn.next_batch, + window = window.len(), + ) +)] +pub(super) async fn handle( + sync_info: SyncInfo<'_>, + conn: &Connection, + window: &Window, +) -> Result> { + window + .iter() + .try_stream() + .broad_and_then(async |(room_id, room)| { + handle_room(sync_info, conn, room) + .map_ok(|room| (room_id, room)) + .await + }) + .ready_try_filter_map(|(room_id, room)| Ok(room.map(|room| (room_id, room)))) + .map_ok(|(room_id, room)| (room_id.to_owned(), room)) + .try_collect() + .await +} + #[tracing::instrument( name = "room", level = "debug", @@ -42,7 +75,7 @@ static DEFAULT_BUMP_TYPES: [TimelineEventType; 6] = fields(room_id, roomsince) )] #[allow(clippy::too_many_arguments)] -pub(super) async fn handle( +async fn handle_room( SyncInfo { services, sender_user, .. }: SyncInfo<'_>, conn: &Connection, WindowRoom { diff --git a/src/api/client/sync/v5/selector.rs b/src/api/client/sync/v5/selector.rs index 59151a22..e8bf4148 100644 --- a/src/api/client/sync/v5/selector.rs +++ b/src/api/client/sync/v5/selector.rs @@ -10,8 +10,7 @@ use tuwunel_core::{ matrix::PduCount, trace, utils::{ - BoolExt, - future::TryExtExt, + BoolExt, TryFutureExtExt, math::usize_from_ruma, stream::{BroadbandExt, IterStream}, }, @@ -19,7 +18,8 @@ use tuwunel_core::{ use tuwunel_service::sync::Connection; use super::{ - ListIds, ResponseLists, SyncInfo, Window, WindowRoom, filter_room, filter_room_meta, + ListIds, ResponseLists, SyncInfo, Window, WindowRoom, + filter::{filter_room, filter_room_meta}, }; #[tracing::instrument(level = "debug", skip_all)] @@ -35,9 +35,7 @@ pub(super) async fn selector( .state_cache .user_memberships(sender_user, Some(&[Join, Invite, Knock])) .map(|(membership, room_id)| (room_id.to_owned(), Some(membership))) - .broad_filter_map(|(room_id, membership)| { - match_lists_for_room(sync_info, conn, room_id, membership) - }) + .broad_filter_map(|(room_id, membership)| matcher(sync_info, conn, room_id, membership)) .collect::>() .await; @@ -57,98 +55,19 @@ pub(super) async fn selector( let lists = response_lists(rooms.iter()); trace!(?lists); - let window = select_window(sync_info, conn, rooms.iter(), &lists).await; + let window = window(sync_info, conn, rooms.iter(), &lists).await; trace!(?window); (window, lists) } -#[tracing::instrument( - name = "window", - level = "debug", - skip_all, - fields(rooms = rooms.clone().count()) -)] -async fn select_window<'a, Rooms>( - sync_info: SyncInfo<'_>, - conn: &Connection, - rooms: Rooms, - lists: &ResponseLists, -) -> Window -where - Rooms: Iterator + Clone + Send + Sync, -{ - static FULL_RANGE: (UInt, UInt) = (UInt::MIN, UInt::MAX); - - let SyncInfo { services, sender_user, .. } = sync_info; - - let selections = lists - .keys() - .cloned() - .filter_map(|id| conn.lists.get(&id).map(|list| (id, list))) - .flat_map(|(id, list)| { - let full_range = list - .ranges - .is_empty() - .then_some(&FULL_RANGE) - .into_iter(); - - list.ranges - .iter() - .chain(full_range) - .map(apply!(2, usize_from_ruma)) - .map(move |range| (id.clone(), range)) - }) - .flat_map(|(id, (start, end))| { - rooms - .clone() - .filter(move |&room| room.lists.contains(&id)) - .filter(|&room| { - conn.rooms - .get(&room.room_id) - .is_some_and(|conn_room| { - conn_room.roomsince == 0 || room.last_count > conn_room.roomsince - }) - }) - .enumerate() - .skip_while(move |&(i, _)| i < start) - .take(end.saturating_add(1).saturating_sub(start)) - .map(|(_, room)| (room.room_id.clone(), room.clone())) - }) - .stream(); - - let subscriptions = conn - .subscriptions - .iter() - .stream() - .broad_filter_map(async |(room_id, _)| { - filter_room_meta(sync_info, room_id) - .await - .into_option()?; - - Some(WindowRoom { - room_id: room_id.clone(), - lists: Default::default(), - ranked: usize::MAX, - last_count: 0, - membership: services - .state_cache - .user_membership(sender_user, room_id) - .await, - }) - }) - .map(|room| (room.room_id.clone(), room)); - - subscriptions.chain(selections).collect().await -} - #[tracing::instrument( name = "matcher", level = "trace", skip_all, fields(?room_id, ?membership) )] -async fn match_lists_for_room( +async fn matcher( sync_info: SyncInfo<'_>, conn: &Connection, room_id: OwnedRoomId, @@ -246,6 +165,84 @@ async fn match_lists_for_room( }) } +#[tracing::instrument( + level = "debug", + skip_all, + fields(rooms = rooms.clone().count()) +)] +async fn window<'a, Rooms>( + sync_info: SyncInfo<'_>, + conn: &Connection, + rooms: Rooms, + lists: &ResponseLists, +) -> Window +where + Rooms: Iterator + Clone + Send + Sync, +{ + static FULL_RANGE: (UInt, UInt) = (UInt::MIN, UInt::MAX); + + let SyncInfo { services, sender_user, .. } = sync_info; + + let selections = lists + .keys() + .cloned() + .filter_map(|id| conn.lists.get(&id).map(|list| (id, list))) + .flat_map(|(id, list)| { + let full_range = list + .ranges + .is_empty() + .then_some(&FULL_RANGE) + .into_iter(); + + list.ranges + .iter() + .chain(full_range) + .map(apply!(2, usize_from_ruma)) + .map(move |range| (id.clone(), range)) + }) + .flat_map(|(id, (start, end))| { + rooms + .clone() + .filter(move |&room| room.lists.contains(&id)) + .filter(|&room| { + conn.rooms + .get(&room.room_id) + .is_some_and(|conn_room| { + conn_room.roomsince == 0 || room.last_count > conn_room.roomsince + }) + }) + .enumerate() + .skip_while(move |&(i, _)| i < start) + .take(end.saturating_add(1).saturating_sub(start)) + .map(|(_, room)| (room.room_id.clone(), room.clone())) + }) + .stream(); + + let subscriptions = conn + .subscriptions + .iter() + .stream() + .broad_filter_map(async |(room_id, _)| { + filter_room_meta(sync_info, room_id) + .await + .into_option()?; + + Some(WindowRoom { + room_id: room_id.clone(), + lists: Default::default(), + ranked: usize::MAX, + last_count: 0, + membership: services + .state_cache + .user_membership(sender_user, room_id) + .await, + }) + }) + .map(|room| (room.room_id.clone(), room)); + + subscriptions.chain(selections).collect().await +} + fn response_lists<'a, Rooms>(rooms: Rooms) -> ResponseLists where Rooms: Iterator,