Fix partial v3 syncs on post-timeout pass; fix partial state on room join.
Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
@@ -111,6 +111,7 @@ pub(crate) async fn create_room_route(
|
||||
.short
|
||||
.get_or_create_shortroomid(&room_id)
|
||||
.await;
|
||||
|
||||
let state_lock = services.rooms.state.mutex.lock(&room_id).await;
|
||||
|
||||
let alias: Option<OwnedRoomAliasId> = match body.room_alias_name.as_ref() {
|
||||
@@ -187,6 +188,10 @@ pub(crate) async fn create_room_route(
|
||||
},
|
||||
};
|
||||
|
||||
// Increment and hold the counter; the room will sync atomically to clients
|
||||
// which is preferable.
|
||||
let next_count = services.globals.next_count();
|
||||
|
||||
// 1. The room create event
|
||||
services
|
||||
.rooms
|
||||
@@ -425,8 +430,10 @@ pub(crate) async fn create_room_route(
|
||||
.await?;
|
||||
}
|
||||
|
||||
// 8. Events implied by invite (and TODO: invite_3pid)
|
||||
drop(next_count);
|
||||
drop(state_lock);
|
||||
|
||||
// 8. Events implied by invite (and TODO: invite_3pid)
|
||||
for user_id in &body.invite {
|
||||
if services
|
||||
.users
|
||||
|
||||
@@ -115,7 +115,6 @@ type PresenceUpdates = HashMap<OwnedUserId, PresenceEventContent>;
|
||||
skip_all,
|
||||
fields(
|
||||
user_id = %body.sender_user(),
|
||||
since = %body.body.since.as_deref().unwrap_or_default(),
|
||||
)
|
||||
)]
|
||||
pub(crate) async fn sync_events_route(
|
||||
@@ -134,21 +133,29 @@ pub(crate) async fn sync_events_route(
|
||||
|
||||
// Setup watchers, so if there's no response, we can wait for them
|
||||
let watcher = services.sync.watch(sender_user, sender_device);
|
||||
|
||||
let next_batch = services.globals.current_count();
|
||||
let response = build_sync_events(&services, &body, next_batch).await?;
|
||||
if body.body.full_state
|
||||
|| !(response.rooms.is_empty()
|
||||
&& response.presence.is_empty()
|
||||
&& response.account_data.is_empty()
|
||||
&& response.device_lists.is_empty()
|
||||
&& response.to_device.is_empty())
|
||||
{
|
||||
return Ok(response);
|
||||
let since = body
|
||||
.body
|
||||
.since
|
||||
.as_deref()
|
||||
.map(str::parse)
|
||||
.flat_ok()
|
||||
.unwrap_or(0);
|
||||
|
||||
// Make an initial pass if there's a possibility of something to find.
|
||||
if since < next_batch {
|
||||
let response = build_sync_events(&services, &body, since, next_batch).await?;
|
||||
if body.body.full_state
|
||||
|| !(response.rooms.is_empty()
|
||||
&& response.presence.is_empty()
|
||||
&& response.account_data.is_empty()
|
||||
&& response.device_lists.is_empty()
|
||||
&& response.to_device.is_empty())
|
||||
{
|
||||
return Ok(response);
|
||||
}
|
||||
}
|
||||
|
||||
// Hang a few seconds so requests are not spammed
|
||||
// Stop hanging if new info arrives
|
||||
let timeout_default = services.config.client_sync_timeout_default;
|
||||
let timeout_min = services.config.client_sync_timeout_min;
|
||||
let timeout_max = services.config.client_sync_timeout_max;
|
||||
@@ -158,27 +165,41 @@ pub(crate) async fn sync_events_route(
|
||||
.unwrap_or_else(|| Duration::from_millis(timeout_default))
|
||||
.clamp(Duration::from_millis(timeout_min), Duration::from_millis(timeout_max));
|
||||
|
||||
// Wait for activity notification.
|
||||
_ = tokio::time::timeout(duration, watcher).await;
|
||||
|
||||
// Retry returning data
|
||||
let next_batch = services.globals.current_count();
|
||||
build_sync_events(&services, &body, next_batch).await
|
||||
// Wait for synchronization of activity.
|
||||
let next_batch = services.globals.wait_pending().await?;
|
||||
debug_assert!(since <= next_batch, "next_batch is monotonic");
|
||||
|
||||
// Return an empty response when nothing has changed.
|
||||
if since == next_batch {
|
||||
return Ok(sync_events::v3::Response::new(next_batch.to_string()));
|
||||
}
|
||||
|
||||
build_sync_events(&services, &body, since, next_batch)
|
||||
.await
|
||||
.map_err(Into::into)
|
||||
}
|
||||
|
||||
pub(crate) async fn build_sync_events(
|
||||
#[tracing::instrument(
|
||||
name = "build",
|
||||
level = "debug",
|
||||
ret(level = "trace"),
|
||||
skip_all,
|
||||
fields(
|
||||
%since,
|
||||
%next_batch,
|
||||
)
|
||||
)]
|
||||
async fn build_sync_events(
|
||||
services: &Services,
|
||||
body: &Ruma<sync_events::v3::Request>,
|
||||
since: u64,
|
||||
next_batch: u64,
|
||||
) -> Result<sync_events::v3::Response, RumaResponse<UiaaResponse>> {
|
||||
) -> Result<sync_events::v3::Response> {
|
||||
let (sender_user, sender_device) = body.sender();
|
||||
|
||||
let since = body
|
||||
.body
|
||||
.since
|
||||
.as_ref()
|
||||
.and_then(|string| string.parse().ok())
|
||||
.unwrap_or(0);
|
||||
|
||||
let full_state = body.body.full_state;
|
||||
let filter = match body.body.filter.as_ref() {
|
||||
| None => FilterDefinition::default(),
|
||||
@@ -364,7 +385,7 @@ pub(crate) async fn build_sync_events(
|
||||
.collect()
|
||||
.await;
|
||||
|
||||
let response = sync_events::v3::Response {
|
||||
Ok(sync_events::v3::Response {
|
||||
account_data: GlobalAccountData { events: account_data },
|
||||
device_lists: DeviceLists {
|
||||
changed: device_list_updates.into_iter().collect(),
|
||||
@@ -390,9 +411,7 @@ pub(crate) async fn build_sync_events(
|
||||
knock: knocked_rooms,
|
||||
},
|
||||
to_device: ToDevice { events: to_device_events },
|
||||
};
|
||||
|
||||
Ok(response)
|
||||
})
|
||||
}
|
||||
|
||||
#[tracing::instrument(name = "presence", level = "debug", skip_all)]
|
||||
@@ -670,6 +689,25 @@ async fn load_joined_room(
|
||||
.boxed()
|
||||
.await?;
|
||||
|
||||
// State was changed after the cutoff for this sync; similar to other handlers.
|
||||
if current_shortstatehash > next_batch {
|
||||
// Transfer the since_shortstatehash not the current over to the next sync.
|
||||
if let Some(since_shortstatehash) = since_shortstatehash {
|
||||
services
|
||||
.rooms
|
||||
.user
|
||||
.associate_token_shortstatehash(room_id, next_batch, since_shortstatehash)
|
||||
.await;
|
||||
}
|
||||
|
||||
return Ok((JoinedRoom::default(), HashSet::new(), HashSet::new()));
|
||||
}
|
||||
|
||||
let associate_token = services
|
||||
.rooms
|
||||
.user
|
||||
.associate_token_shortstatehash(room_id, next_batch, current_shortstatehash);
|
||||
|
||||
let initial = since_shortstatehash.is_none() || since == 0;
|
||||
let lazy_loading_enabled = filter.room.state.lazy_load_options.is_enabled()
|
||||
|| filter
|
||||
@@ -739,9 +777,15 @@ async fn load_joined_room(
|
||||
.read_receipt
|
||||
.last_privateread_update(sender_user, room_id);
|
||||
|
||||
let (last_privateread_update, last_notification_read, since_sender_member, witness) =
|
||||
join4(last_privateread_update, last_notification_read, since_sender_member, witness)
|
||||
.await;
|
||||
let (last_privateread_update, last_notification_read, since_sender_member, witness, ()) =
|
||||
join5(
|
||||
last_privateread_update,
|
||||
last_notification_read,
|
||||
since_sender_member,
|
||||
witness,
|
||||
associate_token,
|
||||
)
|
||||
.await;
|
||||
|
||||
let joined_since_last_sync =
|
||||
since_sender_member
|
||||
@@ -888,14 +932,6 @@ async fn load_joined_room(
|
||||
.chain(private_read_event.flatten().into_iter())
|
||||
.collect();
|
||||
|
||||
// Save the state after this sync so we can send the correct state diff next
|
||||
// sync
|
||||
services
|
||||
.rooms
|
||||
.user
|
||||
.associate_token_shortstatehash(room_id, next_batch, current_shortstatehash)
|
||||
.await;
|
||||
|
||||
let joined_room = JoinedRoom {
|
||||
account_data: RoomAccountData { events: account_data_events },
|
||||
summary: RoomSummary {
|
||||
|
||||
@@ -58,6 +58,11 @@ pub async fn update_membership(
|
||||
|
||||
match &membership {
|
||||
| MembershipState::Join => {
|
||||
// Increment the counter for a unique value and hold the guard even though the
|
||||
// value is not used. This will allow proper sync to clients similar to the
|
||||
// other membership state changes.
|
||||
let _next_count = self.services.globals.next_count();
|
||||
|
||||
// Check if the user never joined this room
|
||||
if !self.once_joined(user_id, room_id).await {
|
||||
// Add the user ID to the join list then
|
||||
@@ -236,7 +241,7 @@ pub async fn update_joined_count(&self, room_id: &RoomId) {
|
||||
/// `update_membership` instead
|
||||
#[implement(super::Service)]
|
||||
#[tracing::instrument(skip(self), level = "debug")]
|
||||
pub fn mark_as_joined(&self, user_id: &UserId, room_id: &RoomId) {
|
||||
pub(crate) fn mark_as_joined(&self, user_id: &UserId, room_id: &RoomId) {
|
||||
let userroom_id = (user_id, room_id);
|
||||
let userroom_id = serialize_key(userroom_id).expect("failed to serialize userroom_id");
|
||||
|
||||
@@ -271,7 +276,7 @@ pub fn mark_as_joined(&self, user_id: &UserId, room_id: &RoomId) {
|
||||
/// `update_membership` instead
|
||||
#[implement(super::Service)]
|
||||
#[tracing::instrument(skip(self), level = "debug")]
|
||||
pub fn mark_as_left(&self, user_id: &UserId, room_id: &RoomId) {
|
||||
pub(crate) fn mark_as_left(&self, user_id: &UserId, room_id: &RoomId) {
|
||||
let count = self.services.globals.next_count();
|
||||
|
||||
let userroom_id = (user_id, room_id);
|
||||
@@ -315,7 +320,7 @@ pub fn mark_as_left(&self, user_id: &UserId, room_id: &RoomId) {
|
||||
/// `update_membership` instead
|
||||
#[implement(super::Service)]
|
||||
#[tracing::instrument(skip(self), level = "debug")]
|
||||
pub fn mark_as_knocked(
|
||||
pub(crate) fn mark_as_knocked(
|
||||
&self,
|
||||
user_id: &UserId,
|
||||
room_id: &RoomId,
|
||||
@@ -372,7 +377,7 @@ fn mark_as_once_joined(&self, user_id: &UserId, room_id: &RoomId) {
|
||||
|
||||
#[implement(super::Service)]
|
||||
#[tracing::instrument(level = "debug", skip(self, last_state, invite_via))]
|
||||
pub async fn mark_as_invited(
|
||||
pub(crate) async fn mark_as_invited(
|
||||
&self,
|
||||
user_id: &UserId,
|
||||
room_id: &RoomId,
|
||||
|
||||
@@ -159,27 +159,29 @@ where
|
||||
.await;
|
||||
|
||||
let insert_lock = self.mutex_insert.lock(pdu.room_id()).await;
|
||||
let count1 = self.services.globals.next_count();
|
||||
let count2 = self.services.globals.next_count();
|
||||
let next_count1 = self.services.globals.next_count();
|
||||
let next_count2 = self.services.globals.next_count();
|
||||
|
||||
// Mark as read first so the sending client doesn't get a notification even if
|
||||
// appending fails
|
||||
self.services
|
||||
.read_receipt
|
||||
.private_read_set(pdu.room_id(), pdu.sender(), *count1);
|
||||
.private_read_set(pdu.room_id(), pdu.sender(), *next_count2);
|
||||
|
||||
self.services
|
||||
.user
|
||||
.reset_notification_counts(pdu.sender(), pdu.room_id());
|
||||
|
||||
let count2 = PduCount::Normal(*count2);
|
||||
let pdu_id: RawPduId = PduId { shortroomid, shorteventid: count2 }.into();
|
||||
let count = PduCount::Normal(*next_count1);
|
||||
let pdu_id: RawPduId = PduId { shortroomid, shorteventid: count }.into();
|
||||
|
||||
// Insert pdu
|
||||
self.db
|
||||
.append_pdu(&pdu_id, pdu, &pdu_json, count2)
|
||||
.append_pdu(&pdu_id, pdu, &pdu_json, count)
|
||||
.await;
|
||||
|
||||
drop(next_count1);
|
||||
drop(next_count2);
|
||||
drop(insert_lock);
|
||||
|
||||
// See if the event matches any known pushers via power level
|
||||
@@ -396,7 +398,7 @@ where
|
||||
{
|
||||
self.services
|
||||
.pdu_metadata
|
||||
.add_relation(count2, related_pducount);
|
||||
.add_relation(count, related_pducount);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -408,7 +410,7 @@ where
|
||||
if let Ok(related_pducount) = self.get_pdu_count(&in_reply_to.event_id).await {
|
||||
self.services
|
||||
.pdu_metadata
|
||||
.add_relation(count2, related_pducount);
|
||||
.add_relation(count, related_pducount);
|
||||
}
|
||||
},
|
||||
| Relation::Thread(thread) => {
|
||||
|
||||
Reference in New Issue
Block a user