From 7ec9d7f5aab569d432721cb2a50c47711e9ee276 Mon Sep 17 00:00:00 2001 From: Jared L <48422312+lhjt@users.noreply.github.com> Date: Wed, 21 Jan 2026 16:50:23 +1100 Subject: [PATCH] fix: ensure aggregate presence change flushes queue --- src/service/presence/pipeline.rs | 27 +++++++++++++++++++++++++++ src/service/sending/sender.rs | 5 +++++ 2 files changed, 32 insertions(+) diff --git a/src/service/presence/pipeline.rs b/src/service/presence/pipeline.rs index a02b3ae4..8e0f8153 100644 --- a/src/service/presence/pipeline.rs +++ b/src/service/presence/pipeline.rs @@ -98,6 +98,7 @@ impl Service { refresh_window_ms: Option, ) -> Result { let now = tuwunel_core::utils::millis_since_unix_epoch(); + // 1) Capture per-device presence snapshot for aggregation. debug!( ?user_id, ?device_key, @@ -118,6 +119,7 @@ impl Service { ) .await; + // 2) Compute the aggregated presence across all devices. let aggregated = self .device_presence .aggregate(user_id, now, self.idle_timeout, self.offline_timeout) @@ -131,17 +133,23 @@ impl Service { "Presence aggregate computed" ); + // 3) Load the last persisted presence to decide whether to skip or merge. let last_presence = self.db.get_presence(user_id).await; let (last_count, last_event) = match last_presence { | Ok((count, event)) => (Some(count), Some(event)), | Err(_) => (None, None), }; + let last_state = last_event + .as_ref() + .map(|event| event.content.presence.clone()); + let state_changed = match &last_event { | Some(event) => event.content.presence != aggregated.state, | None => true, }; + // 4) For rapid pings with no state change, skip writes and reschedule. if !state_changed && let Some((count, last_last_active_ago)) = Self::refresh_skip_decision(refresh_window_ms, last_event.as_ref(), last_count) @@ -163,6 +171,25 @@ impl Service { return Ok(()); } + // 5) If we just transitioned away from online, flush suppressed pushes. + if matches!(last_state, Some(PresenceState::Online)) + && aggregated.state != PresenceState::Online + { + debug!( + ?user_id, + from = ?PresenceState::Online, + to = ?aggregated.state, + "Presence went inactive; flushing suppressed pushes" + ); + self.services + .sending + .schedule_flush_suppressed_for_user( + user_id.to_owned(), + "presence->inactive (aggregate)", + ); + } + + // 6) Persist the aggregated presence, preserving last non-empty status. let fallback_status = last_event .and_then(|event| event.content.status_msg) .filter(|msg| !msg.is_empty()); diff --git a/src/service/sending/sender.rs b/src/service/sending/sender.rs index c84c280b..081015a2 100644 --- a/src/service/sending/sender.rs +++ b/src/service/sending/sender.rs @@ -938,6 +938,7 @@ impl Service { return; } + let mut sent = 0_usize; debug!(?user_id, pushkey, rooms = rooms.len(), "Flushing suppressed pushes ({reason})"); for (room_id, pdu_ids) in rooms { @@ -984,9 +985,13 @@ impl Service { requeued, "Failed to send suppressed push notification" ); + } else { + sent = sent.saturating_add(1); } } } + + debug!(?user_id, pushkey, sent, "Flushed suppressed push notifications"); } async fn flush_suppressed_for_pushkey(