Fix partial v3 syncs on post-timeout pass; fix partial state on room join.

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk
2025-07-27 05:44:10 +00:00
parent 2e0b156de0
commit 59b62b1453
4 changed files with 103 additions and 53 deletions

View File

@@ -111,6 +111,7 @@ pub(crate) async fn create_room_route(
.short
.get_or_create_shortroomid(&room_id)
.await;
let state_lock = services.rooms.state.mutex.lock(&room_id).await;
let alias: Option<OwnedRoomAliasId> = match body.room_alias_name.as_ref() {
@@ -187,6 +188,10 @@ pub(crate) async fn create_room_route(
},
};
// Increment and hold the counter; the room will sync atomically to clients
// which is preferable.
let next_count = services.globals.next_count();
// 1. The room create event
services
.rooms
@@ -425,8 +430,10 @@ pub(crate) async fn create_room_route(
.await?;
}
// 8. Events implied by invite (and TODO: invite_3pid)
drop(next_count);
drop(state_lock);
// 8. Events implied by invite (and TODO: invite_3pid)
for user_id in &body.invite {
if services
.users

View File

@@ -115,7 +115,6 @@ type PresenceUpdates = HashMap<OwnedUserId, PresenceEventContent>;
skip_all,
fields(
user_id = %body.sender_user(),
since = %body.body.since.as_deref().unwrap_or_default(),
)
)]
pub(crate) async fn sync_events_route(
@@ -134,21 +133,29 @@ pub(crate) async fn sync_events_route(
// Setup watchers, so if there's no response, we can wait for them
let watcher = services.sync.watch(sender_user, sender_device);
let next_batch = services.globals.current_count();
let response = build_sync_events(&services, &body, next_batch).await?;
if body.body.full_state
|| !(response.rooms.is_empty()
&& response.presence.is_empty()
&& response.account_data.is_empty()
&& response.device_lists.is_empty()
&& response.to_device.is_empty())
{
return Ok(response);
let since = body
.body
.since
.as_deref()
.map(str::parse)
.flat_ok()
.unwrap_or(0);
// Make an initial pass if there's a possibility of something to find.
if since < next_batch {
let response = build_sync_events(&services, &body, since, next_batch).await?;
if body.body.full_state
|| !(response.rooms.is_empty()
&& response.presence.is_empty()
&& response.account_data.is_empty()
&& response.device_lists.is_empty()
&& response.to_device.is_empty())
{
return Ok(response);
}
}
// Hang a few seconds so requests are not spammed
// Stop hanging if new info arrives
let timeout_default = services.config.client_sync_timeout_default;
let timeout_min = services.config.client_sync_timeout_min;
let timeout_max = services.config.client_sync_timeout_max;
@@ -158,27 +165,41 @@ pub(crate) async fn sync_events_route(
.unwrap_or_else(|| Duration::from_millis(timeout_default))
.clamp(Duration::from_millis(timeout_min), Duration::from_millis(timeout_max));
// Wait for activity notification.
_ = tokio::time::timeout(duration, watcher).await;
// Retry returning data
let next_batch = services.globals.current_count();
build_sync_events(&services, &body, next_batch).await
// Wait for synchronization of activity.
let next_batch = services.globals.wait_pending().await?;
debug_assert!(since <= next_batch, "next_batch is monotonic");
// Return an empty response when nothing has changed.
if since == next_batch {
return Ok(sync_events::v3::Response::new(next_batch.to_string()));
}
build_sync_events(&services, &body, since, next_batch)
.await
.map_err(Into::into)
}
pub(crate) async fn build_sync_events(
#[tracing::instrument(
name = "build",
level = "debug",
ret(level = "trace"),
skip_all,
fields(
%since,
%next_batch,
)
)]
async fn build_sync_events(
services: &Services,
body: &Ruma<sync_events::v3::Request>,
since: u64,
next_batch: u64,
) -> Result<sync_events::v3::Response, RumaResponse<UiaaResponse>> {
) -> Result<sync_events::v3::Response> {
let (sender_user, sender_device) = body.sender();
let since = body
.body
.since
.as_ref()
.and_then(|string| string.parse().ok())
.unwrap_or(0);
let full_state = body.body.full_state;
let filter = match body.body.filter.as_ref() {
| None => FilterDefinition::default(),
@@ -364,7 +385,7 @@ pub(crate) async fn build_sync_events(
.collect()
.await;
let response = sync_events::v3::Response {
Ok(sync_events::v3::Response {
account_data: GlobalAccountData { events: account_data },
device_lists: DeviceLists {
changed: device_list_updates.into_iter().collect(),
@@ -390,9 +411,7 @@ pub(crate) async fn build_sync_events(
knock: knocked_rooms,
},
to_device: ToDevice { events: to_device_events },
};
Ok(response)
})
}
#[tracing::instrument(name = "presence", level = "debug", skip_all)]
@@ -670,6 +689,25 @@ async fn load_joined_room(
.boxed()
.await?;
// State was changed after the cutoff for this sync; similar to other handlers.
if current_shortstatehash > next_batch {
// Transfer the since_shortstatehash not the current over to the next sync.
if let Some(since_shortstatehash) = since_shortstatehash {
services
.rooms
.user
.associate_token_shortstatehash(room_id, next_batch, since_shortstatehash)
.await;
}
return Ok((JoinedRoom::default(), HashSet::new(), HashSet::new()));
}
let associate_token = services
.rooms
.user
.associate_token_shortstatehash(room_id, next_batch, current_shortstatehash);
let initial = since_shortstatehash.is_none() || since == 0;
let lazy_loading_enabled = filter.room.state.lazy_load_options.is_enabled()
|| filter
@@ -739,9 +777,15 @@ async fn load_joined_room(
.read_receipt
.last_privateread_update(sender_user, room_id);
let (last_privateread_update, last_notification_read, since_sender_member, witness) =
join4(last_privateread_update, last_notification_read, since_sender_member, witness)
.await;
let (last_privateread_update, last_notification_read, since_sender_member, witness, ()) =
join5(
last_privateread_update,
last_notification_read,
since_sender_member,
witness,
associate_token,
)
.await;
let joined_since_last_sync =
since_sender_member
@@ -888,14 +932,6 @@ async fn load_joined_room(
.chain(private_read_event.flatten().into_iter())
.collect();
// Save the state after this sync so we can send the correct state diff next
// sync
services
.rooms
.user
.associate_token_shortstatehash(room_id, next_batch, current_shortstatehash)
.await;
let joined_room = JoinedRoom {
account_data: RoomAccountData { events: account_data_events },
summary: RoomSummary {

View File

@@ -58,6 +58,11 @@ pub async fn update_membership(
match &membership {
| MembershipState::Join => {
// Increment the counter for a unique value and hold the guard even though the
// value is not used. This will allow proper sync to clients similar to the
// other membership state changes.
let _next_count = self.services.globals.next_count();
// Check if the user never joined this room
if !self.once_joined(user_id, room_id).await {
// Add the user ID to the join list then
@@ -236,7 +241,7 @@ pub async fn update_joined_count(&self, room_id: &RoomId) {
/// `update_membership` instead
#[implement(super::Service)]
#[tracing::instrument(skip(self), level = "debug")]
pub fn mark_as_joined(&self, user_id: &UserId, room_id: &RoomId) {
pub(crate) fn mark_as_joined(&self, user_id: &UserId, room_id: &RoomId) {
let userroom_id = (user_id, room_id);
let userroom_id = serialize_key(userroom_id).expect("failed to serialize userroom_id");
@@ -271,7 +276,7 @@ pub fn mark_as_joined(&self, user_id: &UserId, room_id: &RoomId) {
/// `update_membership` instead
#[implement(super::Service)]
#[tracing::instrument(skip(self), level = "debug")]
pub fn mark_as_left(&self, user_id: &UserId, room_id: &RoomId) {
pub(crate) fn mark_as_left(&self, user_id: &UserId, room_id: &RoomId) {
let count = self.services.globals.next_count();
let userroom_id = (user_id, room_id);
@@ -315,7 +320,7 @@ pub fn mark_as_left(&self, user_id: &UserId, room_id: &RoomId) {
/// `update_membership` instead
#[implement(super::Service)]
#[tracing::instrument(skip(self), level = "debug")]
pub fn mark_as_knocked(
pub(crate) fn mark_as_knocked(
&self,
user_id: &UserId,
room_id: &RoomId,
@@ -372,7 +377,7 @@ fn mark_as_once_joined(&self, user_id: &UserId, room_id: &RoomId) {
#[implement(super::Service)]
#[tracing::instrument(level = "debug", skip(self, last_state, invite_via))]
pub async fn mark_as_invited(
pub(crate) async fn mark_as_invited(
&self,
user_id: &UserId,
room_id: &RoomId,

View File

@@ -159,27 +159,29 @@ where
.await;
let insert_lock = self.mutex_insert.lock(pdu.room_id()).await;
let count1 = self.services.globals.next_count();
let count2 = self.services.globals.next_count();
let next_count1 = self.services.globals.next_count();
let next_count2 = self.services.globals.next_count();
// Mark as read first so the sending client doesn't get a notification even if
// appending fails
self.services
.read_receipt
.private_read_set(pdu.room_id(), pdu.sender(), *count1);
.private_read_set(pdu.room_id(), pdu.sender(), *next_count2);
self.services
.user
.reset_notification_counts(pdu.sender(), pdu.room_id());
let count2 = PduCount::Normal(*count2);
let pdu_id: RawPduId = PduId { shortroomid, shorteventid: count2 }.into();
let count = PduCount::Normal(*next_count1);
let pdu_id: RawPduId = PduId { shortroomid, shorteventid: count }.into();
// Insert pdu
self.db
.append_pdu(&pdu_id, pdu, &pdu_json, count2)
.append_pdu(&pdu_id, pdu, &pdu_json, count)
.await;
drop(next_count1);
drop(next_count2);
drop(insert_lock);
// See if the event matches any known pushers via power level
@@ -396,7 +398,7 @@ where
{
self.services
.pdu_metadata
.add_relation(count2, related_pducount);
.add_relation(count, related_pducount);
}
}
@@ -408,7 +410,7 @@ where
if let Ok(related_pducount) = self.get_pdu_count(&in_reply_to.event_id).await {
self.services
.pdu_metadata
.add_relation(count2, related_pducount);
.add_relation(count, related_pducount);
}
},
| Relation::Thread(thread) => {