Add upper-bound for presence_since().
Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
@@ -19,6 +19,9 @@ pub(crate) enum PresenceCommand {
|
|||||||
PresenceSince {
|
PresenceSince {
|
||||||
/// UNIX timestamp since (u64)
|
/// UNIX timestamp since (u64)
|
||||||
since: u64,
|
since: u64,
|
||||||
|
|
||||||
|
/// Upper-bound of since
|
||||||
|
to: Option<u64>,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -34,11 +37,11 @@ pub(super) async fn process(subcommand: PresenceCommand, context: &Context<'_>)
|
|||||||
|
|
||||||
write!(context, "Query completed in {query_time:?}:\n\n```rs\n{results:#?}\n```")
|
write!(context, "Query completed in {query_time:?}:\n\n```rs\n{results:#?}\n```")
|
||||||
},
|
},
|
||||||
| PresenceCommand::PresenceSince { since } => {
|
| PresenceCommand::PresenceSince { since, to } => {
|
||||||
let timer = tokio::time::Instant::now();
|
let timer = tokio::time::Instant::now();
|
||||||
let results: Vec<(_, _, _)> = services
|
let results: Vec<(_, _, _)> = services
|
||||||
.presence
|
.presence
|
||||||
.presence_since(since)
|
.presence_since(since, to)
|
||||||
.map(|(user_id, count, bytes)| (user_id.to_owned(), count, bytes.to_vec()))
|
.map(|(user_id, count, bytes)| (user_id.to_owned(), count, bytes.to_vec()))
|
||||||
.collect()
|
.collect()
|
||||||
.await;
|
.await;
|
||||||
|
|||||||
@@ -288,7 +288,7 @@ pub(crate) async fn build_sync_events(
|
|||||||
let presence_updates: OptionFuture<_> = services
|
let presence_updates: OptionFuture<_> = services
|
||||||
.config
|
.config
|
||||||
.allow_local_presence
|
.allow_local_presence
|
||||||
.then(|| process_presence_updates(services, since, sender_user))
|
.then(|| process_presence_updates(services, since, next_batch, sender_user))
|
||||||
.into();
|
.into();
|
||||||
|
|
||||||
let account_data = services
|
let account_data = services
|
||||||
@@ -391,11 +391,12 @@ pub(crate) async fn build_sync_events(
|
|||||||
async fn process_presence_updates(
|
async fn process_presence_updates(
|
||||||
services: &Services,
|
services: &Services,
|
||||||
since: u64,
|
since: u64,
|
||||||
|
next_batch: u64,
|
||||||
syncing_user: &UserId,
|
syncing_user: &UserId,
|
||||||
) -> PresenceUpdates {
|
) -> PresenceUpdates {
|
||||||
services
|
services
|
||||||
.presence
|
.presence
|
||||||
.presence_since(since)
|
.presence_since(since, Some(next_batch))
|
||||||
.filter(|(user_id, ..)| {
|
.filter(|(user_id, ..)| {
|
||||||
services
|
services
|
||||||
.rooms
|
.rooms
|
||||||
|
|||||||
@@ -154,13 +154,15 @@ impl Data {
|
|||||||
pub(super) fn presence_since(
|
pub(super) fn presence_since(
|
||||||
&self,
|
&self,
|
||||||
since: u64,
|
since: u64,
|
||||||
|
to: Option<u64>,
|
||||||
) -> impl Stream<Item = (&UserId, u64, &[u8])> + Send + '_ {
|
) -> impl Stream<Item = (&UserId, u64, &[u8])> + Send + '_ {
|
||||||
self.presenceid_presence
|
self.presenceid_presence
|
||||||
.raw_stream()
|
.raw_stream()
|
||||||
.ignore_err()
|
.ignore_err()
|
||||||
.ready_filter_map(move |(key, presence)| {
|
.ready_filter_map(move |(key, presence)| {
|
||||||
let (count, user_id) = presenceid_parse(key).ok()?;
|
let (count, user_id) = presenceid_parse(key).ok()?;
|
||||||
(count > since).then_some((user_id, count, presence))
|
(count > since && to.is_none_or(|to| count <= to))
|
||||||
|
.then_some((user_id, count, presence))
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -239,8 +239,9 @@ impl Service {
|
|||||||
pub fn presence_since(
|
pub fn presence_since(
|
||||||
&self,
|
&self,
|
||||||
since: u64,
|
since: u64,
|
||||||
|
to: Option<u64>,
|
||||||
) -> impl Stream<Item = (&UserId, u64, &[u8])> + Send + '_ {
|
) -> impl Stream<Item = (&UserId, u64, &[u8])> + Send + '_ {
|
||||||
self.db.presence_since(since)
|
self.db.presence_since(since, to)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
|
|||||||
@@ -605,14 +605,15 @@ impl Service {
|
|||||||
since: (u64, u64),
|
since: (u64, u64),
|
||||||
max_edu_count: &AtomicU64,
|
max_edu_count: &AtomicU64,
|
||||||
) -> Option<EduBuf> {
|
) -> Option<EduBuf> {
|
||||||
let presence_since = self.services.presence.presence_since(since.0);
|
let presence_since = self
|
||||||
|
.services
|
||||||
|
.presence
|
||||||
|
.presence_since(since.0, Some(since.1));
|
||||||
|
|
||||||
pin_mut!(presence_since);
|
pin_mut!(presence_since);
|
||||||
let mut presence_updates = HashMap::<OwnedUserId, PresenceUpdate>::new();
|
let mut presence_updates = HashMap::<OwnedUserId, PresenceUpdate>::new();
|
||||||
while let Some((user_id, count, presence_bytes)) = presence_since.next().await {
|
while let Some((user_id, count, presence_bytes)) = presence_since.next().await {
|
||||||
if count > since.1 {
|
debug_assert!(count <= since.1, "exceeded upper-bound");
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
max_edu_count.fetch_max(count, Ordering::Relaxed);
|
max_edu_count.fetch_max(count, Ordering::Relaxed);
|
||||||
if !self.services.globals.user_is_local(user_id) {
|
if !self.services.globals.user_is_local(user_id) {
|
||||||
|
|||||||
Reference in New Issue
Block a user