Refactor sync v3 leave handler.

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk
2025-10-27 20:05:30 +00:00
parent 7aeed0a95a
commit ce1ac277a6
2 changed files with 150 additions and 107 deletions

View File

@@ -10,7 +10,7 @@ use futures::{
pin_mut,
};
use ruma::{
DeviceId, EventId, OwnedEventId, OwnedRoomId, OwnedUserId, RoomId, UserId,
DeviceId, EventId, OwnedRoomId, OwnedUserId, RoomId, UserId,
api::client::{
filter::FilterDefinition,
sync::sync_events::{
@@ -37,7 +37,7 @@ use tokio::time;
use tuwunel_core::{
Error, Result, at,
debug::INFO_SPAN_LEVEL,
err, error,
err,
error::{inspect_debug_log, inspect_log},
extract_variant, is_equal_to,
matrix::{
@@ -52,10 +52,10 @@ use tuwunel_core::{
self, BoolExt, FutureBoolExt, IterStream, ReadyExt, TryFutureExtExt,
future::{OptionStream, ReadyEqExt},
math::ruma_from_u64,
result::MapExpect,
stream::{BroadbandExt, Tools, TryExpect, WidebandExt},
string::to_small_string,
},
warn,
};
use tuwunel_service::{
Services,
@@ -507,28 +507,20 @@ async fn handle_left_room(
.state_cache
.get_left_count(room_id, sender_user)
.await
.ok();
.unwrap_or(0);
let filter_exclude = filter
.room
.not_rooms
.iter()
.any(is_equal_to!(room_id));
if left_count == 0 || left_count > next_batch {
return Ok(None);
}
let filter_include = filter
.room
.rooms
.as_ref()
.is_some_and(|rooms| rooms.iter().any(is_equal_to!(room_id)));
let include_leave = filter.room.include_leave;
if since == 0 && !include_leave {
return Ok(None);
}
let too_soon = Some(next_batch) < left_count;
let too_late = Some(since) >= left_count;
let initial_sync = since == 0;
let include_leave =
filter.room.include_leave && !filter_exclude && (filter_include || initial_sync);
// Left before last sync or after cutoff for next sync
if (too_late && !include_leave) || too_soon {
// Cannot sync unless the event falls within the snapshot. The room is only
// sync'ed once to the client, after that it's too late.
if since != 0 && left_count <= since {
return Ok(None);
}
@@ -540,125 +532,176 @@ async fn handle_left_room(
pin_mut!(is_not_found, is_disabled, is_banned);
if is_not_found.or(is_disabled).or(is_banned).await {
// This is just a rejected invite, not a room we know
// Insert a leave event anyways for the client
// For rejected invites, deleted, missing, or broken room state this is the last
// resort to convey a the minimum of information to the client.
let event = PduEvent {
event_id: EventId::new(services.globals.server_name()),
sender: sender_user.to_owned(),
origin: None,
origin_server_ts: utils::millis_since_unix_epoch().try_into()?,
kind: RoomMember,
content: serde_json::from_str(r#"{"membership":"leave"}"#)?,
state_key: Some(sender_user.as_str().into()),
unsigned: None,
sender: sender_user.to_owned(),
content: serde_json::from_str(r#"{"membership":"leave"}"#)?,
// The following keys are dropped on conversion
room_id: room_id.clone(),
prev_events: Default::default(),
auth_events: Default::default(),
depth: uint!(1),
origin: None,
unsigned: None,
redacts: None,
hashes: EventHash::default(),
auth_events: Default::default(),
prev_events: Default::default(),
signatures: None,
};
return Ok(Some(LeftRoom {
account_data: RoomAccountData { events: Vec::new() },
account_data: RoomAccountData::default(),
state: RoomState::Before(StateEvents { events: vec![event.into_format()] }),
timeline: Timeline {
limited: false,
prev_batch: Some(next_batch.to_string()),
events: Vec::new(),
events: Default::default(),
prev_batch: Some(left_count.to_string()),
},
state: RoomState::Before(StateEvents { events: vec![event.into_format()] }),
}));
}
let mut left_state_events = Vec::new();
load_left_room(services, sender_user, room_id, since, left_count, full_state, filter).await
}
#[tracing::instrument(name = "load", level = "debug", skip_all)]
async fn load_left_room(
services: &Services,
sender_user: &UserId,
room_id: &RoomId,
since: u64,
left_count: u64,
full_state: bool,
filter: &FilterDefinition,
) -> Result<Option<LeftRoom>> {
let initial = since == 0;
let timeline_limit: usize = filter
.room
.timeline
.limit
.map(TryInto::try_into)
.map_expect("UInt to usize")
.unwrap_or(10)
.min(100);
let (timeline_pdus, limited, _) = load_timeline(
services,
sender_user,
room_id,
PduCount::Normal(since),
Some(PduCount::Normal(left_count)),
timeline_limit.max(1),
)
.await
.unwrap_or_default();
let since_shortstatehash = services
.timeline
.prev_shortstatehash(room_id, PduCount::Normal(since).saturating_add(1))
.await
.ok();
let since_state_ids: HashMap<_, OwnedEventId> = since_shortstatehash
.map(|since_shortstatehash| {
let horizon_shortstatehash: OptionFuture<_> = timeline_pdus
.first()
.map(at!(0))
.map(|count| {
services
.state_accessor
.state_full_ids(since_shortstatehash)
.timeline
.get_shortstatehash(room_id, count)
.inspect_err(inspect_debug_log)
.ok()
})
.into();
let left_shortstatehash = services
.timeline
.get_shortstatehash(room_id, PduCount::Normal(left_count))
.inspect_err(inspect_debug_log)
.or_else(|_| services.state.get_room_shortstatehash(room_id))
.map_err(|_| err!(Database(error!("Room {room_id} has no state"))));
let (since_shortstatehash, horizon_shortstatehash, left_shortstatehash) =
join3(since_shortstatehash, horizon_shortstatehash, left_shortstatehash)
.boxed()
.await;
let StateChanges { state_events, .. } = calculate_state_changes(
services,
sender_user,
room_id,
full_state || initial,
since_shortstatehash,
horizon_shortstatehash.flatten(),
left_shortstatehash?,
false,
None,
)
.boxed()
.await?;
let is_sender_membership = |event: &PduEvent| {
*event.kind() == RoomMember && event.state_key() == Some(sender_user.as_str())
};
let timeline_sender_member = timeline_limit
.eq(&0)
.then(|| timeline_pdus.last().map(ref_at!(1)).cloned())
.into_iter()
.flat_map(Option::into_iter);
let state_events = state_events
.into_iter()
.filter(|pdu| filter.room.state.matches(pdu))
.filter(|pdu| timeline_limit > 0 || !is_sender_membership(pdu))
.chain(timeline_sender_member)
.map(Event::into_format)
.collect();
let left_prev_batch = timeline_limit
.eq(&0)
.then_some(left_count)
.map(PduCount::Normal);
let prev_batch = timeline_pdus
.first()
.filter(|_| timeline_limit > 0)
.map(at!(0))
.or(left_prev_batch)
.as_ref()
.map(ToString::to_string);
let timeline_events = timeline_pdus
.into_iter()
.stream()
.flatten()
.collect()
.wide_filter_map(|item| ignored_filter(services, item, sender_user))
.map(at!(1))
.ready_filter(|pdu| filter.room.timeline.matches(pdu))
.take(timeline_limit)
.collect::<Vec<_>>();
let account_data_events = services
.account_data
.changes_since(Some(room_id), sender_user, since, None)
.ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Room))
.collect();
let (account_data_events, timeline_events) = join(account_data_events, timeline_events)
.boxed()
.await;
let Ok(left_event_id): Result<OwnedEventId> = services
.state_accessor
.room_state_get_id(room_id, &StateEventType::RoomMember, sender_user.as_str())
.await
else {
warn!("Left {room_id} but no left state event");
return Ok(None);
};
let Ok(left_shortstatehash) = services
.state
.pdu_shortstatehash(&left_event_id)
.await
else {
warn!(event_id = %left_event_id, "Leave event has no state in {room_id}");
return Ok(None);
};
let mut left_state_ids: HashMap<_, _> = services
.state_accessor
.state_full_ids(left_shortstatehash)
.collect()
.await;
let leave_shortstatekey = services
.short
.get_or_create_shortstatekey(&StateEventType::RoomMember, sender_user.as_str())
.await;
left_state_ids.insert(leave_shortstatekey, left_event_id);
for (shortstatekey, event_id) in left_state_ids {
if full_state || since_state_ids.get(&shortstatekey) != Some(&event_id) {
let (event_type, state_key) = services
.short
.get_statekey_from_short(shortstatekey)
.await?;
if filter.room.state.lazy_load_options.is_enabled()
&& event_type == StateEventType::RoomMember
&& !full_state
&& state_key
.as_str()
.try_into()
.is_ok_and(|user_id: &UserId| sender_user != user_id)
{
continue;
}
let Ok(pdu) = services.timeline.get_pdu(&event_id).await else {
error!("Pdu in state not found: {event_id}");
continue;
};
left_state_events.push(pdu.into_format());
}
}
Ok(Some(LeftRoom {
account_data: RoomAccountData { events: Vec::new() },
account_data: RoomAccountData { events: account_data_events },
state: RoomState::Before(StateEvents { events: state_events }),
timeline: Timeline {
// TODO: support left timeline events so we dont need to set limited to true
limited: true,
prev_batch: Some(next_batch.to_string()),
events: Vec::new(), // and so we dont need to set this to empty vec
prev_batch,
limited: limited || timeline_limit == 0,
events: timeline_events
.into_iter()
.map(Event::into_format)
.collect(),
},
state: RoomState::Before(StateEvents { events: left_state_events }),
}))
}

View File

@@ -4,7 +4,7 @@
{"Action":"pass","Test":"TestAddAccountData/Can_add_room_account_data"}
{"Action":"fail","Test":"TestArchivedRoomsHistory"}
{"Action":"fail","Test":"TestArchivedRoomsHistory/timeline_has_events"}
{"Action":"fail","Test":"TestArchivedRoomsHistory/timeline_has_events/incremental_sync"}
{"Action":"pass","Test":"TestArchivedRoomsHistory/timeline_has_events/incremental_sync"}
{"Action":"fail","Test":"TestArchivedRoomsHistory/timeline_has_events/initial_sync"}
{"Action":"pass","Test":"TestArchivedRoomsHistory/timeline_is_empty"}
{"Action":"skip","Test":"TestArchivedRoomsHistory/timeline_is_empty/incremental_sync"}