presence: add pipeline tests and move timer

This commit is contained in:
Jared L
2026-01-21 05:54:47 +11:00
parent a73b4eba8b
commit 22a7f6cb33
3 changed files with 206 additions and 30 deletions

View File

@@ -148,6 +148,13 @@ impl PresenceAggregator {
if device_count == 0 {
guard.remove(user_id);
return AggregatedPresence {
state: PresenceState::Offline,
currently_active: false,
last_active_ts: now_ms,
status_msg: None,
device_count: 0,
};
}
debug!(
@@ -205,3 +212,99 @@ fn state_rank(state: &PresenceState) -> u8 {
| _ => 0,
}
}
#[cfg(test)]
mod tests {
use super::*;
use ruma::{device_id, uint, user_id};
#[tokio::test]
async fn aggregates_rank_and_status_msg() {
let aggregator = PresenceAggregator::new();
let user = user_id!("@alice:example.com");
let now = 1_000_u64;
aggregator
.update(
user,
DeviceKey::Device(device_id!("DEVICE_A").to_owned()),
&PresenceState::Unavailable,
Some(false),
Some(uint!(50)),
Some("away".to_owned()),
now,
)
.await;
aggregator
.update(
user,
DeviceKey::Device(device_id!("DEVICE_B").to_owned()),
&PresenceState::Online,
Some(true),
Some(uint!(10)),
Some("online".to_owned()),
now + 10,
)
.await;
let aggregated = aggregator
.aggregate(user, now + 10, 100, 300)
.await;
assert_eq!(aggregated.state, PresenceState::Online);
assert!(aggregated.currently_active);
assert_eq!(aggregated.status_msg.as_deref(), Some("online"));
assert_eq!(aggregated.device_count, 2);
}
#[tokio::test]
async fn degrades_online_to_unavailable_after_idle() {
let aggregator = PresenceAggregator::new();
let user = user_id!("@bob:example.com");
let now = 10_000_u64;
aggregator
.update(
user,
DeviceKey::Device(device_id!("DEVICE_IDLE").to_owned()),
&PresenceState::Online,
Some(true),
Some(uint!(500)),
None,
now,
)
.await;
let aggregated = aggregator
.aggregate(user, now + 500, 100, 1_000)
.await;
assert_eq!(aggregated.state, PresenceState::Unavailable);
}
#[tokio::test]
async fn drops_stale_devices_on_aggregate() {
let aggregator = PresenceAggregator::new();
let user = user_id!("@carol:example.com");
aggregator
.update(
user,
DeviceKey::Device(device_id!("DEVICE_STALE").to_owned()),
&PresenceState::Online,
Some(true),
Some(uint!(10)),
None,
0,
)
.await;
let aggregated = aggregator
.aggregate(user, 1_000, 100, 100)
.await;
assert_eq!(aggregated.device_count, 0);
assert_eq!(aggregated.state, PresenceState::Offline);
}
}

View File

@@ -1,5 +1,6 @@
mod aggregate;
mod data;
// Write/update pipeline lives in pipeline.rs.
mod pipeline;
mod presence;
@@ -11,7 +12,7 @@ use loole::{Receiver, Sender};
use ruma::{
OwnedUserId, UserId, events::presence::PresenceEvent, presence::PresenceState,
};
use tokio::{sync::RwLock, time::sleep};
use tokio::sync::RwLock;
use tuwunel_core::{
Result, checked, debug, debug_warn,
result::LogErr,
@@ -95,7 +96,7 @@ impl crate::Service for Service {
let (handle, reg) = AbortHandle::new_pair();
presence_timers.push(Abortable::new(
presence_timer(user_id.clone(), timeout, count),
pipeline::presence_timer(user_id.clone(), timeout, count),
reg,
));
timer_handles.insert(user_id, (count, handle));
@@ -244,8 +245,4 @@ impl Service {
}
}
async fn presence_timer(user_id: OwnedUserId, timeout: Duration, count: u64) -> TimerFired {
sleep(timeout).await;
(user_id, count)
}
// presence_timer lives in pipeline.rs alongside the timer handling logic.

View File

@@ -7,14 +7,19 @@
use std::time::Duration;
use futures::TryFutureExt;
use ruma::{DeviceId, OwnedUserId, UInt, UserId, presence::PresenceState};
use ruma::{
DeviceId, OwnedUserId, UInt, UserId,
events::presence::PresenceEvent,
presence::PresenceState,
};
use tokio::time::sleep;
use tuwunel_core::{
Error, Result, debug, error, trace,
result::LogErr,
utils::{future::OptionFutureExt, option::OptionExt},
};
use super::{aggregate, Service};
use super::{TimerFired, aggregate, Service};
impl Service {
fn device_key(device_id: Option<&DeviceId>, is_remote: bool) -> aggregate::DeviceKey {
@@ -54,6 +59,30 @@ impl Service {
})
}
fn refresh_skip_decision(
refresh_window_ms: Option<u64>,
last_event: Option<&PresenceEvent>,
last_count: Option<u64>,
) -> Option<(u64, u64)> {
let (Some(refresh_ms), Some(event), Some(count)) =
(refresh_window_ms, last_event, last_count)
else {
return None;
};
let last_last_active_ago: u64 = event
.content
.last_active_ago
.unwrap_or_default()
.into();
(last_last_active_ago < refresh_ms).then_some((count, last_last_active_ago))
}
fn timer_is_stale(expected_count: u64, current_count: u64) -> bool {
expected_count != current_count
}
async fn apply_device_presence_update(
&self,
user_id: &UserId,
@@ -110,26 +139,26 @@ impl Service {
};
if !state_changed {
if let (Some(refresh_ms), Some(event), Some(count)) =
(refresh_window_ms, &last_event, last_count)
{
let last_last_active_ago: u64 = event
.content
.last_active_ago
.unwrap_or_default()
.into();
if last_last_active_ago < refresh_ms {
self.schedule_presence_timer(user_id, &event.content.presence, count)
.log_err()
.ok();
debug!(
?user_id,
?state,
last_last_active_ago,
"Skipping presence update: refresh window (timer rescheduled)"
);
return Ok(());
}
if let Some((count, last_last_active_ago)) = Self::refresh_skip_decision(
refresh_window_ms,
last_event.as_ref(),
last_count,
) {
let presence = last_event
.as_ref()
.map(|event| &event.content.presence)
.unwrap_or(state);
self.schedule_presence_timer(user_id, presence, count)
.log_err()
.ok();
debug!(
?user_id,
?state,
last_last_active_ago,
"Skipping presence update: refresh window (timer rescheduled)"
);
return Ok(());
}
}
@@ -284,7 +313,7 @@ impl Service {
| Err(_) => return Ok(()),
};
if current_count != expected_count {
if Self::timer_is_stale(expected_count, current_count) {
trace!(
?user_id,
expected_count,
@@ -356,3 +385,50 @@ impl Service {
Ok(())
}
}
pub(super) async fn presence_timer(
user_id: OwnedUserId,
timeout: Duration,
count: u64,
) -> TimerFired {
sleep(timeout).await;
(user_id, count)
}
#[cfg(test)]
mod tests {
use super::*;
use ruma::{presence::PresenceState, uint, user_id};
#[test]
fn refresh_window_skip_decision() {
let user_id = user_id!("@alice:example.com");
let event = PresenceEvent {
sender: user_id.to_owned(),
content: ruma::events::presence::PresenceEventContent {
presence: PresenceState::Online,
status_msg: None,
currently_active: Some(true),
last_active_ago: Some(uint!(10)),
avatar_url: None,
displayname: None,
},
};
let decision = Service::refresh_skip_decision(Some(20), Some(&event), Some(5));
assert_eq!(decision, Some((5, 10)));
let decision = Service::refresh_skip_decision(Some(5), Some(&event), Some(5));
assert_eq!(decision, None);
let decision = Service::refresh_skip_decision(Some(20), None, Some(5));
assert_eq!(decision, None);
}
#[test]
fn timer_stale_detection() {
assert!(Service::timer_is_stale(2, 3));
assert!(!Service::timer_is_stale(2, 2));
}
}