diff --git a/src/api/client/sync/v5.rs b/src/api/client/sync/v5.rs index 793446f5..e7cef0b3 100644 --- a/src/api/client/sync/v5.rs +++ b/src/api/client/sync/v5.rs @@ -1,6 +1,12 @@ +mod account_data; +mod e2ee; +mod receipts; +mod room; +mod to_device; +mod typing; + use std::{ - cmp::Ordering, - collections::{BTreeMap, BTreeSet, HashSet}, + collections::{BTreeMap, BTreeSet}, mem::take, ops::Deref, time::Duration, @@ -9,36 +15,27 @@ use std::{ use axum::extract::State; use futures::{ FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt, - future::{OptionFuture, join, join3, join4, join5, try_join}, + future::{OptionFuture, join4, join5, try_join}, pin_mut, - stream::once, }; use ruma::{ - DeviceId, JsOption, MxcUri, OwnedMxcUri, OwnedRoomId, OwnedUserId, RoomId, UInt, UserId, - api::client::sync::sync_events::{ - DeviceLists, UnreadNotificationsCount, - v5::{Request, Response, request::ExtensionRoomConfig, response}, + DeviceId, OwnedRoomId, RoomId, UInt, UserId, + api::client::sync::sync_events::v5::{ + Request, Response, request::ExtensionRoomConfig, response, }, directory::RoomTypeFilter, - events::{ - AnyRawAccountDataEvent, AnySyncEphemeralRoomEvent, StateEventType, TimelineEventType, - receipt::SyncReceiptEvent, - room::member::{MembershipState, RoomMemberEventContent}, - typing::TypingEventContent, - }, - serde::Raw, uint, }; use tokio::time::{Instant, timeout_at}; use tuwunel_core::{ - Err, Result, apply, at, debug_error, error, + Err, Result, apply, error::inspect_log, extract_variant, is_equal_to, - matrix::{Event, StateKey, TypeStateKey, pdu::PduCount}, - pair_of, ref_at, trace, + matrix::TypeStateKey, + trace, utils::{ - BoolExt, FutureBoolExt, IterStream, ReadyExt, TryFutureExtExt, - future::{OptionStream, ReadyEqExt}, + FutureBoolExt, IterStream, ReadyExt, TryFutureExtExt, + future::ReadyEqExt, math::{ruma_from_usize, usize_from_ruma}, result::FlatOk, stream::{BroadbandExt, TryBroadbandExt, TryReadyExt, WidebandExt}, @@ -47,15 +44,11 @@ use tuwunel_core::{ }; use tuwunel_service::{ Services, - rooms::read_receipt::pack_receipts, sync::{KnownRooms, into_snake_key}, }; use super::share_encrypted_room; -use crate::{ - Ruma, - client::{DEFAULT_BUMP_TYPES, ignored_filter, sync::load_timeline}, -}; +use crate::{Ruma, client::DEFAULT_BUMP_TYPES}; type SyncInfo<'a> = (&'a UserId, &'a DeviceId, u64, &'a Request); type TodoRooms = BTreeMap; @@ -492,7 +485,7 @@ where .any(is_equal_to!(room_id)); let room = - handle_room(services, next_batch, sync_info, room_id, todo_room, is_invited) + room::handle(services, next_batch, sync_info, room_id, todo_room, is_invited) .await?; Ok((room_id, room)) @@ -505,290 +498,6 @@ where Ok(rooms) } -#[tracing::instrument(level = "debug", skip_all, fields(room_id, roomsince))] -#[allow(clippy::too_many_arguments)] -async fn handle_room( - services: &Services, - next_batch: u64, - (sender_user, _, _globalsince, _): &SyncInfo<'_>, - room_id: &RoomId, - (required_state_request, timeline_limit, roomsince): &TodoRoom, - is_invited: bool, -) -> Result> { - let timeline: OptionFuture<_> = is_invited - .eq(&false) - .then(|| { - load_timeline( - services, - sender_user, - room_id, - PduCount::Normal(*roomsince), - Some(PduCount::from(next_batch)), - *timeline_limit, - ) - }) - .into(); - - let Ok(timeline) = timeline.await.transpose() else { - debug_error!(?room_id, "Missing timeline."); - return Ok(None); - }; - - let (timeline_pdus, limited, _lastcount) = - timeline.unwrap_or_else(|| (Vec::new(), true, PduCount::default())); - - if *roomsince != 0 && timeline_pdus.is_empty() && !is_invited { - return Ok(None); - } - - let prev_batch = timeline_pdus - .first() - .map(at!(0)) - .map(PduCount::into_unsigned) - .or_else(|| roomsince.ne(&0).then_some(*roomsince)) - .as_ref() - .map(ToString::to_string); - - let bump_stamp = timeline_pdus - .iter() - .filter(|(_, pdu)| { - DEFAULT_BUMP_TYPES - .binary_search(pdu.event_type()) - .is_ok() - }) - .fold(Option::::None, |mut bump_stamp, (_, pdu)| { - let ts = pdu.origin_server_ts().get(); - if bump_stamp.is_none_or(|bump_stamp| bump_stamp < ts) { - bump_stamp.replace(ts); - } - - bump_stamp - }); - - let lazy = required_state_request - .iter() - .any(is_equal_to!(&(StateEventType::RoomMember, "$LAZY".into()))); - - let mut timeline_senders: Vec<_> = timeline_pdus - .iter() - .filter(|_| lazy) - .map(ref_at!(1)) - .map(Event::sender) - .collect(); - - timeline_senders.sort(); - timeline_senders.dedup(); - let timeline_senders = timeline_senders - .iter() - .map(|sender| (StateEventType::RoomMember, StateKey::from_str(sender.as_str()))); - - let required_state = required_state_request - .iter() - .cloned() - .chain(timeline_senders) - .stream() - .broad_filter_map(async |state| { - let state_key: StateKey = match state.1.as_str() { - | "$LAZY" => return None, - | "$ME" => sender_user.as_str().into(), - | _ => state.1.clone(), - }; - - services - .state_accessor - .room_state_get(room_id, &state.0, &state_key) - .map_ok(Event::into_format) - .ok() - .await - }) - .collect(); - - // TODO: figure out a timestamp we can use for remote invites - let invite_state: OptionFuture<_> = is_invited - .then(|| { - services - .state_cache - .invite_state(sender_user, room_id) - .ok() - }) - .into(); - - let timeline = timeline_pdus - .iter() - .stream() - .filter_map(|item| ignored_filter(services, item.clone(), sender_user)) - .map(at!(1)) - .map(Event::into_format) - .collect(); - - let room_name = services - .state_accessor - .get_name(room_id) - .map(Result::ok); - - let room_avatar = services - .state_accessor - .get_avatar(room_id) - .map_ok(|content| content.url) - .ok() - .map(Option::flatten); - - let highlight_count = services - .user - .highlight_count(sender_user, room_id) - .map(TryInto::try_into) - .map(Result::ok); - - let notification_count = services - .user - .notification_count(sender_user, room_id) - .map(TryInto::try_into) - .map(Result::ok); - - let joined_count = services - .state_cache - .room_joined_count(room_id) - .map_ok(TryInto::try_into) - .map_ok(Result::ok) - .map(FlatOk::flat_ok); - - let invited_count = services - .state_cache - .room_invited_count(room_id) - .map_ok(TryInto::try_into) - .map_ok(Result::ok) - .map(FlatOk::flat_ok); - - let meta = join(room_name, room_avatar); - let events = join3(timeline, required_state, invite_state); - let member_counts = join(joined_count, invited_count); - let notification_counts = join(highlight_count, notification_count); - let ( - (room_name, room_avatar), - (timeline, required_state, invite_state), - (joined_count, invited_count), - (highlight_count, notification_count), - ) = join4(meta, events, member_counts, notification_counts) - .boxed() - .await; - - let (heroes, hero_name, heroes_avatar) = calculate_heroes( - services, - sender_user, - room_id, - room_name.as_deref(), - room_avatar.as_deref(), - ) - .await?; - - let num_live = None; // Count events in timeline greater than global sync counter - - Ok(Some(response::Room { - initial: Some(*roomsince == 0), - name: room_name.or(hero_name), - avatar: JsOption::from_option(room_avatar.or(heroes_avatar)), - invite_state: invite_state.flatten(), - required_state, - timeline, - is_dm: None, - prev_batch, - limited, - bump_stamp, - heroes, - num_live, - joined_count, - invited_count, - unread_notifications: UnreadNotificationsCount { highlight_count, notification_count }, - })) -} - -#[tracing::instrument(level = "debug", skip_all, fields(room_id, roomsince))] -#[allow(clippy::type_complexity)] -async fn calculate_heroes( - services: &Services, - sender_user: &UserId, - room_id: &RoomId, - room_name: Option<&str>, - room_avatar: Option<&MxcUri>, -) -> Result<(Option>, Option, Option)> { - const MAX_HEROES: usize = 5; - let heroes: Vec<_> = services - .state_cache - .room_members(room_id) - .ready_filter(|&member| member != sender_user) - .ready_filter_map(|member| room_name.is_none().then_some(member)) - .map(ToOwned::to_owned) - .broadn_filter_map(MAX_HEROES, async |user_id| { - let content = services - .state_accessor - .get_member(room_id, &user_id) - .await - .ok()?; - - let name: OptionFuture<_> = content - .displayname - .is_none() - .then(|| services.users.displayname(&user_id).ok()) - .into(); - - let avatar: OptionFuture<_> = content - .avatar_url - .is_none() - .then(|| services.users.avatar_url(&user_id).ok()) - .into(); - - let (name, avatar) = join(name, avatar).await; - let hero = response::Hero { - user_id, - name: name.unwrap_or(content.displayname), - avatar: avatar.unwrap_or(content.avatar_url), - }; - - Some(hero) - }) - .take(MAX_HEROES) - .collect() - .await; - - let hero_name = match heroes.len().cmp(&(1_usize)) { - | Ordering::Less => None, - | Ordering::Equal => Some( - heroes[0] - .name - .clone() - .unwrap_or_else(|| heroes[0].user_id.to_string()), - ), - | Ordering::Greater => { - let firsts = heroes[1..] - .iter() - .map(|h| { - h.name - .clone() - .unwrap_or_else(|| h.user_id.to_string()) - }) - .collect::>() - .join(", "); - - let last = heroes[0] - .name - .clone() - .unwrap_or_else(|| heroes[0].user_id.to_string()); - - Some(format!("{firsts} and {last}")) - }, - }; - - let heroes_avatar = (room_avatar.is_none() && room_name.is_none()) - .then(|| { - heroes - .first() - .and_then(|hero| hero.avatar.clone()) - }) - .flatten(); - - Ok((Some(heroes), hero_name, heroes_avatar)) -} - #[tracing::instrument( level = "debug", skip_all, @@ -811,7 +520,7 @@ async fn handle_extensions( .account_data .enabled .unwrap_or(false) - .then(|| collect_account_data(services, sync_info, next_batch, known_rooms, todo_rooms)) + .then(|| account_data::collect(services, sync_info, next_batch, known_rooms, todo_rooms)) .into(); let receipts: OptionFuture<_> = request @@ -819,7 +528,7 @@ async fn handle_extensions( .receipts .enabled .unwrap_or(false) - .then(|| collect_receipts(services, sync_info, next_batch, known_rooms, todo_rooms)) + .then(|| receipts::collect(services, sync_info, next_batch, known_rooms, todo_rooms)) .into(); let typing: OptionFuture<_> = request @@ -827,7 +536,7 @@ async fn handle_extensions( .typing .enabled .unwrap_or(false) - .then(|| collect_typing(services, sync_info, next_batch, known_rooms, todo_rooms)) + .then(|| typing::collect(services, sync_info, next_batch, known_rooms, todo_rooms)) .into(); let to_device: OptionFuture<_> = request @@ -835,7 +544,7 @@ async fn handle_extensions( .to_device .enabled .unwrap_or(false) - .then(|| collect_to_device(services, sync_info, next_batch)) + .then(|| to_device::collect(services, sync_info, next_batch)) .into(); let e2ee: OptionFuture<_> = request @@ -843,7 +552,7 @@ async fn handle_extensions( .e2ee .enabled .unwrap_or(false) - .then(|| collect_e2ee(services, sync_info, next_batch)) + .then(|| e2ee::collect(services, sync_info, next_batch)) .into(); let (account_data, receipts, typing, to_device, e2ee) = @@ -860,394 +569,6 @@ async fn handle_extensions( }) } -#[tracing::instrument(level = "trace", skip_all, fields(globalsince, next_batch))] -async fn collect_account_data( - services: &Services, - sync_info: SyncInfo<'_>, - next_batch: u64, - known_rooms: &KnownRooms, - todo_rooms: &TodoRooms, -) -> Result { - let (sender_user, _, globalsince, request) = sync_info; - let data = &request.extensions.account_data; - let rooms = extension_rooms_todo( - sync_info, - known_rooms, - todo_rooms, - data.lists.as_ref(), - data.rooms.as_ref(), - ) - .stream() - .broad_filter_map(async |room_id| { - let &(_, _, roomsince) = todo_rooms.get(room_id)?; - let changes: Vec<_> = services - .account_data - .changes_since(Some(room_id), sender_user, roomsince, Some(next_batch)) - .ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Room)) - .collect() - .await; - - changes - .is_empty() - .eq(&false) - .then(move || (room_id.to_owned(), changes)) - }) - .collect(); - - let global = services - .account_data - .changes_since(None, sender_user, globalsince, Some(next_batch)) - .ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Global)) - .collect(); - - let (global, rooms) = join(global, rooms).await; - - Ok(response::AccountData { global, rooms }) -} - -#[tracing::instrument(level = "trace", skip_all)] -async fn collect_receipts( - services: &Services, - sync_info: SyncInfo<'_>, - next_batch: u64, - known_rooms: &KnownRooms, - todo_rooms: &TodoRooms, -) -> Result { - let (_, _, _, request) = sync_info; - let data = &request.extensions.receipts; - let rooms = extension_rooms_todo( - sync_info, - known_rooms, - todo_rooms, - data.lists.as_ref(), - data.rooms.as_ref(), - ) - .stream() - .broad_filter_map(async |room_id| { - collect_receipt(services, sync_info, next_batch, todo_rooms, room_id).await - }) - .collect() - .await; - - Ok(response::Receipts { rooms }) -} - -async fn collect_receipt( - services: &Services, - (sender_user, ..): SyncInfo<'_>, - next_batch: u64, - todo_rooms: &TodoRooms, - room_id: &RoomId, -) -> Option<(OwnedRoomId, Raw)> { - let &(_, _, roomsince) = todo_rooms.get(room_id)?; - let private_receipt = services - .read_receipt - .last_privateread_update(sender_user, room_id) - .then(async |last_private_update| { - if last_private_update <= roomsince || last_private_update > next_batch { - return None; - } - - services - .read_receipt - .private_read_get(room_id, sender_user) - .map(Some) - .await - }) - .map(Option::into_iter) - .map(Iterator::flatten) - .map(IterStream::stream) - .flatten_stream(); - - let receipts: Vec> = services - .read_receipt - .readreceipts_since(room_id, roomsince, Some(next_batch)) - .filter_map(async |(read_user, _ts, v)| { - services - .users - .user_is_ignored(read_user, sender_user) - .await - .or_some(v) - }) - .chain(private_receipt) - .collect() - .boxed() - .await; - - receipts - .is_empty() - .eq(&false) - .then(|| (room_id.to_owned(), pack_receipts(receipts.into_iter()))) -} - -#[tracing::instrument(level = "trace", skip_all, fields(globalsince))] -async fn collect_typing( - services: &Services, - sync_info: SyncInfo<'_>, - _next_batch: u64, - known_rooms: &KnownRooms, - todo_rooms: &TodoRooms, -) -> Result { - use response::Typing; - use ruma::events::typing::SyncTypingEvent; - - let (sender_user, _, _, request) = sync_info; - let data = &request.extensions.typing; - extension_rooms_todo( - sync_info, - known_rooms, - todo_rooms, - data.lists.as_ref(), - data.rooms.as_ref(), - ) - .stream() - .filter_map(async |room_id| { - services - .typing - .typing_users_for_user(room_id, sender_user) - .inspect_err(|e| debug_error!(%room_id, "Failed to get typing events: {e}")) - .await - .ok() - .filter(|users| !users.is_empty()) - .map(|users| (room_id, users)) - }) - .ready_filter_map(|(room_id, users)| { - let content = TypingEventContent::new(users); - let event = SyncTypingEvent { content }; - let event = Raw::new(&event); - - Some((room_id.to_owned(), event.ok()?)) - }) - .collect::>() - .map(|rooms| Typing { rooms }) - .map(Ok) - .await -} - -#[tracing::instrument(level = "trace", skip_all, fields(globalsince, next_batch))] -async fn collect_to_device( - services: &Services, - (sender_user, sender_device, globalsince, _request): SyncInfo<'_>, - next_batch: u64, -) -> Result> { - services - .users - .remove_to_device_events(sender_user, sender_device, globalsince) - .await; - - let events: Vec<_> = services - .users - .get_to_device_events(sender_user, sender_device, None, Some(next_batch)) - .collect() - .await; - - let to_device = events - .is_empty() - .eq(&false) - .then(|| response::ToDevice { - next_batch: next_batch.to_string(), - events, - }); - - Ok(to_device) -} - -#[tracing::instrument(level = "trace", skip_all, fields(globalsince, next_batch,))] -async fn collect_e2ee( - services: &Services, - syncinfo: SyncInfo<'_>, - next_batch: u64, -) -> Result { - let &(sender_user, sender_device, globalsince, _) = &syncinfo; - let keys_changed = services - .users - .keys_changed(sender_user, globalsince, Some(next_batch)) - .map(ToOwned::to_owned) - .collect::>() - .map(|changed| (changed, HashSet::new())); - - let (changed, left) = (HashSet::new(), HashSet::new()); - let (changed, left) = services - .state_cache - .rooms_joined(sender_user) - .map(ToOwned::to_owned) - .broad_filter_map(async |room_id| { - collect_e2ee_room(services, syncinfo, next_batch, &room_id) - .await - .ok() - }) - .chain(once(keys_changed)) - .ready_fold((changed, left), |(mut changed, mut left), room| { - changed.extend(room.0); - left.extend(room.1); - (changed, left) - }) - .await; - - let left = left - .into_iter() - .stream() - .filter_map(async |user_id| { - share_encrypted_room(services, sender_user, &user_id, None) - .await - .is_false() - .then_some(user_id) - }) - .collect(); - - let device_one_time_keys_count = services - .users - .last_one_time_keys_update(sender_user) - .then(|since| -> OptionFuture<_> { - since - .gt(&globalsince) - .then(|| { - services - .users - .count_one_time_keys(sender_user, sender_device) - }) - .into() - }) - .map(Option::unwrap_or_default); - - let (left, device_one_time_keys_count) = join(left, device_one_time_keys_count) - .boxed() - .await; - - Ok(response::E2EE { - device_one_time_keys_count, - device_unused_fallback_key_types: None, - device_lists: DeviceLists { - changed: changed.into_iter().collect(), - left, - }, - }) -} - -#[tracing::instrument(level = "trace", skip_all, fields(room_id))] -async fn collect_e2ee_room( - services: &Services, - (sender_user, _, globalsince, _): SyncInfo<'_>, - next_batch: u64, - room_id: &RoomId, -) -> Result)> { - let current_shortstatehash = services - .state - .get_room_shortstatehash(room_id) - .inspect_err(|e| error!("Room {room_id} has no state: {e}")); - - let room_keys_changed = services - .users - .room_keys_changed(room_id, globalsince, Some(next_batch)) - .map(|(user_id, _)| user_id) - .map(ToOwned::to_owned) - .collect::>(); - - let (current_shortstatehash, device_list_changed) = - join(current_shortstatehash, room_keys_changed) - .boxed() - .await; - - let lists = (device_list_changed, HashSet::new()); - let Ok(current_shortstatehash) = current_shortstatehash else { - return Ok(lists); - }; - - if current_shortstatehash <= globalsince { - return Ok(lists); - } - - let Ok(since_shortstatehash) = services - .timeline - .prev_shortstatehash(room_id, PduCount::Normal(globalsince).saturating_add(1)) - .await - else { - return Ok(lists); - }; - - if since_shortstatehash == current_shortstatehash { - return Ok(lists); - } - - let encrypted_room = services - .state_accessor - .state_get(current_shortstatehash, &StateEventType::RoomEncryption, "") - .is_ok(); - - let since_encryption = services - .state_accessor - .state_get(since_shortstatehash, &StateEventType::RoomEncryption, "") - .is_ok(); - - let sender_joined_count = services - .state_cache - .get_joined_count(room_id, sender_user); - - let (encrypted_room, since_encryption, sender_joined_count) = - join3(encrypted_room, since_encryption, sender_joined_count).await; - - if !encrypted_room { - return Ok(lists); - } - - let encrypted_since_last_sync = !since_encryption; - let joined_since_last_sync = sender_joined_count.is_ok_and(|count| count > globalsince); - let joined_members_burst: OptionFuture<_> = (joined_since_last_sync - || encrypted_since_last_sync) - .then(|| { - services - .state_cache - .room_members(room_id) - .ready_filter(|&user_id| user_id != sender_user) - .map(ToOwned::to_owned) - .map(|user_id| (MembershipState::Join, user_id)) - .into_future() - }) - .into(); - - services - .state_accessor - .state_added((since_shortstatehash, current_shortstatehash)) - .broad_filter_map(async |(_shortstatekey, shorteventid)| { - services - .timeline - .get_pdu_from_shorteventid(shorteventid) - .ok() - .await - }) - .ready_filter(|event| *event.kind() == TimelineEventType::RoomMember) - .ready_filter(|event| { - event - .state_key() - .is_some_and(|state_key| state_key != sender_user) - }) - .ready_filter_map(|event| { - let content: RoomMemberEventContent = event.get_content().ok()?; - let user_id: OwnedUserId = event.state_key()?.parse().ok()?; - - Some((content.membership, user_id)) - }) - .chain(joined_members_burst.stream()) - .fold(lists, async |(mut changed, mut left), (membership, user_id)| { - use MembershipState::*; - - let should_add = async |user_id| { - !share_encrypted_room(services, sender_user, user_id, Some(room_id)).await - }; - - match membership { - | Join if should_add(&user_id).await => changed.insert(user_id), - | Leave => left.insert(user_id), - | _ => false, - }; - - (changed, left) - }) - .map(Ok) - .boxed() - .await -} - fn extension_rooms_todo<'a>( (_, _, _, request): SyncInfo<'a>, known_rooms: &'a KnownRooms, diff --git a/src/api/client/sync/v5/account_data.rs b/src/api/client/sync/v5/account_data.rs new file mode 100644 index 00000000..1df3287b --- /dev/null +++ b/src/api/client/sync/v5/account_data.rs @@ -0,0 +1,54 @@ +use futures::{StreamExt, future::join}; +use ruma::{api::client::sync::sync_events::v5::response, events::AnyRawAccountDataEvent}; +use tuwunel_core::{ + Result, extract_variant, + utils::{IterStream, ReadyExt, stream::BroadbandExt}, +}; +use tuwunel_service::Services; + +use super::{KnownRooms, SyncInfo, TodoRooms, extension_rooms_todo}; + +#[tracing::instrument(level = "trace", skip_all, fields(globalsince, next_batch))] +pub(super) async fn collect( + services: &Services, + sync_info: SyncInfo<'_>, + next_batch: u64, + known_rooms: &KnownRooms, + todo_rooms: &TodoRooms, +) -> Result { + let (sender_user, _, globalsince, request) = sync_info; + let data = &request.extensions.account_data; + let rooms = extension_rooms_todo( + sync_info, + known_rooms, + todo_rooms, + data.lists.as_ref(), + data.rooms.as_ref(), + ) + .stream() + .broad_filter_map(async |room_id| { + let &(_, _, roomsince) = todo_rooms.get(room_id)?; + let changes: Vec<_> = services + .account_data + .changes_since(Some(room_id), sender_user, roomsince, Some(next_batch)) + .ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Room)) + .collect() + .await; + + changes + .is_empty() + .eq(&false) + .then(move || (room_id.to_owned(), changes)) + }) + .collect(); + + let global = services + .account_data + .changes_since(None, sender_user, globalsince, Some(next_batch)) + .ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Global)) + .collect(); + + let (global, rooms) = join(global, rooms).await; + + Ok(response::AccountData { global, rooms }) +} diff --git a/src/api/client/sync/v5/e2ee.rs b/src/api/client/sync/v5/e2ee.rs new file mode 100644 index 00000000..56032f16 --- /dev/null +++ b/src/api/client/sync/v5/e2ee.rs @@ -0,0 +1,223 @@ +use std::collections::HashSet; + +use futures::{ + FutureExt, StreamExt, TryFutureExt, + future::{OptionFuture, join, join3}, + stream::once, +}; +use ruma::{ + OwnedUserId, RoomId, + api::client::sync::sync_events::{DeviceLists, v5::response}, + events::{ + StateEventType, TimelineEventType, + room::member::{MembershipState, RoomMemberEventContent}, + }, +}; +use tuwunel_core::{ + Result, error, + matrix::{Event, pdu::PduCount}, + pair_of, + utils::{ + BoolExt, IterStream, ReadyExt, TryFutureExtExt, future::OptionStream, + stream::BroadbandExt, + }, +}; +use tuwunel_service::Services; + +use super::{SyncInfo, share_encrypted_room}; + +#[tracing::instrument(level = "trace", skip_all, fields(globalsince, next_batch,))] +pub(super) async fn collect( + services: &Services, + syncinfo: SyncInfo<'_>, + next_batch: u64, +) -> Result { + let &(sender_user, sender_device, globalsince, _) = &syncinfo; + let keys_changed = services + .users + .keys_changed(sender_user, globalsince, Some(next_batch)) + .map(ToOwned::to_owned) + .collect::>() + .map(|changed| (changed, HashSet::new())); + + let (changed, left) = (HashSet::new(), HashSet::new()); + let (changed, left) = services + .state_cache + .rooms_joined(sender_user) + .map(ToOwned::to_owned) + .broad_filter_map(async |room_id| { + collect_room(services, syncinfo, next_batch, &room_id) + .await + .ok() + }) + .chain(once(keys_changed)) + .ready_fold((changed, left), |(mut changed, mut left), room| { + changed.extend(room.0); + left.extend(room.1); + (changed, left) + }) + .await; + + let left = left + .into_iter() + .stream() + .filter_map(async |user_id| { + share_encrypted_room(services, sender_user, &user_id, None) + .await + .is_false() + .then_some(user_id) + }) + .collect(); + + let device_one_time_keys_count = services + .users + .last_one_time_keys_update(sender_user) + .then(|since| -> OptionFuture<_> { + since + .gt(&globalsince) + .then(|| { + services + .users + .count_one_time_keys(sender_user, sender_device) + }) + .into() + }) + .map(Option::unwrap_or_default); + + let (left, device_one_time_keys_count) = join(left, device_one_time_keys_count) + .boxed() + .await; + + Ok(response::E2EE { + device_one_time_keys_count, + device_unused_fallback_key_types: None, + device_lists: DeviceLists { + changed: changed.into_iter().collect(), + left, + }, + }) +} + +#[tracing::instrument(level = "trace", skip_all, fields(room_id))] +async fn collect_room( + services: &Services, + (sender_user, _, globalsince, _): SyncInfo<'_>, + next_batch: u64, + room_id: &RoomId, +) -> Result)> { + let current_shortstatehash = services + .state + .get_room_shortstatehash(room_id) + .inspect_err(|e| error!("Room {room_id} has no state: {e}")); + + let room_keys_changed = services + .users + .room_keys_changed(room_id, globalsince, Some(next_batch)) + .map(|(user_id, _)| user_id) + .map(ToOwned::to_owned) + .collect::>(); + + let (current_shortstatehash, device_list_changed) = + join(current_shortstatehash, room_keys_changed) + .boxed() + .await; + + let lists = (device_list_changed, HashSet::new()); + let Ok(current_shortstatehash) = current_shortstatehash else { + return Ok(lists); + }; + + if current_shortstatehash <= globalsince { + return Ok(lists); + } + + let Ok(since_shortstatehash) = services + .timeline + .prev_shortstatehash(room_id, PduCount::Normal(globalsince).saturating_add(1)) + .await + else { + return Ok(lists); + }; + + if since_shortstatehash == current_shortstatehash { + return Ok(lists); + } + + let encrypted_room = services + .state_accessor + .state_get(current_shortstatehash, &StateEventType::RoomEncryption, "") + .is_ok(); + + let since_encryption = services + .state_accessor + .state_get(since_shortstatehash, &StateEventType::RoomEncryption, "") + .is_ok(); + + let sender_joined_count = services + .state_cache + .get_joined_count(room_id, sender_user); + + let (encrypted_room, since_encryption, sender_joined_count) = + join3(encrypted_room, since_encryption, sender_joined_count).await; + + if !encrypted_room { + return Ok(lists); + } + + let encrypted_since_last_sync = !since_encryption; + let joined_since_last_sync = sender_joined_count.is_ok_and(|count| count > globalsince); + let joined_members_burst: OptionFuture<_> = (joined_since_last_sync + || encrypted_since_last_sync) + .then(|| { + services + .state_cache + .room_members(room_id) + .ready_filter(|&user_id| user_id != sender_user) + .map(ToOwned::to_owned) + .map(|user_id| (MembershipState::Join, user_id)) + .into_future() + }) + .into(); + + services + .state_accessor + .state_added((since_shortstatehash, current_shortstatehash)) + .broad_filter_map(async |(_shortstatekey, shorteventid)| { + services + .timeline + .get_pdu_from_shorteventid(shorteventid) + .ok() + .await + }) + .ready_filter(|event| *event.kind() == TimelineEventType::RoomMember) + .ready_filter(|event| { + event + .state_key() + .is_some_and(|state_key| state_key != sender_user) + }) + .ready_filter_map(|event| { + let content: RoomMemberEventContent = event.get_content().ok()?; + let user_id: OwnedUserId = event.state_key()?.parse().ok()?; + + Some((content.membership, user_id)) + }) + .chain(joined_members_burst.stream()) + .fold(lists, async |(mut changed, mut left), (membership, user_id)| { + use MembershipState::*; + + let should_add = async |user_id| { + !share_encrypted_room(services, sender_user, user_id, Some(room_id)).await + }; + + match membership { + | Join if should_add(&user_id).await => changed.insert(user_id), + | Leave => left.insert(user_id), + | _ => false, + }; + + (changed, left) + }) + .map(Ok) + .boxed() + .await +} diff --git a/src/api/client/sync/v5/receipts.rs b/src/api/client/sync/v5/receipts.rs new file mode 100644 index 00000000..90fcadad --- /dev/null +++ b/src/api/client/sync/v5/receipts.rs @@ -0,0 +1,89 @@ +use futures::{FutureExt, StreamExt}; +use ruma::{ + OwnedRoomId, RoomId, + api::client::sync::sync_events::v5::response, + events::{AnySyncEphemeralRoomEvent, receipt::SyncReceiptEvent}, + serde::Raw, +}; +use tuwunel_core::{ + Result, + utils::{BoolExt, IterStream, stream::BroadbandExt}, +}; +use tuwunel_service::{Services, rooms::read_receipt::pack_receipts}; + +use super::{KnownRooms, SyncInfo, TodoRooms, extension_rooms_todo}; + +#[tracing::instrument(level = "trace", skip_all)] +pub(super) async fn collect( + services: &Services, + sync_info: SyncInfo<'_>, + next_batch: u64, + known_rooms: &KnownRooms, + todo_rooms: &TodoRooms, +) -> Result { + let (_, _, _, request) = sync_info; + let data = &request.extensions.receipts; + let rooms = extension_rooms_todo( + sync_info, + known_rooms, + todo_rooms, + data.lists.as_ref(), + data.rooms.as_ref(), + ) + .stream() + .broad_filter_map(async |room_id| { + collect_room(services, sync_info, next_batch, todo_rooms, room_id).await + }) + .collect() + .await; + + Ok(response::Receipts { rooms }) +} + +async fn collect_room( + services: &Services, + (sender_user, ..): SyncInfo<'_>, + next_batch: u64, + todo_rooms: &TodoRooms, + room_id: &RoomId, +) -> Option<(OwnedRoomId, Raw)> { + let &(_, _, roomsince) = todo_rooms.get(room_id)?; + let private_receipt = services + .read_receipt + .last_privateread_update(sender_user, room_id) + .then(async |last_private_update| { + if last_private_update <= roomsince || last_private_update > next_batch { + return None; + } + + services + .read_receipt + .private_read_get(room_id, sender_user) + .map(Some) + .await + }) + .map(Option::into_iter) + .map(Iterator::flatten) + .map(IterStream::stream) + .flatten_stream(); + + let receipts: Vec> = services + .read_receipt + .readreceipts_since(room_id, roomsince, Some(next_batch)) + .filter_map(async |(read_user, _ts, v)| { + services + .users + .user_is_ignored(read_user, sender_user) + .await + .or_some(v) + }) + .chain(private_receipt) + .collect() + .boxed() + .await; + + receipts + .is_empty() + .eq(&false) + .then(|| (room_id.to_owned(), pack_receipts(receipts.into_iter()))) +} diff --git a/src/api/client/sync/v5/room.rs b/src/api/client/sync/v5/room.rs new file mode 100644 index 00000000..a7475de5 --- /dev/null +++ b/src/api/client/sync/v5/room.rs @@ -0,0 +1,305 @@ +use std::cmp::Ordering; + +use futures::{ + FutureExt, StreamExt, TryFutureExt, + future::{OptionFuture, join, join3, join4}, +}; +use ruma::{ + JsOption, MxcUri, OwnedMxcUri, RoomId, UInt, UserId, + api::client::sync::sync_events::{UnreadNotificationsCount, v5::response}, + events::StateEventType, +}; +use tuwunel_core::{ + Result, at, debug_error, is_equal_to, + matrix::{Event, StateKey, pdu::PduCount}, + ref_at, + utils::{IterStream, ReadyExt, TryFutureExtExt, result::FlatOk, stream::BroadbandExt}, +}; +use tuwunel_service::Services; + +use super::{SyncInfo, TodoRoom}; +use crate::client::{DEFAULT_BUMP_TYPES, ignored_filter, sync::load_timeline}; + +#[tracing::instrument(level = "debug", skip_all, fields(room_id, roomsince))] +#[allow(clippy::too_many_arguments)] +pub(super) async fn handle( + services: &Services, + next_batch: u64, + (sender_user, _, _globalsince, _): &SyncInfo<'_>, + room_id: &RoomId, + (required_state_request, timeline_limit, roomsince): &TodoRoom, + is_invited: bool, +) -> Result> { + let timeline: OptionFuture<_> = is_invited + .eq(&false) + .then(|| { + load_timeline( + services, + sender_user, + room_id, + PduCount::Normal(*roomsince), + Some(PduCount::from(next_batch)), + *timeline_limit, + ) + }) + .into(); + + let Ok(timeline) = timeline.await.transpose() else { + debug_error!(?room_id, "Missing timeline."); + return Ok(None); + }; + + let (timeline_pdus, limited, _lastcount) = + timeline.unwrap_or_else(|| (Vec::new(), true, PduCount::default())); + + if *roomsince != 0 && timeline_pdus.is_empty() && !is_invited { + return Ok(None); + } + + let prev_batch = timeline_pdus + .first() + .map(at!(0)) + .map(PduCount::into_unsigned) + .or_else(|| roomsince.ne(&0).then_some(*roomsince)) + .as_ref() + .map(ToString::to_string); + + let bump_stamp = timeline_pdus + .iter() + .filter(|(_, pdu)| { + DEFAULT_BUMP_TYPES + .binary_search(pdu.event_type()) + .is_ok() + }) + .fold(Option::::None, |mut bump_stamp, (_, pdu)| { + let ts = pdu.origin_server_ts().get(); + if bump_stamp.is_none_or(|bump_stamp| bump_stamp < ts) { + bump_stamp.replace(ts); + } + + bump_stamp + }); + + let lazy = required_state_request + .iter() + .any(is_equal_to!(&(StateEventType::RoomMember, "$LAZY".into()))); + + let mut timeline_senders: Vec<_> = timeline_pdus + .iter() + .filter(|_| lazy) + .map(ref_at!(1)) + .map(Event::sender) + .collect(); + + timeline_senders.sort(); + timeline_senders.dedup(); + let timeline_senders = timeline_senders + .iter() + .map(|sender| (StateEventType::RoomMember, StateKey::from_str(sender.as_str()))); + + let required_state = required_state_request + .iter() + .cloned() + .chain(timeline_senders) + .stream() + .broad_filter_map(async |state| { + let state_key: StateKey = match state.1.as_str() { + | "$LAZY" => return None, + | "$ME" => sender_user.as_str().into(), + | _ => state.1.clone(), + }; + + services + .state_accessor + .room_state_get(room_id, &state.0, &state_key) + .map_ok(Event::into_format) + .ok() + .await + }) + .collect(); + + // TODO: figure out a timestamp we can use for remote invites + let invite_state: OptionFuture<_> = is_invited + .then(|| { + services + .state_cache + .invite_state(sender_user, room_id) + .ok() + }) + .into(); + + let timeline = timeline_pdus + .iter() + .stream() + .filter_map(|item| ignored_filter(services, item.clone(), sender_user)) + .map(at!(1)) + .map(Event::into_format) + .collect(); + + let room_name = services + .state_accessor + .get_name(room_id) + .map(Result::ok); + + let room_avatar = services + .state_accessor + .get_avatar(room_id) + .map_ok(|content| content.url) + .ok() + .map(Option::flatten); + + let highlight_count = services + .user + .highlight_count(sender_user, room_id) + .map(TryInto::try_into) + .map(Result::ok); + + let notification_count = services + .user + .notification_count(sender_user, room_id) + .map(TryInto::try_into) + .map(Result::ok); + + let joined_count = services + .state_cache + .room_joined_count(room_id) + .map_ok(TryInto::try_into) + .map_ok(Result::ok) + .map(FlatOk::flat_ok); + + let invited_count = services + .state_cache + .room_invited_count(room_id) + .map_ok(TryInto::try_into) + .map_ok(Result::ok) + .map(FlatOk::flat_ok); + + let meta = join(room_name, room_avatar); + let events = join3(timeline, required_state, invite_state); + let member_counts = join(joined_count, invited_count); + let notification_counts = join(highlight_count, notification_count); + let ( + (room_name, room_avatar), + (timeline, required_state, invite_state), + (joined_count, invited_count), + (highlight_count, notification_count), + ) = join4(meta, events, member_counts, notification_counts) + .boxed() + .await; + + let (heroes, hero_name, heroes_avatar) = calculate_heroes( + services, + sender_user, + room_id, + room_name.as_deref(), + room_avatar.as_deref(), + ) + .await?; + + let num_live = None; // Count events in timeline greater than global sync counter + + Ok(Some(response::Room { + initial: Some(*roomsince == 0), + name: room_name.or(hero_name), + avatar: JsOption::from_option(room_avatar.or(heroes_avatar)), + invite_state: invite_state.flatten(), + required_state, + timeline, + is_dm: None, + prev_batch, + limited, + bump_stamp, + heroes, + num_live, + joined_count, + invited_count, + unread_notifications: UnreadNotificationsCount { highlight_count, notification_count }, + })) +} + +#[tracing::instrument(level = "debug", skip_all, fields(room_id, roomsince))] +#[allow(clippy::type_complexity)] +async fn calculate_heroes( + services: &Services, + sender_user: &UserId, + room_id: &RoomId, + room_name: Option<&str>, + room_avatar: Option<&MxcUri>, +) -> Result<(Option>, Option, Option)> { + const MAX_HEROES: usize = 5; + let heroes: Vec<_> = services + .state_cache + .room_members(room_id) + .ready_filter(|&member| member != sender_user) + .ready_filter_map(|member| room_name.is_none().then_some(member)) + .map(ToOwned::to_owned) + .broadn_filter_map(MAX_HEROES, async |user_id| { + let content = services + .state_accessor + .get_member(room_id, &user_id) + .await + .ok()?; + + let name: OptionFuture<_> = content + .displayname + .is_none() + .then(|| services.users.displayname(&user_id).ok()) + .into(); + + let avatar: OptionFuture<_> = content + .avatar_url + .is_none() + .then(|| services.users.avatar_url(&user_id).ok()) + .into(); + + let (name, avatar) = join(name, avatar).await; + let hero = response::Hero { + user_id, + name: name.unwrap_or(content.displayname), + avatar: avatar.unwrap_or(content.avatar_url), + }; + + Some(hero) + }) + .take(MAX_HEROES) + .collect() + .await; + + let hero_name = match heroes.len().cmp(&(1_usize)) { + | Ordering::Less => None, + | Ordering::Equal => Some( + heroes[0] + .name + .clone() + .unwrap_or_else(|| heroes[0].user_id.to_string()), + ), + | Ordering::Greater => { + let firsts = heroes[1..] + .iter() + .map(|h| { + h.name + .clone() + .unwrap_or_else(|| h.user_id.to_string()) + }) + .collect::>() + .join(", "); + + let last = heroes[0] + .name + .clone() + .unwrap_or_else(|| heroes[0].user_id.to_string()); + + Some(format!("{firsts} and {last}")) + }, + }; + + let heroes_avatar = (room_avatar.is_none() && room_name.is_none()) + .then(|| { + heroes + .first() + .and_then(|hero| hero.avatar.clone()) + }) + .flatten(); + + Ok((Some(heroes), hero_name, heroes_avatar)) +} diff --git a/src/api/client/sync/v5/to_device.rs b/src/api/client/sync/v5/to_device.rs new file mode 100644 index 00000000..d0b5242a --- /dev/null +++ b/src/api/client/sync/v5/to_device.rs @@ -0,0 +1,34 @@ +use futures::StreamExt; +use ruma::api::client::sync::sync_events::v5::response; +use tuwunel_core::{self, Result}; +use tuwunel_service::Services; + +use super::SyncInfo; + +#[tracing::instrument(level = "trace", skip_all, fields(globalsince, next_batch))] +pub(super) async fn collect( + services: &Services, + (sender_user, sender_device, globalsince, _request): SyncInfo<'_>, + next_batch: u64, +) -> Result> { + services + .users + .remove_to_device_events(sender_user, sender_device, globalsince) + .await; + + let events: Vec<_> = services + .users + .get_to_device_events(sender_user, sender_device, None, Some(next_batch)) + .collect() + .await; + + let to_device = events + .is_empty() + .eq(&false) + .then(|| response::ToDevice { + next_batch: next_batch.to_string(), + events, + }); + + Ok(to_device) +} diff --git a/src/api/client/sync/v5/typing.rs b/src/api/client/sync/v5/typing.rs new file mode 100644 index 00000000..5f23c2e5 --- /dev/null +++ b/src/api/client/sync/v5/typing.rs @@ -0,0 +1,57 @@ +use std::collections::BTreeMap; + +use futures::{FutureExt, StreamExt, TryFutureExt}; +use ruma::{ + api::client::sync::sync_events::v5::response, events::typing::TypingEventContent, serde::Raw, +}; +use tuwunel_core::{ + Result, debug_error, + utils::{IterStream, ReadyExt}, +}; +use tuwunel_service::Services; + +use super::{KnownRooms, SyncInfo, TodoRooms, extension_rooms_todo}; + +#[tracing::instrument(level = "trace", skip_all, fields(globalsince))] +pub(super) async fn collect( + services: &Services, + sync_info: SyncInfo<'_>, + _next_batch: u64, + known_rooms: &KnownRooms, + todo_rooms: &TodoRooms, +) -> Result { + use response::Typing; + use ruma::events::typing::SyncTypingEvent; + + let (sender_user, _, _, request) = sync_info; + let data = &request.extensions.typing; + extension_rooms_todo( + sync_info, + known_rooms, + todo_rooms, + data.lists.as_ref(), + data.rooms.as_ref(), + ) + .stream() + .filter_map(async |room_id| { + services + .typing + .typing_users_for_user(room_id, sender_user) + .inspect_err(|e| debug_error!(%room_id, "Failed to get typing events: {e}")) + .await + .ok() + .filter(|users| !users.is_empty()) + .map(|users| (room_id, users)) + }) + .ready_filter_map(|(room_id, users)| { + let content = TypingEventContent::new(users); + let event = SyncTypingEvent { content }; + let event = Raw::new(&event); + + Some((room_id.to_owned(), event.ok()?)) + }) + .collect::>() + .map(|rooms| Typing { rooms }) + .map(Ok) + .await +}