diff --git a/src/service/presence/mod.rs b/src/service/presence/mod.rs index a16619d1..3383e096 100644 --- a/src/service/presence/mod.rs +++ b/src/service/presence/mod.rs @@ -1,25 +1,21 @@ mod aggregate; mod data; +mod pipeline; mod presence; use std::{collections::HashMap, sync::Arc, time::Duration}; use async_trait::async_trait; -use futures::{ - Stream, StreamExt, TryFutureExt, - future::{AbortHandle, Abortable, try_join}, - stream::FuturesUnordered, -}; +use futures::{Stream, StreamExt, TryFutureExt, future::{AbortHandle, Abortable}, stream::FuturesUnordered}; use loole::{Receiver, Sender}; use ruma::{ - DeviceId, OwnedUserId, UInt, UserId, events::presence::PresenceEvent, presence::PresenceState, + OwnedUserId, UserId, events::presence::PresenceEvent, presence::PresenceState, }; use tokio::{sync::RwLock, time::sleep}; use tuwunel_core::{ - Error, Result, checked, debug, debug_warn, error, + Result, checked, debug, debug_warn, result::LogErr, trace, - utils::future::OptionFutureExt, }; use self::{aggregate::PresenceAggregator, data::Data, presence::Presence}; @@ -131,139 +127,6 @@ impl crate::Service for Service { } impl Service { - fn device_key(device_id: Option<&DeviceId>, is_remote: bool) -> aggregate::DeviceKey { - if is_remote { - return aggregate::DeviceKey::Remote; - } - - match device_id { - | Some(device_id) => aggregate::DeviceKey::Device(device_id.to_owned()), - | None => aggregate::DeviceKey::UnknownLocal, - } - } - - fn schedule_presence_timer( - &self, - user_id: &UserId, - presence_state: &PresenceState, - count: u64, - ) -> Result { - if !(self.timeout_remote_users || self.services.globals.user_is_local(user_id)) - || user_id == self.services.globals.server_user - { - return Ok(()); - } - - let timeout = match presence_state { - | PresenceState::Online => self.services.server.config.presence_idle_timeout_s, - | _ => self.services.server.config.presence_offline_timeout_s, - }; - - self.timer_channel - .0 - .send((user_id.to_owned(), Duration::from_secs(timeout), count)) - .map_err(|e| { - error!("Failed to add presence timer: {}", e); - Error::bad_database("Failed to add presence timer") - }) - } - - async fn apply_device_presence_update( - &self, - user_id: &UserId, - device_key: aggregate::DeviceKey, - state: &PresenceState, - currently_active: Option, - last_active_ago: Option, - status_msg: Option, - refresh_window_ms: Option, - ) -> Result { - let now = tuwunel_core::utils::millis_since_unix_epoch(); - debug!( - ?user_id, - ?device_key, - ?state, - currently_active, - last_active_ago = last_active_ago.map(u64::from), - "Presence update received" - ); - self.device_presence - .update( - user_id, - device_key, - state, - currently_active, - last_active_ago, - status_msg, - now, - ) - .await; - - let aggregated = self - .device_presence - .aggregate(user_id, now, self.idle_timeout, self.offline_timeout) - .await; - debug!( - ?user_id, - agg_state = ?aggregated.state, - agg_currently_active = aggregated.currently_active, - agg_last_active_ts = aggregated.last_active_ts, - agg_device_count = aggregated.device_count, - "Presence aggregate computed" - ); - - let last_presence = self.db.get_presence(user_id).await; - let (last_count, last_event) = match last_presence { - | Ok((count, event)) => (Some(count), Some(event)), - | Err(_) => (None, None), - }; - - let state_changed = match &last_event { - | Some(event) => event.content.presence != aggregated.state, - | None => true, - }; - - if !state_changed { - if let (Some(refresh_ms), Some(event), Some(count)) = - (refresh_window_ms, &last_event, last_count) - { - let last_last_active_ago: u64 = event - .content - .last_active_ago - .unwrap_or_default() - .into(); - if last_last_active_ago < refresh_ms { - self.schedule_presence_timer(user_id, &event.content.presence, count) - .log_err() - .ok(); - debug!( - ?user_id, - ?state, - last_last_active_ago, - "Skipping presence update: refresh window (timer rescheduled)" - ); - return Ok(()); - } - } - } - - let fallback_status = last_event - .and_then(|event| event.content.status_msg) - .filter(|msg| !msg.is_empty()); - let status_msg = aggregated.status_msg.or(fallback_status); - let last_active_ago = - Some(UInt::new_saturating(now.saturating_sub(aggregated.last_active_ts))); - - self.set_presence( - user_id, - &aggregated.state, - Some(aggregated.currently_active), - last_active_ago, - status_msg, - ) - .await - } - /// record that a user has just successfully completed a /sync (or /// equivalent activity) pub async fn note_sync(&self, user_id: &UserId) { @@ -296,130 +159,6 @@ impl Service { .await } - /// Pings the presence of the given user, setting the specified state. When - /// device_id is supplied. - pub async fn maybe_ping_presence( - &self, - user_id: &UserId, - device_id: Option<&DeviceId>, - new_state: &PresenceState, - ) -> Result { - const REFRESH_TIMEOUT: u64 = 30 * 1000; - - if !self.services.server.config.allow_local_presence || self.services.db.is_read_only() { - debug!( - ?user_id, - ?new_state, - allow_local_presence = self.services.server.config.allow_local_presence, - read_only = self.services.db.is_read_only(), - "Skipping presence ping" - ); - return Ok(()); - } - - let update_device_seen = device_id.map_async(|device_id| { - self.services - .users - .update_device_last_seen(user_id, device_id, None) - }); - - let currently_active = *new_state == PresenceState::Online; - let set_presence = self.apply_device_presence_update( - user_id, - Self::device_key(device_id, false), - new_state, - Some(currently_active), - UInt::new(0), - None, - Some(REFRESH_TIMEOUT), - ); - - debug!( - ?user_id, - ?new_state, - currently_active, - "Presence ping accepted" - ); - - try_join(set_presence, update_device_seen.unwrap_or(Ok(()))) - .map_ok(|_| ()) - .await - } - - /// Applies an explicit presence update for a local device. - pub async fn set_presence_for_device( - &self, - user_id: &UserId, - device_id: Option<&DeviceId>, - state: &PresenceState, - status_msg: Option, - ) -> Result { - let currently_active = *state == PresenceState::Online; - self.apply_device_presence_update( - user_id, - Self::device_key(device_id, false), - state, - Some(currently_active), - None, - status_msg, - None, - ) - .await - } - - /// Applies a presence update received over federation. - pub async fn set_presence_from_federation( - &self, - user_id: &UserId, - state: &PresenceState, - currently_active: bool, - last_active_ago: UInt, - status_msg: Option, - ) -> Result { - self.apply_device_presence_update( - user_id, - Self::device_key(None, true), - state, - Some(currently_active), - Some(last_active_ago), - status_msg, - None, - ) - .await - } - - /// Adds a presence event which will be saved until a new event replaces it. - pub async fn set_presence( - &self, - user_id: &UserId, - state: &PresenceState, - currently_active: Option, - last_active_ago: Option, - status_msg: Option, - ) -> Result { - let presence_state = match state.as_str() { - | "" => &PresenceState::Offline, // default an empty string to 'offline' - | &_ => state, - }; - - let count = self - .db - .set_presence(user_id, presence_state, currently_active, last_active_ago, status_msg) - .await?; - - if let Some(count) = count { - let is_local = self.services.globals.user_is_local(user_id); - let is_server_user = user_id == self.services.globals.server_user; - let allow_timeout = self.timeout_remote_users || is_local; - - if allow_timeout && !is_server_user { - self.schedule_presence_timer(user_id, presence_state, count)?; - } - } - - Ok(()) - } - /// Removes the presence record for the given user from the database. /// /// TODO: Why is this not used? @@ -503,83 +242,6 @@ impl Service { Ok(event) } - - async fn process_presence_timer(&self, user_id: &OwnedUserId, expected_count: u64) -> Result { - let (current_count, presence) = match self.db.get_presence_raw(user_id).await { - | Ok(presence) => presence, - | Err(_) => return Ok(()), - }; - - if current_count != expected_count { - trace!( - ?user_id, - expected_count, - current_count, - "Skipping stale presence timer" - ); - return Ok(()); - } - - let presence_state = presence.state().clone(); - let now = tuwunel_core::utils::millis_since_unix_epoch(); - let aggregated = self - .device_presence - .aggregate(user_id, now, self.idle_timeout, self.offline_timeout) - .await; - - if aggregated.device_count == 0 { - let last_active_ago = - Some(UInt::new_saturating(now.saturating_sub(presence.last_active_ts()))); - let status_msg = presence.status_msg(); - let new_state = match (&presence_state, last_active_ago.map(u64::from)) { - | (PresenceState::Online, Some(ago)) if ago >= self.idle_timeout => - Some(PresenceState::Unavailable), - | (PresenceState::Unavailable, Some(ago)) if ago >= self.offline_timeout => - Some(PresenceState::Offline), - | _ => None, - }; - - debug!( - "Processed presence timer for user '{user_id}': Old state = {presence_state}, New \ - state = {new_state:?}" - ); - - if let Some(new_state) = new_state { - self.set_presence( - user_id, - &new_state, - Some(false), - last_active_ago, - status_msg, - ) - .await?; - } - - return Ok(()); - } - - if aggregated.state == presence_state { - self.schedule_presence_timer(user_id, &presence_state, current_count) - .log_err() - .ok(); - return Ok(()); - } - - let status_msg = aggregated.status_msg.or_else(|| presence.status_msg()); - let last_active_ago = - Some(UInt::new_saturating(now.saturating_sub(aggregated.last_active_ts))); - - self.set_presence( - user_id, - &aggregated.state, - Some(aggregated.currently_active), - last_active_ago, - status_msg, - ) - .await?; - - Ok(()) - } } async fn presence_timer(user_id: OwnedUserId, timeout: Duration, count: u64) -> TimerFired { diff --git a/src/service/presence/pipeline.rs b/src/service/presence/pipeline.rs new file mode 100644 index 00000000..ab341142 --- /dev/null +++ b/src/service/presence/pipeline.rs @@ -0,0 +1,358 @@ +//! Presence update pipeline. +//! +//! This module centralizes the write path for presence updates. It keeps the +//! aggregation and timer logic in one place so the public `Service` surface +//! remains small and the update flow is easy to review. + +use std::time::Duration; + +use futures::TryFutureExt; +use ruma::{DeviceId, OwnedUserId, UInt, UserId, presence::PresenceState}; +use tuwunel_core::{ + Error, Result, debug, error, trace, + result::LogErr, + utils::{future::OptionFutureExt, option::OptionExt}, +}; + +use super::{aggregate, Service}; + +impl Service { + fn device_key(device_id: Option<&DeviceId>, is_remote: bool) -> aggregate::DeviceKey { + if is_remote { + return aggregate::DeviceKey::Remote; + } + + match device_id { + | Some(device_id) => aggregate::DeviceKey::Device(device_id.to_owned()), + | None => aggregate::DeviceKey::UnknownLocal, + } + } + + fn schedule_presence_timer( + &self, + user_id: &UserId, + presence_state: &PresenceState, + count: u64, + ) -> Result { + if !(self.timeout_remote_users || self.services.globals.user_is_local(user_id)) + || user_id == self.services.globals.server_user + { + return Ok(()); + } + + let timeout = match presence_state { + | PresenceState::Online => self.services.server.config.presence_idle_timeout_s, + | _ => self.services.server.config.presence_offline_timeout_s, + }; + + self.timer_channel + .0 + .send((user_id.to_owned(), Duration::from_secs(timeout), count)) + .map_err(|e| { + error!("Failed to add presence timer: {}", e); + Error::bad_database("Failed to add presence timer") + }) + } + + async fn apply_device_presence_update( + &self, + user_id: &UserId, + device_key: aggregate::DeviceKey, + state: &PresenceState, + currently_active: Option, + last_active_ago: Option, + status_msg: Option, + refresh_window_ms: Option, + ) -> Result { + let now = tuwunel_core::utils::millis_since_unix_epoch(); + debug!( + ?user_id, + ?device_key, + ?state, + currently_active, + last_active_ago = last_active_ago.map(u64::from), + "Presence update received" + ); + self.device_presence + .update( + user_id, + device_key, + state, + currently_active, + last_active_ago, + status_msg, + now, + ) + .await; + + let aggregated = self + .device_presence + .aggregate(user_id, now, self.idle_timeout, self.offline_timeout) + .await; + debug!( + ?user_id, + agg_state = ?aggregated.state, + agg_currently_active = aggregated.currently_active, + agg_last_active_ts = aggregated.last_active_ts, + agg_device_count = aggregated.device_count, + "Presence aggregate computed" + ); + + let last_presence = self.db.get_presence(user_id).await; + let (last_count, last_event) = match last_presence { + | Ok((count, event)) => (Some(count), Some(event)), + | Err(_) => (None, None), + }; + + let state_changed = match &last_event { + | Some(event) => event.content.presence != aggregated.state, + | None => true, + }; + + if !state_changed { + if let (Some(refresh_ms), Some(event), Some(count)) = + (refresh_window_ms, &last_event, last_count) + { + let last_last_active_ago: u64 = event + .content + .last_active_ago + .unwrap_or_default() + .into(); + if last_last_active_ago < refresh_ms { + self.schedule_presence_timer(user_id, &event.content.presence, count) + .log_err() + .ok(); + debug!( + ?user_id, + ?state, + last_last_active_ago, + "Skipping presence update: refresh window (timer rescheduled)" + ); + return Ok(()); + } + } + } + + let fallback_status = last_event + .and_then(|event| event.content.status_msg) + .filter(|msg| !msg.is_empty()); + let status_msg = aggregated.status_msg.or(fallback_status); + let last_active_ago = + Some(UInt::new_saturating(now.saturating_sub(aggregated.last_active_ts))); + + self.set_presence( + user_id, + &aggregated.state, + Some(aggregated.currently_active), + last_active_ago, + status_msg, + ) + .await + } + + /// Pings the presence of the given user, setting the specified state. When + /// device_id is supplied. + pub async fn maybe_ping_presence( + &self, + user_id: &UserId, + device_id: Option<&DeviceId>, + new_state: &PresenceState, + ) -> Result { + const REFRESH_TIMEOUT: u64 = 30 * 1000; + + if !self.services.server.config.allow_local_presence || self.services.db.is_read_only() { + debug!( + ?user_id, + ?new_state, + allow_local_presence = self.services.server.config.allow_local_presence, + read_only = self.services.db.is_read_only(), + "Skipping presence ping" + ); + return Ok(()); + } + + let update_device_seen = device_id.map_async(|device_id| { + self.services + .users + .update_device_last_seen(user_id, device_id, None) + }); + + let currently_active = *new_state == PresenceState::Online; + let set_presence = self.apply_device_presence_update( + user_id, + Self::device_key(device_id, false), + new_state, + Some(currently_active), + UInt::new(0), + None, + Some(REFRESH_TIMEOUT), + ); + + debug!( + ?user_id, + ?new_state, + currently_active, + "Presence ping accepted" + ); + + futures::future::try_join(set_presence, update_device_seen.unwrap_or(Ok(()))) + .map_ok(|_| ()) + .await + } + + /// Applies an explicit presence update for a local device. + pub async fn set_presence_for_device( + &self, + user_id: &UserId, + device_id: Option<&DeviceId>, + state: &PresenceState, + status_msg: Option, + ) -> Result { + let currently_active = *state == PresenceState::Online; + self.apply_device_presence_update( + user_id, + Self::device_key(device_id, false), + state, + Some(currently_active), + None, + status_msg, + None, + ) + .await + } + + /// Applies a presence update received over federation. + pub async fn set_presence_from_federation( + &self, + user_id: &UserId, + state: &PresenceState, + currently_active: bool, + last_active_ago: UInt, + status_msg: Option, + ) -> Result { + self.apply_device_presence_update( + user_id, + Self::device_key(None, true), + state, + Some(currently_active), + Some(last_active_ago), + status_msg, + None, + ) + .await + } + + /// Adds a presence event which will be saved until a new event replaces it. + pub async fn set_presence( + &self, + user_id: &UserId, + state: &PresenceState, + currently_active: Option, + last_active_ago: Option, + status_msg: Option, + ) -> Result { + let presence_state = match state.as_str() { + | "" => &PresenceState::Offline, // default an empty string to 'offline' + | &_ => state, + }; + + let count = self + .db + .set_presence(user_id, presence_state, currently_active, last_active_ago, status_msg) + .await?; + + if let Some(count) = count { + let is_local = self.services.globals.user_is_local(user_id); + let is_server_user = user_id == self.services.globals.server_user; + let allow_timeout = self.timeout_remote_users || is_local; + + if allow_timeout && !is_server_user { + self.schedule_presence_timer(user_id, presence_state, count)?; + } + } + + Ok(()) + } + + pub(super) async fn process_presence_timer( + &self, + user_id: &OwnedUserId, + expected_count: u64, + ) -> Result { + let (current_count, presence) = match self.db.get_presence_raw(user_id).await { + | Ok(presence) => presence, + | Err(_) => return Ok(()), + }; + + if current_count != expected_count { + trace!( + ?user_id, + expected_count, + current_count, + "Skipping stale presence timer" + ); + return Ok(()); + } + + let presence_state = presence.state().clone(); + let now = tuwunel_core::utils::millis_since_unix_epoch(); + let aggregated = self + .device_presence + .aggregate(user_id, now, self.idle_timeout, self.offline_timeout) + .await; + + if aggregated.device_count == 0 { + let last_active_ago = + Some(UInt::new_saturating(now.saturating_sub(presence.last_active_ts()))); + let status_msg = presence.status_msg(); + + let new_state = match (&presence_state, last_active_ago.map(u64::from)) { + | (PresenceState::Online, Some(ago)) if ago >= self.idle_timeout => + Some(PresenceState::Unavailable), + | (PresenceState::Unavailable, Some(ago)) if ago >= self.offline_timeout => + Some(PresenceState::Offline), + | _ => None, + }; + + debug!( + "Processed presence timer for user '{user_id}': Old state = {presence_state}, New \ + state = {new_state:?}" + ); + + if let Some(new_state) = new_state { + self.set_presence( + user_id, + &new_state, + Some(false), + last_active_ago, + status_msg, + ) + .await?; + } + + return Ok(()); + } + + if aggregated.state == presence_state { + self.schedule_presence_timer(user_id, &presence_state, current_count) + .log_err() + .ok(); + return Ok(()); + } + + let status_msg = aggregated.status_msg.or_else(|| presence.status_msg()); + let last_active_ago = + Some(UInt::new_saturating(now.saturating_sub(aggregated.last_active_ts))); + + self.set_presence( + user_id, + &aggregated.state, + Some(aggregated.currently_active), + last_active_ago, + status_msg, + ) + .await?; + + Ok(()) + } +}