From 2b327696490fa3c0e5d4219f8998b096fc0f9c3f Mon Sep 17 00:00:00 2001 From: Jared L <48422312+lhjt@users.noreply.github.com> Date: Sun, 18 Jan 2026 16:31:36 +1100 Subject: [PATCH] presence: aggregate device updates --- src/api/client/presence.rs | 8 +- src/api/server/send.rs | 6 +- src/service/presence/aggregate.rs | 207 +++++++++++++ src/service/presence/data.rs | 20 +- src/service/presence/mod.rs | 475 ++++++++++++++++++++++++------ src/service/presence/presence.rs | 9 + 6 files changed, 628 insertions(+), 97 deletions(-) create mode 100644 src/service/presence/aggregate.rs diff --git a/src/api/client/presence.rs b/src/api/client/presence.rs index 3a02c98d..438f5672 100644 --- a/src/api/client/presence.rs +++ b/src/api/client/presence.rs @@ -23,7 +23,13 @@ pub(crate) async fn set_presence_route( services .presence - .set_presence(body.sender_user(), &body.presence, None, None, body.status_msg.clone()) + .set_presence_for_device( + body.sender_user(), + body.sender_device.as_deref(), + &body.presence, + body.status_msg.clone(), + PresenceUpdateReason::ClientRequest, + ) .await?; Ok(set_presence::v3::Response {}) diff --git a/src/api/server/send.rs b/src/api/server/send.rs index 6751dc19..7a8cf153 100644 --- a/src/api/server/send.rs +++ b/src/api/server/send.rs @@ -268,11 +268,11 @@ async fn handle_edu_presence_update( services .presence - .set_presence( + .set_presence_from_federation( &update.user_id, &update.presence, - Some(update.currently_active), - Some(update.last_active_ago), + update.currently_active, + update.last_active_ago, update.status_msg.clone(), ) .await diff --git a/src/service/presence/aggregate.rs b/src/service/presence/aggregate.rs new file mode 100644 index 00000000..9051cc77 --- /dev/null +++ b/src/service/presence/aggregate.rs @@ -0,0 +1,207 @@ +use std::collections::HashMap; + +use ruma::{OwnedDeviceId, OwnedUserId, UInt, UserId, presence::PresenceState}; +use tuwunel_core::debug; +use tokio::sync::RwLock; + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub(crate) enum DeviceKey { + Device(OwnedDeviceId), + Remote, + UnknownLocal, +} + +#[derive(Debug, Clone)] +struct DevicePresence { + state: PresenceState, + currently_active: bool, + last_active_ts: u64, + last_update_ts: u64, + status_msg: Option, +} + +#[derive(Debug, Clone)] +pub(crate) struct AggregatedPresence { + pub(crate) state: PresenceState, + pub(crate) currently_active: bool, + pub(crate) last_active_ts: u64, + pub(crate) status_msg: Option, + pub(crate) device_count: usize, +} + +#[derive(Debug, Default)] +pub(crate) struct PresenceAggregator { + inner: RwLock>>, +} + +impl PresenceAggregator { + pub(crate) fn new() -> Self { Self::default() } + + pub(crate) async fn clear(&self) { + self.inner.write().await.clear(); + } + + pub(crate) async fn update( + &self, + user_id: &UserId, + device_key: DeviceKey, + state: &PresenceState, + currently_active: Option, + last_active_ago: Option, + status_msg: Option, + now_ms: u64, + ) { + let mut guard = self.inner.write().await; + let devices = guard.entry(user_id.to_owned()).or_default(); + + let last_active_ts = match last_active_ago { + | None => now_ms, + | Some(ago) => now_ms.saturating_sub(ago.into()), + }; + + let entry = devices.entry(device_key).or_insert(DevicePresence { + state: state.clone(), + currently_active: currently_active.unwrap_or(false), + last_active_ts, + last_update_ts: now_ms, + status_msg: status_msg.clone(), + }); + + entry.state = state.clone(); + entry.currently_active = currently_active.unwrap_or(false); + entry.last_active_ts = last_active_ts; + entry.last_update_ts = now_ms; + if status_msg.is_some() { + entry.status_msg = status_msg; + } + } + + pub(crate) async fn aggregate( + &self, + user_id: &UserId, + now_ms: u64, + idle_timeout_ms: u64, + offline_timeout_ms: u64, + ) -> AggregatedPresence { + let mut guard = self.inner.write().await; + let Some(devices) = guard.get_mut(user_id) else { + return AggregatedPresence { + state: PresenceState::Offline, + currently_active: false, + last_active_ts: now_ms, + status_msg: None, + device_count: 0, + }; + }; + + let mut best_state = PresenceState::Offline; + let mut best_rank = state_rank(&best_state); + let mut any_currently_active = false; + let mut last_active_ts = 0_u64; + let mut latest_status: Option<(u64, String)> = None; + + devices.retain(|_, device| { + let last_active_age = now_ms.saturating_sub(device.last_active_ts); + let last_update_age = now_ms.saturating_sub(device.last_update_ts); + + let effective_state = effective_device_state( + &device.state, + last_active_age, + idle_timeout_ms, + offline_timeout_ms, + ); + + let rank = state_rank(&effective_state); + if rank > best_rank { + best_rank = rank; + best_state = effective_state.clone(); + } + + if effective_state == PresenceState::Online || effective_state == PresenceState::Busy { + if device.currently_active && last_active_age < idle_timeout_ms { + any_currently_active = true; + } + } + + if let Some(msg) = device.status_msg.as_ref().filter(|msg| !msg.is_empty()) { + match latest_status { + | None => { + latest_status = Some((device.last_update_ts, msg.clone())); + }, + | Some((ts, _)) if device.last_update_ts > ts => { + latest_status = Some((device.last_update_ts, msg.clone())); + }, + | _ => {}, + } + } + + if device.last_active_ts > last_active_ts { + last_active_ts = device.last_active_ts; + } + + // Drop devices that haven't updated in a long time to keep the map small. + last_update_age < offline_timeout_ms + }); + + let device_count = devices.len(); + let status_msg = latest_status.map(|(_, msg)| msg); + + if device_count == 0 { + guard.remove(user_id); + } + + debug!( + ?user_id, + device_count, + state = ?best_state, + currently_active = any_currently_active, + last_active_ts, + status_msg = status_msg.as_deref(), + "Aggregated presence" + ); + + AggregatedPresence { + state: best_state, + currently_active: any_currently_active, + last_active_ts: if last_active_ts == 0 { now_ms } else { last_active_ts }, + status_msg, + device_count, + } + } +} + +fn effective_device_state( + state: &PresenceState, + last_active_age: u64, + idle_timeout_ms: u64, + offline_timeout_ms: u64, +) -> PresenceState { + match state { + | PresenceState::Busy | PresenceState::Online => { + if last_active_age >= idle_timeout_ms { + PresenceState::Unavailable + } else { + state.clone() + } + }, + | PresenceState::Unavailable => { + if last_active_age >= offline_timeout_ms { + PresenceState::Offline + } else { + PresenceState::Unavailable + } + }, + | PresenceState::Offline => PresenceState::Offline, + | _ => state.clone(), + } +} + +fn state_rank(state: &PresenceState) -> u8 { + match state { + | PresenceState::Busy => 3, + | PresenceState::Online => 2, + | PresenceState::Unavailable => 1, + | PresenceState::Offline => 0, + | _ => 0, + } +} diff --git a/src/service/presence/data.rs b/src/service/presence/data.rs index ae2b17a2..4b508575 100644 --- a/src/service/presence/data.rs +++ b/src/service/presence/data.rs @@ -43,6 +43,20 @@ impl Data { Ok((count, event)) } + pub(super) async fn get_presence_raw(&self, user_id: &UserId) -> Result<(u64, Presence)> { + let count = self + .userid_presenceid + .get(user_id) + .await + .deserialized::()?; + + let key = presenceid_key(count, user_id); + let bytes = self.presenceid_presence.get(&key).await?; + let presence = Presence::from_json_bytes(&bytes)?; + + Ok((count, presence)) + } + pub(super) async fn set_presence( &self, user_id: &UserId, @@ -50,7 +64,7 @@ impl Data { currently_active: Option, last_active_ago: Option, status_msg: Option, - ) -> Result { + ) -> Result> { let last_presence = self.get_presence(user_id).await; let state_changed = match last_presence { | Err(_) => true, @@ -96,7 +110,7 @@ impl Data { "presence spam {user_id:?} last_active_ts:{last_active_ts:?} < \ {last_last_active_ts:?}", ); - return Ok(()); + return Ok(None); } let status_msg = if status_msg.as_ref().is_some_and(String::is_empty) { @@ -124,7 +138,7 @@ impl Data { self.presenceid_presence.remove(&key); } - Ok(()) + Ok(Some(*count)) } #[inline] diff --git a/src/service/presence/mod.rs b/src/service/presence/mod.rs index c11e052c..2b223bef 100644 --- a/src/service/presence/mod.rs +++ b/src/service/presence/mod.rs @@ -1,10 +1,15 @@ +mod aggregate; mod data; mod presence; use std::{collections::HashMap, sync::Arc, time::Duration}; use async_trait::async_trait; -use futures::{Stream, StreamExt, TryFutureExt, future::try_join, stream::FuturesUnordered}; +use futures::{ + Stream, StreamExt, TryFutureExt, + future::{AbortHandle, Abortable, try_join}, + stream::FuturesUnordered, +}; use loole::{Receiver, Sender}; use ruma::{ DeviceId, OwnedUserId, UInt, UserId, events::presence::PresenceEvent, presence::PresenceState, @@ -17,7 +22,7 @@ use tuwunel_core::{ utils::{future::OptionFutureExt, option::OptionExt}, }; -use self::{data::Data, presence::Presence}; +use self::{aggregate::PresenceAggregator, data::Data, presence::Presence}; pub struct Service { timer_channel: (Sender, Receiver), @@ -27,9 +32,11 @@ pub struct Service { db: Data, services: Arc, last_sync_seen: RwLock>, + device_presence: PresenceAggregator, } -type TimerType = (OwnedUserId, Duration); +type TimerType = (OwnedUserId, Duration, u64); +type TimerFired = (OwnedUserId, u64); #[async_trait] impl crate::Service for Service { @@ -45,6 +52,7 @@ impl crate::Service for Service { db: Data::new(args), services: args.services.clone(), last_sync_seen: RwLock::new(HashMap::new()), + device_presence: PresenceAggregator::new(), })) } @@ -52,23 +60,49 @@ impl crate::Service for Service { // reset dormant online/away statuses to offline, and set the server user as // online self.unset_all_presence().await; + self.device_presence.clear().await; _ = self .maybe_ping_presence(&self.services.globals.server_user, None, &PresenceState::Online) .await; let receiver = self.timer_channel.1.clone(); - let mut presence_timers = FuturesUnordered::new(); + let mut presence_timers: FuturesUnordered<_> = FuturesUnordered::new(); + let mut timer_handles: HashMap = HashMap::new(); while !receiver.is_closed() { tokio::select! { - Some(user_id) = presence_timers.next() => { - self.process_presence_timer(&user_id).await.log_err().ok(); + Some(result) = presence_timers.next() => { + let Ok((user_id, count)) = result else { + continue; + }; + + if let Some((current_count, _)) = timer_handles.get(&user_id) { + if *current_count != count { + trace!(?user_id, count, current_count, "Skipping stale presence timer"); + continue; + } + } + + timer_handles.remove(&user_id); + self.process_presence_timer(&user_id, count).await.log_err().ok(); }, event = receiver.recv_async() => match event { Err(_) => break, - Ok((user_id, timeout)) => { - debug!("Adding timer {}: {user_id} timeout:{timeout:?}", presence_timers.len()); - presence_timers.push(presence_timer(user_id, timeout)); + Ok((user_id, timeout, count)) => { + debug!( + "Adding timer {}: {user_id} timeout:{timeout:?} count:{count}", + presence_timers.len() + ); + if let Some((_, handle)) = timer_handles.remove(&user_id) { + handle.abort(); + } + + let (handle, reg) = AbortHandle::new_pair(); + presence_timers.push(Abortable::new( + presence_timer(user_id.clone(), timeout, count), + reg, + )); + timer_handles.insert(user_id, (count, handle)); }, }, } @@ -97,6 +131,142 @@ impl crate::Service for Service { } impl Service { + fn device_key(device_id: Option<&DeviceId>, is_remote: bool) -> aggregate::DeviceKey { + if is_remote { + return aggregate::DeviceKey::Remote; + } + + match device_id { + | Some(device_id) => aggregate::DeviceKey::Device(device_id.to_owned()), + | None => aggregate::DeviceKey::UnknownLocal, + } + } + + fn schedule_presence_timer( + &self, + user_id: &UserId, + presence_state: &PresenceState, + count: u64, + ) -> Result { + if !(self.timeout_remote_users || self.services.globals.user_is_local(user_id)) + || user_id == self.services.globals.server_user + { + return Ok(()); + } + + let timeout = match presence_state { + | PresenceState::Online => self.services.server.config.presence_idle_timeout_s, + | _ => self.services.server.config.presence_offline_timeout_s, + }; + + self.timer_channel + .0 + .send((user_id.to_owned(), Duration::from_secs(timeout), count)) + .map_err(|e| { + error!("Failed to add presence timer: {}", e); + Error::bad_database("Failed to add presence timer") + }) + } + + async fn apply_device_presence_update( + &self, + user_id: &UserId, + device_key: aggregate::DeviceKey, + state: &PresenceState, + currently_active: Option, + last_active_ago: Option, + status_msg: Option, + reason: PresenceUpdateReason, + refresh_window_ms: Option, + ) -> Result { + let now = tuwunel_core::utils::millis_since_unix_epoch(); + debug!( + ?user_id, + ?device_key, + ?state, + currently_active, + last_active_ago = last_active_ago.map(u64::from), + ?reason, + "Presence update received" + ); + self.device_presence + .update( + user_id, + device_key, + state, + currently_active, + last_active_ago, + status_msg, + now, + ) + .await; + + let aggregated = self + .device_presence + .aggregate(user_id, now, self.idle_timeout, self.offline_timeout) + .await; + debug!( + ?user_id, + agg_state = ?aggregated.state, + agg_currently_active = aggregated.currently_active, + agg_last_active_ts = aggregated.last_active_ts, + agg_device_count = aggregated.device_count, + "Presence aggregate computed" + ); + + let last_presence = self.db.get_presence(user_id).await; + let (last_count, last_event) = match last_presence { + | Ok((count, event)) => (Some(count), Some(event)), + | Err(_) => (None, None), + }; + + let state_changed = match &last_event { + | Some(event) => event.content.presence != aggregated.state, + | None => true, + }; + + if !state_changed { + if let (Some(refresh_ms), Some(event), Some(count)) = + (refresh_window_ms, &last_event, last_count) + { + let last_last_active_ago: u64 = event + .content + .last_active_ago + .unwrap_or_default() + .into(); + if last_last_active_ago < refresh_ms { + self.schedule_presence_timer(user_id, &event.content.presence, count) + .log_err() + .ok(); + debug!( + ?user_id, + ?state, + last_last_active_ago, + "Skipping presence update: refresh window (timer rescheduled)" + ); + return Ok(()); + } + } + } + + let fallback_status = last_event + .and_then(|event| event.content.status_msg) + .filter(|msg| !msg.is_empty()); + let status_msg = aggregated.status_msg.or(fallback_status); + let last_active_ago = + Some(UInt::new_saturating(now.saturating_sub(aggregated.last_active_ts))); + + self.set_presence( + user_id, + &aggregated.state, + Some(aggregated.currently_active), + last_active_ago, + status_msg, + reason, + ) + .await + } + /// record that a user has just successfully completed a /sync (or /// equivalent activity) pub async fn note_sync(&self, user_id: &UserId) { @@ -137,28 +307,16 @@ impl Service { device_id: Option<&DeviceId>, new_state: &PresenceState, ) -> Result { - const REFRESH_TIMEOUT: u64 = 60 * 1000; + const REFRESH_TIMEOUT: u64 = 30 * 1000; if !self.services.server.config.allow_local_presence || self.services.db.is_read_only() { - return Ok(()); - } - - let last_presence = self.db.get_presence(user_id).await; - let state_changed = match last_presence { - | Err(_) => true, - | Ok((_, ref presence)) => presence.content.presence != *new_state, - }; - - let last_last_active_ago = match last_presence { - | Err(_) => 0_u64, - | Ok((_, ref presence)) => presence - .content - .last_active_ago - .unwrap_or_default() - .into(), - }; - - if !state_changed && last_last_active_ago < REFRESH_TIMEOUT { + debug!( + ?user_id, + ?new_state, + allow_local_presence = self.services.server.config.allow_local_presence, + read_only = self.services.db.is_read_only(), + "Skipping presence ping" + ); return Ok(()); } @@ -168,19 +326,23 @@ impl Service { .update_device_last_seen(user_id, device_id, None) }); - let status_msg = match last_presence { - | Ok((_, ref presence)) => presence.content.status_msg.clone(), - | Err(_) => Some(String::new()), - }; - - let last_active_ago = UInt::new(0); let currently_active = *new_state == PresenceState::Online; - let set_presence = self.set_presence( + let set_presence = self.apply_device_presence_update( user_id, + Self::device_key(device_id, false), new_state, Some(currently_active), - last_active_ago, - status_msg, + UInt::new(0), + None, + PresenceUpdateReason::Ping, + Some(REFRESH_TIMEOUT), + ); + + debug!( + ?user_id, + ?new_state, + currently_active, + "Presence ping accepted" ); try_join(set_presence, update_device_seen.unwrap_or(Ok(()))) @@ -188,6 +350,52 @@ impl Service { .await } + /// Applies an explicit presence update for a local device. + pub async fn set_presence_for_device( + &self, + user_id: &UserId, + device_id: Option<&DeviceId>, + state: &PresenceState, + status_msg: Option, + reason: PresenceUpdateReason, + ) -> Result { + let currently_active = *state == PresenceState::Online; + self.apply_device_presence_update( + user_id, + Self::device_key(device_id, false), + state, + Some(currently_active), + None, + status_msg, + reason, + None, + ) + .await + } + + /// Applies a presence update received over federation. + pub async fn set_presence_from_federation( + &self, + user_id: &UserId, + state: &PresenceState, + currently_active: bool, + last_active_ago: UInt, + status_msg: Option, + reason: PresenceUpdateReason, + ) -> Result { + self.apply_device_presence_update( + user_id, + Self::device_key(None, true), + state, + Some(currently_active), + Some(last_active_ago), + status_msg, + reason, + None, + ) + .await + } + /// Adds a presence event which will be saved until a new event replaces it. pub async fn set_presence( &self, @@ -202,33 +410,63 @@ impl Service { | &_ => state, }; - self.db + let count = self + .db .set_presence(user_id, presence_state, currently_active, last_active_ago, status_msg) .await?; - if (self.timeout_remote_users || self.services.globals.user_is_local(user_id)) - && user_id != self.services.globals.server_user - { - let timeout = match presence_state { - | PresenceState::Online => - self.services - .server - .config - .presence_idle_timeout_s, - | _ => - self.services - .server - .config - .presence_offline_timeout_s, - }; + if let Some(count) = count { + if (self.timeout_remote_users || self.services.globals.user_is_local(user_id)) + && user_id != self.services.globals.server_user + { + let timeout = match presence_state { + | PresenceState::Online => + self.services + .server + .config + .presence_idle_timeout_s, + | _ => + self.services + .server + .config + .presence_offline_timeout_s, + }; - self.timer_channel - .0 - .send((user_id.to_owned(), Duration::from_secs(timeout))) - .map_err(|e| { - error!("Failed to add presence timer: {}", e); - Error::bad_database("Failed to add presence timer") - })?; + let timeout_kind = match presence_state { + | PresenceState::Online => "idle_timeout_s", + | _ => "offline_timeout_s", + }; + + debug!( + ?user_id, + ?presence_state, + currently_active, + last_active_ago = last_active_ago.map(u64::from), + status_msg = status_msg_log.as_deref(), + count, + timeout_s = timeout, + timeout_kind, + timeout_remote_users = self.timeout_remote_users, + is_local, + is_server_user, + "Scheduling presence timer" + ); + + self.schedule_presence_timer(user_id, presence_state, count)?; + } else { + debug!( + ?user_id, + ?presence_state, + currently_active, + last_active_ago = last_active_ago.map(u64::from), + status_msg = status_msg_log.as_deref(), + count, + timeout_remote_users = self.timeout_remote_users, + is_local, + is_server_user, + "Presence timer not scheduled" + ); + } } Ok(()) @@ -318,43 +556,100 @@ impl Service { Ok(event) } - async fn process_presence_timer(&self, user_id: &OwnedUserId) -> Result { - let mut presence_state = PresenceState::Offline; - let mut last_active_ago = None; - let mut status_msg = None; - - let presence_event = self.get_presence(user_id).await; - - if let Ok(presence_event) = presence_event { - presence_state = presence_event.content.presence; - last_active_ago = presence_event.content.last_active_ago; - status_msg = presence_event.content.status_msg; - } - - let new_state = match (&presence_state, last_active_ago.map(u64::from)) { - | (PresenceState::Online, Some(ago)) if ago >= self.idle_timeout => - Some(PresenceState::Unavailable), - | (PresenceState::Unavailable, Some(ago)) if ago >= self.offline_timeout => - Some(PresenceState::Offline), - | _ => None, + async fn process_presence_timer(&self, user_id: &OwnedUserId, expected_count: u64) -> Result { + let (current_count, presence) = match self.db.get_presence_raw(user_id).await { + | Ok(presence) => presence, + | Err(_) => return Ok(()), }; - debug!( - "Processed presence timer for user '{user_id}': Old state = {presence_state}, New \ - state = {new_state:?}" - ); - - if let Some(new_state) = new_state { - self.set_presence(user_id, &new_state, Some(false), last_active_ago, status_msg) - .await?; + if current_count != expected_count { + trace!( + ?user_id, + expected_count, + current_count, + "Skipping stale presence timer" + ); + return Ok(()); } + let presence_state = presence.state().clone(); + let now = tuwunel_core::utils::millis_since_unix_epoch(); + let aggregated = self + .device_presence + .aggregate(user_id, now, self.idle_timeout, self.offline_timeout) + .await; + + if aggregated.device_count == 0 { + let last_active_ago = + Some(UInt::new_saturating(now.saturating_sub(presence.last_active_ts()))); + let status_msg = presence.status_msg(); + let new_state = match (&presence_state, last_active_ago.map(u64::from)) { + | (PresenceState::Online, Some(ago)) if ago >= self.idle_timeout => + Some(PresenceState::Unavailable), + | (PresenceState::Unavailable, Some(ago)) if ago >= self.offline_timeout => + Some(PresenceState::Offline), + | _ => None, + }; + + debug!( + "Processed presence timer for user '{user_id}': Old state = {presence_state}, New \ + state = {new_state:?}" + ); + + if let Some(new_state) = new_state { + let reason = match new_state { + | PresenceState::Unavailable => PresenceUpdateReason::TimerIdle, + | PresenceState::Offline => PresenceUpdateReason::TimerOffline, + | _ => PresenceUpdateReason::Ping, + }; + + self.set_presence( + user_id, + &new_state, + Some(false), + last_active_ago, + status_msg, + reason, + ) + .await?; + } + + return Ok(()); + } + + if aggregated.state == presence_state { + self.schedule_presence_timer(user_id, &presence_state, current_count) + .log_err() + .ok(); + return Ok(()); + } + + let reason = match aggregated.state { + | PresenceState::Unavailable => PresenceUpdateReason::TimerIdle, + | PresenceState::Offline => PresenceUpdateReason::TimerOffline, + | _ => PresenceUpdateReason::Ping, + }; + + let status_msg = aggregated.status_msg.or_else(|| presence.status_msg()); + let last_active_ago = + Some(UInt::new_saturating(now.saturating_sub(aggregated.last_active_ts))); + + self.set_presence( + user_id, + &aggregated.state, + Some(aggregated.currently_active), + last_active_ago, + status_msg, + reason, + ) + .await?; + Ok(()) } } -async fn presence_timer(user_id: OwnedUserId, timeout: Duration) -> OwnedUserId { +async fn presence_timer(user_id: OwnedUserId, timeout: Duration, count: u64) -> TimerFired { sleep(timeout).await; - user_id + (user_id, count) } diff --git a/src/service/presence/presence.rs b/src/service/presence/presence.rs index d3c53d0a..887d5284 100644 --- a/src/service/presence/presence.rs +++ b/src/service/presence/presence.rs @@ -40,6 +40,15 @@ impl Presence { .map_err(|_| Error::bad_database("Invalid presence data in database")) } + #[inline] + pub(super) fn state(&self) -> &PresenceState { &self.state } + + #[inline] + pub(super) fn last_active_ts(&self) -> u64 { self.last_active_ts } + + #[inline] + pub(super) fn status_msg(&self) -> Option { self.status_msg.clone() } + /// Creates a PresenceEvent from available data. pub(super) async fn to_presence_event( &self,