From 22a7f6cb334212dac3aef1deeab0d49a6b1f6bba Mon Sep 17 00:00:00 2001 From: Jared L <48422312+lhjt@users.noreply.github.com> Date: Wed, 21 Jan 2026 05:54:47 +1100 Subject: [PATCH] presence: add pipeline tests and move timer --- src/service/presence/aggregate.rs | 103 +++++++++++++++++++++++++ src/service/presence/mod.rs | 11 +-- src/service/presence/pipeline.rs | 122 ++++++++++++++++++++++++------ 3 files changed, 206 insertions(+), 30 deletions(-) diff --git a/src/service/presence/aggregate.rs b/src/service/presence/aggregate.rs index 9051cc77..208af1b4 100644 --- a/src/service/presence/aggregate.rs +++ b/src/service/presence/aggregate.rs @@ -148,6 +148,13 @@ impl PresenceAggregator { if device_count == 0 { guard.remove(user_id); + return AggregatedPresence { + state: PresenceState::Offline, + currently_active: false, + last_active_ts: now_ms, + status_msg: None, + device_count: 0, + }; } debug!( @@ -205,3 +212,99 @@ fn state_rank(state: &PresenceState) -> u8 { | _ => 0, } } + +#[cfg(test)] +mod tests { + use super::*; + use ruma::{device_id, uint, user_id}; + + #[tokio::test] + async fn aggregates_rank_and_status_msg() { + let aggregator = PresenceAggregator::new(); + let user = user_id!("@alice:example.com"); + let now = 1_000_u64; + + aggregator + .update( + user, + DeviceKey::Device(device_id!("DEVICE_A").to_owned()), + &PresenceState::Unavailable, + Some(false), + Some(uint!(50)), + Some("away".to_owned()), + now, + ) + .await; + + aggregator + .update( + user, + DeviceKey::Device(device_id!("DEVICE_B").to_owned()), + &PresenceState::Online, + Some(true), + Some(uint!(10)), + Some("online".to_owned()), + now + 10, + ) + .await; + + let aggregated = aggregator + .aggregate(user, now + 10, 100, 300) + .await; + + assert_eq!(aggregated.state, PresenceState::Online); + assert!(aggregated.currently_active); + assert_eq!(aggregated.status_msg.as_deref(), Some("online")); + assert_eq!(aggregated.device_count, 2); + } + + #[tokio::test] + async fn degrades_online_to_unavailable_after_idle() { + let aggregator = PresenceAggregator::new(); + let user = user_id!("@bob:example.com"); + let now = 10_000_u64; + + aggregator + .update( + user, + DeviceKey::Device(device_id!("DEVICE_IDLE").to_owned()), + &PresenceState::Online, + Some(true), + Some(uint!(500)), + None, + now, + ) + .await; + + let aggregated = aggregator + .aggregate(user, now + 500, 100, 1_000) + .await; + + assert_eq!(aggregated.state, PresenceState::Unavailable); + } + + #[tokio::test] + async fn drops_stale_devices_on_aggregate() { + let aggregator = PresenceAggregator::new(); + let user = user_id!("@carol:example.com"); + + aggregator + .update( + user, + DeviceKey::Device(device_id!("DEVICE_STALE").to_owned()), + &PresenceState::Online, + Some(true), + Some(uint!(10)), + None, + 0, + ) + .await; + + let aggregated = aggregator + .aggregate(user, 1_000, 100, 100) + .await; + + assert_eq!(aggregated.device_count, 0); + assert_eq!(aggregated.state, PresenceState::Offline); + } +} diff --git a/src/service/presence/mod.rs b/src/service/presence/mod.rs index 3383e096..a88b5f33 100644 --- a/src/service/presence/mod.rs +++ b/src/service/presence/mod.rs @@ -1,5 +1,6 @@ mod aggregate; mod data; +// Write/update pipeline lives in pipeline.rs. mod pipeline; mod presence; @@ -11,7 +12,7 @@ use loole::{Receiver, Sender}; use ruma::{ OwnedUserId, UserId, events::presence::PresenceEvent, presence::PresenceState, }; -use tokio::{sync::RwLock, time::sleep}; +use tokio::sync::RwLock; use tuwunel_core::{ Result, checked, debug, debug_warn, result::LogErr, @@ -95,7 +96,7 @@ impl crate::Service for Service { let (handle, reg) = AbortHandle::new_pair(); presence_timers.push(Abortable::new( - presence_timer(user_id.clone(), timeout, count), + pipeline::presence_timer(user_id.clone(), timeout, count), reg, )); timer_handles.insert(user_id, (count, handle)); @@ -244,8 +245,4 @@ impl Service { } } -async fn presence_timer(user_id: OwnedUserId, timeout: Duration, count: u64) -> TimerFired { - sleep(timeout).await; - - (user_id, count) -} +// presence_timer lives in pipeline.rs alongside the timer handling logic. diff --git a/src/service/presence/pipeline.rs b/src/service/presence/pipeline.rs index ab341142..64e8547f 100644 --- a/src/service/presence/pipeline.rs +++ b/src/service/presence/pipeline.rs @@ -7,14 +7,19 @@ use std::time::Duration; use futures::TryFutureExt; -use ruma::{DeviceId, OwnedUserId, UInt, UserId, presence::PresenceState}; +use ruma::{ + DeviceId, OwnedUserId, UInt, UserId, + events::presence::PresenceEvent, + presence::PresenceState, +}; +use tokio::time::sleep; use tuwunel_core::{ Error, Result, debug, error, trace, result::LogErr, utils::{future::OptionFutureExt, option::OptionExt}, }; -use super::{aggregate, Service}; +use super::{TimerFired, aggregate, Service}; impl Service { fn device_key(device_id: Option<&DeviceId>, is_remote: bool) -> aggregate::DeviceKey { @@ -54,6 +59,30 @@ impl Service { }) } + fn refresh_skip_decision( + refresh_window_ms: Option, + last_event: Option<&PresenceEvent>, + last_count: Option, + ) -> Option<(u64, u64)> { + let (Some(refresh_ms), Some(event), Some(count)) = + (refresh_window_ms, last_event, last_count) + else { + return None; + }; + + let last_last_active_ago: u64 = event + .content + .last_active_ago + .unwrap_or_default() + .into(); + + (last_last_active_ago < refresh_ms).then_some((count, last_last_active_ago)) + } + + fn timer_is_stale(expected_count: u64, current_count: u64) -> bool { + expected_count != current_count + } + async fn apply_device_presence_update( &self, user_id: &UserId, @@ -110,26 +139,26 @@ impl Service { }; if !state_changed { - if let (Some(refresh_ms), Some(event), Some(count)) = - (refresh_window_ms, &last_event, last_count) - { - let last_last_active_ago: u64 = event - .content - .last_active_ago - .unwrap_or_default() - .into(); - if last_last_active_ago < refresh_ms { - self.schedule_presence_timer(user_id, &event.content.presence, count) - .log_err() - .ok(); - debug!( - ?user_id, - ?state, - last_last_active_ago, - "Skipping presence update: refresh window (timer rescheduled)" - ); - return Ok(()); - } + if let Some((count, last_last_active_ago)) = Self::refresh_skip_decision( + refresh_window_ms, + last_event.as_ref(), + last_count, + ) { + let presence = last_event + .as_ref() + .map(|event| &event.content.presence) + .unwrap_or(state); + + self.schedule_presence_timer(user_id, presence, count) + .log_err() + .ok(); + debug!( + ?user_id, + ?state, + last_last_active_ago, + "Skipping presence update: refresh window (timer rescheduled)" + ); + return Ok(()); } } @@ -284,7 +313,7 @@ impl Service { | Err(_) => return Ok(()), }; - if current_count != expected_count { + if Self::timer_is_stale(expected_count, current_count) { trace!( ?user_id, expected_count, @@ -356,3 +385,50 @@ impl Service { Ok(()) } } + +pub(super) async fn presence_timer( + user_id: OwnedUserId, + timeout: Duration, + count: u64, +) -> TimerFired { + sleep(timeout).await; + + (user_id, count) +} + +#[cfg(test)] +mod tests { + use super::*; + use ruma::{presence::PresenceState, uint, user_id}; + + #[test] + fn refresh_window_skip_decision() { + let user_id = user_id!("@alice:example.com"); + let event = PresenceEvent { + sender: user_id.to_owned(), + content: ruma::events::presence::PresenceEventContent { + presence: PresenceState::Online, + status_msg: None, + currently_active: Some(true), + last_active_ago: Some(uint!(10)), + avatar_url: None, + displayname: None, + }, + }; + + let decision = Service::refresh_skip_decision(Some(20), Some(&event), Some(5)); + assert_eq!(decision, Some((5, 10))); + + let decision = Service::refresh_skip_decision(Some(5), Some(&event), Some(5)); + assert_eq!(decision, None); + + let decision = Service::refresh_skip_decision(Some(20), None, Some(5)); + assert_eq!(decision, None); + } + + #[test] + fn timer_stale_detection() { + assert!(Service::timer_is_stale(2, 3)); + assert!(!Service::timer_is_stale(2, 2)); + } +}