presence: restore deferred push suppression

This commit is contained in:
Jared L
2026-01-21 06:05:09 +11:00
parent 22a7f6cb33
commit aadc9ade3e
5 changed files with 509 additions and 9 deletions

View File

@@ -70,11 +70,7 @@ impl Service {
return None;
};
let last_last_active_ago: u64 = event
.content
.last_active_ago
.unwrap_or_default()
.into();
let last_last_active_ago: u64 = event.content.last_active_ago?.into();
(last_last_active_ago < refresh_ms).then_some((count, last_last_active_ago))
}
@@ -349,6 +345,11 @@ impl Service {
);
if let Some(new_state) = new_state {
if matches!(new_state, PresenceState::Unavailable | PresenceState::Offline) {
self.services
.sending
.schedule_flush_suppressed_for_user(user_id.to_owned(), "presence->inactive");
}
self.set_presence(
user_id,
&new_state,
@@ -369,6 +370,15 @@ impl Service {
return Ok(());
}
if matches!(
aggregated.state,
PresenceState::Unavailable | PresenceState::Offline
) {
self.services
.sending
.schedule_flush_suppressed_for_user(user_id.to_owned(), "presence->inactive");
}
let status_msg = aggregated.status_msg.or_else(|| presence.status_msg());
let last_active_ago =
Some(UInt::new_saturating(now.saturating_sub(aggregated.last_active_ts)));
@@ -422,6 +432,22 @@ mod tests {
let decision = Service::refresh_skip_decision(Some(5), Some(&event), Some(5));
assert_eq!(decision, None);
let event_missing_ago = PresenceEvent {
sender: user_id.to_owned(),
content: ruma::events::presence::PresenceEventContent {
presence: PresenceState::Online,
status_msg: None,
currently_active: Some(true),
last_active_ago: None,
avatar_url: None,
displayname: None,
},
};
let decision =
Service::refresh_skip_decision(Some(20), Some(&event_missing_ago), Some(5));
assert_eq!(decision, None);
let decision = Service::refresh_skip_decision(Some(20), None, Some(5));
assert_eq!(decision, None);
}

View File

@@ -2,6 +2,7 @@ mod append;
mod notification;
mod request;
mod send;
mod suppressed;
use std::sync::Arc;
@@ -32,6 +33,7 @@ pub struct Service {
notification_increment_mutex: MutexMap<(OwnedRoomId, OwnedUserId), ()>,
highlight_increment_mutex: MutexMap<(OwnedRoomId, OwnedUserId), ()>,
db: Data,
suppressed: suppressed::SuppressedQueue,
}
struct Data {
@@ -60,6 +62,7 @@ impl crate::Service for Service {
roomuserid_lastnotificationread: args.db["roomuserid_lastnotificationread"]
.clone(),
},
suppressed: suppressed::SuppressedQueue::default(),
}))
}
@@ -139,6 +142,7 @@ pub async fn delete_pusher(&self, sender: &UserId, pushkey: &str) {
let key = (sender, pushkey);
self.db.senderkey_pusher.del(key);
self.db.pushkey_deviceid.remove(pushkey);
self.clear_suppressed_pushkey(sender, pushkey);
self.services
.sending

View File

@@ -22,6 +22,11 @@ pub fn reset_notification_counts(&self, user_id: &UserId, room_id: &RoomId) {
self.db
.roomuserid_lastnotificationread
.put(roomuser_id, *count);
let removed = self.clear_suppressed_room(user_id, room_id);
if removed > 0 {
trace!(?user_id, ?room_id, removed, "Cleared suppressed push events after read");
}
}
#[implement(super::Service)]

View File

@@ -0,0 +1,216 @@
use std::collections::{HashMap, VecDeque};
use std::sync::Mutex;
use ruma::{OwnedRoomId, OwnedUserId, RoomId, UserId};
use tuwunel_core::{debug, implement, trace, utils};
use crate::rooms::timeline::RawPduId;
const SUPPRESSED_MAX_EVENTS_PER_ROOM: usize = 512;
const SUPPRESSED_MAX_EVENTS_PER_PUSHKEY: usize = 4096;
const SUPPRESSED_MAX_ROOMS_PER_PUSHKEY: usize = 256;
#[derive(Default)]
pub(super) struct SuppressedQueue {
inner: Mutex<HashMap<OwnedUserId, HashMap<String, PushkeyQueue>>>,
}
#[derive(Default)]
struct PushkeyQueue {
rooms: HashMap<OwnedRoomId, VecDeque<SuppressedEvent>>,
total_events: usize,
}
#[derive(Clone, Debug)]
struct SuppressedEvent {
pdu_id: RawPduId,
_inserted_at_ms: u64,
}
impl SuppressedQueue {
fn lock(
&self,
) -> std::sync::MutexGuard<'_, HashMap<OwnedUserId, HashMap<String, PushkeyQueue>>> {
self.inner
.lock()
.unwrap_or_else(|e| e.into_inner())
}
fn drain_room(queue: VecDeque<SuppressedEvent>) -> Vec<RawPduId> {
queue.into_iter().map(|event| event.pdu_id).collect()
}
fn drop_one_front(queue: &mut VecDeque<SuppressedEvent>, total_events: &mut usize) -> bool {
if queue.pop_front().is_some() {
*total_events = total_events.saturating_sub(1);
return true;
}
false
}
}
#[implement(super::Service)]
pub fn queue_suppressed_push(
&self,
user_id: &UserId,
pushkey: &str,
room_id: &RoomId,
pdu_id: RawPduId,
) -> bool {
let mut inner = self.suppressed.lock();
let user_entry = inner.entry(user_id.to_owned()).or_default();
let push_entry = user_entry
.entry(pushkey.to_owned())
.or_default();
if !push_entry.rooms.contains_key(room_id)
&& push_entry.rooms.len() >= SUPPRESSED_MAX_ROOMS_PER_PUSHKEY
{
debug!(
?user_id,
?room_id,
pushkey,
max_rooms = SUPPRESSED_MAX_ROOMS_PER_PUSHKEY,
"Suppressed push queue full (rooms); dropping event"
);
return false;
}
let queue = push_entry
.rooms
.entry(room_id.to_owned())
.or_insert_with(VecDeque::new);
if queue
.back()
.is_some_and(|event| event.pdu_id == pdu_id)
{
trace!(
?user_id,
?room_id,
pushkey,
"Suppressed push event is duplicate; skipping"
);
return false;
}
if push_entry.total_events >= SUPPRESSED_MAX_EVENTS_PER_PUSHKEY && queue.is_empty() {
debug!(
?user_id,
?room_id,
pushkey,
max_events = SUPPRESSED_MAX_EVENTS_PER_PUSHKEY,
"Suppressed push queue full (total); dropping event"
);
return false;
}
while queue.len() >= SUPPRESSED_MAX_EVENTS_PER_ROOM
|| push_entry.total_events >= SUPPRESSED_MAX_EVENTS_PER_PUSHKEY
{
if !SuppressedQueue::drop_one_front(queue, &mut push_entry.total_events) {
break;
}
}
queue.push_back(SuppressedEvent {
pdu_id,
_inserted_at_ms: utils::millis_since_unix_epoch(),
});
push_entry.total_events = push_entry.total_events.saturating_add(1);
true
}
#[implement(super::Service)]
pub fn take_suppressed_for_pushkey(
&self,
user_id: &UserId,
pushkey: &str,
) -> Vec<(OwnedRoomId, Vec<RawPduId>)> {
let mut inner = self.suppressed.lock();
let Some(user_entry) = inner.get_mut(user_id) else {
return Vec::new();
};
let Some(push_entry) = user_entry.remove(pushkey) else {
return Vec::new();
};
if user_entry.is_empty() {
inner.remove(user_id);
}
push_entry
.rooms
.into_iter()
.map(|(room_id, queue)| (room_id, SuppressedQueue::drain_room(queue)))
.collect()
}
#[implement(super::Service)]
pub fn take_suppressed_for_user(
&self,
user_id: &UserId,
) -> Vec<(String, Vec<(OwnedRoomId, Vec<RawPduId>)>)> {
let mut inner = self.suppressed.lock();
let Some(user_entry) = inner.remove(user_id) else {
return Vec::new();
};
user_entry
.into_iter()
.map(|(pushkey, queue)| {
let rooms = queue
.rooms
.into_iter()
.map(|(room_id, q)| (room_id, SuppressedQueue::drain_room(q)))
.collect();
(pushkey, rooms)
})
.collect()
}
#[implement(super::Service)]
pub fn clear_suppressed_room(&self, user_id: &UserId, room_id: &RoomId) -> usize {
let mut inner = self.suppressed.lock();
let Some(user_entry) = inner.get_mut(user_id) else {
return 0;
};
let mut removed: usize = 0;
user_entry.retain(|_, push_entry| {
if let Some(queue) = push_entry.rooms.remove(room_id) {
removed = removed.saturating_add(queue.len());
push_entry.total_events = push_entry.total_events.saturating_sub(queue.len());
}
!push_entry.rooms.is_empty()
});
if user_entry.is_empty() {
inner.remove(user_id);
}
removed
}
#[implement(super::Service)]
pub fn clear_suppressed_pushkey(&self, user_id: &UserId, pushkey: &str) -> usize {
let mut inner = self.suppressed.lock();
let Some(user_entry) = inner.get_mut(user_id) else {
return 0;
};
let removed = user_entry
.remove(pushkey)
.map(|queue| queue.total_events)
.unwrap_or(0);
if user_entry.is_empty() {
inner.remove(user_id);
}
removed
}

