Fix partial v3 syncs on post-timeout pass; fix partial state on room join.
Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
@@ -111,6 +111,7 @@ pub(crate) async fn create_room_route(
|
|||||||
.short
|
.short
|
||||||
.get_or_create_shortroomid(&room_id)
|
.get_or_create_shortroomid(&room_id)
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
let state_lock = services.rooms.state.mutex.lock(&room_id).await;
|
let state_lock = services.rooms.state.mutex.lock(&room_id).await;
|
||||||
|
|
||||||
let alias: Option<OwnedRoomAliasId> = match body.room_alias_name.as_ref() {
|
let alias: Option<OwnedRoomAliasId> = match body.room_alias_name.as_ref() {
|
||||||
@@ -187,6 +188,10 @@ pub(crate) async fn create_room_route(
|
|||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// Increment and hold the counter; the room will sync atomically to clients
|
||||||
|
// which is preferable.
|
||||||
|
let next_count = services.globals.next_count();
|
||||||
|
|
||||||
// 1. The room create event
|
// 1. The room create event
|
||||||
services
|
services
|
||||||
.rooms
|
.rooms
|
||||||
@@ -425,8 +430,10 @@ pub(crate) async fn create_room_route(
|
|||||||
.await?;
|
.await?;
|
||||||
}
|
}
|
||||||
|
|
||||||
// 8. Events implied by invite (and TODO: invite_3pid)
|
drop(next_count);
|
||||||
drop(state_lock);
|
drop(state_lock);
|
||||||
|
|
||||||
|
// 8. Events implied by invite (and TODO: invite_3pid)
|
||||||
for user_id in &body.invite {
|
for user_id in &body.invite {
|
||||||
if services
|
if services
|
||||||
.users
|
.users
|
||||||
|
|||||||
@@ -115,7 +115,6 @@ type PresenceUpdates = HashMap<OwnedUserId, PresenceEventContent>;
|
|||||||
skip_all,
|
skip_all,
|
||||||
fields(
|
fields(
|
||||||
user_id = %body.sender_user(),
|
user_id = %body.sender_user(),
|
||||||
since = %body.body.since.as_deref().unwrap_or_default(),
|
|
||||||
)
|
)
|
||||||
)]
|
)]
|
||||||
pub(crate) async fn sync_events_route(
|
pub(crate) async fn sync_events_route(
|
||||||
@@ -134,21 +133,29 @@ pub(crate) async fn sync_events_route(
|
|||||||
|
|
||||||
// Setup watchers, so if there's no response, we can wait for them
|
// Setup watchers, so if there's no response, we can wait for them
|
||||||
let watcher = services.sync.watch(sender_user, sender_device);
|
let watcher = services.sync.watch(sender_user, sender_device);
|
||||||
|
|
||||||
let next_batch = services.globals.current_count();
|
let next_batch = services.globals.current_count();
|
||||||
let response = build_sync_events(&services, &body, next_batch).await?;
|
let since = body
|
||||||
if body.body.full_state
|
.body
|
||||||
|| !(response.rooms.is_empty()
|
.since
|
||||||
&& response.presence.is_empty()
|
.as_deref()
|
||||||
&& response.account_data.is_empty()
|
.map(str::parse)
|
||||||
&& response.device_lists.is_empty()
|
.flat_ok()
|
||||||
&& response.to_device.is_empty())
|
.unwrap_or(0);
|
||||||
{
|
|
||||||
return Ok(response);
|
// Make an initial pass if there's a possibility of something to find.
|
||||||
|
if since < next_batch {
|
||||||
|
let response = build_sync_events(&services, &body, since, next_batch).await?;
|
||||||
|
if body.body.full_state
|
||||||
|
|| !(response.rooms.is_empty()
|
||||||
|
&& response.presence.is_empty()
|
||||||
|
&& response.account_data.is_empty()
|
||||||
|
&& response.device_lists.is_empty()
|
||||||
|
&& response.to_device.is_empty())
|
||||||
|
{
|
||||||
|
return Ok(response);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Hang a few seconds so requests are not spammed
|
|
||||||
// Stop hanging if new info arrives
|
|
||||||
let timeout_default = services.config.client_sync_timeout_default;
|
let timeout_default = services.config.client_sync_timeout_default;
|
||||||
let timeout_min = services.config.client_sync_timeout_min;
|
let timeout_min = services.config.client_sync_timeout_min;
|
||||||
let timeout_max = services.config.client_sync_timeout_max;
|
let timeout_max = services.config.client_sync_timeout_max;
|
||||||
@@ -158,27 +165,41 @@ pub(crate) async fn sync_events_route(
|
|||||||
.unwrap_or_else(|| Duration::from_millis(timeout_default))
|
.unwrap_or_else(|| Duration::from_millis(timeout_default))
|
||||||
.clamp(Duration::from_millis(timeout_min), Duration::from_millis(timeout_max));
|
.clamp(Duration::from_millis(timeout_min), Duration::from_millis(timeout_max));
|
||||||
|
|
||||||
|
// Wait for activity notification.
|
||||||
_ = tokio::time::timeout(duration, watcher).await;
|
_ = tokio::time::timeout(duration, watcher).await;
|
||||||
|
|
||||||
// Retry returning data
|
// Wait for synchronization of activity.
|
||||||
let next_batch = services.globals.current_count();
|
let next_batch = services.globals.wait_pending().await?;
|
||||||
build_sync_events(&services, &body, next_batch).await
|
debug_assert!(since <= next_batch, "next_batch is monotonic");
|
||||||
|
|
||||||
|
// Return an empty response when nothing has changed.
|
||||||
|
if since == next_batch {
|
||||||
|
return Ok(sync_events::v3::Response::new(next_batch.to_string()));
|
||||||
|
}
|
||||||
|
|
||||||
|
build_sync_events(&services, &body, since, next_batch)
|
||||||
|
.await
|
||||||
|
.map_err(Into::into)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) async fn build_sync_events(
|
#[tracing::instrument(
|
||||||
|
name = "build",
|
||||||
|
level = "debug",
|
||||||
|
ret(level = "trace"),
|
||||||
|
skip_all,
|
||||||
|
fields(
|
||||||
|
%since,
|
||||||
|
%next_batch,
|
||||||
|
)
|
||||||
|
)]
|
||||||
|
async fn build_sync_events(
|
||||||
services: &Services,
|
services: &Services,
|
||||||
body: &Ruma<sync_events::v3::Request>,
|
body: &Ruma<sync_events::v3::Request>,
|
||||||
|
since: u64,
|
||||||
next_batch: u64,
|
next_batch: u64,
|
||||||
) -> Result<sync_events::v3::Response, RumaResponse<UiaaResponse>> {
|
) -> Result<sync_events::v3::Response> {
|
||||||
let (sender_user, sender_device) = body.sender();
|
let (sender_user, sender_device) = body.sender();
|
||||||
|
|
||||||
let since = body
|
|
||||||
.body
|
|
||||||
.since
|
|
||||||
.as_ref()
|
|
||||||
.and_then(|string| string.parse().ok())
|
|
||||||
.unwrap_or(0);
|
|
||||||
|
|
||||||
let full_state = body.body.full_state;
|
let full_state = body.body.full_state;
|
||||||
let filter = match body.body.filter.as_ref() {
|
let filter = match body.body.filter.as_ref() {
|
||||||
| None => FilterDefinition::default(),
|
| None => FilterDefinition::default(),
|
||||||
@@ -364,7 +385,7 @@ pub(crate) async fn build_sync_events(
|
|||||||
.collect()
|
.collect()
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
let response = sync_events::v3::Response {
|
Ok(sync_events::v3::Response {
|
||||||
account_data: GlobalAccountData { events: account_data },
|
account_data: GlobalAccountData { events: account_data },
|
||||||
device_lists: DeviceLists {
|
device_lists: DeviceLists {
|
||||||
changed: device_list_updates.into_iter().collect(),
|
changed: device_list_updates.into_iter().collect(),
|
||||||
@@ -390,9 +411,7 @@ pub(crate) async fn build_sync_events(
|
|||||||
knock: knocked_rooms,
|
knock: knocked_rooms,
|
||||||
},
|
},
|
||||||
to_device: ToDevice { events: to_device_events },
|
to_device: ToDevice { events: to_device_events },
|
||||||
};
|
})
|
||||||
|
|
||||||
Ok(response)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument(name = "presence", level = "debug", skip_all)]
|
#[tracing::instrument(name = "presence", level = "debug", skip_all)]
|
||||||
@@ -670,6 +689,25 @@ async fn load_joined_room(
|
|||||||
.boxed()
|
.boxed()
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
|
// State was changed after the cutoff for this sync; similar to other handlers.
|
||||||
|
if current_shortstatehash > next_batch {
|
||||||
|
// Transfer the since_shortstatehash not the current over to the next sync.
|
||||||
|
if let Some(since_shortstatehash) = since_shortstatehash {
|
||||||
|
services
|
||||||
|
.rooms
|
||||||
|
.user
|
||||||
|
.associate_token_shortstatehash(room_id, next_batch, since_shortstatehash)
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
|
||||||
|
return Ok((JoinedRoom::default(), HashSet::new(), HashSet::new()));
|
||||||
|
}
|
||||||
|
|
||||||
|
let associate_token = services
|
||||||
|
.rooms
|
||||||
|
.user
|
||||||
|
.associate_token_shortstatehash(room_id, next_batch, current_shortstatehash);
|
||||||
|
|
||||||
let initial = since_shortstatehash.is_none() || since == 0;
|
let initial = since_shortstatehash.is_none() || since == 0;
|
||||||
let lazy_loading_enabled = filter.room.state.lazy_load_options.is_enabled()
|
let lazy_loading_enabled = filter.room.state.lazy_load_options.is_enabled()
|
||||||
|| filter
|
|| filter
|
||||||
@@ -739,9 +777,15 @@ async fn load_joined_room(
|
|||||||
.read_receipt
|
.read_receipt
|
||||||
.last_privateread_update(sender_user, room_id);
|
.last_privateread_update(sender_user, room_id);
|
||||||
|
|
||||||
let (last_privateread_update, last_notification_read, since_sender_member, witness) =
|
let (last_privateread_update, last_notification_read, since_sender_member, witness, ()) =
|
||||||
join4(last_privateread_update, last_notification_read, since_sender_member, witness)
|
join5(
|
||||||
.await;
|
last_privateread_update,
|
||||||
|
last_notification_read,
|
||||||
|
since_sender_member,
|
||||||
|
witness,
|
||||||
|
associate_token,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
|
||||||
let joined_since_last_sync =
|
let joined_since_last_sync =
|
||||||
since_sender_member
|
since_sender_member
|
||||||
@@ -888,14 +932,6 @@ async fn load_joined_room(
|
|||||||
.chain(private_read_event.flatten().into_iter())
|
.chain(private_read_event.flatten().into_iter())
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
// Save the state after this sync so we can send the correct state diff next
|
|
||||||
// sync
|
|
||||||
services
|
|
||||||
.rooms
|
|
||||||
.user
|
|
||||||
.associate_token_shortstatehash(room_id, next_batch, current_shortstatehash)
|
|
||||||
.await;
|
|
||||||
|
|
||||||
let joined_room = JoinedRoom {
|
let joined_room = JoinedRoom {
|
||||||
account_data: RoomAccountData { events: account_data_events },
|
account_data: RoomAccountData { events: account_data_events },
|
||||||
summary: RoomSummary {
|
summary: RoomSummary {
|
||||||
|
|||||||
@@ -58,6 +58,11 @@ pub async fn update_membership(
|
|||||||
|
|
||||||
match &membership {
|
match &membership {
|
||||||
| MembershipState::Join => {
|
| MembershipState::Join => {
|
||||||
|
// Increment the counter for a unique value and hold the guard even though the
|
||||||
|
// value is not used. This will allow proper sync to clients similar to the
|
||||||
|
// other membership state changes.
|
||||||
|
let _next_count = self.services.globals.next_count();
|
||||||
|
|
||||||
// Check if the user never joined this room
|
// Check if the user never joined this room
|
||||||
if !self.once_joined(user_id, room_id).await {
|
if !self.once_joined(user_id, room_id).await {
|
||||||
// Add the user ID to the join list then
|
// Add the user ID to the join list then
|
||||||
@@ -236,7 +241,7 @@ pub async fn update_joined_count(&self, room_id: &RoomId) {
|
|||||||
/// `update_membership` instead
|
/// `update_membership` instead
|
||||||
#[implement(super::Service)]
|
#[implement(super::Service)]
|
||||||
#[tracing::instrument(skip(self), level = "debug")]
|
#[tracing::instrument(skip(self), level = "debug")]
|
||||||
pub fn mark_as_joined(&self, user_id: &UserId, room_id: &RoomId) {
|
pub(crate) fn mark_as_joined(&self, user_id: &UserId, room_id: &RoomId) {
|
||||||
let userroom_id = (user_id, room_id);
|
let userroom_id = (user_id, room_id);
|
||||||
let userroom_id = serialize_key(userroom_id).expect("failed to serialize userroom_id");
|
let userroom_id = serialize_key(userroom_id).expect("failed to serialize userroom_id");
|
||||||
|
|
||||||
@@ -271,7 +276,7 @@ pub fn mark_as_joined(&self, user_id: &UserId, room_id: &RoomId) {
|
|||||||
/// `update_membership` instead
|
/// `update_membership` instead
|
||||||
#[implement(super::Service)]
|
#[implement(super::Service)]
|
||||||
#[tracing::instrument(skip(self), level = "debug")]
|
#[tracing::instrument(skip(self), level = "debug")]
|
||||||
pub fn mark_as_left(&self, user_id: &UserId, room_id: &RoomId) {
|
pub(crate) fn mark_as_left(&self, user_id: &UserId, room_id: &RoomId) {
|
||||||
let count = self.services.globals.next_count();
|
let count = self.services.globals.next_count();
|
||||||
|
|
||||||
let userroom_id = (user_id, room_id);
|
let userroom_id = (user_id, room_id);
|
||||||
@@ -315,7 +320,7 @@ pub fn mark_as_left(&self, user_id: &UserId, room_id: &RoomId) {
|
|||||||
/// `update_membership` instead
|
/// `update_membership` instead
|
||||||
#[implement(super::Service)]
|
#[implement(super::Service)]
|
||||||
#[tracing::instrument(skip(self), level = "debug")]
|
#[tracing::instrument(skip(self), level = "debug")]
|
||||||
pub fn mark_as_knocked(
|
pub(crate) fn mark_as_knocked(
|
||||||
&self,
|
&self,
|
||||||
user_id: &UserId,
|
user_id: &UserId,
|
||||||
room_id: &RoomId,
|
room_id: &RoomId,
|
||||||
@@ -372,7 +377,7 @@ fn mark_as_once_joined(&self, user_id: &UserId, room_id: &RoomId) {
|
|||||||
|
|
||||||
#[implement(super::Service)]
|
#[implement(super::Service)]
|
||||||
#[tracing::instrument(level = "debug", skip(self, last_state, invite_via))]
|
#[tracing::instrument(level = "debug", skip(self, last_state, invite_via))]
|
||||||
pub async fn mark_as_invited(
|
pub(crate) async fn mark_as_invited(
|
||||||
&self,
|
&self,
|
||||||
user_id: &UserId,
|
user_id: &UserId,
|
||||||
room_id: &RoomId,
|
room_id: &RoomId,
|
||||||
|
|||||||
@@ -159,27 +159,29 @@ where
|
|||||||
.await;
|
.await;
|
||||||
|
|
||||||
let insert_lock = self.mutex_insert.lock(pdu.room_id()).await;
|
let insert_lock = self.mutex_insert.lock(pdu.room_id()).await;
|
||||||
let count1 = self.services.globals.next_count();
|
let next_count1 = self.services.globals.next_count();
|
||||||
let count2 = self.services.globals.next_count();
|
let next_count2 = self.services.globals.next_count();
|
||||||
|
|
||||||
// Mark as read first so the sending client doesn't get a notification even if
|
// Mark as read first so the sending client doesn't get a notification even if
|
||||||
// appending fails
|
// appending fails
|
||||||
self.services
|
self.services
|
||||||
.read_receipt
|
.read_receipt
|
||||||
.private_read_set(pdu.room_id(), pdu.sender(), *count1);
|
.private_read_set(pdu.room_id(), pdu.sender(), *next_count2);
|
||||||
|
|
||||||
self.services
|
self.services
|
||||||
.user
|
.user
|
||||||
.reset_notification_counts(pdu.sender(), pdu.room_id());
|
.reset_notification_counts(pdu.sender(), pdu.room_id());
|
||||||
|
|
||||||
let count2 = PduCount::Normal(*count2);
|
let count = PduCount::Normal(*next_count1);
|
||||||
let pdu_id: RawPduId = PduId { shortroomid, shorteventid: count2 }.into();
|
let pdu_id: RawPduId = PduId { shortroomid, shorteventid: count }.into();
|
||||||
|
|
||||||
// Insert pdu
|
// Insert pdu
|
||||||
self.db
|
self.db
|
||||||
.append_pdu(&pdu_id, pdu, &pdu_json, count2)
|
.append_pdu(&pdu_id, pdu, &pdu_json, count)
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
|
drop(next_count1);
|
||||||
|
drop(next_count2);
|
||||||
drop(insert_lock);
|
drop(insert_lock);
|
||||||
|
|
||||||
// See if the event matches any known pushers via power level
|
// See if the event matches any known pushers via power level
|
||||||
@@ -396,7 +398,7 @@ where
|
|||||||
{
|
{
|
||||||
self.services
|
self.services
|
||||||
.pdu_metadata
|
.pdu_metadata
|
||||||
.add_relation(count2, related_pducount);
|
.add_relation(count, related_pducount);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -408,7 +410,7 @@ where
|
|||||||
if let Ok(related_pducount) = self.get_pdu_count(&in_reply_to.event_id).await {
|
if let Ok(related_pducount) = self.get_pdu_count(&in_reply_to.event_id).await {
|
||||||
self.services
|
self.services
|
||||||
.pdu_metadata
|
.pdu_metadata
|
||||||
.add_relation(count2, related_pducount);
|
.add_relation(count, related_pducount);
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
| Relation::Thread(thread) => {
|
| Relation::Thread(thread) => {
|
||||||
|
|||||||
Reference in New Issue
Block a user