From be1264965acd1f5a4e049fa0c4b59f4787f6afca Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Thu, 4 Sep 2025 22:33:42 +0000 Subject: [PATCH] Refactor sliding-sync. Signed-off-by: Jason Volk --- Cargo.toml | 1 + src/api/client/sync/v5.rs | 1513 ++++++++++++++++++++++--------------- src/service/sync/mod.rs | 341 +++++---- 3 files changed, 1070 insertions(+), 785 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 60beea37..5c01381b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -436,6 +436,7 @@ version = "2.7" version = "0.3" features = [ "ffi", + "serde", "std", "union", ] diff --git a/src/api/client/sync/v5.rs b/src/api/client/sync/v5.rs index 2fe64d2d..26204878 100644 --- a/src/api/client/sync/v5.rs +++ b/src/api/client/sync/v5.rs @@ -1,42 +1,52 @@ use std::{ cmp::Ordering, collections::{BTreeMap, BTreeSet, HashMap, HashSet}, + mem::take, ops::Deref, time::Duration, }; use axum::extract::State; use futures::{ - FutureExt, Stream, StreamExt, TryFutureExt, - future::{OptionFuture, join3, try_join5}, + FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt, + future::{OptionFuture, join, join3, join4, join5, try_join}, pin_mut, }; use ruma::{ - DeviceId, OwnedEventId, OwnedRoomId, RoomId, UInt, UserId, + DeviceId, JsOption, MxcUri, OwnedEventId, OwnedMxcUri, OwnedRoomId, RoomId, UInt, UserId, api::client::sync::sync_events::{ - self, DeviceLists, UnreadNotificationsCount, v5::request::ExtensionRoomConfig, + DeviceLists, UnreadNotificationsCount, + v5::{Request, Response, request::ExtensionRoomConfig, response}, }, directory::RoomTypeFilter, events::{ AnyRawAccountDataEvent, AnySyncEphemeralRoomEvent, StateEventType, TimelineEventType, + receipt::SyncReceiptEvent, room::member::{MembershipState, RoomMemberEventContent}, typing::TypingEventContent, }, serde::Raw, uint, }; +use tokio::time::{Instant, timeout_at}; use tuwunel_core::{ - Err, Error, Result, at, error, extract_variant, is_equal_to, - matrix::{Event, TypeStateKey, pdu::PduCount}, - trace, + Err, Result, apply, at, debug_error, error, extract_variant, is_equal_to, + matrix::{Event, StateKey, TypeStateKey, pdu::PduCount}, + ref_at, trace, utils::{ - BoolExt, FutureBoolExt, IterStream, ReadyExt, TryFutureExtExt, TryReadyExt, + BoolExt, FutureBoolExt, IterStream, ReadyExt, TryFutureExtExt, future::ReadyEqExt, math::{ruma_from_usize, usize_from_ruma}, + result::FlatOk, + stream::{BroadbandExt, TryBroadbandExt, TryReadyExt, WidebandExt}, }, warn, }; -use tuwunel_service::{Services, rooms::read_receipt::pack_receipts, sync::into_snake_key}; +use tuwunel_service::{ + Services, + rooms::read_receipt::pack_receipts, + sync::{KnownRooms, into_snake_key}, +}; use super::share_encrypted_room; use crate::{ @@ -44,9 +54,10 @@ use crate::{ client::{DEFAULT_BUMP_TYPES, ignored_filter, sync::load_timeline}, }; -type SyncInfo<'a> = (&'a UserId, &'a DeviceId, u64, &'a sync_events::v5::Request); -type TodoRooms = BTreeMap, usize, u64)>; -type KnownRooms = BTreeMap>; +type SyncInfo<'a> = (&'a UserId, &'a DeviceId, u64, &'a Request); +type TodoRooms = BTreeMap; +type TodoRoom = (BTreeSet, usize, u64); +type ResponseLists = BTreeMap; /// `POST /_matrix/client/unstable/org.matrix.simplified_msc3575/sync` /// ([MSC4186]) @@ -58,21 +69,30 @@ type KnownRooms = BTreeMap>; /// /// [MSC3575]: https://github.com/matrix-org/matrix-spec-proposals/pull/3575 /// [MSC4186]: https://github.com/matrix-org/matrix-spec-proposals/pull/4186 +#[tracing::instrument( + name = "sync", + level = "debug", + skip_all, + fields( + user_id = %body.sender_user(), + device_id = %body.sender_device(), + ) +)] pub(crate) async fn sync_events_v5_route( State(ref services): State, - body: Ruma, -) -> Result { + mut body: Ruma, +) -> Result { debug_assert!(DEFAULT_BUMP_TYPES.is_sorted(), "DEFAULT_BUMP_TYPES is not sorted"); - let sender_user = body.sender_user(); - let sender_device = body.sender_device(); - let snake_key = into_snake_key(sender_user, sender_device, body.conn_id.clone()); - let globalsince = body + let mut request = take(&mut body.body); + let mut globalsince = request .pos .as_ref() .and_then(|string| string.parse().ok()) .unwrap_or(0); + let (sender_user, sender_device) = body.sender(); + let snake_key = into_snake_key(sender_user, sender_device, request.conn_id.as_deref()); if globalsince != 0 && !services.sync.snake_connection_cached(&snake_key) { return Err!(Request(UnknownPos( "Connection data unknown to server; restarting sync stream." @@ -86,16 +106,10 @@ pub(crate) async fn sync_events_v5_route( .forget_snake_sync_connection(&snake_key); } - // Setup watchers, so if there's no response, we can wait for them - let watcher = services.sync.watch(sender_user, sender_device); - - let next_batch = services.globals.wait_pending().await?; - // Get sticky parameters from cache - let mut cached = body.body.clone(); let known_rooms = services .sync - .update_snake_sync_request_with_cache(&snake_key, &mut cached); + .update_snake_sync_request_with_cache(&snake_key, &mut request); let all_joined_rooms = services .state_cache @@ -118,186 +132,136 @@ pub(crate) async fn sync_events_v5_route( let (all_joined_rooms, all_invited_rooms, all_knocked_rooms) = join3(all_joined_rooms, all_invited_rooms, all_knocked_rooms).await; - let all_joined_rooms = all_joined_rooms.iter().map(AsRef::as_ref); let all_invited_rooms = all_invited_rooms.iter().map(AsRef::as_ref); let all_knocked_rooms = all_knocked_rooms.iter().map(AsRef::as_ref); + let all_joined_rooms = all_joined_rooms.iter().map(AsRef::as_ref); let all_rooms = all_joined_rooms .clone() .chain(all_invited_rooms.clone()) .chain(all_knocked_rooms.clone()); - let pos = next_batch.to_string(); - - let mut todo_rooms: TodoRooms = BTreeMap::new(); - - let sync_info: SyncInfo<'_> = (sender_user, sender_device, globalsince, &cached); - - let account_data = collect_account_data(services, sync_info, next_batch).map(Ok); - - let e2ee = collect_e2ee(services, sync_info, next_batch, all_joined_rooms.clone()); - - let to_device = collect_to_device(services, sync_info, next_batch).map(Ok); - - let receipts = collect_receipts(services, sync_info, next_batch).map(Ok); - - let typing = collect_typing_events(services, sync_info, next_batch, all_joined_rooms.clone()); - - let (account_data, e2ee, to_device, receipts, typing) = - try_join5(account_data, e2ee, to_device, receipts, typing).await?; - - let extensions = sync_events::v5::response::Extensions { - account_data, - e2ee, - to_device, - receipts, - typing, - }; - - let mut response = sync_events::v5::Response { - txn_id: cached.txn_id.clone(), - pos, - lists: BTreeMap::new(), - rooms: BTreeMap::new(), - extensions, - }; - - handle_lists( + let sync_info: SyncInfo<'_> = (sender_user, sender_device, globalsince, &request); + let (known_rooms, todo_rooms, lists) = handle_lists( services, sync_info, + known_rooms, all_invited_rooms.clone(), all_joined_rooms.clone(), all_rooms.clone(), - &mut todo_rooms, - &known_rooms, - &mut response, ) .await; - fetch_subscriptions(services, sync_info, &known_rooms, &mut todo_rooms).await; + let timeout = request + .timeout + .as_ref() + .map(Duration::as_millis) + .map(TryInto::try_into) + .flat_ok() + .unwrap_or(services.config.client_sync_timeout_default) + .max(services.config.client_sync_timeout_min) + .min(services.config.client_sync_timeout_max); - response.rooms = process_rooms( - services, - sender_user, - next_batch, - all_invited_rooms.clone(), - &todo_rooms, - &mut response, - &cached, - ) - .await?; + let stop_at = Instant::now() + .checked_add(Duration::from_millis(timeout)) + .expect("configuration must limit maximum timeout"); - if response.rooms.iter().all(|(id, r)| { - r.timeline.is_empty() - && r.required_state.is_empty() - && !response - .extensions - .receipts - .rooms - .contains_key(id) - }) && response - .extensions - .to_device - .clone() - .is_none_or(|to| to.events.is_empty()) - && response.extensions.typing.is_empty() - { - // Hang a few seconds so requests are not spammed - // Stop hanging if new info arrives - let timeout_default = services.config.client_sync_timeout_default; - let timeout_min = services.config.client_sync_timeout_min; - let timeout_max = services.config.client_sync_timeout_max; - let duration = body - .timeout - .unwrap_or_else(|| Duration::from_millis(timeout_default)) - .clamp(Duration::from_millis(timeout_min), Duration::from_millis(timeout_max)); + let mut response = Response { + txn_id: request.txn_id.clone(), + lists, + pos: String::new(), + rooms: Default::default(), + extensions: Default::default(), + }; - _ = tokio::time::timeout(duration, watcher).await; - } + loop { + let watchers = services.sync.watch(sender_user, sender_device); + let next_batch = services.globals.wait_pending().await?; - trace!( - rooms = ?response.rooms.len(), - account_data = ?response.extensions.account_data.rooms.len(), - receipts = ?response.extensions.receipts.rooms.len(), - "responding to request with" - ); - Ok(response) -} + debug_assert!(globalsince <= next_batch, "next_batch is monotonic"); + if globalsince < next_batch { + let rooms = handle_rooms( + services, + &sync_info, + next_batch, + &known_rooms, + &todo_rooms, + all_invited_rooms.clone(), + ) + .map_ok(|rooms| response.rooms = rooms); -async fn fetch_subscriptions( - services: &Services, - (sender_user, sender_device, globalsince, body): SyncInfo<'_>, - known_rooms: &KnownRooms, - todo_rooms: &mut TodoRooms, -) { - let mut known_subscription_rooms = BTreeSet::new(); - for (room_id, room) in &body.room_subscriptions { - let not_exists = services.metadata.exists(room_id).eq(&false); + let extensions = handle_extensions( + services, + sync_info, + next_batch, + &known_rooms, + &todo_rooms, + all_joined_rooms.clone(), + ) + .map_ok(|extensions| response.extensions = extensions); - let is_disabled = services.metadata.is_disabled(room_id); + try_join(rooms, extensions).boxed().await?; - let is_banned = services.metadata.is_banned(room_id); - - pin_mut!(not_exists, is_disabled, is_banned); - if not_exists.or(is_disabled).or(is_banned).await { - continue; + if !is_empty_response(&response) { + trace!(globalsince, next_batch, "response {response:?}"); + response.pos = next_batch.to_string(); + return Ok(response); + } } - let todo_room = - todo_rooms - .entry(room_id.clone()) - .or_insert((BTreeSet::new(), 0_usize, u64::MAX)); + if timeout_at(stop_at, watchers).await.is_err() { + trace!(globalsince, next_batch, "timeout; empty response"); + response.pos = next_batch.to_string(); + return Ok(response); + } - let limit: UInt = room.timeline_limit; - - todo_room.0.extend( - room.required_state - .iter() - .map(|(ty, sk)| (ty.clone(), sk.as_str().into())), - ); - todo_room.1 = todo_room.1.max(usize_from_ruma(limit)); - // 0 means unknown because it got out of date - todo_room.2 = todo_room.2.min( - known_rooms - .get("subscriptions") - .and_then(|k| k.get(room_id)) - .copied() - .unwrap_or(0), - ); - known_subscription_rooms.insert(room_id.clone()); - } - // where this went (protomsc says it was removed) - //for r in body.unsubscribe_rooms { - // known_subscription_rooms.remove(&r); - // body.room_subscriptions.remove(&r); - //} - - if let Some(conn_id) = body.conn_id.clone() { - let snake_key = into_snake_key(sender_user, sender_device, conn_id); - services.sync.update_snake_sync_known_rooms( - &snake_key, - "subscriptions".to_owned(), - known_subscription_rooms, + trace!( globalsince, + last_batch = ?next_batch, + count = ?services.globals.pending_count(), + stop_at = ?stop_at, + "notified by watcher" ); + + globalsince = next_batch; } } +fn is_empty_response(response: &Response) -> bool { + response.extensions.is_empty() + && response + .rooms + .iter() + .all(|(_, room)| room.timeline.is_empty() && room.invite_state.is_none()) +} + +#[tracing::instrument( + level = "debug", + skip_all, + fields( + all_invited_rooms = all_invited_rooms.clone().count(), + all_joined_rooms = all_joined_rooms.clone().count(), + all_rooms = all_rooms.clone().count(), + known_rooms = known_rooms.len(), + ) +)] #[allow(clippy::too_many_arguments)] async fn handle_lists<'a, Rooms, AllRooms>( services: &Services, - (sender_user, sender_device, globalsince, body): SyncInfo<'_>, + sync_info: SyncInfo<'_>, + known_rooms: KnownRooms, all_invited_rooms: Rooms, all_joined_rooms: Rooms, all_rooms: AllRooms, - todo_rooms: &'a mut TodoRooms, - known_rooms: &'a KnownRooms, - response: &'_ mut sync_events::v5::Response, -) -> KnownRooms +) -> (KnownRooms, TodoRooms, ResponseLists) where Rooms: Iterator + Clone + Send + 'a, AllRooms: Iterator + Clone + Send + 'a, { - for (list_id, list) in &body.lists { + let &(sender_user, sender_device, globalsince, request) = &sync_info; + + let mut todo_rooms: TodoRooms = BTreeMap::new(); + let mut response_lists = ResponseLists::new(); + for (list_id, list) in &request.lists { let active_rooms: Vec<_> = match list.filters.as_ref().and_then(|f| f.is_invite) { | None => all_rooms.clone().collect(), | Some(true) => all_invited_rooms.clone().collect(), @@ -319,9 +283,7 @@ where }; let mut new_known_rooms: BTreeSet = BTreeSet::new(); - let ranges = list.ranges.clone(); - for mut range in ranges { range.0 = uint!(0); range.1 = range @@ -345,8 +307,6 @@ where u64::MAX, )); - let limit: usize = usize_from_ruma(list.room_details.timeline_limit).min(100); - todo_room.0.extend( list.room_details .required_state @@ -354,7 +314,9 @@ where .map(|(ty, sk)| (ty.clone(), sk.as_str().into())), ); + let limit: usize = usize_from_ruma(list.room_details.timeline_limit).min(100); todo_room.1 = todo_room.1.max(limit); + // 0 means unknown because it got out of date todo_room.2 = todo_room.2.min( known_rooms @@ -365,389 +327,756 @@ where ); } } - response - .lists - .insert(list_id.clone(), sync_events::v5::response::List { - count: ruma_from_usize(active_rooms.len()), - }); - if let Some(conn_id) = body.conn_id.clone() { - let snake_key = into_snake_key(sender_user, sender_device, conn_id); + if let Some(conn_id) = request.conn_id.as_deref() { + let snake_key = into_snake_key(sender_user, sender_device, conn_id.into()); + let list_id = list_id.as_str().into(); services.sync.update_snake_sync_known_rooms( &snake_key, - list_id.clone(), + list_id, new_known_rooms, globalsince, ); } - } - BTreeMap::default() -} - -async fn process_rooms<'a, Rooms>( - services: &Services, - sender_user: &UserId, - next_batch: u64, - all_invited_rooms: Rooms, - todo_rooms: &TodoRooms, - response: &mut sync_events::v5::Response, - body: &sync_events::v5::Request, -) -> Result> -where - Rooms: Iterator + Clone + Send + 'a, -{ - let mut rooms = BTreeMap::new(); - for (room_id, (required_state_request, timeline_limit, roomsince)) in todo_rooms { - let roomsincecount = PduCount::Normal(*roomsince); - - let mut timestamp: Option<_> = None; - let mut invite_state = None; - let (timeline_pdus, limited); - let new_room_id: &RoomId = (*room_id).as_ref(); - if all_invited_rooms - .clone() - .any(is_equal_to!(new_room_id)) - { - // TODO: figure out a timestamp we can use for remote invites - invite_state = services - .state_cache - .invite_state(sender_user, room_id) - .await - .ok(); - - (timeline_pdus, limited) = (Vec::new(), true); - } else { - (timeline_pdus, limited, _) = match load_timeline( - services, - sender_user, - room_id, - roomsincecount, - Some(PduCount::from(next_batch)), - *timeline_limit, - ) - .await - { - | Ok(value) => value, - | Err(err) => { - warn!("Encountered missing timeline in {}, error {}", room_id, err); - continue; - }, - }; - } - - if body.extensions.account_data.enabled == Some(true) { - response.extensions.account_data.rooms.insert( - room_id.to_owned(), - services - .account_data - .changes_since(Some(room_id), sender_user, *roomsince, Some(next_batch)) - .ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Room)) - .collect() - .await, - ); - } - - let last_privateread_update = services - .read_receipt - .last_privateread_update(sender_user, room_id) - .await; - - let private_read_event: OptionFuture<_> = (last_privateread_update > *roomsince) - .then(|| { - services - .read_receipt - .private_read_get(room_id, sender_user) - .ok() - }) - .into(); - - let mut receipts: Vec> = services - .read_receipt - .readreceipts_since(room_id, *roomsince, Some(next_batch)) - .filter_map(async |(read_user, _ts, v)| { - services - .users - .user_is_ignored(read_user, sender_user) - .await - .or_some(v) - }) - .collect() - .await; - - if let Some(private_read_event) = private_read_event.await.flatten() { - receipts.push(private_read_event); - } - - let receipt_size = receipts.len(); - - if receipt_size > 0 { - response - .extensions - .receipts - .rooms - .insert(room_id.clone(), pack_receipts(Box::new(receipts.into_iter()))); - } - - if *roomsince != 0 - && timeline_pdus.is_empty() - && receipt_size == 0 - && response - .extensions - .account_data - .rooms - .get(room_id) - .is_none_or(Vec::is_empty) - { - continue; - } - - let prev_batch = timeline_pdus - .first() - .map_or(Ok::<_, Error>(None), |(pdu_count, _)| { - Ok(Some(match pdu_count { - | PduCount::Backfilled(_) => { - error!("timeline in backfill state?!"); - "0".to_owned() - }, - | PduCount::Normal(c) => c.to_string(), - })) - })? - .or_else(|| { - if *roomsince != 0 { - Some(roomsince.to_string()) - } else { - None - } - }); - - let room_events: Vec<_> = timeline_pdus - .iter() - .stream() - .filter_map(|item| ignored_filter(services, item.clone(), sender_user)) - .map(at!(1)) - .map(Event::into_format) - .collect() - .await; - - for (_, pdu) in timeline_pdus { - let ts = pdu.origin_server_ts; - if DEFAULT_BUMP_TYPES - .binary_search(&pdu.kind) - .is_ok() && timestamp.is_none_or(|time| time <= ts) - { - timestamp = Some(ts); - } - } - - let required_state = required_state_request - .iter() - .stream() - .filter_map(async |state| { - services - .state_accessor - .room_state_get(room_id, &state.0, &state.1) - .await - .map(Event::into_format) - .ok() - }) - .collect() - .await; - - let room_name = services - .state_accessor - .get_name(room_id) - .await - .ok(); - - // Heroes - let heroes: Vec<_> = if room_name.is_none() { - services - .state_cache - .room_members(room_id) - .ready_filter(|member| *member != sender_user) - .filter_map(|user_id| { - services - .state_accessor - .get_member(room_id, user_id) - .map_ok(|memberevent| sync_events::v5::response::Hero { - user_id: user_id.into(), - name: memberevent.displayname, - avatar: memberevent.avatar_url, - }) - .ok() - }) - .take(5) - .collect() - .await - } else { - vec![] - }; - - let hero_name = match heroes.len().cmp(&(1_usize)) { - | Ordering::Greater => { - let firsts = heroes[1..] - .iter() - .map(|h| { - h.name - .clone() - .unwrap_or_else(|| h.user_id.to_string()) - }) - .collect::>() - .join(", "); - - let last = heroes[0] - .name - .clone() - .unwrap_or_else(|| heroes[0].user_id.to_string()); - - Some(format!("{firsts} and {last}")) - }, - | Ordering::Equal => Some( - heroes[0] - .name - .clone() - .unwrap_or_else(|| heroes[0].user_id.to_string()), - ), - | Ordering::Less => None, - }; - - let room_avatar = services - .state_accessor - .get_avatar(room_id) - .map_ok(|content| content.url) - .ok() - .map(Option::flatten) - .await; - - let heroes_avatar = (room_avatar.is_none() && room_name.is_none()) - .then(|| { - heroes - .first() - .and_then(|hero| hero.avatar.clone()) - }) - .flatten(); - - let avatar = ruma::JsOption::from_option(room_avatar.or(heroes_avatar)); - - rooms.insert(room_id.clone(), sync_events::v5::response::Room { - avatar, - name: room_name.or(hero_name), - initial: Some(roomsince == &0), - is_dm: None, - invite_state, - unread_notifications: UnreadNotificationsCount { - highlight_count: Some( - services - .user - .highlight_count(sender_user, room_id) - .await - .try_into() - .expect("notification count can't go that high"), - ), - notification_count: Some( - services - .user - .notification_count(sender_user, room_id) - .await - .try_into() - .expect("notification count can't go that high"), - ), - }, - timeline: room_events, - required_state, - prev_batch, - limited, - joined_count: Some( - services - .state_cache - .room_joined_count(room_id) - .await - .unwrap_or(0) - .try_into() - .unwrap_or_else(|_| uint!(0)), - ), - invited_count: Some( - services - .state_cache - .room_invited_count(room_id) - .await - .unwrap_or(0) - .try_into() - .unwrap_or_else(|_| uint!(0)), - ), - num_live: None, // Count events in timeline greater than global sync counter - bump_stamp: timestamp, - heroes: Some(heroes), + response_lists.insert(list_id.clone(), response::List { + count: ruma_from_usize(active_rooms.len()), }); } + + let (known_rooms, todo_rooms) = + fetch_subscriptions(services, sync_info, known_rooms, todo_rooms).await; + + (known_rooms, todo_rooms, response_lists) +} + +#[tracing::instrument( + level = "debug", + skip_all, + fields( + global_since, + known_rooms = known_rooms.len(), + todo_rooms = todo_rooms.len(), + ) +)] +async fn fetch_subscriptions( + services: &Services, + (sender_user, sender_device, globalsince, request): SyncInfo<'_>, + known_rooms: KnownRooms, + todo_rooms: TodoRooms, +) -> (KnownRooms, TodoRooms) { + let subs = (todo_rooms, BTreeSet::new()); + let (todo_rooms, known_subs) = request + .room_subscriptions + .iter() + .stream() + .broad_filter_map(async |(room_id, room)| { + let not_exists = services.metadata.exists(room_id).eq(&false); + let is_disabled = services.metadata.is_disabled(room_id); + let is_banned = services.metadata.is_banned(room_id); + + pin_mut!(not_exists, is_disabled, is_banned); + not_exists + .or(is_disabled) + .or(is_banned) + .await + .eq(&false) + .then_some((room_id, room)) + }) + .ready_fold(subs, |(mut todo_rooms, mut known_subs), (room_id, room)| { + let todo_room = + todo_rooms + .entry(room_id.clone()) + .or_insert((BTreeSet::new(), 0_usize, u64::MAX)); + + todo_room.0.extend( + room.required_state + .iter() + .map(|(ty, sk)| (ty.clone(), sk.as_str().into())), + ); + + let limit: UInt = room.timeline_limit; + todo_room.1 = todo_room.1.max(usize_from_ruma(limit)); + + // 0 means unknown because it got out of date + todo_room.2 = todo_room.2.min( + known_rooms + .get("subscriptions") + .and_then(|k| k.get(room_id)) + .copied() + .unwrap_or(0), + ); + + known_subs.insert(room_id.clone()); + (todo_rooms, known_subs) + }) + .await; + + if let Some(conn_id) = request.conn_id.as_deref() { + let snake_key = into_snake_key(sender_user, sender_device, conn_id.into()); + let list_id = "subscriptions".into(); + services + .sync + .update_snake_sync_known_rooms(&snake_key, list_id, known_subs, globalsince); + } + + (known_rooms, todo_rooms) +} + +#[tracing::instrument( + level = "debug", + skip_all, + fields(?filters, negate) +)] +fn filter_rooms<'a, Rooms>( + services: &'a Services, + filters: &'a [RoomTypeFilter], + negate: &'a bool, + rooms: Rooms, +) -> impl Stream + Send + 'a +where + Rooms: Stream + Send + 'a, +{ + rooms + .wide_filter_map(async |room_id| { + match services + .state_accessor + .get_room_type(room_id) + .await + { + | Ok(room_type) => Some((room_id, Some(room_type))), + | Err(e) if e.is_not_found() => Some((room_id, None)), + | Err(_) => None, + } + }) + .map(|(room_id, room_type)| (room_id, RoomTypeFilter::from(room_type))) + .ready_filter_map(|(room_id, room_type_filter)| { + let contains = filters.contains(&room_type_filter); + let pos = !*negate && (filters.is_empty() || contains); + let neg = *negate && !contains; + + (pos || neg).then_some(room_id) + }) +} + +#[tracing::instrument( + level = "debug", + skip_all, + fields( + next_batch, + all_invited_rooms = all_invited_rooms.clone().count(), + todo_rooms = todo_rooms.len(), + ) +)] +async fn handle_rooms<'a, Rooms>( + services: &Services, + sync_info: &SyncInfo<'_>, + next_batch: u64, + _known_rooms: &KnownRooms, + todo_rooms: &TodoRooms, + all_invited_rooms: Rooms, +) -> Result> +where + Rooms: Iterator + Clone + Send + Sync + 'a, +{ + let rooms: BTreeMap<_, _> = todo_rooms + .iter() + .try_stream() + .broad_and_then(async |(room_id, todo_room)| { + let is_invited = all_invited_rooms + .clone() + .any(is_equal_to!(room_id)); + + let room = + handle_room(services, next_batch, sync_info, room_id, todo_room, is_invited) + .await?; + + Ok((room_id, room)) + }) + .ready_try_filter_map(|(room_id, room)| Ok(room.map(|room| (room_id, room)))) + .map_ok(|(room_id, room)| (room_id.to_owned(), room)) + .try_collect() + .await?; + Ok(rooms) } -async fn collect_account_data( +#[tracing::instrument(level = "debug", skip_all, fields(room_id, roomsince))] +#[allow(clippy::too_many_arguments)] +async fn handle_room( services: &Services, - (sender_user, _, globalsince, body): (&UserId, &DeviceId, u64, &sync_events::v5::Request), next_batch: u64, -) -> sync_events::v5::response::AccountData { - let mut account_data = sync_events::v5::response::AccountData { - global: Vec::new(), - rooms: BTreeMap::new(), + (sender_user, _, _globalsince, _): &SyncInfo<'_>, + room_id: &RoomId, + (required_state_request, timeline_limit, roomsince): &TodoRoom, + is_invited: bool, +) -> Result> { + let timeline: OptionFuture<_> = is_invited + .eq(&false) + .then(|| { + load_timeline( + services, + sender_user, + room_id, + PduCount::Normal(*roomsince), + Some(PduCount::from(next_batch)), + *timeline_limit, + ) + }) + .into(); + + let Ok(timeline) = timeline.await.transpose() else { + debug_error!(?room_id, "Missing timeline."); + return Ok(None); }; - if !body + let (timeline_pdus, limited, _lastcount) = + timeline.unwrap_or_else(|| (Vec::new(), true, PduCount::default())); + + if *roomsince != 0 && timeline_pdus.is_empty() && !is_invited { + return Ok(None); + } + + let prev_batch = timeline_pdus + .first() + .map(at!(0)) + .map(PduCount::into_unsigned) + .or_else(|| roomsince.ne(&0).then_some(*roomsince)) + .as_ref() + .map(ToString::to_string); + + let bump_stamp = timeline_pdus + .iter() + .filter(|(_, pdu)| { + DEFAULT_BUMP_TYPES + .binary_search(pdu.event_type()) + .is_ok() + }) + .fold(Option::::None, |mut bump_stamp, (_, pdu)| { + let ts = pdu.origin_server_ts().get(); + if bump_stamp.is_none_or(|bump_stamp| bump_stamp < ts) { + bump_stamp.replace(ts); + } + + bump_stamp + }); + + let lazy = required_state_request + .iter() + .any(is_equal_to!(&(StateEventType::RoomMember, "$LAZY".into()))); + + let mut timeline_senders: Vec<_> = timeline_pdus + .iter() + .filter(|_| lazy) + .map(ref_at!(1)) + .map(Event::sender) + .collect(); + + timeline_senders.sort(); + timeline_senders.dedup(); + let timeline_senders = timeline_senders + .iter() + .map(|sender| (StateEventType::RoomMember, StateKey::from_str(sender.as_str()))); + + let required_state = required_state_request + .iter() + .cloned() + .chain(timeline_senders) + .stream() + .broad_filter_map(async |state| { + let state_key: StateKey = match state.1.as_str() { + | "$LAZY" => return None, + | "$ME" => sender_user.as_str().into(), + | _ => state.1.clone(), + }; + + services + .state_accessor + .room_state_get(room_id, &state.0, &state_key) + .map_ok(Event::into_format) + .ok() + .await + }) + .collect(); + + // TODO: figure out a timestamp we can use for remote invites + let invite_state: OptionFuture<_> = is_invited + .then(|| { + services + .state_cache + .invite_state(sender_user, room_id) + .ok() + }) + .into(); + + let timeline = timeline_pdus + .iter() + .stream() + .filter_map(|item| ignored_filter(services, item.clone(), sender_user)) + .map(at!(1)) + .map(Event::into_format) + .collect(); + + let room_name = services + .state_accessor + .get_name(room_id) + .map(Result::ok); + + let room_avatar = services + .state_accessor + .get_avatar(room_id) + .map_ok(|content| content.url) + .ok() + .map(Option::flatten); + + let highlight_count = services + .user + .highlight_count(sender_user, room_id) + .map(TryInto::try_into) + .map(Result::ok); + + let notification_count = services + .user + .notification_count(sender_user, room_id) + .map(TryInto::try_into) + .map(Result::ok); + + let joined_count = services + .state_cache + .room_joined_count(room_id) + .map_ok(TryInto::try_into) + .map_ok(Result::ok) + .map(FlatOk::flat_ok); + + let invited_count = services + .state_cache + .room_invited_count(room_id) + .map_ok(TryInto::try_into) + .map_ok(Result::ok) + .map(FlatOk::flat_ok); + + let meta = join(room_name, room_avatar); + let events = join3(timeline, required_state, invite_state); + let member_counts = join(joined_count, invited_count); + let notification_counts = join(highlight_count, notification_count); + let ( + (room_name, room_avatar), + (timeline, required_state, invite_state), + (joined_count, invited_count), + (highlight_count, notification_count), + ) = join4(meta, events, member_counts, notification_counts) + .boxed() + .await; + + let (heroes, hero_name, heroes_avatar) = calculate_heroes( + services, + sender_user, + room_id, + room_name.as_deref(), + room_avatar.as_deref(), + ) + .await?; + + let num_live = None; // Count events in timeline greater than global sync counter + + Ok(Some(response::Room { + initial: Some(*roomsince == 0), + name: room_name.or(hero_name), + avatar: JsOption::from_option(room_avatar.or(heroes_avatar)), + invite_state: invite_state.flatten(), + required_state, + timeline, + is_dm: None, + prev_batch, + limited, + bump_stamp, + heroes, + num_live, + joined_count, + invited_count, + unread_notifications: UnreadNotificationsCount { highlight_count, notification_count }, + })) +} + +#[tracing::instrument(level = "debug", skip_all, fields(room_id, roomsince))] +#[allow(clippy::type_complexity)] +async fn calculate_heroes( + services: &Services, + sender_user: &UserId, + room_id: &RoomId, + room_name: Option<&str>, + room_avatar: Option<&MxcUri>, +) -> Result<(Option>, Option, Option)> { + const MAX_HEROES: usize = 5; + let heroes: Vec<_> = services + .state_cache + .room_members(room_id) + .ready_filter(|&member| member != sender_user) + .ready_filter_map(|member| room_name.is_none().then_some(member)) + .map(ToOwned::to_owned) + .broadn_filter_map(MAX_HEROES, async |user_id| { + let content = services + .state_accessor + .get_member(room_id, &user_id) + .await + .ok()?; + + let name: OptionFuture<_> = content + .displayname + .is_none() + .then(|| services.users.displayname(&user_id).ok()) + .into(); + + let avatar: OptionFuture<_> = content + .avatar_url + .is_none() + .then(|| services.users.avatar_url(&user_id).ok()) + .into(); + + let (name, avatar) = join(name, avatar).await; + let hero = response::Hero { + user_id, + name: name.unwrap_or(content.displayname), + avatar: avatar.unwrap_or(content.avatar_url), + }; + + Some(hero) + }) + .take(MAX_HEROES) + .collect() + .await; + + let hero_name = match heroes.len().cmp(&(1_usize)) { + | Ordering::Less => None, + | Ordering::Equal => Some( + heroes[0] + .name + .clone() + .unwrap_or_else(|| heroes[0].user_id.to_string()), + ), + | Ordering::Greater => { + let firsts = heroes[1..] + .iter() + .map(|h| { + h.name + .clone() + .unwrap_or_else(|| h.user_id.to_string()) + }) + .collect::>() + .join(", "); + + let last = heroes[0] + .name + .clone() + .unwrap_or_else(|| heroes[0].user_id.to_string()); + + Some(format!("{firsts} and {last}")) + }, + }; + + let heroes_avatar = (room_avatar.is_none() && room_name.is_none()) + .then(|| { + heroes + .first() + .and_then(|hero| hero.avatar.clone()) + }) + .flatten(); + + Ok((Some(heroes), hero_name, heroes_avatar)) +} + +#[tracing::instrument( + level = "debug", + skip_all, + fields( + global_since, + known_rooms = known_rooms.len(), + ) +)] +async fn handle_extensions<'a, Rooms>( + services: &Services, + sync_info: SyncInfo<'_>, + next_batch: u64, + known_rooms: &KnownRooms, + todo_rooms: &TodoRooms, + all_joined_rooms: Rooms, +) -> Result +where + Rooms: Iterator + Clone + Send + 'a, +{ + let &(_, _, _, request) = &sync_info; + + let account_data: OptionFuture<_> = request .extensions .account_data .enabled .unwrap_or(false) - { - return sync_events::v5::response::AccountData::default(); - } + .then(|| collect_account_data(services, sync_info, next_batch, known_rooms, todo_rooms)) + .into(); - account_data.global = services + let receipts: OptionFuture<_> = request + .extensions + .receipts + .enabled + .unwrap_or(false) + .then(|| collect_receipts(services, sync_info, next_batch, known_rooms, todo_rooms)) + .into(); + + let typing: OptionFuture<_> = request + .extensions + .typing + .enabled + .unwrap_or(false) + .then(|| collect_typing(services, sync_info, next_batch, known_rooms, todo_rooms)) + .into(); + + let to_device: OptionFuture<_> = request + .extensions + .to_device + .enabled + .unwrap_or(false) + .then(|| collect_to_device(services, sync_info, next_batch)) + .into(); + + let e2ee: OptionFuture<_> = request + .extensions + .e2ee + .enabled + .unwrap_or(false) + .then(|| { + collect_e2ee(services, sync_info, next_batch, todo_rooms, all_joined_rooms.clone()) + }) + .into(); + + let (account_data, receipts, typing, to_device, e2ee) = + join5(account_data, receipts, typing, to_device, e2ee) + .map(apply!(5, |t: Option<_>| t.unwrap_or(Ok(Default::default())))) + .await; + + Ok(response::Extensions { + account_data: account_data?, + receipts: receipts?, + typing: typing?, + to_device: to_device?, + e2ee: e2ee?, + }) +} + +#[tracing::instrument(level = "trace", skip_all, fields(globalsince, next_batch))] +async fn collect_account_data( + services: &Services, + sync_info: SyncInfo<'_>, + next_batch: u64, + known_rooms: &KnownRooms, + todo_rooms: &TodoRooms, +) -> Result { + let (sender_user, _, globalsince, request) = sync_info; + let data = &request.extensions.account_data; + let rooms = extension_rooms_todo( + sync_info, + known_rooms, + todo_rooms, + data.lists.as_ref(), + data.rooms.as_ref(), + ) + .stream() + .broad_filter_map(async |room_id| { + let &(_, _, roomsince) = todo_rooms.get(room_id)?; + let changes: Vec<_> = services + .account_data + .changes_since(Some(room_id), sender_user, roomsince, Some(next_batch)) + .ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Room)) + .collect() + .await; + + changes + .is_empty() + .eq(&false) + .then(move || (room_id.to_owned(), changes)) + }) + .collect(); + + let global = services .account_data .changes_since(None, sender_user, globalsince, Some(next_batch)) .ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Global)) + .collect(); + + let (global, rooms) = join(global, rooms).await; + + Ok(response::AccountData { global, rooms }) +} + +#[tracing::instrument(level = "trace", skip_all)] +async fn collect_receipts( + services: &Services, + sync_info: SyncInfo<'_>, + next_batch: u64, + known_rooms: &KnownRooms, + todo_rooms: &TodoRooms, +) -> Result { + let (_, _, _, request) = sync_info; + let data = &request.extensions.receipts; + let rooms = extension_rooms_todo( + sync_info, + known_rooms, + todo_rooms, + data.lists.as_ref(), + data.rooms.as_ref(), + ) + .stream() + .broad_filter_map(async |room_id| { + collect_receipt(services, sync_info, next_batch, todo_rooms, room_id).await + }) + .collect() + .await; + + Ok(response::Receipts { rooms }) +} + +async fn collect_receipt( + services: &Services, + (sender_user, ..): SyncInfo<'_>, + next_batch: u64, + todo_rooms: &TodoRooms, + room_id: &RoomId, +) -> Option<(OwnedRoomId, Raw)> { + let &(_, _, roomsince) = todo_rooms.get(room_id)?; + let private_receipt = services + .read_receipt + .last_privateread_update(sender_user, room_id) + .then(async |last_private_update| { + if last_private_update <= roomsince || last_private_update > next_batch { + return None; + } + + services + .read_receipt + .private_read_get(room_id, sender_user) + .map(Some) + .await + }) + .map(Option::into_iter) + .map(Iterator::flatten) + .map(IterStream::stream) + .flatten_stream(); + + let receipts: Vec> = services + .read_receipt + .readreceipts_since(room_id, roomsince, Some(next_batch)) + .filter_map(async |(read_user, _ts, v)| { + services + .users + .user_is_ignored(read_user, sender_user) + .await + .or_some(v) + }) + .chain(private_receipt) + .collect() + .boxed() + .await; + + receipts + .is_empty() + .eq(&false) + .then(|| (room_id.to_owned(), pack_receipts(receipts.into_iter()))) +} + +#[tracing::instrument(level = "trace", skip_all, fields(globalsince))] +async fn collect_typing( + services: &Services, + sync_info: SyncInfo<'_>, + _next_batch: u64, + known_rooms: &KnownRooms, + todo_rooms: &TodoRooms, +) -> Result { + use response::Typing; + use ruma::events::typing::SyncTypingEvent; + + let (sender_user, _, _, request) = sync_info; + let data = &request.extensions.typing; + extension_rooms_todo( + sync_info, + known_rooms, + todo_rooms, + data.lists.as_ref(), + data.rooms.as_ref(), + ) + .stream() + .filter_map(async |room_id| { + services + .typing + .typing_users_for_user(room_id, sender_user) + .inspect_err(|e| debug_error!(%room_id, "Failed to get typing events: {e}")) + .await + .ok() + .filter(|users| !users.is_empty()) + .map(|users| (room_id, users)) + }) + .ready_filter_map(|(room_id, users)| { + let content = TypingEventContent::new(users); + let event = SyncTypingEvent { content }; + let event = Raw::new(&event); + + Some((room_id.to_owned(), event.ok()?)) + }) + .collect::>() + .map(|rooms| Typing { rooms }) + .map(Ok) + .await +} + +#[tracing::instrument(level = "trace", skip_all, fields(globalsince, next_batch))] +async fn collect_to_device( + services: &Services, + (sender_user, sender_device, globalsince, _request): SyncInfo<'_>, + next_batch: u64, +) -> Result> { + services + .users + .remove_to_device_events(sender_user, sender_device, globalsince) + .await; + + let events: Vec<_> = services + .users + .get_to_device_events(sender_user, sender_device, None, Some(next_batch)) .collect() .await; - if let Some(rooms) = &body.extensions.account_data.rooms { - for room in rooms - .iter() - .filter_map(|erc| extract_variant!(erc, ExtensionRoomConfig::Room)) - { - account_data.rooms.insert( - room.clone(), - services - .account_data - .changes_since(Some(room), sender_user, globalsince, Some(next_batch)) - .ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Room)) - .collect() - .await, - ); - } - } + let to_device = events + .is_empty() + .eq(&false) + .then(|| response::ToDevice { + next_batch: next_batch.to_string(), + events, + }); - account_data + Ok(to_device) } +// TODO ---------------------------------------------------------------------- + +#[tracing::instrument( + level = "trace", + skip_all, + fields( + globalsince, + next_batch, + all_joined_rooms = all_joined_rooms.clone().count(), + ) +)] async fn collect_e2ee<'a, Rooms>( services: &Services, - (sender_user, sender_device, globalsince, body): ( - &UserId, - &DeviceId, - u64, - &sync_events::v5::Request, - ), + (sender_user, sender_device, globalsince, _): SyncInfo<'_>, next_batch: u64, + _todo_rooms: &TodoRooms, all_joined_rooms: Rooms, -) -> Result +) -> Result where - Rooms: Iterator + Send + 'a, + Rooms: Iterator + Clone + Send + 'a, { - if !body.extensions.e2ee.enabled.unwrap_or(false) { - return Ok(sync_events::v5::response::E2EE::default()); - } - let mut left_encrypted_users = HashSet::new(); // Users that have left any encrypted rooms the sender was in + // Users that have left any encrypted rooms the sender was in + let mut left_encrypted_users = HashSet::new(); let mut device_list_changes = HashSet::new(); let mut device_list_left = HashSet::new(); // Look for device list updates of this account @@ -911,13 +1240,24 @@ where } } - Ok(sync_events::v5::response::E2EE { - device_unused_fallback_key_types: None, + let last_otk_update = services + .users + .last_one_time_keys_update(sender_user) + .await; - device_one_time_keys_count: services - .users - .count_one_time_keys(sender_user, sender_device) - .await, + let device_otk_count: OptionFuture<_> = last_otk_update + .gt(&globalsince) + .then(|| { + services + .users + .count_one_time_keys(sender_user, sender_device) + }) + .into(); + + Ok(response::E2EE { + device_one_time_keys_count: device_otk_count.await.unwrap_or_default(), + + device_unused_fallback_key_types: None, device_lists: DeviceLists { changed: device_list_changes.into_iter().collect(), @@ -926,109 +1266,42 @@ where }) } -async fn collect_to_device( - services: &Services, - (sender_user, sender_device, globalsince, body): SyncInfo<'_>, - next_batch: u64, -) -> Option { - if !body.extensions.to_device.enabled.unwrap_or(false) { - return None; - } +// ---------------------------------------------------------------------------- - services - .users - .remove_to_device_events(sender_user, sender_device, globalsince) - .await; +fn extension_rooms_todo<'a>( + (_, _, _, request): SyncInfo<'a>, + known_rooms: &'a KnownRooms, + todo_rooms: &'a TodoRooms, + lists: Option<&'a Vec>, + rooms: Option<&'a Vec>, +) -> impl Iterator + Send + 'a { + let lists_explicit = lists.into_iter().flat_map(|vec| vec.iter()); - Some(sync_events::v5::response::ToDevice { - next_batch: next_batch.to_string(), - events: services - .users - .get_to_device_events(sender_user, sender_device, None, Some(next_batch)) - .collect() - .await, - }) -} + let lists_requested = request + .lists + .keys() + .filter(move |_| lists.is_none()); -async fn collect_typing_events<'a, Rooms>( - services: &Services, - (sender_user, _, _, body): SyncInfo<'_>, - _next_batch: u64, - rooms: Rooms, -) -> Result -where - Rooms: Iterator + Send + 'a, -{ - use sync_events::v5::response::Typing; + let rooms_explicit = rooms + .into_iter() + .flat_map(|vec| vec.iter()) + .filter_map(|erc| extract_variant!(erc, ExtensionRoomConfig::Room)) + .map(AsRef::::as_ref); - if !body.extensions.typing.enabled.unwrap_or(false) { - return Ok(Typing::default()); - } + let rooms_implicit = todo_rooms + .keys() + .map(AsRef::as_ref) + .filter(move |_| rooms.is_none()); - rooms - .stream() - .filter_map(async |room_id| { - services - .typing - .typing_users_for_user(room_id, sender_user) - .inspect_err(|e| warn!(%room_id, "Failed to get typing events for room: {e}")) - .await - .ok() - .filter(|users| !users.is_empty()) - .map(|users| Ok((room_id, users))) + lists_explicit + .chain(lists_requested) + .flat_map(|list_id| { + known_rooms + .get(list_id.as_str()) + .into_iter() + .flat_map(BTreeMap::keys) }) - .ready_try_fold_default(|mut response: Typing, (room_id, users)| { - response.rooms.insert( - room_id.to_owned(), - Raw::new(&ruma::events::typing::SyncTypingEvent { - content: TypingEventContent::new(users), - })?, - ); - - Ok(response) - }) - .await -} - -async fn collect_receipts( - _services: &Services, - (_sender_user, _, _globalsince, _body): SyncInfo<'_>, - _next_batch: u64, -) -> sync_events::v5::response::Receipts { - sync_events::v5::response::Receipts { rooms: BTreeMap::new() } - // TODO: get explicitly requested read receipts -} - -fn filter_rooms<'a, Rooms>( - services: &'a Services, - filter: &'a [RoomTypeFilter], - negate: &'a bool, - rooms: Rooms, -) -> impl Stream + Send + 'a -where - Rooms: Stream + Send + 'a, -{ - rooms.filter_map(async |room_id| { - let room_type = services - .state_accessor - .get_room_type(room_id) - .await; - - if room_type - .as_ref() - .is_err_and(|e| !e.is_not_found()) - { - return None; - } - - let room_type_filter = RoomTypeFilter::from(room_type.ok()); - - let include = if *negate { - !filter.contains(&room_type_filter) - } else { - filter.is_empty() || filter.contains(&room_type_filter) - }; - - include.then_some(room_id) - }) + .map(AsRef::as_ref) + .chain(rooms_explicit) + .chain(rooms_implicit) } diff --git a/src/service/sync/mod.rs b/src/service/sync/mod.rs index b253fea6..d0dd397a 100644 --- a/src/service/sync/mod.rs +++ b/src/service/sync/mod.rs @@ -5,8 +5,11 @@ use std::{ sync::{Arc, Mutex, Mutex as StdMutex}, }; -use ruma::{OwnedDeviceId, OwnedRoomId, OwnedUserId, api::client::sync::sync_events::v5}; -use tuwunel_core::Result; +use ruma::{ + OwnedDeviceId, OwnedRoomId, OwnedUserId, + api::client::sync::sync_events::v5::{Request, request}, +}; +use tuwunel_core::{Result, implement, smallstr::SmallString}; use tuwunel_database::Map; pub struct Service { @@ -31,17 +34,21 @@ pub struct Data { userid_lastonetimekeyupdate: Arc, } -#[derive(Default)] +#[derive(Debug, Default)] struct SnakeSyncCache { - lists: BTreeMap, - subscriptions: BTreeMap, - known_rooms: BTreeMap>, - extensions: v5::request::Extensions, + lists: BTreeMap, + subscriptions: RoomSubscriptions, + known_rooms: KnownRooms, + extensions: request::Extensions, } -type DbConnections = Mutex>; -type SnakeConnectionsKey = (OwnedUserId, OwnedDeviceId, Option); +pub type KnownRooms = BTreeMap>; +pub type RoomSubscriptions = BTreeMap; +pub type SnakeConnectionsKey = (OwnedUserId, OwnedDeviceId, Option); type SnakeConnectionsVal = Arc>; +type DbConnections = Mutex>; +pub type ListId = SmallString<[u8; 16]>; +pub type ConnId = SmallString<[u8; 16]>; impl crate::Service for Service { fn build(args: crate::Args<'_>) -> Result> { @@ -69,185 +76,189 @@ impl crate::Service for Service { fn name(&self) -> &str { crate::service::make_name(std::module_path!()) } } -impl Service { - pub fn snake_connection_cached(&self, key: &SnakeConnectionsKey) -> bool { - self.snake_connections - .lock() - .expect("locked") - .contains_key(key) - } +#[implement(Service)] +pub fn update_snake_sync_request_with_cache( + &self, + snake_key: &SnakeConnectionsKey, + request: &mut Request, +) -> KnownRooms { + let mut cache = self.snake_connections.lock().expect("locked"); + let cached = Arc::clone( + cache + .entry(snake_key.clone()) + .or_insert_with(|| Arc::new(Mutex::new(SnakeSyncCache::default()))), + ); - pub fn forget_snake_sync_connection(&self, key: &SnakeConnectionsKey) { - self.snake_connections - .lock() - .expect("locked") - .remove(key); - } + let cached = &mut cached.lock().expect("locked"); + drop(cache); - pub fn update_snake_sync_request_with_cache( - &self, - snake_key: &SnakeConnectionsKey, - request: &mut v5::Request, - ) -> BTreeMap> { - let mut cache = self.snake_connections.lock().expect("locked"); - let cached = Arc::clone( - cache - .entry(snake_key.clone()) - .or_insert_with(|| Arc::new(Mutex::new(SnakeSyncCache::default()))), - ); - let cached = &mut cached.lock().expect("locked"); - drop(cache); + //Request::try_from_http_request(req, path_args); + for (list_id, list) in &mut request.lists { + if let Some(cached_list) = cached.lists.get(list_id.as_str()) { + list_or_sticky( + &mut list.room_details.required_state, + &cached_list.room_details.required_state, + ); - //v5::Request::try_from_http_request(req, path_args); - for (list_id, list) in &mut request.lists { - if let Some(cached_list) = cached.lists.get(list_id) { - list_or_sticky( - &mut list.room_details.required_state, - &cached_list.room_details.required_state, - ); + //some_or_sticky(&mut list.include_heroes, cached_list.include_heroes); - //some_or_sticky(&mut list.include_heroes, cached_list.include_heroes); - - match (&mut list.filters, cached_list.filters.clone()) { - | (Some(filters), Some(cached_filters)) => { - some_or_sticky(&mut filters.is_invite, cached_filters.is_invite); - // TODO (morguldir): Find out how a client can unset this, probably need - // to change into an option inside ruma - list_or_sticky( - &mut filters.not_room_types, - &cached_filters.not_room_types, - ); - }, - | (_, Some(cached_filters)) => list.filters = Some(cached_filters), - | (Some(list_filters), _) => list.filters = Some(list_filters.clone()), - | (..) => {}, - } + match (&mut list.filters, cached_list.filters.clone()) { + | (Some(filters), Some(cached_filters)) => { + some_or_sticky(&mut filters.is_invite, cached_filters.is_invite); + // TODO (morguldir): Find out how a client can unset this, probably need + // to change into an option inside ruma + list_or_sticky(&mut filters.not_room_types, &cached_filters.not_room_types); + }, + | (_, Some(cached_filters)) => list.filters = Some(cached_filters), + | (Some(list_filters), _) => list.filters = Some(list_filters.clone()), + | (..) => {}, } - cached.lists.insert(list_id.clone(), list.clone()); } cached - .subscriptions - .extend(request.room_subscriptions.clone()); - request - .room_subscriptions - .extend(cached.subscriptions.clone()); - - request.extensions.e2ee.enabled = request - .extensions - .e2ee - .enabled - .or(cached.extensions.e2ee.enabled); - - request.extensions.to_device.enabled = request - .extensions - .to_device - .enabled - .or(cached.extensions.to_device.enabled); - - request.extensions.account_data.enabled = request - .extensions - .account_data - .enabled - .or(cached.extensions.account_data.enabled); - request.extensions.account_data.lists = request - .extensions - .account_data .lists - .clone() - .or_else(|| cached.extensions.account_data.lists.clone()); - request.extensions.account_data.rooms = request - .extensions - .account_data - .rooms - .clone() - .or_else(|| cached.extensions.account_data.rooms.clone()); - - some_or_sticky(&mut request.extensions.typing.enabled, cached.extensions.typing.enabled); - some_or_sticky( - &mut request.extensions.typing.rooms, - cached.extensions.typing.rooms.clone(), - ); - some_or_sticky( - &mut request.extensions.typing.lists, - cached.extensions.typing.lists.clone(), - ); - some_or_sticky( - &mut request.extensions.receipts.enabled, - cached.extensions.receipts.enabled, - ); - some_or_sticky( - &mut request.extensions.receipts.rooms, - cached.extensions.receipts.rooms.clone(), - ); - some_or_sticky( - &mut request.extensions.receipts.lists, - cached.extensions.receipts.lists.clone(), - ); - - cached.extensions = request.extensions.clone(); - cached.known_rooms.clone() + .insert(list_id.as_str().into(), list.clone()); } - pub fn update_snake_sync_known_rooms( - &self, - key: &SnakeConnectionsKey, - list_id: String, - new_cached_rooms: BTreeSet, - globalsince: u64, - ) { - assert!(key.2.is_some(), "Some(conn_id) required for this call"); - let mut cache = self.snake_connections.lock().expect("locked"); - let cached = Arc::clone( - cache - .entry(key.clone()) - .or_insert_with(|| Arc::new(Mutex::new(SnakeSyncCache::default()))), - ); - let cached = &mut cached.lock().expect("locked"); - drop(cache); + cached + .subscriptions + .extend(request.room_subscriptions.clone()); - for (room_id, lastsince) in cached - .known_rooms - .entry(list_id.clone()) - .or_default() - .iter_mut() - { - if !new_cached_rooms.contains(room_id) { - *lastsince = 0; - } - } - let list = cached.known_rooms.entry(list_id).or_default(); - for room_id in new_cached_rooms { - list.insert(room_id, globalsince); + request + .room_subscriptions + .extend(cached.subscriptions.clone()); + + request.extensions.e2ee.enabled = request + .extensions + .e2ee + .enabled + .or(cached.extensions.e2ee.enabled); + + request.extensions.to_device.enabled = request + .extensions + .to_device + .enabled + .or(cached.extensions.to_device.enabled); + + request.extensions.account_data.enabled = request + .extensions + .account_data + .enabled + .or(cached.extensions.account_data.enabled); + request.extensions.account_data.lists = request + .extensions + .account_data + .lists + .clone() + .or_else(|| cached.extensions.account_data.lists.clone()); + request.extensions.account_data.rooms = request + .extensions + .account_data + .rooms + .clone() + .or_else(|| cached.extensions.account_data.rooms.clone()); + + { + let (request, cached) = (&mut request.extensions.typing, &cached.extensions.typing); + some_or_sticky(&mut request.enabled, cached.enabled); + some_or_sticky(&mut request.rooms, cached.rooms.clone()); + some_or_sticky(&mut request.lists, cached.lists.clone()); + }; + { + let (request, cached) = (&mut request.extensions.receipts, &cached.extensions.receipts); + some_or_sticky(&mut request.enabled, cached.enabled); + some_or_sticky(&mut request.rooms, cached.rooms.clone()); + some_or_sticky(&mut request.lists, cached.lists.clone()); + }; + + cached.extensions = request.extensions.clone(); + cached.known_rooms.clone() +} + +#[implement(Service)] +pub fn update_snake_sync_known_rooms( + &self, + key: &SnakeConnectionsKey, + list_id: ListId, + new_cached_rooms: BTreeSet, + globalsince: u64, +) { + assert!(key.2.is_some(), "Some(conn_id) required for this call"); + + let mut cache = self.snake_connections.lock().expect("locked"); + let cached = Arc::clone( + cache + .entry(key.clone()) + .or_insert_with(|| Arc::new(Mutex::new(SnakeSyncCache::default()))), + ); + + let cached = &mut cached.lock().expect("locked"); + drop(cache); + + for (room_id, lastsince) in cached + .known_rooms + .entry(list_id.clone()) + .or_default() + .iter_mut() + { + if !new_cached_rooms.contains(room_id) { + *lastsince = 0; } } - pub fn update_snake_sync_subscriptions( - &self, - key: &SnakeConnectionsKey, - subscriptions: BTreeMap, - ) { - let mut cache = self.snake_connections.lock().expect("locked"); - let cached = Arc::clone( - cache - .entry(key.clone()) - .or_insert_with(|| Arc::new(Mutex::new(SnakeSyncCache::default()))), - ); - let cached = &mut cached.lock().expect("locked"); - drop(cache); - - cached.subscriptions = subscriptions; + let list = cached.known_rooms.entry(list_id).or_default(); + for room_id in new_cached_rooms { + list.insert(room_id, globalsince); } } +#[implement(Service)] +pub fn update_snake_sync_subscriptions( + &self, + key: &SnakeConnectionsKey, + subscriptions: RoomSubscriptions, +) { + let mut cache = self.snake_connections.lock().expect("locked"); + let cached = Arc::clone( + cache + .entry(key.clone()) + .or_insert_with(|| Arc::new(Mutex::new(SnakeSyncCache::default()))), + ); + + let cached = &mut cached.lock().expect("locked"); + drop(cache); + + cached.subscriptions = subscriptions; +} + +#[implement(Service)] +pub fn forget_snake_sync_connection(&self, key: &SnakeConnectionsKey) { + self.snake_connections + .lock() + .expect("locked") + .remove(key); +} + +#[implement(Service)] +pub fn snake_connection_cached(&self, key: &SnakeConnectionsKey) -> bool { + self.snake_connections + .lock() + .expect("locked") + .contains_key(key) +} + #[inline] -pub fn into_snake_key(user_id: U, device_id: D, conn_id: C) -> SnakeConnectionsKey +pub fn into_snake_key( + user_id: U, + device_id: D, + conn_id: Option, +) -> SnakeConnectionsKey where U: Into, D: Into, - C: Into>, + C: Into, { - (user_id.into(), device_id.into(), conn_id.into()) + (user_id.into(), device_id.into(), conn_id.map(Into::into)) } /// load params from cache if body doesn't contain it, as long as it's allowed