View File

@@ -50,6 +50,8 @@ use tuwunel_core::{
warn,
};
use crate::rooms::timeline::RawPduId;
use super::{Destination, EduBuf, EduVec, Msg, SendingEvent, Service, data::QueueItem};
#[derive(Debug)]
@@ -801,11 +803,11 @@ impl Service {
let rules_for_user = self
.services
.account_data
.get_global(&user_id, GlobalAccountDataEventType::PushRules)
.get_global::<PushRulesEvent>(&user_id, GlobalAccountDataEventType::PushRules)
.map(|ev| {
ev.map_or_else(
|_| push::Ruleset::server_default(&user_id),
|ev: PushRulesEvent| ev.content.global,
|ev| ev.content.global,
)
})
.map(Ok);
@@ -814,9 +816,25 @@ impl Service {
try_join3(pusher, rules_for_user, suppressed).await?;
if suppressed {
let queued = self
.enqueue_suppressed_push_events(&user_id, &pushkey, &events)
.await;
debug!(
?user_id,
pushkey,
queued,
events = events.len(),
"Push suppressed; queued events"
);
return Ok(Destination::Push(user_id, pushkey));
}
self.schedule_flush_suppressed_for_pushkey(
user_id.clone(),
pushkey.clone(),
"non-suppressed push",
);
let _sent = events
.iter()
.stream()
@@ -842,18 +860,240 @@ impl Service {
Ok(Destination::Push(user_id, pushkey))
}
pub fn schedule_flush_suppressed_for_pushkey(
&self,
user_id: OwnedUserId,
pushkey: String,
reason: &'static str,
) {
let sending = self.services.sending.clone();
let runtime = self.server.runtime();
runtime.spawn(async move {
sending
.flush_suppressed_for_pushkey(user_id, pushkey, reason)
.await;
});
}
pub fn schedule_flush_suppressed_for_user(
&self,
user_id: OwnedUserId,
reason: &'static str,
) {
let sending = self.services.sending.clone();
let runtime = self.server.runtime();
runtime.spawn(async move {
sending.flush_suppressed_for_user(user_id, reason).await;
});
}
async fn enqueue_suppressed_push_events(
&self,
user_id: &UserId,
pushkey: &str,
events: &[SendingEvent],
) -> usize {
let mut queued = 0_usize;
for event in events {
let SendingEvent::Pdu(pdu_id) = event else {
continue;
};
let Ok(pdu) = self.services.timeline.get_pdu_from_id(pdu_id).await else {
debug!(?user_id, ?pdu_id, "Suppressing push but PDU is missing");
continue;
};
if pdu.is_redacted() {
trace!(?user_id, ?pdu_id, "Suppressing push for redacted PDU");
continue;
}
if self
.services
.pusher
.queue_suppressed_push(user_id, pushkey, pdu.room_id(), *pdu_id)
{
queued = queued.saturating_add(1);
}
}
queued
}
async fn flush_suppressed_rooms(
&self,
user_id: &UserId,
pushkey: &str,
pusher: &ruma::api::client::push::Pusher,
rules_for_user: &push::Ruleset,
rooms: Vec<(OwnedRoomId, Vec<RawPduId>)>,
reason: &'static str,
) {
if rooms.is_empty() {
return;
}
debug!(
?user_id,
pushkey,
rooms = rooms.len(),
"Flushing suppressed pushes ({reason})"
);
for (room_id, pdu_ids) in rooms {
let unread = self
.services
.pusher
.notification_count(user_id, &room_id)
.await;
if unread == 0 {
trace!(
?user_id,
?room_id,
"Skipping suppressed push flush: no unread"
);
continue;
}
for pdu_id in pdu_ids {
let Ok(pdu) = self.services.timeline.get_pdu_from_id(&pdu_id).await else {
debug!(?user_id, ?pdu_id, "Suppressed PDU missing during flush");
continue;
};
if pdu.is_redacted() {
trace!(?user_id, ?pdu_id, "Suppressed PDU redacted during flush");
continue;
}
if let Err(error) = self
.services
.pusher
.send_push_notice(user_id, pusher, rules_for_user, &pdu)
.await
{
let requeued = self.services.pusher.queue_suppressed_push(
user_id,
pushkey,
&room_id,
pdu_id,
);
warn!(
?user_id,
?room_id,
?error,
requeued,
"Failed to send suppressed push notification"
);
}
}
}
}
async fn flush_suppressed_for_pushkey(
&self,
user_id: OwnedUserId,
pushkey: String,
reason: &'static str,
) {
let suppressed = self
.services
.pusher
.take_suppressed_for_pushkey(&user_id, &pushkey);
if suppressed.is_empty() {
return;
}
let pusher = match self.services.pusher.get_pusher(&user_id, &pushkey).await {
| Ok(pusher) => pusher,
| Err(error) => {
warn!(?user_id, pushkey, ?error, "Missing pusher for suppressed flush");
return;
},
};
let rules_for_user = match self
.services
.account_data
.get_global::<PushRulesEvent>(&user_id, GlobalAccountDataEventType::PushRules)
.await
{
| Ok(ev) => ev.content.global,
| Err(_) => push::Ruleset::server_default(&user_id),
};
self.flush_suppressed_rooms(
&user_id,
&pushkey,
&pusher,
&rules_for_user,
suppressed,
reason,
)
.await;
}
pub async fn flush_suppressed_for_user(
&self,
user_id: OwnedUserId,
reason: &'static str,
) {
let suppressed = self.services.pusher.take_suppressed_for_user(&user_id);
if suppressed.is_empty() {
return;
}
let rules_for_user = match self
.services
.account_data
.get_global::<PushRulesEvent>(&user_id, GlobalAccountDataEventType::PushRules)
.await
{
| Ok(ev) => ev.content.global,
| Err(_) => push::Ruleset::server_default(&user_id),
};
for (pushkey, rooms) in suppressed {
let pusher = match self.services.pusher.get_pusher(&user_id, &pushkey).await {
| Ok(pusher) => pusher,
| Err(error) => {
warn!(?user_id, pushkey, ?error, "Missing pusher for suppressed flush");
continue;
},
};
self.flush_suppressed_rooms(
&user_id,
&pushkey,
&pusher,
&rules_for_user,
rooms,
reason,
)
.await;
}
}
// optional suppression: heuristic combining presence age and recent sync
// activity.
async fn pushing_suppressed(&self, user_id: &UserId) -> bool {
if !self.services.config.suppress_push_when_active {
debug!(?user_id, "push not suppressed: suppress_push_when_active disabled");
return false;
}
let Ok(presence) = self.services.presence.get_presence(user_id).await else {
debug!(?user_id, "push not suppressed: presence unavailable");
return false;
};
if presence.content.presence != PresenceState::Online {
debug!(
?user_id,
presence = ?presence.content.presence,
"push not suppressed: presence not online"
);
return false;
}
@@ -864,6 +1104,11 @@ impl Service {
.unwrap_or(u64::MAX);
if presence_age_ms >= 65_000 {
debug!(
?user_id,
presence_age_ms,
"push not suppressed: presence too old"
);
return false;
}
@@ -875,8 +1120,12 @@ impl Service {
let considered_active = sync_gap_ms.is_some_and(|gap| gap < 32_000);
if considered_active {
trace!(?user_id, presence_age_ms, "suppressing push: active heuristic");
match sync_gap_ms {
| Some(gap) if gap < 32_000 =>
debug!(?user_id, presence_age_ms, sync_gap_ms = gap, "suppressing push: active heuristic"),
| Some(gap) =>
debug!(?user_id, presence_age_ms, sync_gap_ms = gap, "push not suppressed: sync gap too large"),
| None => debug!(?user_id, presence_age_ms, "push not suppressed: no recent sync"),
}
considered_active