diff --git a/Cargo.lock b/Cargo.lock index a9d4a959..bb77c544 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3509,7 +3509,7 @@ dependencies = [ [[package]] name = "ruma" version = "0.13.0" -source = "git+https://github.com/matrix-construct/ruma?rev=07fee6a5db8c1523399a2463fd8a1c50a04da759#07fee6a5db8c1523399a2463fd8a1c50a04da759" +source = "git+https://github.com/matrix-construct/ruma?rev=e03efd9d228be89365ae0399e2178d0ffa3dc42f#e03efd9d228be89365ae0399e2178d0ffa3dc42f" dependencies = [ "assign", "js_int", @@ -3528,7 +3528,7 @@ dependencies = [ [[package]] name = "ruma-appservice-api" version = "0.13.0" -source = "git+https://github.com/matrix-construct/ruma?rev=07fee6a5db8c1523399a2463fd8a1c50a04da759#07fee6a5db8c1523399a2463fd8a1c50a04da759" +source = "git+https://github.com/matrix-construct/ruma?rev=e03efd9d228be89365ae0399e2178d0ffa3dc42f#e03efd9d228be89365ae0399e2178d0ffa3dc42f" dependencies = [ "js_int", "ruma-common", @@ -3540,7 +3540,7 @@ dependencies = [ [[package]] name = "ruma-client-api" version = "0.21.0" -source = "git+https://github.com/matrix-construct/ruma?rev=07fee6a5db8c1523399a2463fd8a1c50a04da759#07fee6a5db8c1523399a2463fd8a1c50a04da759" +source = "git+https://github.com/matrix-construct/ruma?rev=e03efd9d228be89365ae0399e2178d0ffa3dc42f#e03efd9d228be89365ae0399e2178d0ffa3dc42f" dependencies = [ "as_variant", "assign", @@ -3555,6 +3555,7 @@ dependencies = [ "serde", "serde_html_form", "serde_json", + "smallstr", "thiserror 2.0.16", "url", "web-time", @@ -3563,7 +3564,7 @@ dependencies = [ [[package]] name = "ruma-common" version = "0.16.0" -source = "git+https://github.com/matrix-construct/ruma?rev=07fee6a5db8c1523399a2463fd8a1c50a04da759#07fee6a5db8c1523399a2463fd8a1c50a04da759" +source = "git+https://github.com/matrix-construct/ruma?rev=e03efd9d228be89365ae0399e2178d0ffa3dc42f#e03efd9d228be89365ae0399e2178d0ffa3dc42f" dependencies = [ "as_variant", "base64", @@ -3596,7 +3597,7 @@ dependencies = [ [[package]] name = "ruma-events" version = "0.31.0" -source = "git+https://github.com/matrix-construct/ruma?rev=07fee6a5db8c1523399a2463fd8a1c50a04da759#07fee6a5db8c1523399a2463fd8a1c50a04da759" +source = "git+https://github.com/matrix-construct/ruma?rev=e03efd9d228be89365ae0399e2178d0ffa3dc42f#e03efd9d228be89365ae0399e2178d0ffa3dc42f" dependencies = [ "as_variant", "indexmap", @@ -3622,7 +3623,7 @@ dependencies = [ [[package]] name = "ruma-federation-api" version = "0.12.0" -source = "git+https://github.com/matrix-construct/ruma?rev=07fee6a5db8c1523399a2463fd8a1c50a04da759#07fee6a5db8c1523399a2463fd8a1c50a04da759" +source = "git+https://github.com/matrix-construct/ruma?rev=e03efd9d228be89365ae0399e2178d0ffa3dc42f#e03efd9d228be89365ae0399e2178d0ffa3dc42f" dependencies = [ "bytes", "headers", @@ -3644,7 +3645,7 @@ dependencies = [ [[package]] name = "ruma-identifiers-validation" version = "0.11.0" -source = "git+https://github.com/matrix-construct/ruma?rev=07fee6a5db8c1523399a2463fd8a1c50a04da759#07fee6a5db8c1523399a2463fd8a1c50a04da759" +source = "git+https://github.com/matrix-construct/ruma?rev=e03efd9d228be89365ae0399e2178d0ffa3dc42f#e03efd9d228be89365ae0399e2178d0ffa3dc42f" dependencies = [ "js_int", "thiserror 2.0.16", @@ -3653,7 +3654,7 @@ dependencies = [ [[package]] name = "ruma-macros" version = "0.16.0" -source = "git+https://github.com/matrix-construct/ruma?rev=07fee6a5db8c1523399a2463fd8a1c50a04da759#07fee6a5db8c1523399a2463fd8a1c50a04da759" +source = "git+https://github.com/matrix-construct/ruma?rev=e03efd9d228be89365ae0399e2178d0ffa3dc42f#e03efd9d228be89365ae0399e2178d0ffa3dc42f" dependencies = [ "cfg-if", "proc-macro-crate", @@ -3668,7 +3669,7 @@ dependencies = [ [[package]] name = "ruma-push-gateway-api" version = "0.12.0" -source = "git+https://github.com/matrix-construct/ruma?rev=07fee6a5db8c1523399a2463fd8a1c50a04da759#07fee6a5db8c1523399a2463fd8a1c50a04da759" +source = "git+https://github.com/matrix-construct/ruma?rev=e03efd9d228be89365ae0399e2178d0ffa3dc42f#e03efd9d228be89365ae0399e2178d0ffa3dc42f" dependencies = [ "js_int", "ruma-common", @@ -3680,7 +3681,7 @@ dependencies = [ [[package]] name = "ruma-signatures" version = "0.18.0" -source = "git+https://github.com/matrix-construct/ruma?rev=07fee6a5db8c1523399a2463fd8a1c50a04da759#07fee6a5db8c1523399a2463fd8a1c50a04da759" +source = "git+https://github.com/matrix-construct/ruma?rev=e03efd9d228be89365ae0399e2178d0ffa3dc42f#e03efd9d228be89365ae0399e2178d0ffa3dc42f" dependencies = [ "base64", "ed25519-dalek", diff --git a/Cargo.toml b/Cargo.toml index 67951a10..864177b5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -320,7 +320,7 @@ default-features = false [workspace.dependencies.ruma] git = "https://github.com/matrix-construct/ruma" -rev = "07fee6a5db8c1523399a2463fd8a1c50a04da759" +rev = "e03efd9d228be89365ae0399e2178d0ffa3dc42f" features = [ "__compat", "appservice-api-c", diff --git a/src/api/client/sync/v5.rs b/src/api/client/sync/v5.rs index e7cef0b3..14e131e6 100644 --- a/src/api/client/sync/v5.rs +++ b/src/api/client/sync/v5.rs @@ -15,7 +15,7 @@ use std::{ use axum::extract::State; use futures::{ FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt, - future::{OptionFuture, join4, join5, try_join}, + future::{OptionFuture, join, join3, join5, try_join}, pin_mut, }; use ruma::{ @@ -24,6 +24,7 @@ use ruma::{ Request, Response, request::ExtensionRoomConfig, response, }, directory::RoomTypeFilter, + events::room::member::MembershipState, uint, }; use tokio::time::{Instant, timeout_at}; @@ -44,16 +45,29 @@ use tuwunel_core::{ }; use tuwunel_service::{ Services, - sync::{KnownRooms, into_snake_key}, + sync::{KnownRooms, ListId, into_connection_key}, }; use super::share_encrypted_room; use crate::{Ruma, client::DEFAULT_BUMP_TYPES}; -type SyncInfo<'a> = (&'a UserId, &'a DeviceId, u64, &'a Request); +#[derive(Copy, Clone)] +struct SyncInfo<'a> { + sender_user: &'a UserId, + sender_device: &'a DeviceId, + request: &'a Request, + globalsince: u64, +} + +struct TodoRoom { + membership: MembershipState, + requested_state: BTreeSet, + timeline_limit: usize, + roomsince: u64, +} + type TodoRooms = BTreeMap; -type TodoRoom = (BTreeSet, usize, u64); -type ResponseLists = BTreeMap; +type ResponseLists = BTreeMap; /// `POST /_matrix/client/unstable/org.matrix.simplified_msc3575/sync` /// ([MSC4186]) @@ -88,8 +102,9 @@ pub(crate) async fn sync_events_v5_route( .unwrap_or(0); let (sender_user, sender_device) = body.sender(); - let snake_key = into_snake_key(sender_user, sender_device, request.conn_id.as_deref()); - if globalsince != 0 && !services.sync.snake_connection_cached(&snake_key) { + let conn_key = into_connection_key(sender_user, sender_device, request.conn_id.as_deref()); + + if globalsince != 0 && !services.sync.is_connection_cached(&conn_key) { return Err!(Request(UnknownPos( "Connection data unknown to server; restarting sync stream." ))); @@ -97,15 +112,22 @@ pub(crate) async fn sync_events_v5_route( // Client / User requested an initial sync if globalsince == 0 { - services - .sync - .forget_snake_sync_connection(&snake_key); + services.sync.forget_connection(&conn_key); } // Get sticky parameters from cache let known_rooms = services .sync - .update_snake_sync_request_with_cache(&snake_key, &mut request); + .update_cache(&conn_key, &mut request); + + let sync_info = SyncInfo { + sender_user, + sender_device, + globalsince, + request: &request, + }; + + let lists = handle_lists(services, sync_info, known_rooms); let ping_presence = services .presence @@ -113,45 +135,7 @@ pub(crate) async fn sync_events_v5_route( .inspect_err(inspect_log) .ok(); - let all_joined_rooms = services - .state_cache - .rooms_joined(sender_user) - .map(ToOwned::to_owned) - .collect::>(); - - let all_invited_rooms = services - .state_cache - .rooms_invited(sender_user) - .map(ToOwned::to_owned) - .collect::>(); - - let all_knocked_rooms = services - .state_cache - .rooms_knocked(sender_user) - .map(ToOwned::to_owned) - .collect::>(); - - let (all_joined_rooms, all_invited_rooms, all_knocked_rooms, _) = - join4(all_joined_rooms, all_invited_rooms, all_knocked_rooms, ping_presence).await; - - let all_invited_rooms = all_invited_rooms.iter().map(AsRef::as_ref); - let all_knocked_rooms = all_knocked_rooms.iter().map(AsRef::as_ref); - let all_joined_rooms = all_joined_rooms.iter().map(AsRef::as_ref); - let all_rooms = all_joined_rooms - .clone() - .chain(all_invited_rooms.clone()) - .chain(all_knocked_rooms.clone()); - - let sync_info: SyncInfo<'_> = (sender_user, sender_device, globalsince, &request); - let (known_rooms, todo_rooms, lists) = handle_lists( - services, - sync_info, - known_rooms, - all_invited_rooms.clone(), - all_joined_rooms.clone(), - all_rooms.clone(), - ) - .await; + let ((known_rooms, todo_rooms, lists), _) = join(lists, ping_presence).await; let timeout = request .timeout @@ -170,7 +154,7 @@ pub(crate) async fn sync_events_v5_route( let mut response = Response { txn_id: request.txn_id.clone(), lists, - pos: String::new(), + pos: Default::default(), rooms: Default::default(), extensions: Default::default(), }; @@ -181,15 +165,8 @@ pub(crate) async fn sync_events_v5_route( debug_assert!(globalsince <= next_batch, "next_batch is monotonic"); if globalsince < next_batch { - let rooms = handle_rooms( - services, - &sync_info, - next_batch, - &known_rooms, - &todo_rooms, - all_invited_rooms.clone(), - ) - .map_ok(|rooms| response.rooms = rooms); + let rooms = handle_rooms(services, sync_info, next_batch, &todo_rooms) + .map_ok(|rooms| response.rooms = rooms); let extensions = handle_extensions(services, sync_info, next_batch, &known_rooms, &todo_rooms) @@ -199,14 +176,14 @@ pub(crate) async fn sync_events_v5_route( if !is_empty_response(&response) { trace!(globalsince, next_batch, "response {response:?}"); - response.pos = next_batch.to_string(); + response.pos = next_batch.to_string().into(); return Ok(response); } } if timeout_at(stop_at, watchers).await.is_err() { trace!(globalsince, next_batch, "timeout; empty response"); - response.pos = next_batch.to_string(); + response.pos = next_batch.to_string().into(); return Ok(response); } @@ -234,26 +211,50 @@ fn is_empty_response(response: &Response) -> bool { level = "debug", skip_all, fields( - all_invited_rooms = all_invited_rooms.clone().count(), - all_joined_rooms = all_joined_rooms.clone().count(), - all_rooms = all_rooms.clone().count(), known_rooms = known_rooms.len(), ) )] #[allow(clippy::too_many_arguments)] -async fn handle_lists<'a, Rooms, AllRooms>( +async fn handle_lists( services: &Services, sync_info: SyncInfo<'_>, known_rooms: KnownRooms, - all_invited_rooms: Rooms, - all_joined_rooms: Rooms, - all_rooms: AllRooms, -) -> (KnownRooms, TodoRooms, ResponseLists) -where - Rooms: Iterator + Clone + Send + 'a, - AllRooms: Iterator + Clone + Send + 'a, -{ - let &(sender_user, sender_device, globalsince, request) = &sync_info; +) -> (KnownRooms, TodoRooms, ResponseLists) { + let &SyncInfo { + sender_user, + sender_device, + request, + globalsince, + } = &sync_info; + + let all_joined_rooms = services + .state_cache + .rooms_joined(sender_user) + .map(ToOwned::to_owned) + .collect::>(); + + let all_invited_rooms = services + .state_cache + .rooms_invited(sender_user) + .map(ToOwned::to_owned) + .collect::>(); + + let all_knocked_rooms = services + .state_cache + .rooms_knocked(sender_user) + .map(ToOwned::to_owned) + .collect::>(); + + let (all_joined_rooms, all_invited_rooms, all_knocked_rooms) = + join3(all_joined_rooms, all_invited_rooms, all_knocked_rooms).await; + + let all_invited_rooms = all_invited_rooms.iter().map(AsRef::as_ref); + let all_knocked_rooms = all_knocked_rooms.iter().map(AsRef::as_ref); + let all_joined_rooms = all_joined_rooms.iter().map(AsRef::as_ref); + let all_rooms = all_joined_rooms + .clone() + .chain(all_invited_rooms.clone()) + .chain(all_knocked_rooms.clone()); let mut todo_rooms: TodoRooms = BTreeMap::new(); let mut response_lists = ResponseLists::new(); @@ -298,13 +299,25 @@ where new_known_rooms.extend(new_rooms); for room_id in room_ids { - let todo_room = todo_rooms.entry(room_id.to_owned()).or_insert(( - BTreeSet::new(), - 0_usize, - u64::MAX, - )); + let todo_room = todo_rooms + .entry(room_id.to_owned()) + .or_insert(TodoRoom { + membership: MembershipState::Join, + requested_state: BTreeSet::new(), + timeline_limit: 0_usize, + roomsince: u64::MAX, + }); - todo_room.0.extend( + todo_room.membership = if all_invited_rooms + .clone() + .any(is_equal_to!(room_id)) + { + MembershipState::Invite + } else { + MembershipState::Join + }; + + todo_room.requested_state.extend( list.room_details .required_state .iter() @@ -312,10 +325,10 @@ where ); let limit: usize = usize_from_ruma(list.room_details.timeline_limit).min(100); - todo_room.1 = todo_room.1.max(limit); + todo_room.timeline_limit = todo_room.timeline_limit.max(limit); // 0 means unknown because it got out of date - todo_room.2 = todo_room.2.min( + todo_room.roomsince = todo_room.roomsince.min( known_rooms .get(list_id.as_str()) .and_then(|k| k.get(room_id)) @@ -326,14 +339,11 @@ where } if let Some(conn_id) = request.conn_id.as_deref() { - let snake_key = into_snake_key(sender_user, sender_device, conn_id.into()); + let conn_key = into_connection_key(sender_user, sender_device, conn_id.into()); let list_id = list_id.as_str().into(); - services.sync.update_snake_sync_known_rooms( - &snake_key, - list_id, - new_known_rooms, - globalsince, - ); + services + .sync + .update_known_rooms(&conn_key, list_id, new_known_rooms, globalsince); } response_lists.insert(list_id.clone(), response::List { @@ -358,7 +368,12 @@ where )] async fn fetch_subscriptions( services: &Services, - (sender_user, sender_device, globalsince, request): SyncInfo<'_>, + SyncInfo { + sender_user, + sender_device, + globalsince, + request, + }: SyncInfo<'_>, known_rooms: KnownRooms, todo_rooms: TodoRooms, ) -> (KnownRooms, TodoRooms) { @@ -381,22 +396,28 @@ async fn fetch_subscriptions( .then_some((room_id, room)) }) .ready_fold(subs, |(mut todo_rooms, mut known_subs), (room_id, room)| { - let todo_room = - todo_rooms - .entry(room_id.clone()) - .or_insert((BTreeSet::new(), 0_usize, u64::MAX)); + let todo_room = todo_rooms + .entry(room_id.clone()) + .or_insert(TodoRoom { + membership: MembershipState::Join, + requested_state: BTreeSet::new(), + timeline_limit: 0_usize, + roomsince: u64::MAX, + }); - todo_room.0.extend( + todo_room.requested_state.extend( room.required_state .iter() .map(|(ty, sk)| (ty.clone(), sk.as_str().into())), ); let limit: UInt = room.timeline_limit; - todo_room.1 = todo_room.1.max(usize_from_ruma(limit)); + todo_room.timeline_limit = todo_room + .timeline_limit + .max(usize_from_ruma(limit)); // 0 means unknown because it got out of date - todo_room.2 = todo_room.2.min( + todo_room.roomsince = todo_room.roomsince.min( known_rooms .get("subscriptions") .and_then(|k| k.get(room_id)) @@ -410,11 +431,11 @@ async fn fetch_subscriptions( .await; if let Some(conn_id) = request.conn_id.as_deref() { - let snake_key = into_snake_key(sender_user, sender_device, conn_id.into()); + let conn_key = into_connection_key(sender_user, sender_device, conn_id.into()); let list_id = "subscriptions".into(); services .sync - .update_snake_sync_known_rooms(&snake_key, list_id, known_subs, globalsince); + .update_known_rooms(&conn_key, list_id, known_subs, globalsince); } (known_rooms, todo_rooms) @@ -461,32 +482,20 @@ where skip_all, fields( next_batch, - all_invited_rooms = all_invited_rooms.clone().count(), todo_rooms = todo_rooms.len(), ) )] -async fn handle_rooms<'a, Rooms>( +async fn handle_rooms( services: &Services, - sync_info: &SyncInfo<'_>, + sync_info: SyncInfo<'_>, next_batch: u64, - _known_rooms: &KnownRooms, todo_rooms: &TodoRooms, - all_invited_rooms: Rooms, -) -> Result> -where - Rooms: Iterator + Clone + Send + Sync + 'a, -{ +) -> Result> { let rooms: BTreeMap<_, _> = todo_rooms .iter() .try_stream() .broad_and_then(async |(room_id, todo_room)| { - let is_invited = all_invited_rooms - .clone() - .any(is_equal_to!(room_id)); - - let room = - room::handle(services, next_batch, sync_info, room_id, todo_room, is_invited) - .await?; + let room = room::handle(services, next_batch, sync_info, room_id, todo_room).await?; Ok((room_id, room)) }) @@ -513,7 +522,7 @@ async fn handle_extensions( known_rooms: &KnownRooms, todo_rooms: &TodoRooms, ) -> Result { - let &(_, _, _, request) = &sync_info; + let SyncInfo { request, .. } = sync_info; let account_data: OptionFuture<_> = request .extensions @@ -569,26 +578,31 @@ async fn handle_extensions( }) } -fn extension_rooms_todo<'a>( - (_, _, _, request): SyncInfo<'a>, +fn extension_rooms_todo<'a, ListIter, ConfigIter>( + SyncInfo { request, .. }: SyncInfo<'a>, known_rooms: &'a KnownRooms, todo_rooms: &'a TodoRooms, - lists: Option<&'a Vec>, - rooms: Option<&'a Vec>, -) -> impl Iterator + Send + 'a { - let lists_explicit = lists.into_iter().flat_map(|vec| vec.iter()); + lists: Option, + rooms: Option, +) -> impl Iterator + Send + Sync + 'a +where + ListIter: Iterator + Clone + Send + Sync + 'a, + ConfigIter: Iterator + Clone + Send + Sync + 'a, +{ + let lists_explicit = lists.clone().into_iter().flatten(); + + let rooms_explicit = rooms + .clone() + .into_iter() + .flatten() + .filter_map(|erc| extract_variant!(erc, ExtensionRoomConfig::Room)) + .map(AsRef::::as_ref); let lists_requested = request .lists .keys() .filter(move |_| lists.is_none()); - let rooms_explicit = rooms - .into_iter() - .flat_map(|vec| vec.iter()) - .filter_map(|erc| extract_variant!(erc, ExtensionRoomConfig::Room)) - .map(AsRef::::as_ref); - let rooms_implicit = todo_rooms .keys() .map(AsRef::as_ref) diff --git a/src/api/client/sync/v5/account_data.rs b/src/api/client/sync/v5/account_data.rs index 1df3287b..986caa0f 100644 --- a/src/api/client/sync/v5/account_data.rs +++ b/src/api/client/sync/v5/account_data.rs @@ -6,7 +6,7 @@ use tuwunel_core::{ }; use tuwunel_service::Services; -use super::{KnownRooms, SyncInfo, TodoRooms, extension_rooms_todo}; +use super::{KnownRooms, SyncInfo, TodoRoom, TodoRooms, extension_rooms_todo}; #[tracing::instrument(level = "trace", skip_all, fields(globalsince, next_batch))] pub(super) async fn collect( @@ -16,31 +16,39 @@ pub(super) async fn collect( known_rooms: &KnownRooms, todo_rooms: &TodoRooms, ) -> Result { - let (sender_user, _, globalsince, request) = sync_info; - let data = &request.extensions.account_data; - let rooms = extension_rooms_todo( - sync_info, - known_rooms, - todo_rooms, - data.lists.as_ref(), - data.rooms.as_ref(), - ) - .stream() - .broad_filter_map(async |room_id| { - let &(_, _, roomsince) = todo_rooms.get(room_id)?; - let changes: Vec<_> = services - .account_data - .changes_since(Some(room_id), sender_user, roomsince, Some(next_batch)) - .ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Room)) - .collect() - .await; + let SyncInfo { sender_user, globalsince, request, .. } = sync_info; - changes - .is_empty() - .eq(&false) - .then(move || (room_id.to_owned(), changes)) - }) - .collect(); + let lists = request + .extensions + .account_data + .lists + .as_deref() + .map(<[_]>::iter); + + let rooms = request + .extensions + .account_data + .rooms + .as_deref() + .map(<[_]>::iter); + + let rooms = extension_rooms_todo(sync_info, known_rooms, todo_rooms, lists, rooms) + .stream() + .broad_filter_map(async |room_id| { + let &TodoRoom { roomsince, .. } = todo_rooms.get(room_id)?; + let changes: Vec<_> = services + .account_data + .changes_since(Some(room_id), sender_user, roomsince, Some(next_batch)) + .ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Room)) + .collect() + .await; + + changes + .is_empty() + .eq(&false) + .then(move || (room_id.to_owned(), changes)) + }) + .collect(); let global = services .account_data diff --git a/src/api/client/sync/v5/e2ee.rs b/src/api/client/sync/v5/e2ee.rs index 56032f16..07cea3e4 100644 --- a/src/api/client/sync/v5/e2ee.rs +++ b/src/api/client/sync/v5/e2ee.rs @@ -29,10 +29,13 @@ use super::{SyncInfo, share_encrypted_room}; #[tracing::instrument(level = "trace", skip_all, fields(globalsince, next_batch,))] pub(super) async fn collect( services: &Services, - syncinfo: SyncInfo<'_>, + sync_info: SyncInfo<'_>, next_batch: u64, ) -> Result { - let &(sender_user, sender_device, globalsince, _) = &syncinfo; + let SyncInfo { + sender_user, sender_device, globalsince, .. + } = sync_info; + let keys_changed = services .users .keys_changed(sender_user, globalsince, Some(next_batch)) @@ -46,7 +49,7 @@ pub(super) async fn collect( .rooms_joined(sender_user) .map(ToOwned::to_owned) .broad_filter_map(async |room_id| { - collect_room(services, syncinfo, next_batch, &room_id) + collect_room(services, sync_info, next_batch, &room_id) .await .ok() }) @@ -101,7 +104,7 @@ pub(super) async fn collect( #[tracing::instrument(level = "trace", skip_all, fields(room_id))] async fn collect_room( services: &Services, - (sender_user, _, globalsince, _): SyncInfo<'_>, + SyncInfo { sender_user, globalsince, .. }: SyncInfo<'_>, next_batch: u64, room_id: &RoomId, ) -> Result)> { diff --git a/src/api/client/sync/v5/receipts.rs b/src/api/client/sync/v5/receipts.rs index 90fcadad..99ca1d38 100644 --- a/src/api/client/sync/v5/receipts.rs +++ b/src/api/client/sync/v5/receipts.rs @@ -11,7 +11,7 @@ use tuwunel_core::{ }; use tuwunel_service::{Services, rooms::read_receipt::pack_receipts}; -use super::{KnownRooms, SyncInfo, TodoRooms, extension_rooms_todo}; +use super::{KnownRooms, SyncInfo, TodoRoom, TodoRooms, extension_rooms_todo}; #[tracing::instrument(level = "trace", skip_all)] pub(super) async fn collect( @@ -21,33 +21,41 @@ pub(super) async fn collect( known_rooms: &KnownRooms, todo_rooms: &TodoRooms, ) -> Result { - let (_, _, _, request) = sync_info; - let data = &request.extensions.receipts; - let rooms = extension_rooms_todo( - sync_info, - known_rooms, - todo_rooms, - data.lists.as_ref(), - data.rooms.as_ref(), - ) - .stream() - .broad_filter_map(async |room_id| { - collect_room(services, sync_info, next_batch, todo_rooms, room_id).await - }) - .collect() - .await; + let SyncInfo { request, .. } = sync_info; + + let lists = request + .extensions + .receipts + .lists + .as_deref() + .map(<[_]>::iter); + + let rooms = request + .extensions + .receipts + .rooms + .as_deref() + .map(<[_]>::iter); + + let rooms = extension_rooms_todo(sync_info, known_rooms, todo_rooms, lists, rooms) + .stream() + .broad_filter_map(async |room_id| { + collect_room(services, sync_info, next_batch, todo_rooms, room_id).await + }) + .collect() + .await; Ok(response::Receipts { rooms }) } async fn collect_room( services: &Services, - (sender_user, ..): SyncInfo<'_>, + SyncInfo { sender_user, .. }: SyncInfo<'_>, next_batch: u64, todo_rooms: &TodoRooms, room_id: &RoomId, ) -> Option<(OwnedRoomId, Raw)> { - let &(_, _, roomsince) = todo_rooms.get(room_id)?; + let &TodoRoom { roomsince, .. } = todo_rooms.get(room_id)?; let private_receipt = services .read_receipt .last_privateread_update(sender_user, room_id) diff --git a/src/api/client/sync/v5/room.rs b/src/api/client/sync/v5/room.rs index a7475de5..1c230d46 100644 --- a/src/api/client/sync/v5/room.rs +++ b/src/api/client/sync/v5/room.rs @@ -7,7 +7,7 @@ use futures::{ use ruma::{ JsOption, MxcUri, OwnedMxcUri, RoomId, UInt, UserId, api::client::sync::sync_events::{UnreadNotificationsCount, v5::response}, - events::StateEventType, + events::{StateEventType, room::member::MembershipState}, }; use tuwunel_core::{ Result, at, debug_error, is_equal_to, @@ -25,21 +25,25 @@ use crate::client::{DEFAULT_BUMP_TYPES, ignored_filter, sync::load_timeline}; pub(super) async fn handle( services: &Services, next_batch: u64, - (sender_user, _, _globalsince, _): &SyncInfo<'_>, + SyncInfo { sender_user, .. }: SyncInfo<'_>, room_id: &RoomId, - (required_state_request, timeline_limit, roomsince): &TodoRoom, - is_invited: bool, + &TodoRoom { + ref membership, + ref requested_state, + timeline_limit, + roomsince, + }: &TodoRoom, ) -> Result> { - let timeline: OptionFuture<_> = is_invited - .eq(&false) + let timeline: OptionFuture<_> = membership + .ne(&MembershipState::Invite) .then(|| { load_timeline( services, sender_user, room_id, - PduCount::Normal(*roomsince), + PduCount::Normal(roomsince), Some(PduCount::from(next_batch)), - *timeline_limit, + timeline_limit, ) }) .into(); @@ -52,7 +56,7 @@ pub(super) async fn handle( let (timeline_pdus, limited, _lastcount) = timeline.unwrap_or_else(|| (Vec::new(), true, PduCount::default())); - if *roomsince != 0 && timeline_pdus.is_empty() && !is_invited { + if roomsince != 0 && timeline_pdus.is_empty() && membership.ne(&MembershipState::Invite) { return Ok(None); } @@ -60,7 +64,7 @@ pub(super) async fn handle( .first() .map(at!(0)) .map(PduCount::into_unsigned) - .or_else(|| roomsince.ne(&0).then_some(*roomsince)) + .or_else(|| roomsince.ne(&0).then_some(roomsince)) .as_ref() .map(ToString::to_string); @@ -80,7 +84,7 @@ pub(super) async fn handle( bump_stamp }); - let lazy = required_state_request + let lazy = requested_state .iter() .any(is_equal_to!(&(StateEventType::RoomMember, "$LAZY".into()))); @@ -97,7 +101,7 @@ pub(super) async fn handle( .iter() .map(|sender| (StateEventType::RoomMember, StateKey::from_str(sender.as_str()))); - let required_state = required_state_request + let required_state = requested_state .iter() .cloned() .chain(timeline_senders) @@ -119,7 +123,8 @@ pub(super) async fn handle( .collect(); // TODO: figure out a timestamp we can use for remote invites - let invite_state: OptionFuture<_> = is_invited + let invite_state: OptionFuture<_> = membership + .eq(&MembershipState::Invite) .then(|| { services .state_cache @@ -199,7 +204,7 @@ pub(super) async fn handle( let num_live = None; // Count events in timeline greater than global sync counter Ok(Some(response::Room { - initial: Some(*roomsince == 0), + initial: Some(roomsince == 0), name: room_name.or(hero_name), avatar: JsOption::from_option(room_avatar.or(heroes_avatar)), invite_state: invite_state.flatten(), diff --git a/src/api/client/sync/v5/to_device.rs b/src/api/client/sync/v5/to_device.rs index d0b5242a..6a4298fe 100644 --- a/src/api/client/sync/v5/to_device.rs +++ b/src/api/client/sync/v5/to_device.rs @@ -8,7 +8,9 @@ use super::SyncInfo; #[tracing::instrument(level = "trace", skip_all, fields(globalsince, next_batch))] pub(super) async fn collect( services: &Services, - (sender_user, sender_device, globalsince, _request): SyncInfo<'_>, + SyncInfo { + sender_user, sender_device, globalsince, .. + }: SyncInfo<'_>, next_batch: u64, ) -> Result> { services diff --git a/src/api/client/sync/v5/typing.rs b/src/api/client/sync/v5/typing.rs index 5f23c2e5..650f3864 100644 --- a/src/api/client/sync/v5/typing.rs +++ b/src/api/client/sync/v5/typing.rs @@ -2,7 +2,9 @@ use std::collections::BTreeMap; use futures::{FutureExt, StreamExt, TryFutureExt}; use ruma::{ - api::client::sync::sync_events::v5::response, events::typing::TypingEventContent, serde::Raw, + api::client::sync::sync_events::v5::response, + events::typing::{SyncTypingEvent, TypingEventContent}, + serde::Raw, }; use tuwunel_core::{ Result, debug_error, @@ -21,37 +23,44 @@ pub(super) async fn collect( todo_rooms: &TodoRooms, ) -> Result { use response::Typing; - use ruma::events::typing::SyncTypingEvent; - let (sender_user, _, _, request) = sync_info; - let data = &request.extensions.typing; - extension_rooms_todo( - sync_info, - known_rooms, - todo_rooms, - data.lists.as_ref(), - data.rooms.as_ref(), - ) - .stream() - .filter_map(async |room_id| { - services - .typing - .typing_users_for_user(room_id, sender_user) - .inspect_err(|e| debug_error!(%room_id, "Failed to get typing events: {e}")) - .await - .ok() - .filter(|users| !users.is_empty()) - .map(|users| (room_id, users)) - }) - .ready_filter_map(|(room_id, users)| { - let content = TypingEventContent::new(users); - let event = SyncTypingEvent { content }; - let event = Raw::new(&event); + let SyncInfo { sender_user, request, .. } = sync_info; - Some((room_id.to_owned(), event.ok()?)) - }) - .collect::>() - .map(|rooms| Typing { rooms }) - .map(Ok) - .await + let lists = request + .extensions + .typing + .lists + .as_deref() + .map(<[_]>::iter); + + let rooms = request + .extensions + .typing + .rooms + .as_deref() + .map(<[_]>::iter); + + extension_rooms_todo(sync_info, known_rooms, todo_rooms, lists, rooms) + .stream() + .filter_map(async |room_id| { + services + .typing + .typing_users_for_user(room_id, sender_user) + .inspect_err(|e| debug_error!(%room_id, "Failed to get typing events: {e}")) + .await + .ok() + .filter(|users| !users.is_empty()) + .map(|users| (room_id, users)) + }) + .ready_filter_map(|(room_id, users)| { + let content = TypingEventContent::new(users); + let event = SyncTypingEvent { content }; + let event = Raw::new(&event); + + Some((room_id.to_owned(), event.ok()?)) + }) + .collect::>() + .map(|rooms| Typing { rooms }) + .map(Ok) + .await } diff --git a/src/service/sync/mod.rs b/src/service/sync/mod.rs index a6370191..bdc34fd8 100644 --- a/src/service/sync/mod.rs +++ b/src/service/sync/mod.rs @@ -7,15 +7,18 @@ use std::{ use ruma::{ OwnedDeviceId, OwnedRoomId, OwnedUserId, - api::client::sync::sync_events::v5::{Request, request}, + api::client::sync::sync_events::v5::{ + Request, request, + request::{AccountData, E2EE, Receipts, ToDevice, Typing}, + }, }; use tuwunel_core::{Result, implement, smallstr::SmallString}; use tuwunel_database::Map; pub struct Service { - db: Data, services: Arc, - snake_connections: DbConnections, + connections: Connections, + db: Data, } pub struct Data { @@ -35,20 +38,22 @@ pub struct Data { } #[derive(Debug, Default)] -struct SnakeSyncCache { - lists: BTreeMap, - subscriptions: RoomSubscriptions, +pub struct Cache { + lists: Lists, known_rooms: KnownRooms, + subscriptions: Subscriptions, extensions: request::Extensions, } -pub type KnownRooms = BTreeMap>; -pub type RoomSubscriptions = BTreeMap; -pub type SnakeConnectionsKey = (OwnedUserId, OwnedDeviceId, Option); -type SnakeConnectionsVal = Arc>; -type DbConnections = Mutex>; +type Connections = Mutex>>>; +pub type ConnectionKey = (OwnedUserId, OwnedDeviceId, Option); +pub type ConnectionId = SmallString<[u8; 16]>; + +pub type Subscriptions = BTreeMap; +pub type Lists = BTreeMap; +pub type KnownRooms = BTreeMap; +pub type ListRooms = BTreeMap; pub type ListId = SmallString<[u8; 16]>; -pub type ConnId = SmallString<[u8; 16]>; impl crate::Service for Service { fn build(args: &crate::Args<'_>) -> Result> { @@ -69,7 +74,7 @@ impl crate::Service for Service { userid_lastonetimekeyupdate: args.db["userid_lastonetimekeyupdate"].clone(), }, services: args.services.clone(), - snake_connections: StdMutex::new(BTreeMap::new()), + connections: StdMutex::new(BTreeMap::new()), })) } @@ -77,49 +82,54 @@ impl crate::Service for Service { } #[implement(Service)] -pub fn update_snake_sync_request_with_cache( - &self, - snake_key: &SnakeConnectionsKey, - request: &mut Request, -) -> KnownRooms { - let mut cache = self.snake_connections.lock().expect("locked"); - let cached = Arc::clone( - cache - .entry(snake_key.clone()) - .or_insert_with(|| Arc::new(Mutex::new(SnakeSyncCache::default()))), - ); +pub fn update_cache(&self, key: &ConnectionKey, request: &mut Request) -> KnownRooms { + let cache = self.get_connection(key); + let mut cached = cache.lock().expect("locked"); - let cached = &mut cached.lock().expect("locked"); - drop(cache); - - //Request::try_from_http_request(req, path_args); - for (list_id, list) in &mut request.lists { - if let Some(cached_list) = cached.lists.get(list_id.as_str()) { - list_or_sticky( - &mut list.room_details.required_state, - &cached_list.room_details.required_state, - ); - - //some_or_sticky(&mut list.include_heroes, cached_list.include_heroes); - - match (&mut list.filters, cached_list.filters.clone()) { - | (Some(filters), Some(cached_filters)) => { - some_or_sticky(&mut filters.is_invite, cached_filters.is_invite); - // TODO (morguldir): Find out how a client can unset this, probably need - // to change into an option inside ruma - list_or_sticky(&mut filters.not_room_types, &cached_filters.not_room_types); - }, - | (_, Some(cached_filters)) => list.filters = Some(cached_filters), - | (Some(list_filters), _) => list.filters = Some(list_filters.clone()), - | (..) => {}, - } - } + Self::update_cache_lists(request, &mut cached); + Self::update_cache_subscriptions(request, &mut cached); + Self::update_cache_extensions(request, &mut cached); + cached.known_rooms.clone() +} +#[implement(Service)] +fn update_cache_lists(request: &mut Request, cached: &mut Cache) { + for (list_id, request_list) in &mut request.lists { cached .lists - .insert(list_id.as_str().into(), list.clone()); + .entry(list_id.as_str().into()) + .and_modify(|cached_list| { + Self::update_cache_list(request_list, cached_list); + }) + .or_insert_with(|| request_list.clone()); } +} +#[implement(Service)] +fn update_cache_list(request: &mut request::List, cached: &mut request::List) { + list_or_sticky( + &mut request.room_details.required_state, + &mut cached.room_details.required_state, + ); + + match (&mut request.filters, &mut cached.filters) { + | (None, None) => {}, + | (None, Some(cached)) => request.filters = Some(cached.clone()), + | (Some(request), None) => cached.filters = Some(request.clone()), + | (Some(request), Some(cached)) => { + some_or_sticky(&mut request.is_dm, &mut cached.is_dm); + some_or_sticky(&mut request.is_encrypted, &mut cached.is_encrypted); + some_or_sticky(&mut request.is_invite, &mut cached.is_invite); + list_or_sticky(&mut request.room_types, &mut cached.room_types); + list_or_sticky(&mut request.not_room_types, &mut cached.not_room_types); + list_or_sticky(&mut request.tags, &mut cached.not_tags); + list_or_sticky(&mut request.spaces, &mut cached.spaces); + }, + } +} + +#[implement(Service)] +fn update_cache_subscriptions(request: &mut Request, cached: &mut Cache) { cached .subscriptions .extend(request.room_subscriptions.clone()); @@ -127,150 +137,134 @@ pub fn update_snake_sync_request_with_cache( request .room_subscriptions .extend(cached.subscriptions.clone()); - - request.extensions.e2ee.enabled = request - .extensions - .e2ee - .enabled - .or(cached.extensions.e2ee.enabled); - - request.extensions.to_device.enabled = request - .extensions - .to_device - .enabled - .or(cached.extensions.to_device.enabled); - - request.extensions.account_data.enabled = request - .extensions - .account_data - .enabled - .or(cached.extensions.account_data.enabled); - request.extensions.account_data.lists = request - .extensions - .account_data - .lists - .clone() - .or_else(|| cached.extensions.account_data.lists.clone()); - request.extensions.account_data.rooms = request - .extensions - .account_data - .rooms - .clone() - .or_else(|| cached.extensions.account_data.rooms.clone()); - - { - let (request, cached) = (&mut request.extensions.typing, &cached.extensions.typing); - some_or_sticky(&mut request.enabled, cached.enabled); - some_or_sticky(&mut request.rooms, cached.rooms.clone()); - some_or_sticky(&mut request.lists, cached.lists.clone()); - }; - { - let (request, cached) = (&mut request.extensions.receipts, &cached.extensions.receipts); - some_or_sticky(&mut request.enabled, cached.enabled); - some_or_sticky(&mut request.rooms, cached.rooms.clone()); - some_or_sticky(&mut request.lists, cached.lists.clone()); - }; - - cached.extensions = request.extensions.clone(); - cached.known_rooms.clone() } #[implement(Service)] -pub fn update_snake_sync_known_rooms( +fn update_cache_extensions(request: &mut Request, cached: &mut Cache) { + let request = &mut request.extensions; + let cached = &mut cached.extensions; + + Self::update_cache_account_data(&mut request.account_data, &mut cached.account_data); + Self::update_cache_receipts(&mut request.receipts, &mut cached.receipts); + Self::update_cache_typing(&mut request.typing, &mut cached.typing); + Self::update_cache_to_device(&mut request.to_device, &mut cached.to_device); + Self::update_cache_e2ee(&mut request.e2ee, &mut cached.e2ee); +} + +#[implement(Service)] +fn update_cache_account_data(request: &mut AccountData, cached: &mut AccountData) { + some_or_sticky(&mut request.enabled, &mut cached.enabled); + some_or_sticky(&mut request.lists, &mut cached.lists); + some_or_sticky(&mut request.rooms, &mut cached.rooms); +} + +#[implement(Service)] +fn update_cache_receipts(request: &mut Receipts, cached: &mut Receipts) { + some_or_sticky(&mut request.enabled, &mut cached.enabled); + some_or_sticky(&mut request.rooms, &mut cached.rooms); + some_or_sticky(&mut request.lists, &mut cached.lists); +} + +#[implement(Service)] +fn update_cache_typing(request: &mut Typing, cached: &mut Typing) { + some_or_sticky(&mut request.enabled, &mut cached.enabled); + some_or_sticky(&mut request.rooms, &mut cached.rooms); + some_or_sticky(&mut request.lists, &mut cached.lists); +} + +#[implement(Service)] +fn update_cache_to_device(request: &mut ToDevice, cached: &mut ToDevice) { + some_or_sticky(&mut request.enabled, &mut cached.enabled); +} + +#[implement(Service)] +fn update_cache_e2ee(request: &mut E2EE, cached: &mut E2EE) { + some_or_sticky(&mut request.enabled, &mut cached.enabled); +} + +/// load params from cache if body doesn't contain it, as long as it's allowed +/// in some cases we may need to allow an empty list as an actual value +fn list_or_sticky(target: &mut Vec, cached: &mut Vec) { + if !target.is_empty() { + cached.clone_from(target); + } else { + target.clone_from(cached); + } +} + +fn some_or_sticky(target: &mut Option, cached: &mut Option) { + if let Some(target) = target { + cached.replace(target.clone()); + } else { + target.clone_from(cached); + } +} + +#[implement(Service)] +pub fn update_known_rooms( &self, - key: &SnakeConnectionsKey, + key: &ConnectionKey, list_id: ListId, - new_cached_rooms: BTreeSet, + new_rooms: BTreeSet, globalsince: u64, ) { assert!(key.2.is_some(), "Some(conn_id) required for this call"); - let mut cache = self.snake_connections.lock().expect("locked"); - let cached = Arc::clone( - cache - .entry(key.clone()) - .or_insert_with(|| Arc::new(Mutex::new(SnakeSyncCache::default()))), - ); + let cache = self.get_connection(key); + let mut cached = cache.lock().expect("locked"); + let list_rooms = cached.known_rooms.entry(list_id).or_default(); - let cached = &mut cached.lock().expect("locked"); - drop(cache); - - for (room_id, lastsince) in cached - .known_rooms - .entry(list_id.clone()) - .or_default() - .iter_mut() - { - if !new_cached_rooms.contains(room_id) { + for (room_id, lastsince) in list_rooms.iter_mut() { + if !new_rooms.contains(room_id) { *lastsince = 0; } } - let list = cached.known_rooms.entry(list_id).or_default(); - for room_id in new_cached_rooms { - list.insert(room_id, globalsince); + for room_id in new_rooms { + list_rooms.insert(room_id, globalsince); } } #[implement(Service)] -pub fn update_snake_sync_subscriptions( - &self, - key: &SnakeConnectionsKey, - subscriptions: RoomSubscriptions, -) { - let mut cache = self.snake_connections.lock().expect("locked"); - let cached = Arc::clone( - cache - .entry(key.clone()) - .or_insert_with(|| Arc::new(Mutex::new(SnakeSyncCache::default()))), - ); - - let cached = &mut cached.lock().expect("locked"); - drop(cache); - - cached.subscriptions = subscriptions; +pub fn update_subscriptions(&self, key: &ConnectionKey, subscriptions: Subscriptions) { + self.get_connection(key) + .lock() + .expect("locked") + .subscriptions = subscriptions; } #[implement(Service)] -pub fn forget_snake_sync_connection(&self, key: &SnakeConnectionsKey) { - self.snake_connections +pub fn get_connection(&self, key: &ConnectionKey) -> Arc> { + self.connections + .lock() + .expect("locked") + .entry(key.clone()) + .or_insert_with(|| Arc::new(Mutex::new(Cache::default()))) + .clone() +} + +#[implement(Service)] +pub fn forget_connection(&self, key: &ConnectionKey) { + self.connections .lock() .expect("locked") .remove(key); } #[implement(Service)] -pub fn snake_connection_cached(&self, key: &SnakeConnectionsKey) -> bool { - self.snake_connections +pub fn is_connection_cached(&self, key: &ConnectionKey) -> bool { + self.connections .lock() .expect("locked") .contains_key(key) } #[inline] -pub fn into_snake_key( - user_id: U, - device_id: D, - conn_id: Option, -) -> SnakeConnectionsKey +pub fn into_connection_key(user_id: U, device_id: D, conn_id: Option) -> ConnectionKey where U: Into, D: Into, - C: Into, + C: Into, { (user_id.into(), device_id.into(), conn_id.map(Into::into)) } - -/// load params from cache if body doesn't contain it, as long as it's allowed -/// in some cases we may need to allow an empty list as an actual value -fn list_or_sticky(target: &mut Vec, cached: &Vec) { - if target.is_empty() { - target.clone_from(cached); - } -} - -fn some_or_sticky(target: &mut Option, cached: Option) { - if target.is_none() { - *target = cached; - } -}