diff --git a/src/api/client/room/create.rs b/src/api/client/room/create.rs index 877d3317..5fe7735f 100644 --- a/src/api/client/room/create.rs +++ b/src/api/client/room/create.rs @@ -111,6 +111,7 @@ pub(crate) async fn create_room_route( .short .get_or_create_shortroomid(&room_id) .await; + let state_lock = services.rooms.state.mutex.lock(&room_id).await; let alias: Option = match body.room_alias_name.as_ref() { @@ -187,6 +188,10 @@ pub(crate) async fn create_room_route( }, }; + // Increment and hold the counter; the room will sync atomically to clients + // which is preferable. + let next_count = services.globals.next_count(); + // 1. The room create event services .rooms @@ -425,8 +430,10 @@ pub(crate) async fn create_room_route( .await?; } - // 8. Events implied by invite (and TODO: invite_3pid) + drop(next_count); drop(state_lock); + + // 8. Events implied by invite (and TODO: invite_3pid) for user_id in &body.invite { if services .users diff --git a/src/api/client/sync/v3.rs b/src/api/client/sync/v3.rs index 44d80ea7..21c954ab 100644 --- a/src/api/client/sync/v3.rs +++ b/src/api/client/sync/v3.rs @@ -115,7 +115,6 @@ type PresenceUpdates = HashMap; skip_all, fields( user_id = %body.sender_user(), - since = %body.body.since.as_deref().unwrap_or_default(), ) )] pub(crate) async fn sync_events_route( @@ -134,21 +133,29 @@ pub(crate) async fn sync_events_route( // Setup watchers, so if there's no response, we can wait for them let watcher = services.sync.watch(sender_user, sender_device); - let next_batch = services.globals.current_count(); - let response = build_sync_events(&services, &body, next_batch).await?; - if body.body.full_state - || !(response.rooms.is_empty() - && response.presence.is_empty() - && response.account_data.is_empty() - && response.device_lists.is_empty() - && response.to_device.is_empty()) - { - return Ok(response); + let since = body + .body + .since + .as_deref() + .map(str::parse) + .flat_ok() + .unwrap_or(0); + + // Make an initial pass if there's a possibility of something to find. + if since < next_batch { + let response = build_sync_events(&services, &body, since, next_batch).await?; + if body.body.full_state + || !(response.rooms.is_empty() + && response.presence.is_empty() + && response.account_data.is_empty() + && response.device_lists.is_empty() + && response.to_device.is_empty()) + { + return Ok(response); + } } - // Hang a few seconds so requests are not spammed - // Stop hanging if new info arrives let timeout_default = services.config.client_sync_timeout_default; let timeout_min = services.config.client_sync_timeout_min; let timeout_max = services.config.client_sync_timeout_max; @@ -158,27 +165,41 @@ pub(crate) async fn sync_events_route( .unwrap_or_else(|| Duration::from_millis(timeout_default)) .clamp(Duration::from_millis(timeout_min), Duration::from_millis(timeout_max)); + // Wait for activity notification. _ = tokio::time::timeout(duration, watcher).await; - // Retry returning data - let next_batch = services.globals.current_count(); - build_sync_events(&services, &body, next_batch).await + // Wait for synchronization of activity. + let next_batch = services.globals.wait_pending().await?; + debug_assert!(since <= next_batch, "next_batch is monotonic"); + + // Return an empty response when nothing has changed. + if since == next_batch { + return Ok(sync_events::v3::Response::new(next_batch.to_string())); + } + + build_sync_events(&services, &body, since, next_batch) + .await + .map_err(Into::into) } -pub(crate) async fn build_sync_events( +#[tracing::instrument( + name = "build", + level = "debug", + ret(level = "trace"), + skip_all, + fields( + %since, + %next_batch, + ) +)] +async fn build_sync_events( services: &Services, body: &Ruma, + since: u64, next_batch: u64, -) -> Result> { +) -> Result { let (sender_user, sender_device) = body.sender(); - let since = body - .body - .since - .as_ref() - .and_then(|string| string.parse().ok()) - .unwrap_or(0); - let full_state = body.body.full_state; let filter = match body.body.filter.as_ref() { | None => FilterDefinition::default(), @@ -364,7 +385,7 @@ pub(crate) async fn build_sync_events( .collect() .await; - let response = sync_events::v3::Response { + Ok(sync_events::v3::Response { account_data: GlobalAccountData { events: account_data }, device_lists: DeviceLists { changed: device_list_updates.into_iter().collect(), @@ -390,9 +411,7 @@ pub(crate) async fn build_sync_events( knock: knocked_rooms, }, to_device: ToDevice { events: to_device_events }, - }; - - Ok(response) + }) } #[tracing::instrument(name = "presence", level = "debug", skip_all)] @@ -670,6 +689,25 @@ async fn load_joined_room( .boxed() .await?; + // State was changed after the cutoff for this sync; similar to other handlers. + if current_shortstatehash > next_batch { + // Transfer the since_shortstatehash not the current over to the next sync. + if let Some(since_shortstatehash) = since_shortstatehash { + services + .rooms + .user + .associate_token_shortstatehash(room_id, next_batch, since_shortstatehash) + .await; + } + + return Ok((JoinedRoom::default(), HashSet::new(), HashSet::new())); + } + + let associate_token = services + .rooms + .user + .associate_token_shortstatehash(room_id, next_batch, current_shortstatehash); + let initial = since_shortstatehash.is_none() || since == 0; let lazy_loading_enabled = filter.room.state.lazy_load_options.is_enabled() || filter @@ -739,9 +777,15 @@ async fn load_joined_room( .read_receipt .last_privateread_update(sender_user, room_id); - let (last_privateread_update, last_notification_read, since_sender_member, witness) = - join4(last_privateread_update, last_notification_read, since_sender_member, witness) - .await; + let (last_privateread_update, last_notification_read, since_sender_member, witness, ()) = + join5( + last_privateread_update, + last_notification_read, + since_sender_member, + witness, + associate_token, + ) + .await; let joined_since_last_sync = since_sender_member @@ -888,14 +932,6 @@ async fn load_joined_room( .chain(private_read_event.flatten().into_iter()) .collect(); - // Save the state after this sync so we can send the correct state diff next - // sync - services - .rooms - .user - .associate_token_shortstatehash(room_id, next_batch, current_shortstatehash) - .await; - let joined_room = JoinedRoom { account_data: RoomAccountData { events: account_data_events }, summary: RoomSummary { diff --git a/src/service/rooms/state_cache/update.rs b/src/service/rooms/state_cache/update.rs index 2c52af42..cb0d5e83 100644 --- a/src/service/rooms/state_cache/update.rs +++ b/src/service/rooms/state_cache/update.rs @@ -58,6 +58,11 @@ pub async fn update_membership( match &membership { | MembershipState::Join => { + // Increment the counter for a unique value and hold the guard even though the + // value is not used. This will allow proper sync to clients similar to the + // other membership state changes. + let _next_count = self.services.globals.next_count(); + // Check if the user never joined this room if !self.once_joined(user_id, room_id).await { // Add the user ID to the join list then @@ -236,7 +241,7 @@ pub async fn update_joined_count(&self, room_id: &RoomId) { /// `update_membership` instead #[implement(super::Service)] #[tracing::instrument(skip(self), level = "debug")] -pub fn mark_as_joined(&self, user_id: &UserId, room_id: &RoomId) { +pub(crate) fn mark_as_joined(&self, user_id: &UserId, room_id: &RoomId) { let userroom_id = (user_id, room_id); let userroom_id = serialize_key(userroom_id).expect("failed to serialize userroom_id"); @@ -271,7 +276,7 @@ pub fn mark_as_joined(&self, user_id: &UserId, room_id: &RoomId) { /// `update_membership` instead #[implement(super::Service)] #[tracing::instrument(skip(self), level = "debug")] -pub fn mark_as_left(&self, user_id: &UserId, room_id: &RoomId) { +pub(crate) fn mark_as_left(&self, user_id: &UserId, room_id: &RoomId) { let count = self.services.globals.next_count(); let userroom_id = (user_id, room_id); @@ -315,7 +320,7 @@ pub fn mark_as_left(&self, user_id: &UserId, room_id: &RoomId) { /// `update_membership` instead #[implement(super::Service)] #[tracing::instrument(skip(self), level = "debug")] -pub fn mark_as_knocked( +pub(crate) fn mark_as_knocked( &self, user_id: &UserId, room_id: &RoomId, @@ -372,7 +377,7 @@ fn mark_as_once_joined(&self, user_id: &UserId, room_id: &RoomId) { #[implement(super::Service)] #[tracing::instrument(level = "debug", skip(self, last_state, invite_via))] -pub async fn mark_as_invited( +pub(crate) async fn mark_as_invited( &self, user_id: &UserId, room_id: &RoomId, diff --git a/src/service/rooms/timeline/append.rs b/src/service/rooms/timeline/append.rs index dcdeb09e..e2335151 100644 --- a/src/service/rooms/timeline/append.rs +++ b/src/service/rooms/timeline/append.rs @@ -159,27 +159,29 @@ where .await; let insert_lock = self.mutex_insert.lock(pdu.room_id()).await; - let count1 = self.services.globals.next_count(); - let count2 = self.services.globals.next_count(); + let next_count1 = self.services.globals.next_count(); + let next_count2 = self.services.globals.next_count(); // Mark as read first so the sending client doesn't get a notification even if // appending fails self.services .read_receipt - .private_read_set(pdu.room_id(), pdu.sender(), *count1); + .private_read_set(pdu.room_id(), pdu.sender(), *next_count2); self.services .user .reset_notification_counts(pdu.sender(), pdu.room_id()); - let count2 = PduCount::Normal(*count2); - let pdu_id: RawPduId = PduId { shortroomid, shorteventid: count2 }.into(); + let count = PduCount::Normal(*next_count1); + let pdu_id: RawPduId = PduId { shortroomid, shorteventid: count }.into(); // Insert pdu self.db - .append_pdu(&pdu_id, pdu, &pdu_json, count2) + .append_pdu(&pdu_id, pdu, &pdu_json, count) .await; + drop(next_count1); + drop(next_count2); drop(insert_lock); // See if the event matches any known pushers via power level @@ -396,7 +398,7 @@ where { self.services .pdu_metadata - .add_relation(count2, related_pducount); + .add_relation(count, related_pducount); } } @@ -408,7 +410,7 @@ where if let Ok(related_pducount) = self.get_pdu_count(&in_reply_to.event_id).await { self.services .pdu_metadata - .add_relation(count2, related_pducount); + .add_relation(count, related_pducount); } }, | Relation::Thread(thread) => {