From aadc9ade3eca997fdcef97613981fd02f7cf5d69 Mon Sep 17 00:00:00 2001 From: Jared L <48422312+lhjt@users.noreply.github.com> Date: Wed, 21 Jan 2026 06:05:09 +1100 Subject: [PATCH] presence: restore deferred push suppression --- src/service/presence/pipeline.rs | 36 +++- src/service/pusher/mod.rs | 4 + src/service/pusher/notification.rs | 5 + src/service/pusher/suppressed.rs | 216 ++++++++++++++++++++++++ src/service/sending/sender.rs | 257 ++++++++++++++++++++++++++++- 5 files changed, 509 insertions(+), 9 deletions(-) create mode 100644 src/service/pusher/suppressed.rs diff --git a/src/service/presence/pipeline.rs b/src/service/presence/pipeline.rs index 64e8547f..ee67920b 100644 --- a/src/service/presence/pipeline.rs +++ b/src/service/presence/pipeline.rs @@ -70,11 +70,7 @@ impl Service { return None; }; - let last_last_active_ago: u64 = event - .content - .last_active_ago - .unwrap_or_default() - .into(); + let last_last_active_ago: u64 = event.content.last_active_ago?.into(); (last_last_active_ago < refresh_ms).then_some((count, last_last_active_ago)) } @@ -349,6 +345,11 @@ impl Service { ); if let Some(new_state) = new_state { + if matches!(new_state, PresenceState::Unavailable | PresenceState::Offline) { + self.services + .sending + .schedule_flush_suppressed_for_user(user_id.to_owned(), "presence->inactive"); + } self.set_presence( user_id, &new_state, @@ -369,6 +370,15 @@ impl Service { return Ok(()); } + if matches!( + aggregated.state, + PresenceState::Unavailable | PresenceState::Offline + ) { + self.services + .sending + .schedule_flush_suppressed_for_user(user_id.to_owned(), "presence->inactive"); + } + let status_msg = aggregated.status_msg.or_else(|| presence.status_msg()); let last_active_ago = Some(UInt::new_saturating(now.saturating_sub(aggregated.last_active_ts))); @@ -422,6 +432,22 @@ mod tests { let decision = Service::refresh_skip_decision(Some(5), Some(&event), Some(5)); assert_eq!(decision, None); + let event_missing_ago = PresenceEvent { + sender: user_id.to_owned(), + content: ruma::events::presence::PresenceEventContent { + presence: PresenceState::Online, + status_msg: None, + currently_active: Some(true), + last_active_ago: None, + avatar_url: None, + displayname: None, + }, + }; + + let decision = + Service::refresh_skip_decision(Some(20), Some(&event_missing_ago), Some(5)); + assert_eq!(decision, None); + let decision = Service::refresh_skip_decision(Some(20), None, Some(5)); assert_eq!(decision, None); } diff --git a/src/service/pusher/mod.rs b/src/service/pusher/mod.rs index 643d7e94..46b6a39f 100644 --- a/src/service/pusher/mod.rs +++ b/src/service/pusher/mod.rs @@ -2,6 +2,7 @@ mod append; mod notification; mod request; mod send; +mod suppressed; use std::sync::Arc; @@ -32,6 +33,7 @@ pub struct Service { notification_increment_mutex: MutexMap<(OwnedRoomId, OwnedUserId), ()>, highlight_increment_mutex: MutexMap<(OwnedRoomId, OwnedUserId), ()>, db: Data, + suppressed: suppressed::SuppressedQueue, } struct Data { @@ -60,6 +62,7 @@ impl crate::Service for Service { roomuserid_lastnotificationread: args.db["roomuserid_lastnotificationread"] .clone(), }, + suppressed: suppressed::SuppressedQueue::default(), })) } @@ -139,6 +142,7 @@ pub async fn delete_pusher(&self, sender: &UserId, pushkey: &str) { let key = (sender, pushkey); self.db.senderkey_pusher.del(key); self.db.pushkey_deviceid.remove(pushkey); + self.clear_suppressed_pushkey(sender, pushkey); self.services .sending diff --git a/src/service/pusher/notification.rs b/src/service/pusher/notification.rs index be16bc07..c65db51b 100644 --- a/src/service/pusher/notification.rs +++ b/src/service/pusher/notification.rs @@ -22,6 +22,11 @@ pub fn reset_notification_counts(&self, user_id: &UserId, room_id: &RoomId) { self.db .roomuserid_lastnotificationread .put(roomuser_id, *count); + + let removed = self.clear_suppressed_room(user_id, room_id); + if removed > 0 { + trace!(?user_id, ?room_id, removed, "Cleared suppressed push events after read"); + } } #[implement(super::Service)] diff --git a/src/service/pusher/suppressed.rs b/src/service/pusher/suppressed.rs new file mode 100644 index 00000000..e7c467d1 --- /dev/null +++ b/src/service/pusher/suppressed.rs @@ -0,0 +1,216 @@ +use std::collections::{HashMap, VecDeque}; +use std::sync::Mutex; + +use ruma::{OwnedRoomId, OwnedUserId, RoomId, UserId}; +use tuwunel_core::{debug, implement, trace, utils}; + +use crate::rooms::timeline::RawPduId; + +const SUPPRESSED_MAX_EVENTS_PER_ROOM: usize = 512; +const SUPPRESSED_MAX_EVENTS_PER_PUSHKEY: usize = 4096; +const SUPPRESSED_MAX_ROOMS_PER_PUSHKEY: usize = 256; + +#[derive(Default)] +pub(super) struct SuppressedQueue { + inner: Mutex>>, +} + +#[derive(Default)] +struct PushkeyQueue { + rooms: HashMap>, + total_events: usize, +} + +#[derive(Clone, Debug)] +struct SuppressedEvent { + pdu_id: RawPduId, + _inserted_at_ms: u64, +} + +impl SuppressedQueue { + fn lock( + &self, + ) -> std::sync::MutexGuard<'_, HashMap>> { + self.inner + .lock() + .unwrap_or_else(|e| e.into_inner()) + } + + fn drain_room(queue: VecDeque) -> Vec { + queue.into_iter().map(|event| event.pdu_id).collect() + } + + fn drop_one_front(queue: &mut VecDeque, total_events: &mut usize) -> bool { + if queue.pop_front().is_some() { + *total_events = total_events.saturating_sub(1); + return true; + } + + false + } +} + +#[implement(super::Service)] +pub fn queue_suppressed_push( + &self, + user_id: &UserId, + pushkey: &str, + room_id: &RoomId, + pdu_id: RawPduId, +) -> bool { + let mut inner = self.suppressed.lock(); + let user_entry = inner.entry(user_id.to_owned()).or_default(); + let push_entry = user_entry + .entry(pushkey.to_owned()) + .or_default(); + + if !push_entry.rooms.contains_key(room_id) + && push_entry.rooms.len() >= SUPPRESSED_MAX_ROOMS_PER_PUSHKEY + { + debug!( + ?user_id, + ?room_id, + pushkey, + max_rooms = SUPPRESSED_MAX_ROOMS_PER_PUSHKEY, + "Suppressed push queue full (rooms); dropping event" + ); + return false; + } + + let queue = push_entry + .rooms + .entry(room_id.to_owned()) + .or_insert_with(VecDeque::new); + + if queue + .back() + .is_some_and(|event| event.pdu_id == pdu_id) + { + trace!( + ?user_id, + ?room_id, + pushkey, + "Suppressed push event is duplicate; skipping" + ); + return false; + } + + if push_entry.total_events >= SUPPRESSED_MAX_EVENTS_PER_PUSHKEY && queue.is_empty() { + debug!( + ?user_id, + ?room_id, + pushkey, + max_events = SUPPRESSED_MAX_EVENTS_PER_PUSHKEY, + "Suppressed push queue full (total); dropping event" + ); + return false; + } + + while queue.len() >= SUPPRESSED_MAX_EVENTS_PER_ROOM + || push_entry.total_events >= SUPPRESSED_MAX_EVENTS_PER_PUSHKEY + { + if !SuppressedQueue::drop_one_front(queue, &mut push_entry.total_events) { + break; + } + } + + queue.push_back(SuppressedEvent { + pdu_id, + _inserted_at_ms: utils::millis_since_unix_epoch(), + }); + push_entry.total_events = push_entry.total_events.saturating_add(1); + + true +} + +#[implement(super::Service)] +pub fn take_suppressed_for_pushkey( + &self, + user_id: &UserId, + pushkey: &str, +) -> Vec<(OwnedRoomId, Vec)> { + let mut inner = self.suppressed.lock(); + let Some(user_entry) = inner.get_mut(user_id) else { + return Vec::new(); + }; + + let Some(push_entry) = user_entry.remove(pushkey) else { + return Vec::new(); + }; + + if user_entry.is_empty() { + inner.remove(user_id); + } + + push_entry + .rooms + .into_iter() + .map(|(room_id, queue)| (room_id, SuppressedQueue::drain_room(queue))) + .collect() +} + +#[implement(super::Service)] +pub fn take_suppressed_for_user( + &self, + user_id: &UserId, +) -> Vec<(String, Vec<(OwnedRoomId, Vec)>)> { + let mut inner = self.suppressed.lock(); + let Some(user_entry) = inner.remove(user_id) else { + return Vec::new(); + }; + + user_entry + .into_iter() + .map(|(pushkey, queue)| { + let rooms = queue + .rooms + .into_iter() + .map(|(room_id, q)| (room_id, SuppressedQueue::drain_room(q))) + .collect(); + (pushkey, rooms) + }) + .collect() +} + +#[implement(super::Service)] +pub fn clear_suppressed_room(&self, user_id: &UserId, room_id: &RoomId) -> usize { + let mut inner = self.suppressed.lock(); + let Some(user_entry) = inner.get_mut(user_id) else { + return 0; + }; + + let mut removed: usize = 0; + user_entry.retain(|_, push_entry| { + if let Some(queue) = push_entry.rooms.remove(room_id) { + removed = removed.saturating_add(queue.len()); + push_entry.total_events = push_entry.total_events.saturating_sub(queue.len()); + } + + !push_entry.rooms.is_empty() + }); + + if user_entry.is_empty() { + inner.remove(user_id); + } + + removed +} + +#[implement(super::Service)] +pub fn clear_suppressed_pushkey(&self, user_id: &UserId, pushkey: &str) -> usize { + let mut inner = self.suppressed.lock(); + let Some(user_entry) = inner.get_mut(user_id) else { + return 0; + }; + + let removed = user_entry + .remove(pushkey) + .map(|queue| queue.total_events) + .unwrap_or(0); + + if user_entry.is_empty() { + inner.remove(user_id); + } + + removed +} diff --git a/src/service/sending/sender.rs b/src/service/sending/sender.rs index d5689501..c1d2cb24 100644 --- a/src/service/sending/sender.rs +++ b/src/service/sending/sender.rs @@ -50,6 +50,8 @@ use tuwunel_core::{ warn, }; +use crate::rooms::timeline::RawPduId; + use super::{Destination, EduBuf, EduVec, Msg, SendingEvent, Service, data::QueueItem}; #[derive(Debug)] @@ -801,11 +803,11 @@ impl Service { let rules_for_user = self .services .account_data - .get_global(&user_id, GlobalAccountDataEventType::PushRules) + .get_global::(&user_id, GlobalAccountDataEventType::PushRules) .map(|ev| { ev.map_or_else( |_| push::Ruleset::server_default(&user_id), - |ev: PushRulesEvent| ev.content.global, + |ev| ev.content.global, ) }) .map(Ok); @@ -814,9 +816,25 @@ impl Service { try_join3(pusher, rules_for_user, suppressed).await?; if suppressed { + let queued = self + .enqueue_suppressed_push_events(&user_id, &pushkey, &events) + .await; + debug!( + ?user_id, + pushkey, + queued, + events = events.len(), + "Push suppressed; queued events" + ); return Ok(Destination::Push(user_id, pushkey)); } + self.schedule_flush_suppressed_for_pushkey( + user_id.clone(), + pushkey.clone(), + "non-suppressed push", + ); + let _sent = events .iter() .stream() @@ -842,18 +860,240 @@ impl Service { Ok(Destination::Push(user_id, pushkey)) } + pub fn schedule_flush_suppressed_for_pushkey( + &self, + user_id: OwnedUserId, + pushkey: String, + reason: &'static str, + ) { + let sending = self.services.sending.clone(); + let runtime = self.server.runtime(); + runtime.spawn(async move { + sending + .flush_suppressed_for_pushkey(user_id, pushkey, reason) + .await; + }); + } + + pub fn schedule_flush_suppressed_for_user( + &self, + user_id: OwnedUserId, + reason: &'static str, + ) { + let sending = self.services.sending.clone(); + let runtime = self.server.runtime(); + runtime.spawn(async move { + sending.flush_suppressed_for_user(user_id, reason).await; + }); + } + + async fn enqueue_suppressed_push_events( + &self, + user_id: &UserId, + pushkey: &str, + events: &[SendingEvent], + ) -> usize { + let mut queued = 0_usize; + for event in events { + let SendingEvent::Pdu(pdu_id) = event else { + continue; + }; + + let Ok(pdu) = self.services.timeline.get_pdu_from_id(pdu_id).await else { + debug!(?user_id, ?pdu_id, "Suppressing push but PDU is missing"); + continue; + }; + + if pdu.is_redacted() { + trace!(?user_id, ?pdu_id, "Suppressing push for redacted PDU"); + continue; + } + + if self + .services + .pusher + .queue_suppressed_push(user_id, pushkey, pdu.room_id(), *pdu_id) + { + queued = queued.saturating_add(1); + } + } + + queued + } + + async fn flush_suppressed_rooms( + &self, + user_id: &UserId, + pushkey: &str, + pusher: &ruma::api::client::push::Pusher, + rules_for_user: &push::Ruleset, + rooms: Vec<(OwnedRoomId, Vec)>, + reason: &'static str, + ) { + if rooms.is_empty() { + return; + } + + debug!( + ?user_id, + pushkey, + rooms = rooms.len(), + "Flushing suppressed pushes ({reason})" + ); + + for (room_id, pdu_ids) in rooms { + let unread = self + .services + .pusher + .notification_count(user_id, &room_id) + .await; + if unread == 0 { + trace!( + ?user_id, + ?room_id, + "Skipping suppressed push flush: no unread" + ); + continue; + } + + for pdu_id in pdu_ids { + let Ok(pdu) = self.services.timeline.get_pdu_from_id(&pdu_id).await else { + debug!(?user_id, ?pdu_id, "Suppressed PDU missing during flush"); + continue; + }; + + if pdu.is_redacted() { + trace!(?user_id, ?pdu_id, "Suppressed PDU redacted during flush"); + continue; + } + + if let Err(error) = self + .services + .pusher + .send_push_notice(user_id, pusher, rules_for_user, &pdu) + .await + { + let requeued = self.services.pusher.queue_suppressed_push( + user_id, + pushkey, + &room_id, + pdu_id, + ); + warn!( + ?user_id, + ?room_id, + ?error, + requeued, + "Failed to send suppressed push notification" + ); + } + } + } + } + + async fn flush_suppressed_for_pushkey( + &self, + user_id: OwnedUserId, + pushkey: String, + reason: &'static str, + ) { + let suppressed = self + .services + .pusher + .take_suppressed_for_pushkey(&user_id, &pushkey); + if suppressed.is_empty() { + return; + } + + let pusher = match self.services.pusher.get_pusher(&user_id, &pushkey).await { + | Ok(pusher) => pusher, + | Err(error) => { + warn!(?user_id, pushkey, ?error, "Missing pusher for suppressed flush"); + return; + }, + }; + + let rules_for_user = match self + .services + .account_data + .get_global::(&user_id, GlobalAccountDataEventType::PushRules) + .await + { + | Ok(ev) => ev.content.global, + | Err(_) => push::Ruleset::server_default(&user_id), + }; + + self.flush_suppressed_rooms( + &user_id, + &pushkey, + &pusher, + &rules_for_user, + suppressed, + reason, + ) + .await; + } + + pub async fn flush_suppressed_for_user( + &self, + user_id: OwnedUserId, + reason: &'static str, + ) { + let suppressed = self.services.pusher.take_suppressed_for_user(&user_id); + if suppressed.is_empty() { + return; + } + + let rules_for_user = match self + .services + .account_data + .get_global::(&user_id, GlobalAccountDataEventType::PushRules) + .await + { + | Ok(ev) => ev.content.global, + | Err(_) => push::Ruleset::server_default(&user_id), + }; + + for (pushkey, rooms) in suppressed { + let pusher = match self.services.pusher.get_pusher(&user_id, &pushkey).await { + | Ok(pusher) => pusher, + | Err(error) => { + warn!(?user_id, pushkey, ?error, "Missing pusher for suppressed flush"); + continue; + }, + }; + + self.flush_suppressed_rooms( + &user_id, + &pushkey, + &pusher, + &rules_for_user, + rooms, + reason, + ) + .await; + } + } + // optional suppression: heuristic combining presence age and recent sync // activity. async fn pushing_suppressed(&self, user_id: &UserId) -> bool { if !self.services.config.suppress_push_when_active { + debug!(?user_id, "push not suppressed: suppress_push_when_active disabled"); return false; } let Ok(presence) = self.services.presence.get_presence(user_id).await else { + debug!(?user_id, "push not suppressed: presence unavailable"); return false; }; if presence.content.presence != PresenceState::Online { + debug!( + ?user_id, + presence = ?presence.content.presence, + "push not suppressed: presence not online" + ); return false; } @@ -864,6 +1104,11 @@ impl Service { .unwrap_or(u64::MAX); if presence_age_ms >= 65_000 { + debug!( + ?user_id, + presence_age_ms, + "push not suppressed: presence too old" + ); return false; } @@ -875,8 +1120,12 @@ impl Service { let considered_active = sync_gap_ms.is_some_and(|gap| gap < 32_000); - if considered_active { - trace!(?user_id, presence_age_ms, "suppressing push: active heuristic"); + match sync_gap_ms { + | Some(gap) if gap < 32_000 => + debug!(?user_id, presence_age_ms, sync_gap_ms = gap, "suppressing push: active heuristic"), + | Some(gap) => + debug!(?user_id, presence_age_ms, sync_gap_ms = gap, "push not suppressed: sync gap too large"), + | None => debug!(?user_id, presence_age_ms, "push not suppressed: no recent sync"), } considered_active