presence: extract update pipeline
This commit is contained in:
@@ -1,25 +1,21 @@
|
||||
mod aggregate;
|
||||
mod data;
|
||||
mod pipeline;
|
||||
mod presence;
|
||||
|
||||
use std::{collections::HashMap, sync::Arc, time::Duration};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use futures::{
|
||||
Stream, StreamExt, TryFutureExt,
|
||||
future::{AbortHandle, Abortable, try_join},
|
||||
stream::FuturesUnordered,
|
||||
};
|
||||
use futures::{Stream, StreamExt, TryFutureExt, future::{AbortHandle, Abortable}, stream::FuturesUnordered};
|
||||
use loole::{Receiver, Sender};
|
||||
use ruma::{
|
||||
DeviceId, OwnedUserId, UInt, UserId, events::presence::PresenceEvent, presence::PresenceState,
|
||||
OwnedUserId, UserId, events::presence::PresenceEvent, presence::PresenceState,
|
||||
};
|
||||
use tokio::{sync::RwLock, time::sleep};
|
||||
use tuwunel_core::{
|
||||
Error, Result, checked, debug, debug_warn, error,
|
||||
Result, checked, debug, debug_warn,
|
||||
result::LogErr,
|
||||
trace,
|
||||
utils::future::OptionFutureExt,
|
||||
};
|
||||
|
||||
use self::{aggregate::PresenceAggregator, data::Data, presence::Presence};
|
||||
@@ -131,139 +127,6 @@ impl crate::Service for Service {
|
||||
}
|
||||
|
||||
impl Service {
|
||||
fn device_key(device_id: Option<&DeviceId>, is_remote: bool) -> aggregate::DeviceKey {
|
||||
if is_remote {
|
||||
return aggregate::DeviceKey::Remote;
|
||||
}
|
||||
|
||||
match device_id {
|
||||
| Some(device_id) => aggregate::DeviceKey::Device(device_id.to_owned()),
|
||||
| None => aggregate::DeviceKey::UnknownLocal,
|
||||
}
|
||||
}
|
||||
|
||||
fn schedule_presence_timer(
|
||||
&self,
|
||||
user_id: &UserId,
|
||||
presence_state: &PresenceState,
|
||||
count: u64,
|
||||
) -> Result {
|
||||
if !(self.timeout_remote_users || self.services.globals.user_is_local(user_id))
|
||||
|| user_id == self.services.globals.server_user
|
||||
{
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let timeout = match presence_state {
|
||||
| PresenceState::Online => self.services.server.config.presence_idle_timeout_s,
|
||||
| _ => self.services.server.config.presence_offline_timeout_s,
|
||||
};
|
||||
|
||||
self.timer_channel
|
||||
.0
|
||||
.send((user_id.to_owned(), Duration::from_secs(timeout), count))
|
||||
.map_err(|e| {
|
||||
error!("Failed to add presence timer: {}", e);
|
||||
Error::bad_database("Failed to add presence timer")
|
||||
})
|
||||
}
|
||||
|
||||
async fn apply_device_presence_update(
|
||||
&self,
|
||||
user_id: &UserId,
|
||||
device_key: aggregate::DeviceKey,
|
||||
state: &PresenceState,
|
||||
currently_active: Option<bool>,
|
||||
last_active_ago: Option<UInt>,
|
||||
status_msg: Option<String>,
|
||||
refresh_window_ms: Option<u64>,
|
||||
) -> Result {
|
||||
let now = tuwunel_core::utils::millis_since_unix_epoch();
|
||||
debug!(
|
||||
?user_id,
|
||||
?device_key,
|
||||
?state,
|
||||
currently_active,
|
||||
last_active_ago = last_active_ago.map(u64::from),
|
||||
"Presence update received"
|
||||
);
|
||||
self.device_presence
|
||||
.update(
|
||||
user_id,
|
||||
device_key,
|
||||
state,
|
||||
currently_active,
|
||||
last_active_ago,
|
||||
status_msg,
|
||||
now,
|
||||
)
|
||||
.await;
|
||||
|
||||
let aggregated = self
|
||||
.device_presence
|
||||
.aggregate(user_id, now, self.idle_timeout, self.offline_timeout)
|
||||
.await;
|
||||
debug!(
|
||||
?user_id,
|
||||
agg_state = ?aggregated.state,
|
||||
agg_currently_active = aggregated.currently_active,
|
||||
agg_last_active_ts = aggregated.last_active_ts,
|
||||
agg_device_count = aggregated.device_count,
|
||||
"Presence aggregate computed"
|
||||
);
|
||||
|
||||
let last_presence = self.db.get_presence(user_id).await;
|
||||
let (last_count, last_event) = match last_presence {
|
||||
| Ok((count, event)) => (Some(count), Some(event)),
|
||||
| Err(_) => (None, None),
|
||||
};
|
||||
|
||||
let state_changed = match &last_event {
|
||||
| Some(event) => event.content.presence != aggregated.state,
|
||||
| None => true,
|
||||
};
|
||||
|
||||
if !state_changed {
|
||||
if let (Some(refresh_ms), Some(event), Some(count)) =
|
||||
(refresh_window_ms, &last_event, last_count)
|
||||
{
|
||||
let last_last_active_ago: u64 = event
|
||||
.content
|
||||
.last_active_ago
|
||||
.unwrap_or_default()
|
||||
.into();
|
||||
if last_last_active_ago < refresh_ms {
|
||||
self.schedule_presence_timer(user_id, &event.content.presence, count)
|
||||
.log_err()
|
||||
.ok();
|
||||
debug!(
|
||||
?user_id,
|
||||
?state,
|
||||
last_last_active_ago,
|
||||
"Skipping presence update: refresh window (timer rescheduled)"
|
||||
);
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let fallback_status = last_event
|
||||
.and_then(|event| event.content.status_msg)
|
||||
.filter(|msg| !msg.is_empty());
|
||||
let status_msg = aggregated.status_msg.or(fallback_status);
|
||||
let last_active_ago =
|
||||
Some(UInt::new_saturating(now.saturating_sub(aggregated.last_active_ts)));
|
||||
|
||||
self.set_presence(
|
||||
user_id,
|
||||
&aggregated.state,
|
||||
Some(aggregated.currently_active),
|
||||
last_active_ago,
|
||||
status_msg,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
/// record that a user has just successfully completed a /sync (or
|
||||
/// equivalent activity)
|
||||
pub async fn note_sync(&self, user_id: &UserId) {
|
||||
@@ -296,130 +159,6 @@ impl Service {
|
||||
.await
|
||||
}
|
||||
|
||||
/// Pings the presence of the given user, setting the specified state. When
|
||||
/// device_id is supplied.
|
||||
pub async fn maybe_ping_presence(
|
||||
&self,
|
||||
user_id: &UserId,
|
||||
device_id: Option<&DeviceId>,
|
||||
new_state: &PresenceState,
|
||||
) -> Result {
|
||||
const REFRESH_TIMEOUT: u64 = 30 * 1000;
|
||||
|
||||
if !self.services.server.config.allow_local_presence || self.services.db.is_read_only() {
|
||||
debug!(
|
||||
?user_id,
|
||||
?new_state,
|
||||
allow_local_presence = self.services.server.config.allow_local_presence,
|
||||
read_only = self.services.db.is_read_only(),
|
||||
"Skipping presence ping"
|
||||
);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let update_device_seen = device_id.map_async(|device_id| {
|
||||
self.services
|
||||
.users
|
||||
.update_device_last_seen(user_id, device_id, None)
|
||||
});
|
||||
|
||||
let currently_active = *new_state == PresenceState::Online;
|
||||
let set_presence = self.apply_device_presence_update(
|
||||
user_id,
|
||||
Self::device_key(device_id, false),
|
||||
new_state,
|
||||
Some(currently_active),
|
||||
UInt::new(0),
|
||||
None,
|
||||
Some(REFRESH_TIMEOUT),
|
||||
);
|
||||
|
||||
debug!(
|
||||
?user_id,
|
||||
?new_state,
|
||||
currently_active,
|
||||
"Presence ping accepted"
|
||||
);
|
||||
|
||||
try_join(set_presence, update_device_seen.unwrap_or(Ok(())))
|
||||
.map_ok(|_| ())
|
||||
.await
|
||||
}
|
||||
|
||||
/// Applies an explicit presence update for a local device.
|
||||
pub async fn set_presence_for_device(
|
||||
&self,
|
||||
user_id: &UserId,
|
||||
device_id: Option<&DeviceId>,
|
||||
state: &PresenceState,
|
||||
status_msg: Option<String>,
|
||||
) -> Result {
|
||||
let currently_active = *state == PresenceState::Online;
|
||||
self.apply_device_presence_update(
|
||||
user_id,
|
||||
Self::device_key(device_id, false),
|
||||
state,
|
||||
Some(currently_active),
|
||||
None,
|
||||
status_msg,
|
||||
None,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Applies a presence update received over federation.
|
||||
pub async fn set_presence_from_federation(
|
||||
&self,
|
||||
user_id: &UserId,
|
||||
state: &PresenceState,
|
||||
currently_active: bool,
|
||||
last_active_ago: UInt,
|
||||
status_msg: Option<String>,
|
||||
) -> Result {
|
||||
self.apply_device_presence_update(
|
||||
user_id,
|
||||
Self::device_key(None, true),
|
||||
state,
|
||||
Some(currently_active),
|
||||
Some(last_active_ago),
|
||||
status_msg,
|
||||
None,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Adds a presence event which will be saved until a new event replaces it.
|
||||
pub async fn set_presence(
|
||||
&self,
|
||||
user_id: &UserId,
|
||||
state: &PresenceState,
|
||||
currently_active: Option<bool>,
|
||||
last_active_ago: Option<UInt>,
|
||||
status_msg: Option<String>,
|
||||
) -> Result {
|
||||
let presence_state = match state.as_str() {
|
||||
| "" => &PresenceState::Offline, // default an empty string to 'offline'
|
||||
| &_ => state,
|
||||
};
|
||||
|
||||
let count = self
|
||||
.db
|
||||
.set_presence(user_id, presence_state, currently_active, last_active_ago, status_msg)
|
||||
.await?;
|
||||
|
||||
if let Some(count) = count {
|
||||
let is_local = self.services.globals.user_is_local(user_id);
|
||||
let is_server_user = user_id == self.services.globals.server_user;
|
||||
let allow_timeout = self.timeout_remote_users || is_local;
|
||||
|
||||
if allow_timeout && !is_server_user {
|
||||
self.schedule_presence_timer(user_id, presence_state, count)?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Removes the presence record for the given user from the database.
|
||||
///
|
||||
/// TODO: Why is this not used?
|
||||
@@ -503,83 +242,6 @@ impl Service {
|
||||
|
||||
Ok(event)
|
||||
}
|
||||
|
||||
async fn process_presence_timer(&self, user_id: &OwnedUserId, expected_count: u64) -> Result {
|
||||
let (current_count, presence) = match self.db.get_presence_raw(user_id).await {
|
||||
| Ok(presence) => presence,
|
||||
| Err(_) => return Ok(()),
|
||||
};
|
||||
|
||||
if current_count != expected_count {
|
||||
trace!(
|
||||
?user_id,
|
||||
expected_count,
|
||||
current_count,
|
||||
"Skipping stale presence timer"
|
||||
);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let presence_state = presence.state().clone();
|
||||
let now = tuwunel_core::utils::millis_since_unix_epoch();
|
||||
let aggregated = self
|
||||
.device_presence
|
||||
.aggregate(user_id, now, self.idle_timeout, self.offline_timeout)
|
||||
.await;
|
||||
|
||||
if aggregated.device_count == 0 {
|
||||
let last_active_ago =
|
||||
Some(UInt::new_saturating(now.saturating_sub(presence.last_active_ts())));
|
||||
let status_msg = presence.status_msg();
|
||||
let new_state = match (&presence_state, last_active_ago.map(u64::from)) {
|
||||
| (PresenceState::Online, Some(ago)) if ago >= self.idle_timeout =>
|
||||
Some(PresenceState::Unavailable),
|
||||
| (PresenceState::Unavailable, Some(ago)) if ago >= self.offline_timeout =>
|
||||
Some(PresenceState::Offline),
|
||||
| _ => None,
|
||||
};
|
||||
|
||||
debug!(
|
||||
"Processed presence timer for user '{user_id}': Old state = {presence_state}, New \
|
||||
state = {new_state:?}"
|
||||
);
|
||||
|
||||
if let Some(new_state) = new_state {
|
||||
self.set_presence(
|
||||
user_id,
|
||||
&new_state,
|
||||
Some(false),
|
||||
last_active_ago,
|
||||
status_msg,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
if aggregated.state == presence_state {
|
||||
self.schedule_presence_timer(user_id, &presence_state, current_count)
|
||||
.log_err()
|
||||
.ok();
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let status_msg = aggregated.status_msg.or_else(|| presence.status_msg());
|
||||
let last_active_ago =
|
||||
Some(UInt::new_saturating(now.saturating_sub(aggregated.last_active_ts)));
|
||||
|
||||
self.set_presence(
|
||||
user_id,
|
||||
&aggregated.state,
|
||||
Some(aggregated.currently_active),
|
||||
last_active_ago,
|
||||
status_msg,
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
async fn presence_timer(user_id: OwnedUserId, timeout: Duration, count: u64) -> TimerFired {
|
||||
|
||||
358
src/service/presence/pipeline.rs
Normal file
358
src/service/presence/pipeline.rs
Normal file
@@ -0,0 +1,358 @@
|
||||
//! Presence update pipeline.
|
||||
//!
|
||||
//! This module centralizes the write path for presence updates. It keeps the
|
||||
//! aggregation and timer logic in one place so the public `Service` surface
|
||||
//! remains small and the update flow is easy to review.
|
||||
|
||||
use std::time::Duration;
|
||||
|
||||
use futures::TryFutureExt;
|
||||
use ruma::{DeviceId, OwnedUserId, UInt, UserId, presence::PresenceState};
|
||||
use tuwunel_core::{
|
||||
Error, Result, debug, error, trace,
|
||||
result::LogErr,
|
||||
utils::{future::OptionFutureExt, option::OptionExt},
|
||||
};
|
||||
|
||||
use super::{aggregate, Service};
|
||||
|
||||
impl Service {
|
||||
fn device_key(device_id: Option<&DeviceId>, is_remote: bool) -> aggregate::DeviceKey {
|
||||
if is_remote {
|
||||
return aggregate::DeviceKey::Remote;
|
||||
}
|
||||
|
||||
match device_id {
|
||||
| Some(device_id) => aggregate::DeviceKey::Device(device_id.to_owned()),
|
||||
| None => aggregate::DeviceKey::UnknownLocal,
|
||||
}
|
||||
}
|
||||
|
||||
fn schedule_presence_timer(
|
||||
&self,
|
||||
user_id: &UserId,
|
||||
presence_state: &PresenceState,
|
||||
count: u64,
|
||||
) -> Result {
|
||||
if !(self.timeout_remote_users || self.services.globals.user_is_local(user_id))
|
||||
|| user_id == self.services.globals.server_user
|
||||
{
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let timeout = match presence_state {
|
||||
| PresenceState::Online => self.services.server.config.presence_idle_timeout_s,
|
||||
| _ => self.services.server.config.presence_offline_timeout_s,
|
||||
};
|
||||
|
||||
self.timer_channel
|
||||
.0
|
||||
.send((user_id.to_owned(), Duration::from_secs(timeout), count))
|
||||
.map_err(|e| {
|
||||
error!("Failed to add presence timer: {}", e);
|
||||
Error::bad_database("Failed to add presence timer")
|
||||
})
|
||||
}
|
||||
|
||||
async fn apply_device_presence_update(
|
||||
&self,
|
||||
user_id: &UserId,
|
||||
device_key: aggregate::DeviceKey,
|
||||
state: &PresenceState,
|
||||
currently_active: Option<bool>,
|
||||
last_active_ago: Option<UInt>,
|
||||
status_msg: Option<String>,
|
||||
refresh_window_ms: Option<u64>,
|
||||
) -> Result {
|
||||
let now = tuwunel_core::utils::millis_since_unix_epoch();
|
||||
debug!(
|
||||
?user_id,
|
||||
?device_key,
|
||||
?state,
|
||||
currently_active,
|
||||
last_active_ago = last_active_ago.map(u64::from),
|
||||
"Presence update received"
|
||||
);
|
||||
self.device_presence
|
||||
.update(
|
||||
user_id,
|
||||
device_key,
|
||||
state,
|
||||
currently_active,
|
||||
last_active_ago,
|
||||
status_msg,
|
||||
now,
|
||||
)
|
||||
.await;
|
||||
|
||||
let aggregated = self
|
||||
.device_presence
|
||||
.aggregate(user_id, now, self.idle_timeout, self.offline_timeout)
|
||||
.await;
|
||||
debug!(
|
||||
?user_id,
|
||||
agg_state = ?aggregated.state,
|
||||
agg_currently_active = aggregated.currently_active,
|
||||
agg_last_active_ts = aggregated.last_active_ts,
|
||||
agg_device_count = aggregated.device_count,
|
||||
"Presence aggregate computed"
|
||||
);
|
||||
|
||||
let last_presence = self.db.get_presence(user_id).await;
|
||||
let (last_count, last_event) = match last_presence {
|
||||
| Ok((count, event)) => (Some(count), Some(event)),
|
||||
| Err(_) => (None, None),
|
||||
};
|
||||
|
||||
let state_changed = match &last_event {
|
||||
| Some(event) => event.content.presence != aggregated.state,
|
||||
| None => true,
|
||||
};
|
||||
|
||||
if !state_changed {
|
||||
if let (Some(refresh_ms), Some(event), Some(count)) =
|
||||
(refresh_window_ms, &last_event, last_count)
|
||||
{
|
||||
let last_last_active_ago: u64 = event
|
||||
.content
|
||||
.last_active_ago
|
||||
.unwrap_or_default()
|
||||
.into();
|
||||
if last_last_active_ago < refresh_ms {
|
||||
self.schedule_presence_timer(user_id, &event.content.presence, count)
|
||||
.log_err()
|
||||
.ok();
|
||||
debug!(
|
||||
?user_id,
|
||||
?state,
|
||||
last_last_active_ago,
|
||||
"Skipping presence update: refresh window (timer rescheduled)"
|
||||
);
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let fallback_status = last_event
|
||||
.and_then(|event| event.content.status_msg)
|
||||
.filter(|msg| !msg.is_empty());
|
||||
let status_msg = aggregated.status_msg.or(fallback_status);
|
||||
let last_active_ago =
|
||||
Some(UInt::new_saturating(now.saturating_sub(aggregated.last_active_ts)));
|
||||
|
||||
self.set_presence(
|
||||
user_id,
|
||||
&aggregated.state,
|
||||
Some(aggregated.currently_active),
|
||||
last_active_ago,
|
||||
status_msg,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Pings the presence of the given user, setting the specified state. When
|
||||
/// device_id is supplied.
|
||||
pub async fn maybe_ping_presence(
|
||||
&self,
|
||||
user_id: &UserId,
|
||||
device_id: Option<&DeviceId>,
|
||||
new_state: &PresenceState,
|
||||
) -> Result {
|
||||
const REFRESH_TIMEOUT: u64 = 30 * 1000;
|
||||
|
||||
if !self.services.server.config.allow_local_presence || self.services.db.is_read_only() {
|
||||
debug!(
|
||||
?user_id,
|
||||
?new_state,
|
||||
allow_local_presence = self.services.server.config.allow_local_presence,
|
||||
read_only = self.services.db.is_read_only(),
|
||||
"Skipping presence ping"
|
||||
);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let update_device_seen = device_id.map_async(|device_id| {
|
||||
self.services
|
||||
.users
|
||||
.update_device_last_seen(user_id, device_id, None)
|
||||
});
|
||||
|
||||
let currently_active = *new_state == PresenceState::Online;
|
||||
let set_presence = self.apply_device_presence_update(
|
||||
user_id,
|
||||
Self::device_key(device_id, false),
|
||||
new_state,
|
||||
Some(currently_active),
|
||||
UInt::new(0),
|
||||
None,
|
||||
Some(REFRESH_TIMEOUT),
|
||||
);
|
||||
|
||||
debug!(
|
||||
?user_id,
|
||||
?new_state,
|
||||
currently_active,
|
||||
"Presence ping accepted"
|
||||
);
|
||||
|
||||
futures::future::try_join(set_presence, update_device_seen.unwrap_or(Ok(())))
|
||||
.map_ok(|_| ())
|
||||
.await
|
||||
}
|
||||
|
||||
/// Applies an explicit presence update for a local device.
|
||||
pub async fn set_presence_for_device(
|
||||
&self,
|
||||
user_id: &UserId,
|
||||
device_id: Option<&DeviceId>,
|
||||
state: &PresenceState,
|
||||
status_msg: Option<String>,
|
||||
) -> Result {
|
||||
let currently_active = *state == PresenceState::Online;
|
||||
self.apply_device_presence_update(
|
||||
user_id,
|
||||
Self::device_key(device_id, false),
|
||||
state,
|
||||
Some(currently_active),
|
||||
None,
|
||||
status_msg,
|
||||
None,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Applies a presence update received over federation.
|
||||
pub async fn set_presence_from_federation(
|
||||
&self,
|
||||
user_id: &UserId,
|
||||
state: &PresenceState,
|
||||
currently_active: bool,
|
||||
last_active_ago: UInt,
|
||||
status_msg: Option<String>,
|
||||
) -> Result {
|
||||
self.apply_device_presence_update(
|
||||
user_id,
|
||||
Self::device_key(None, true),
|
||||
state,
|
||||
Some(currently_active),
|
||||
Some(last_active_ago),
|
||||
status_msg,
|
||||
None,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Adds a presence event which will be saved until a new event replaces it.
|
||||
pub async fn set_presence(
|
||||
&self,
|
||||
user_id: &UserId,
|
||||
state: &PresenceState,
|
||||
currently_active: Option<bool>,
|
||||
last_active_ago: Option<UInt>,
|
||||
status_msg: Option<String>,
|
||||
) -> Result {
|
||||
let presence_state = match state.as_str() {
|
||||
| "" => &PresenceState::Offline, // default an empty string to 'offline'
|
||||
| &_ => state,
|
||||
};
|
||||
|
||||
let count = self
|
||||
.db
|
||||
.set_presence(user_id, presence_state, currently_active, last_active_ago, status_msg)
|
||||
.await?;
|
||||
|
||||
if let Some(count) = count {
|
||||
let is_local = self.services.globals.user_is_local(user_id);
|
||||
let is_server_user = user_id == self.services.globals.server_user;
|
||||
let allow_timeout = self.timeout_remote_users || is_local;
|
||||
|
||||
if allow_timeout && !is_server_user {
|
||||
self.schedule_presence_timer(user_id, presence_state, count)?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(super) async fn process_presence_timer(
|
||||
&self,
|
||||
user_id: &OwnedUserId,
|
||||
expected_count: u64,
|
||||
) -> Result {
|
||||
let (current_count, presence) = match self.db.get_presence_raw(user_id).await {
|
||||
| Ok(presence) => presence,
|
||||
| Err(_) => return Ok(()),
|
||||
};
|
||||
|
||||
if current_count != expected_count {
|
||||
trace!(
|
||||
?user_id,
|
||||
expected_count,
|
||||
current_count,
|
||||
"Skipping stale presence timer"
|
||||
);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let presence_state = presence.state().clone();
|
||||
let now = tuwunel_core::utils::millis_since_unix_epoch();
|
||||
let aggregated = self
|
||||
.device_presence
|
||||
.aggregate(user_id, now, self.idle_timeout, self.offline_timeout)
|
||||
.await;
|
||||
|
||||
if aggregated.device_count == 0 {
|
||||
let last_active_ago =
|
||||
Some(UInt::new_saturating(now.saturating_sub(presence.last_active_ts())));
|
||||
let status_msg = presence.status_msg();
|
||||
|
||||
let new_state = match (&presence_state, last_active_ago.map(u64::from)) {
|
||||
| (PresenceState::Online, Some(ago)) if ago >= self.idle_timeout =>
|
||||
Some(PresenceState::Unavailable),
|
||||
| (PresenceState::Unavailable, Some(ago)) if ago >= self.offline_timeout =>
|
||||
Some(PresenceState::Offline),
|
||||
| _ => None,
|
||||
};
|
||||
|
||||
debug!(
|
||||
"Processed presence timer for user '{user_id}': Old state = {presence_state}, New \
|
||||
state = {new_state:?}"
|
||||
);
|
||||
|
||||
if let Some(new_state) = new_state {
|
||||
self.set_presence(
|
||||
user_id,
|
||||
&new_state,
|
||||
Some(false),
|
||||
last_active_ago,
|
||||
status_msg,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
if aggregated.state == presence_state {
|
||||
self.schedule_presence_timer(user_id, &presence_state, current_count)
|
||||
.log_err()
|
||||
.ok();
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let status_msg = aggregated.status_msg.or_else(|| presence.status_msg());
|
||||
let last_active_ago =
|
||||
Some(UInt::new_saturating(now.saturating_sub(aggregated.last_active_ts)));
|
||||
|
||||
self.set_presence(
|
||||
user_id,
|
||||
&aggregated.state,
|
||||
Some(aggregated.currently_active),
|
||||
last_active_ago,
|
||||
status_msg,
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user