From 63dfe8f7e37a7f70e8f8dbf23f5001701b7218eb Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Sat, 26 Jul 2025 04:59:00 +0000 Subject: [PATCH] Add upper-bound for presence_since(). Signed-off-by: Jason Volk --- src/admin/query/presence.rs | 7 +++++-- src/api/client/sync/v3.rs | 5 +++-- src/service/presence/data.rs | 4 +++- src/service/presence/mod.rs | 3 ++- src/service/sending/sender.rs | 9 +++++---- 5 files changed, 18 insertions(+), 10 deletions(-) diff --git a/src/admin/query/presence.rs b/src/admin/query/presence.rs index d21795d7..b7bf1f79 100644 --- a/src/admin/query/presence.rs +++ b/src/admin/query/presence.rs @@ -19,6 +19,9 @@ pub(crate) enum PresenceCommand { PresenceSince { /// UNIX timestamp since (u64) since: u64, + + /// Upper-bound of since + to: Option, }, } @@ -34,11 +37,11 @@ pub(super) async fn process(subcommand: PresenceCommand, context: &Context<'_>) write!(context, "Query completed in {query_time:?}:\n\n```rs\n{results:#?}\n```") }, - | PresenceCommand::PresenceSince { since } => { + | PresenceCommand::PresenceSince { since, to } => { let timer = tokio::time::Instant::now(); let results: Vec<(_, _, _)> = services .presence - .presence_since(since) + .presence_since(since, to) .map(|(user_id, count, bytes)| (user_id.to_owned(), count, bytes.to_vec())) .collect() .await; diff --git a/src/api/client/sync/v3.rs b/src/api/client/sync/v3.rs index 706dbb96..2b738f02 100644 --- a/src/api/client/sync/v3.rs +++ b/src/api/client/sync/v3.rs @@ -288,7 +288,7 @@ pub(crate) async fn build_sync_events( let presence_updates: OptionFuture<_> = services .config .allow_local_presence - .then(|| process_presence_updates(services, since, sender_user)) + .then(|| process_presence_updates(services, since, next_batch, sender_user)) .into(); let account_data = services @@ -391,11 +391,12 @@ pub(crate) async fn build_sync_events( async fn process_presence_updates( services: &Services, since: u64, + next_batch: u64, syncing_user: &UserId, ) -> PresenceUpdates { services .presence - .presence_since(since) + .presence_since(since, Some(next_batch)) .filter(|(user_id, ..)| { services .rooms diff --git a/src/service/presence/data.rs b/src/service/presence/data.rs index 53dee45c..de6b07cd 100644 --- a/src/service/presence/data.rs +++ b/src/service/presence/data.rs @@ -154,13 +154,15 @@ impl Data { pub(super) fn presence_since( &self, since: u64, + to: Option, ) -> impl Stream + Send + '_ { self.presenceid_presence .raw_stream() .ignore_err() .ready_filter_map(move |(key, presence)| { let (count, user_id) = presenceid_parse(key).ok()?; - (count > since).then_some((user_id, count, presence)) + (count > since && to.is_none_or(|to| count <= to)) + .then_some((user_id, count, presence)) }) } } diff --git a/src/service/presence/mod.rs b/src/service/presence/mod.rs index 0b958206..79541b66 100644 --- a/src/service/presence/mod.rs +++ b/src/service/presence/mod.rs @@ -239,8 +239,9 @@ impl Service { pub fn presence_since( &self, since: u64, + to: Option, ) -> impl Stream + Send + '_ { - self.db.presence_since(since) + self.db.presence_since(since, to) } #[inline] diff --git a/src/service/sending/sender.rs b/src/service/sending/sender.rs index 6672d6e0..162b070e 100644 --- a/src/service/sending/sender.rs +++ b/src/service/sending/sender.rs @@ -605,14 +605,15 @@ impl Service { since: (u64, u64), max_edu_count: &AtomicU64, ) -> Option { - let presence_since = self.services.presence.presence_since(since.0); + let presence_since = self + .services + .presence + .presence_since(since.0, Some(since.1)); pin_mut!(presence_since); let mut presence_updates = HashMap::::new(); while let Some((user_id, count, presence_bytes)) = presence_since.next().await { - if count > since.1 { - break; - } + debug_assert!(count <= since.1, "exceeded upper-bound"); max_edu_count.fetch_max(count, Ordering::Relaxed); if !self.services.globals.user_is_local(user_id) {