diff --git a/src/api/client/sync/v3.rs b/src/api/client/sync/v3.rs index 5ac62f18..c748a550 100644 --- a/src/api/client/sync/v3.rs +++ b/src/api/client/sync/v3.rs @@ -10,7 +10,7 @@ use futures::{ pin_mut, }; use ruma::{ - DeviceId, EventId, OwnedEventId, OwnedRoomId, OwnedUserId, RoomId, UserId, + DeviceId, EventId, OwnedRoomId, OwnedUserId, RoomId, UserId, api::client::{ filter::FilterDefinition, sync::sync_events::{ @@ -37,7 +37,7 @@ use tokio::time; use tuwunel_core::{ Error, Result, at, debug::INFO_SPAN_LEVEL, - err, error, + err, error::{inspect_debug_log, inspect_log}, extract_variant, is_equal_to, matrix::{ @@ -52,10 +52,10 @@ use tuwunel_core::{ self, BoolExt, FutureBoolExt, IterStream, ReadyExt, TryFutureExtExt, future::{OptionStream, ReadyEqExt}, math::ruma_from_u64, + result::MapExpect, stream::{BroadbandExt, Tools, TryExpect, WidebandExt}, string::to_small_string, }, - warn, }; use tuwunel_service::{ Services, @@ -507,28 +507,20 @@ async fn handle_left_room( .state_cache .get_left_count(room_id, sender_user) .await - .ok(); + .unwrap_or(0); - let filter_exclude = filter - .room - .not_rooms - .iter() - .any(is_equal_to!(room_id)); + if left_count == 0 || left_count > next_batch { + return Ok(None); + } - let filter_include = filter - .room - .rooms - .as_ref() - .is_some_and(|rooms| rooms.iter().any(is_equal_to!(room_id))); + let include_leave = filter.room.include_leave; + if since == 0 && !include_leave { + return Ok(None); + } - let too_soon = Some(next_batch) < left_count; - let too_late = Some(since) >= left_count; - let initial_sync = since == 0; - let include_leave = - filter.room.include_leave && !filter_exclude && (filter_include || initial_sync); - - // Left before last sync or after cutoff for next sync - if (too_late && !include_leave) || too_soon { + // Cannot sync unless the event falls within the snapshot. The room is only + // sync'ed once to the client, after that it's too late. + if since != 0 && left_count <= since { return Ok(None); } @@ -540,125 +532,176 @@ async fn handle_left_room( pin_mut!(is_not_found, is_disabled, is_banned); if is_not_found.or(is_disabled).or(is_banned).await { - // This is just a rejected invite, not a room we know - // Insert a leave event anyways for the client + // For rejected invites, deleted, missing, or broken room state this is the last + // resort to convey a the minimum of information to the client. let event = PduEvent { event_id: EventId::new(services.globals.server_name()), - sender: sender_user.to_owned(), - origin: None, origin_server_ts: utils::millis_since_unix_epoch().try_into()?, kind: RoomMember, - content: serde_json::from_str(r#"{"membership":"leave"}"#)?, state_key: Some(sender_user.as_str().into()), - unsigned: None, + sender: sender_user.to_owned(), + content: serde_json::from_str(r#"{"membership":"leave"}"#)?, // The following keys are dropped on conversion room_id: room_id.clone(), - prev_events: Default::default(), - auth_events: Default::default(), depth: uint!(1), + origin: None, + unsigned: None, redacts: None, hashes: EventHash::default(), + auth_events: Default::default(), + prev_events: Default::default(), signatures: None, }; return Ok(Some(LeftRoom { - account_data: RoomAccountData { events: Vec::new() }, + account_data: RoomAccountData::default(), + state: RoomState::Before(StateEvents { events: vec![event.into_format()] }), timeline: Timeline { limited: false, - prev_batch: Some(next_batch.to_string()), - events: Vec::new(), + events: Default::default(), + prev_batch: Some(left_count.to_string()), }, - state: RoomState::Before(StateEvents { events: vec![event.into_format()] }), })); } - let mut left_state_events = Vec::new(); + load_left_room(services, sender_user, room_id, since, left_count, full_state, filter).await +} + +#[tracing::instrument(name = "load", level = "debug", skip_all)] +async fn load_left_room( + services: &Services, + sender_user: &UserId, + room_id: &RoomId, + since: u64, + left_count: u64, + full_state: bool, + filter: &FilterDefinition, +) -> Result> { + let initial = since == 0; + let timeline_limit: usize = filter + .room + .timeline + .limit + .map(TryInto::try_into) + .map_expect("UInt to usize") + .unwrap_or(10) + .min(100); + + let (timeline_pdus, limited, _) = load_timeline( + services, + sender_user, + room_id, + PduCount::Normal(since), + Some(PduCount::Normal(left_count)), + timeline_limit.max(1), + ) + .await + .unwrap_or_default(); let since_shortstatehash = services .timeline .prev_shortstatehash(room_id, PduCount::Normal(since).saturating_add(1)) - .await .ok(); - let since_state_ids: HashMap<_, OwnedEventId> = since_shortstatehash - .map(|since_shortstatehash| { + let horizon_shortstatehash: OptionFuture<_> = timeline_pdus + .first() + .map(at!(0)) + .map(|count| { services - .state_accessor - .state_full_ids(since_shortstatehash) + .timeline + .get_shortstatehash(room_id, count) + .inspect_err(inspect_debug_log) + .ok() }) + .into(); + + let left_shortstatehash = services + .timeline + .get_shortstatehash(room_id, PduCount::Normal(left_count)) + .inspect_err(inspect_debug_log) + .or_else(|_| services.state.get_room_shortstatehash(room_id)) + .map_err(|_| err!(Database(error!("Room {room_id} has no state")))); + + let (since_shortstatehash, horizon_shortstatehash, left_shortstatehash) = + join3(since_shortstatehash, horizon_shortstatehash, left_shortstatehash) + .boxed() + .await; + + let StateChanges { state_events, .. } = calculate_state_changes( + services, + sender_user, + room_id, + full_state || initial, + since_shortstatehash, + horizon_shortstatehash.flatten(), + left_shortstatehash?, + false, + None, + ) + .boxed() + .await?; + + let is_sender_membership = |event: &PduEvent| { + *event.kind() == RoomMember && event.state_key() == Some(sender_user.as_str()) + }; + + let timeline_sender_member = timeline_limit + .eq(&0) + .then(|| timeline_pdus.last().map(ref_at!(1)).cloned()) + .into_iter() + .flat_map(Option::into_iter); + + let state_events = state_events + .into_iter() + .filter(|pdu| filter.room.state.matches(pdu)) + .filter(|pdu| timeline_limit > 0 || !is_sender_membership(pdu)) + .chain(timeline_sender_member) + .map(Event::into_format) + .collect(); + + let left_prev_batch = timeline_limit + .eq(&0) + .then_some(left_count) + .map(PduCount::Normal); + + let prev_batch = timeline_pdus + .first() + .filter(|_| timeline_limit > 0) + .map(at!(0)) + .or(left_prev_batch) + .as_ref() + .map(ToString::to_string); + + let timeline_events = timeline_pdus .into_iter() .stream() - .flatten() - .collect() + .wide_filter_map(|item| ignored_filter(services, item, sender_user)) + .map(at!(1)) + .ready_filter(|pdu| filter.room.timeline.matches(pdu)) + .take(timeline_limit) + .collect::>(); + + let account_data_events = services + .account_data + .changes_since(Some(room_id), sender_user, since, None) + .ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Room)) + .collect(); + + let (account_data_events, timeline_events) = join(account_data_events, timeline_events) + .boxed() .await; - let Ok(left_event_id): Result = services - .state_accessor - .room_state_get_id(room_id, &StateEventType::RoomMember, sender_user.as_str()) - .await - else { - warn!("Left {room_id} but no left state event"); - return Ok(None); - }; - - let Ok(left_shortstatehash) = services - .state - .pdu_shortstatehash(&left_event_id) - .await - else { - warn!(event_id = %left_event_id, "Leave event has no state in {room_id}"); - return Ok(None); - }; - - let mut left_state_ids: HashMap<_, _> = services - .state_accessor - .state_full_ids(left_shortstatehash) - .collect() - .await; - - let leave_shortstatekey = services - .short - .get_or_create_shortstatekey(&StateEventType::RoomMember, sender_user.as_str()) - .await; - - left_state_ids.insert(leave_shortstatekey, left_event_id); - - for (shortstatekey, event_id) in left_state_ids { - if full_state || since_state_ids.get(&shortstatekey) != Some(&event_id) { - let (event_type, state_key) = services - .short - .get_statekey_from_short(shortstatekey) - .await?; - - if filter.room.state.lazy_load_options.is_enabled() - && event_type == StateEventType::RoomMember - && !full_state - && state_key - .as_str() - .try_into() - .is_ok_and(|user_id: &UserId| sender_user != user_id) - { - continue; - } - - let Ok(pdu) = services.timeline.get_pdu(&event_id).await else { - error!("Pdu in state not found: {event_id}"); - continue; - }; - - left_state_events.push(pdu.into_format()); - } - } - Ok(Some(LeftRoom { - account_data: RoomAccountData { events: Vec::new() }, + account_data: RoomAccountData { events: account_data_events }, + state: RoomState::Before(StateEvents { events: state_events }), timeline: Timeline { - // TODO: support left timeline events so we dont need to set limited to true - limited: true, - prev_batch: Some(next_batch.to_string()), - events: Vec::new(), // and so we dont need to set this to empty vec + prev_batch, + limited: limited || timeline_limit == 0, + events: timeline_events + .into_iter() + .map(Event::into_format) + .collect(), }, - state: RoomState::Before(StateEvents { events: left_state_events }), })) } diff --git a/tests/complement/results.jsonl b/tests/complement/results.jsonl index 2c1a02b3..b9c509d2 100644 --- a/tests/complement/results.jsonl +++ b/tests/complement/results.jsonl @@ -4,7 +4,7 @@ {"Action":"pass","Test":"TestAddAccountData/Can_add_room_account_data"} {"Action":"fail","Test":"TestArchivedRoomsHistory"} {"Action":"fail","Test":"TestArchivedRoomsHistory/timeline_has_events"} -{"Action":"fail","Test":"TestArchivedRoomsHistory/timeline_has_events/incremental_sync"} +{"Action":"pass","Test":"TestArchivedRoomsHistory/timeline_has_events/incremental_sync"} {"Action":"fail","Test":"TestArchivedRoomsHistory/timeline_has_events/initial_sync"} {"Action":"pass","Test":"TestArchivedRoomsHistory/timeline_is_empty"} {"Action":"skip","Test":"TestArchivedRoomsHistory/timeline_is_empty/incremental_sync"}