presence: aggregate device updates
This commit is contained in:
@@ -23,7 +23,13 @@ pub(crate) async fn set_presence_route(
|
||||
|
||||
services
|
||||
.presence
|
||||
.set_presence(body.sender_user(), &body.presence, None, None, body.status_msg.clone())
|
||||
.set_presence_for_device(
|
||||
body.sender_user(),
|
||||
body.sender_device.as_deref(),
|
||||
&body.presence,
|
||||
body.status_msg.clone(),
|
||||
PresenceUpdateReason::ClientRequest,
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(set_presence::v3::Response {})
|
||||
|
||||
@@ -268,11 +268,11 @@ async fn handle_edu_presence_update(
|
||||
|
||||
services
|
||||
.presence
|
||||
.set_presence(
|
||||
.set_presence_from_federation(
|
||||
&update.user_id,
|
||||
&update.presence,
|
||||
Some(update.currently_active),
|
||||
Some(update.last_active_ago),
|
||||
update.currently_active,
|
||||
update.last_active_ago,
|
||||
update.status_msg.clone(),
|
||||
)
|
||||
.await
|
||||
|
||||
207
src/service/presence/aggregate.rs
Normal file
207
src/service/presence/aggregate.rs
Normal file
@@ -0,0 +1,207 @@
|
||||
use std::collections::HashMap;
|
||||
|
||||
use ruma::{OwnedDeviceId, OwnedUserId, UInt, UserId, presence::PresenceState};
|
||||
use tuwunel_core::debug;
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
|
||||
pub(crate) enum DeviceKey {
|
||||
Device(OwnedDeviceId),
|
||||
Remote,
|
||||
UnknownLocal,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
struct DevicePresence {
|
||||
state: PresenceState,
|
||||
currently_active: bool,
|
||||
last_active_ts: u64,
|
||||
last_update_ts: u64,
|
||||
status_msg: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub(crate) struct AggregatedPresence {
|
||||
pub(crate) state: PresenceState,
|
||||
pub(crate) currently_active: bool,
|
||||
pub(crate) last_active_ts: u64,
|
||||
pub(crate) status_msg: Option<String>,
|
||||
pub(crate) device_count: usize,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
pub(crate) struct PresenceAggregator {
|
||||
inner: RwLock<HashMap<OwnedUserId, HashMap<DeviceKey, DevicePresence>>>,
|
||||
}
|
||||
|
||||
impl PresenceAggregator {
|
||||
pub(crate) fn new() -> Self { Self::default() }
|
||||
|
||||
pub(crate) async fn clear(&self) {
|
||||
self.inner.write().await.clear();
|
||||
}
|
||||
|
||||
pub(crate) async fn update(
|
||||
&self,
|
||||
user_id: &UserId,
|
||||
device_key: DeviceKey,
|
||||
state: &PresenceState,
|
||||
currently_active: Option<bool>,
|
||||
last_active_ago: Option<UInt>,
|
||||
status_msg: Option<String>,
|
||||
now_ms: u64,
|
||||
) {
|
||||
let mut guard = self.inner.write().await;
|
||||
let devices = guard.entry(user_id.to_owned()).or_default();
|
||||
|
||||
let last_active_ts = match last_active_ago {
|
||||
| None => now_ms,
|
||||
| Some(ago) => now_ms.saturating_sub(ago.into()),
|
||||
};
|
||||
|
||||
let entry = devices.entry(device_key).or_insert(DevicePresence {
|
||||
state: state.clone(),
|
||||
currently_active: currently_active.unwrap_or(false),
|
||||
last_active_ts,
|
||||
last_update_ts: now_ms,
|
||||
status_msg: status_msg.clone(),
|
||||
});
|
||||
|
||||
entry.state = state.clone();
|
||||
entry.currently_active = currently_active.unwrap_or(false);
|
||||
entry.last_active_ts = last_active_ts;
|
||||
entry.last_update_ts = now_ms;
|
||||
if status_msg.is_some() {
|
||||
entry.status_msg = status_msg;
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn aggregate(
|
||||
&self,
|
||||
user_id: &UserId,
|
||||
now_ms: u64,
|
||||
idle_timeout_ms: u64,
|
||||
offline_timeout_ms: u64,
|
||||
) -> AggregatedPresence {
|
||||
let mut guard = self.inner.write().await;
|
||||
let Some(devices) = guard.get_mut(user_id) else {
|
||||
return AggregatedPresence {
|
||||
state: PresenceState::Offline,
|
||||
currently_active: false,
|
||||
last_active_ts: now_ms,
|
||||
status_msg: None,
|
||||
device_count: 0,
|
||||
};
|
||||
};
|
||||
|
||||
let mut best_state = PresenceState::Offline;
|
||||
let mut best_rank = state_rank(&best_state);
|
||||
let mut any_currently_active = false;
|
||||
let mut last_active_ts = 0_u64;
|
||||
let mut latest_status: Option<(u64, String)> = None;
|
||||
|
||||
devices.retain(|_, device| {
|
||||
let last_active_age = now_ms.saturating_sub(device.last_active_ts);
|
||||
let last_update_age = now_ms.saturating_sub(device.last_update_ts);
|
||||
|
||||
let effective_state = effective_device_state(
|
||||
&device.state,
|
||||
last_active_age,
|
||||
idle_timeout_ms,
|
||||
offline_timeout_ms,
|
||||
);
|
||||
|
||||
let rank = state_rank(&effective_state);
|
||||
if rank > best_rank {
|
||||
best_rank = rank;
|
||||
best_state = effective_state.clone();
|
||||
}
|
||||
|
||||
if effective_state == PresenceState::Online || effective_state == PresenceState::Busy {
|
||||
if device.currently_active && last_active_age < idle_timeout_ms {
|
||||
any_currently_active = true;
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(msg) = device.status_msg.as_ref().filter(|msg| !msg.is_empty()) {
|
||||
match latest_status {
|
||||
| None => {
|
||||
latest_status = Some((device.last_update_ts, msg.clone()));
|
||||
},
|
||||
| Some((ts, _)) if device.last_update_ts > ts => {
|
||||
latest_status = Some((device.last_update_ts, msg.clone()));
|
||||
},
|
||||
| _ => {},
|
||||
}
|
||||
}
|
||||
|
||||
if device.last_active_ts > last_active_ts {
|
||||
last_active_ts = device.last_active_ts;
|
||||
}
|
||||
|
||||
// Drop devices that haven't updated in a long time to keep the map small.
|
||||
last_update_age < offline_timeout_ms
|
||||
});
|
||||
|
||||
let device_count = devices.len();
|
||||
let status_msg = latest_status.map(|(_, msg)| msg);
|
||||
|
||||
if device_count == 0 {
|
||||
guard.remove(user_id);
|
||||
}
|
||||
|
||||
debug!(
|
||||
?user_id,
|
||||
device_count,
|
||||
state = ?best_state,
|
||||
currently_active = any_currently_active,
|
||||
last_active_ts,
|
||||
status_msg = status_msg.as_deref(),
|
||||
"Aggregated presence"
|
||||
);
|
||||
|
||||
AggregatedPresence {
|
||||
state: best_state,
|
||||
currently_active: any_currently_active,
|
||||
last_active_ts: if last_active_ts == 0 { now_ms } else { last_active_ts },
|
||||
status_msg,
|
||||
device_count,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn effective_device_state(
|
||||
state: &PresenceState,
|
||||
last_active_age: u64,
|
||||
idle_timeout_ms: u64,
|
||||
offline_timeout_ms: u64,
|
||||
) -> PresenceState {
|
||||
match state {
|
||||
| PresenceState::Busy | PresenceState::Online => {
|
||||
if last_active_age >= idle_timeout_ms {
|
||||
PresenceState::Unavailable
|
||||
} else {
|
||||
state.clone()
|
||||
}
|
||||
},
|
||||
| PresenceState::Unavailable => {
|
||||
if last_active_age >= offline_timeout_ms {
|
||||
PresenceState::Offline
|
||||
} else {
|
||||
PresenceState::Unavailable
|
||||
}
|
||||
},
|
||||
| PresenceState::Offline => PresenceState::Offline,
|
||||
| _ => state.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
fn state_rank(state: &PresenceState) -> u8 {
|
||||
match state {
|
||||
| PresenceState::Busy => 3,
|
||||
| PresenceState::Online => 2,
|
||||
| PresenceState::Unavailable => 1,
|
||||
| PresenceState::Offline => 0,
|
||||
| _ => 0,
|
||||
}
|
||||
}
|
||||
@@ -43,6 +43,20 @@ impl Data {
|
||||
Ok((count, event))
|
||||
}
|
||||
|
||||
pub(super) async fn get_presence_raw(&self, user_id: &UserId) -> Result<(u64, Presence)> {
|
||||
let count = self
|
||||
.userid_presenceid
|
||||
.get(user_id)
|
||||
.await
|
||||
.deserialized::<u64>()?;
|
||||
|
||||
let key = presenceid_key(count, user_id);
|
||||
let bytes = self.presenceid_presence.get(&key).await?;
|
||||
let presence = Presence::from_json_bytes(&bytes)?;
|
||||
|
||||
Ok((count, presence))
|
||||
}
|
||||
|
||||
pub(super) async fn set_presence(
|
||||
&self,
|
||||
user_id: &UserId,
|
||||
@@ -50,7 +64,7 @@ impl Data {
|
||||
currently_active: Option<bool>,
|
||||
last_active_ago: Option<UInt>,
|
||||
status_msg: Option<String>,
|
||||
) -> Result {
|
||||
) -> Result<Option<u64>> {
|
||||
let last_presence = self.get_presence(user_id).await;
|
||||
let state_changed = match last_presence {
|
||||
| Err(_) => true,
|
||||
@@ -96,7 +110,7 @@ impl Data {
|
||||
"presence spam {user_id:?} last_active_ts:{last_active_ts:?} < \
|
||||
{last_last_active_ts:?}",
|
||||
);
|
||||
return Ok(());
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let status_msg = if status_msg.as_ref().is_some_and(String::is_empty) {
|
||||
@@ -124,7 +138,7 @@ impl Data {
|
||||
self.presenceid_presence.remove(&key);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
Ok(Some(*count))
|
||||
}
|
||||
|
||||
#[inline]
|
||||
|
||||
@@ -1,10 +1,15 @@
|
||||
mod aggregate;
|
||||
mod data;
|
||||
mod presence;
|
||||
|
||||
use std::{collections::HashMap, sync::Arc, time::Duration};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use futures::{Stream, StreamExt, TryFutureExt, future::try_join, stream::FuturesUnordered};
|
||||
use futures::{
|
||||
Stream, StreamExt, TryFutureExt,
|
||||
future::{AbortHandle, Abortable, try_join},
|
||||
stream::FuturesUnordered,
|
||||
};
|
||||
use loole::{Receiver, Sender};
|
||||
use ruma::{
|
||||
DeviceId, OwnedUserId, UInt, UserId, events::presence::PresenceEvent, presence::PresenceState,
|
||||
@@ -17,7 +22,7 @@ use tuwunel_core::{
|
||||
utils::{future::OptionFutureExt, option::OptionExt},
|
||||
};
|
||||
|
||||
use self::{data::Data, presence::Presence};
|
||||
use self::{aggregate::PresenceAggregator, data::Data, presence::Presence};
|
||||
|
||||
pub struct Service {
|
||||
timer_channel: (Sender<TimerType>, Receiver<TimerType>),
|
||||
@@ -27,9 +32,11 @@ pub struct Service {
|
||||
db: Data,
|
||||
services: Arc<crate::services::OnceServices>,
|
||||
last_sync_seen: RwLock<HashMap<OwnedUserId, u64>>,
|
||||
device_presence: PresenceAggregator,
|
||||
}
|
||||
|
||||
type TimerType = (OwnedUserId, Duration);
|
||||
type TimerType = (OwnedUserId, Duration, u64);
|
||||
type TimerFired = (OwnedUserId, u64);
|
||||
|
||||
#[async_trait]
|
||||
impl crate::Service for Service {
|
||||
@@ -45,6 +52,7 @@ impl crate::Service for Service {
|
||||
db: Data::new(args),
|
||||
services: args.services.clone(),
|
||||
last_sync_seen: RwLock::new(HashMap::new()),
|
||||
device_presence: PresenceAggregator::new(),
|
||||
}))
|
||||
}
|
||||
|
||||
@@ -52,23 +60,49 @@ impl crate::Service for Service {
|
||||
// reset dormant online/away statuses to offline, and set the server user as
|
||||
// online
|
||||
self.unset_all_presence().await;
|
||||
self.device_presence.clear().await;
|
||||
_ = self
|
||||
.maybe_ping_presence(&self.services.globals.server_user, None, &PresenceState::Online)
|
||||
.await;
|
||||
|
||||
let receiver = self.timer_channel.1.clone();
|
||||
|
||||
let mut presence_timers = FuturesUnordered::new();
|
||||
let mut presence_timers: FuturesUnordered<_> = FuturesUnordered::new();
|
||||
let mut timer_handles: HashMap<OwnedUserId, (u64, AbortHandle)> = HashMap::new();
|
||||
while !receiver.is_closed() {
|
||||
tokio::select! {
|
||||
Some(user_id) = presence_timers.next() => {
|
||||
self.process_presence_timer(&user_id).await.log_err().ok();
|
||||
Some(result) = presence_timers.next() => {
|
||||
let Ok((user_id, count)) = result else {
|
||||
continue;
|
||||
};
|
||||
|
||||
if let Some((current_count, _)) = timer_handles.get(&user_id) {
|
||||
if *current_count != count {
|
||||
trace!(?user_id, count, current_count, "Skipping stale presence timer");
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
timer_handles.remove(&user_id);
|
||||
self.process_presence_timer(&user_id, count).await.log_err().ok();
|
||||
},
|
||||
event = receiver.recv_async() => match event {
|
||||
Err(_) => break,
|
||||
Ok((user_id, timeout)) => {
|
||||
debug!("Adding timer {}: {user_id} timeout:{timeout:?}", presence_timers.len());
|
||||
presence_timers.push(presence_timer(user_id, timeout));
|
||||
Ok((user_id, timeout, count)) => {
|
||||
debug!(
|
||||
"Adding timer {}: {user_id} timeout:{timeout:?} count:{count}",
|
||||
presence_timers.len()
|
||||
);
|
||||
if let Some((_, handle)) = timer_handles.remove(&user_id) {
|
||||
handle.abort();
|
||||
}
|
||||
|
||||
let (handle, reg) = AbortHandle::new_pair();
|
||||
presence_timers.push(Abortable::new(
|
||||
presence_timer(user_id.clone(), timeout, count),
|
||||
reg,
|
||||
));
|
||||
timer_handles.insert(user_id, (count, handle));
|
||||
},
|
||||
},
|
||||
}
|
||||
@@ -97,6 +131,142 @@ impl crate::Service for Service {
|
||||
}
|
||||
|
||||
impl Service {
|
||||
fn device_key(device_id: Option<&DeviceId>, is_remote: bool) -> aggregate::DeviceKey {
|
||||
if is_remote {
|
||||
return aggregate::DeviceKey::Remote;
|
||||
}
|
||||
|
||||
match device_id {
|
||||
| Some(device_id) => aggregate::DeviceKey::Device(device_id.to_owned()),
|
||||
| None => aggregate::DeviceKey::UnknownLocal,
|
||||
}
|
||||
}
|
||||
|
||||
fn schedule_presence_timer(
|
||||
&self,
|
||||
user_id: &UserId,
|
||||
presence_state: &PresenceState,
|
||||
count: u64,
|
||||
) -> Result {
|
||||
if !(self.timeout_remote_users || self.services.globals.user_is_local(user_id))
|
||||
|| user_id == self.services.globals.server_user
|
||||
{
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let timeout = match presence_state {
|
||||
| PresenceState::Online => self.services.server.config.presence_idle_timeout_s,
|
||||
| _ => self.services.server.config.presence_offline_timeout_s,
|
||||
};
|
||||
|
||||
self.timer_channel
|
||||
.0
|
||||
.send((user_id.to_owned(), Duration::from_secs(timeout), count))
|
||||
.map_err(|e| {
|
||||
error!("Failed to add presence timer: {}", e);
|
||||
Error::bad_database("Failed to add presence timer")
|
||||
})
|
||||
}
|
||||
|
||||
async fn apply_device_presence_update(
|
||||
&self,
|
||||
user_id: &UserId,
|
||||
device_key: aggregate::DeviceKey,
|
||||
state: &PresenceState,
|
||||
currently_active: Option<bool>,
|
||||
last_active_ago: Option<UInt>,
|
||||
status_msg: Option<String>,
|
||||
reason: PresenceUpdateReason,
|
||||
refresh_window_ms: Option<u64>,
|
||||
) -> Result {
|
||||
let now = tuwunel_core::utils::millis_since_unix_epoch();
|
||||
debug!(
|
||||
?user_id,
|
||||
?device_key,
|
||||
?state,
|
||||
currently_active,
|
||||
last_active_ago = last_active_ago.map(u64::from),
|
||||
?reason,
|
||||
"Presence update received"
|
||||
);
|
||||
self.device_presence
|
||||
.update(
|
||||
user_id,
|
||||
device_key,
|
||||
state,
|
||||
currently_active,
|
||||
last_active_ago,
|
||||
status_msg,
|
||||
now,
|
||||
)
|
||||
.await;
|
||||
|
||||
let aggregated = self
|
||||
.device_presence
|
||||
.aggregate(user_id, now, self.idle_timeout, self.offline_timeout)
|
||||
.await;
|
||||
debug!(
|
||||
?user_id,
|
||||
agg_state = ?aggregated.state,
|
||||
agg_currently_active = aggregated.currently_active,
|
||||
agg_last_active_ts = aggregated.last_active_ts,
|
||||
agg_device_count = aggregated.device_count,
|
||||
"Presence aggregate computed"
|
||||
);
|
||||
|
||||
let last_presence = self.db.get_presence(user_id).await;
|
||||
let (last_count, last_event) = match last_presence {
|
||||
| Ok((count, event)) => (Some(count), Some(event)),
|
||||
| Err(_) => (None, None),
|
||||
};
|
||||
|
||||
let state_changed = match &last_event {
|
||||
| Some(event) => event.content.presence != aggregated.state,
|
||||
| None => true,
|
||||
};
|
||||
|
||||
if !state_changed {
|
||||
if let (Some(refresh_ms), Some(event), Some(count)) =
|
||||
(refresh_window_ms, &last_event, last_count)
|
||||
{
|
||||
let last_last_active_ago: u64 = event
|
||||
.content
|
||||
.last_active_ago
|
||||
.unwrap_or_default()
|
||||
.into();
|
||||
if last_last_active_ago < refresh_ms {
|
||||
self.schedule_presence_timer(user_id, &event.content.presence, count)
|
||||
.log_err()
|
||||
.ok();
|
||||
debug!(
|
||||
?user_id,
|
||||
?state,
|
||||
last_last_active_ago,
|
||||
"Skipping presence update: refresh window (timer rescheduled)"
|
||||
);
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let fallback_status = last_event
|
||||
.and_then(|event| event.content.status_msg)
|
||||
.filter(|msg| !msg.is_empty());
|
||||
let status_msg = aggregated.status_msg.or(fallback_status);
|
||||
let last_active_ago =
|
||||
Some(UInt::new_saturating(now.saturating_sub(aggregated.last_active_ts)));
|
||||
|
||||
self.set_presence(
|
||||
user_id,
|
||||
&aggregated.state,
|
||||
Some(aggregated.currently_active),
|
||||
last_active_ago,
|
||||
status_msg,
|
||||
reason,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
/// record that a user has just successfully completed a /sync (or
|
||||
/// equivalent activity)
|
||||
pub async fn note_sync(&self, user_id: &UserId) {
|
||||
@@ -137,28 +307,16 @@ impl Service {
|
||||
device_id: Option<&DeviceId>,
|
||||
new_state: &PresenceState,
|
||||
) -> Result {
|
||||
const REFRESH_TIMEOUT: u64 = 60 * 1000;
|
||||
const REFRESH_TIMEOUT: u64 = 30 * 1000;
|
||||
|
||||
if !self.services.server.config.allow_local_presence || self.services.db.is_read_only() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let last_presence = self.db.get_presence(user_id).await;
|
||||
let state_changed = match last_presence {
|
||||
| Err(_) => true,
|
||||
| Ok((_, ref presence)) => presence.content.presence != *new_state,
|
||||
};
|
||||
|
||||
let last_last_active_ago = match last_presence {
|
||||
| Err(_) => 0_u64,
|
||||
| Ok((_, ref presence)) => presence
|
||||
.content
|
||||
.last_active_ago
|
||||
.unwrap_or_default()
|
||||
.into(),
|
||||
};
|
||||
|
||||
if !state_changed && last_last_active_ago < REFRESH_TIMEOUT {
|
||||
debug!(
|
||||
?user_id,
|
||||
?new_state,
|
||||
allow_local_presence = self.services.server.config.allow_local_presence,
|
||||
read_only = self.services.db.is_read_only(),
|
||||
"Skipping presence ping"
|
||||
);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
@@ -168,19 +326,23 @@ impl Service {
|
||||
.update_device_last_seen(user_id, device_id, None)
|
||||
});
|
||||
|
||||
let status_msg = match last_presence {
|
||||
| Ok((_, ref presence)) => presence.content.status_msg.clone(),
|
||||
| Err(_) => Some(String::new()),
|
||||
};
|
||||
|
||||
let last_active_ago = UInt::new(0);
|
||||
let currently_active = *new_state == PresenceState::Online;
|
||||
let set_presence = self.set_presence(
|
||||
let set_presence = self.apply_device_presence_update(
|
||||
user_id,
|
||||
Self::device_key(device_id, false),
|
||||
new_state,
|
||||
Some(currently_active),
|
||||
last_active_ago,
|
||||
status_msg,
|
||||
UInt::new(0),
|
||||
None,
|
||||
PresenceUpdateReason::Ping,
|
||||
Some(REFRESH_TIMEOUT),
|
||||
);
|
||||
|
||||
debug!(
|
||||
?user_id,
|
||||
?new_state,
|
||||
currently_active,
|
||||
"Presence ping accepted"
|
||||
);
|
||||
|
||||
try_join(set_presence, update_device_seen.unwrap_or(Ok(())))
|
||||
@@ -188,6 +350,52 @@ impl Service {
|
||||
.await
|
||||
}
|
||||
|
||||
/// Applies an explicit presence update for a local device.
|
||||
pub async fn set_presence_for_device(
|
||||
&self,
|
||||
user_id: &UserId,
|
||||
device_id: Option<&DeviceId>,
|
||||
state: &PresenceState,
|
||||
status_msg: Option<String>,
|
||||
reason: PresenceUpdateReason,
|
||||
) -> Result {
|
||||
let currently_active = *state == PresenceState::Online;
|
||||
self.apply_device_presence_update(
|
||||
user_id,
|
||||
Self::device_key(device_id, false),
|
||||
state,
|
||||
Some(currently_active),
|
||||
None,
|
||||
status_msg,
|
||||
reason,
|
||||
None,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Applies a presence update received over federation.
|
||||
pub async fn set_presence_from_federation(
|
||||
&self,
|
||||
user_id: &UserId,
|
||||
state: &PresenceState,
|
||||
currently_active: bool,
|
||||
last_active_ago: UInt,
|
||||
status_msg: Option<String>,
|
||||
reason: PresenceUpdateReason,
|
||||
) -> Result {
|
||||
self.apply_device_presence_update(
|
||||
user_id,
|
||||
Self::device_key(None, true),
|
||||
state,
|
||||
Some(currently_active),
|
||||
Some(last_active_ago),
|
||||
status_msg,
|
||||
reason,
|
||||
None,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Adds a presence event which will be saved until a new event replaces it.
|
||||
pub async fn set_presence(
|
||||
&self,
|
||||
@@ -202,33 +410,63 @@ impl Service {
|
||||
| &_ => state,
|
||||
};
|
||||
|
||||
self.db
|
||||
let count = self
|
||||
.db
|
||||
.set_presence(user_id, presence_state, currently_active, last_active_ago, status_msg)
|
||||
.await?;
|
||||
|
||||
if (self.timeout_remote_users || self.services.globals.user_is_local(user_id))
|
||||
&& user_id != self.services.globals.server_user
|
||||
{
|
||||
let timeout = match presence_state {
|
||||
| PresenceState::Online =>
|
||||
self.services
|
||||
.server
|
||||
.config
|
||||
.presence_idle_timeout_s,
|
||||
| _ =>
|
||||
self.services
|
||||
.server
|
||||
.config
|
||||
.presence_offline_timeout_s,
|
||||
};
|
||||
if let Some(count) = count {
|
||||
if (self.timeout_remote_users || self.services.globals.user_is_local(user_id))
|
||||
&& user_id != self.services.globals.server_user
|
||||
{
|
||||
let timeout = match presence_state {
|
||||
| PresenceState::Online =>
|
||||
self.services
|
||||
.server
|
||||
.config
|
||||
.presence_idle_timeout_s,
|
||||
| _ =>
|
||||
self.services
|
||||
.server
|
||||
.config
|
||||
.presence_offline_timeout_s,
|
||||
};
|
||||
|
||||
self.timer_channel
|
||||
.0
|
||||
.send((user_id.to_owned(), Duration::from_secs(timeout)))
|
||||
.map_err(|e| {
|
||||
error!("Failed to add presence timer: {}", e);
|
||||
Error::bad_database("Failed to add presence timer")
|
||||
})?;
|
||||
let timeout_kind = match presence_state {
|
||||
| PresenceState::Online => "idle_timeout_s",
|
||||
| _ => "offline_timeout_s",
|
||||
};
|
||||
|
||||
debug!(
|
||||
?user_id,
|
||||
?presence_state,
|
||||
currently_active,
|
||||
last_active_ago = last_active_ago.map(u64::from),
|
||||
status_msg = status_msg_log.as_deref(),
|
||||
count,
|
||||
timeout_s = timeout,
|
||||
timeout_kind,
|
||||
timeout_remote_users = self.timeout_remote_users,
|
||||
is_local,
|
||||
is_server_user,
|
||||
"Scheduling presence timer"
|
||||
);
|
||||
|
||||
self.schedule_presence_timer(user_id, presence_state, count)?;
|
||||
} else {
|
||||
debug!(
|
||||
?user_id,
|
||||
?presence_state,
|
||||
currently_active,
|
||||
last_active_ago = last_active_ago.map(u64::from),
|
||||
status_msg = status_msg_log.as_deref(),
|
||||
count,
|
||||
timeout_remote_users = self.timeout_remote_users,
|
||||
is_local,
|
||||
is_server_user,
|
||||
"Presence timer not scheduled"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@@ -318,43 +556,100 @@ impl Service {
|
||||
Ok(event)
|
||||
}
|
||||
|
||||
async fn process_presence_timer(&self, user_id: &OwnedUserId) -> Result {
|
||||
let mut presence_state = PresenceState::Offline;
|
||||
let mut last_active_ago = None;
|
||||
let mut status_msg = None;
|
||||
|
||||
let presence_event = self.get_presence(user_id).await;
|
||||
|
||||
if let Ok(presence_event) = presence_event {
|
||||
presence_state = presence_event.content.presence;
|
||||
last_active_ago = presence_event.content.last_active_ago;
|
||||
status_msg = presence_event.content.status_msg;
|
||||
}
|
||||
|
||||
let new_state = match (&presence_state, last_active_ago.map(u64::from)) {
|
||||
| (PresenceState::Online, Some(ago)) if ago >= self.idle_timeout =>
|
||||
Some(PresenceState::Unavailable),
|
||||
| (PresenceState::Unavailable, Some(ago)) if ago >= self.offline_timeout =>
|
||||
Some(PresenceState::Offline),
|
||||
| _ => None,
|
||||
async fn process_presence_timer(&self, user_id: &OwnedUserId, expected_count: u64) -> Result {
|
||||
let (current_count, presence) = match self.db.get_presence_raw(user_id).await {
|
||||
| Ok(presence) => presence,
|
||||
| Err(_) => return Ok(()),
|
||||
};
|
||||
|
||||
debug!(
|
||||
"Processed presence timer for user '{user_id}': Old state = {presence_state}, New \
|
||||
state = {new_state:?}"
|
||||
);
|
||||
|
||||
if let Some(new_state) = new_state {
|
||||
self.set_presence(user_id, &new_state, Some(false), last_active_ago, status_msg)
|
||||
.await?;
|
||||
if current_count != expected_count {
|
||||
trace!(
|
||||
?user_id,
|
||||
expected_count,
|
||||
current_count,
|
||||
"Skipping stale presence timer"
|
||||
);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let presence_state = presence.state().clone();
|
||||
let now = tuwunel_core::utils::millis_since_unix_epoch();
|
||||
let aggregated = self
|
||||
.device_presence
|
||||
.aggregate(user_id, now, self.idle_timeout, self.offline_timeout)
|
||||
.await;
|
||||
|
||||
if aggregated.device_count == 0 {
|
||||
let last_active_ago =
|
||||
Some(UInt::new_saturating(now.saturating_sub(presence.last_active_ts())));
|
||||
let status_msg = presence.status_msg();
|
||||
let new_state = match (&presence_state, last_active_ago.map(u64::from)) {
|
||||
| (PresenceState::Online, Some(ago)) if ago >= self.idle_timeout =>
|
||||
Some(PresenceState::Unavailable),
|
||||
| (PresenceState::Unavailable, Some(ago)) if ago >= self.offline_timeout =>
|
||||
Some(PresenceState::Offline),
|
||||
| _ => None,
|
||||
};
|
||||
|
||||
debug!(
|
||||
"Processed presence timer for user '{user_id}': Old state = {presence_state}, New \
|
||||
state = {new_state:?}"
|
||||
);
|
||||
|
||||
if let Some(new_state) = new_state {
|
||||
let reason = match new_state {
|
||||
| PresenceState::Unavailable => PresenceUpdateReason::TimerIdle,
|
||||
| PresenceState::Offline => PresenceUpdateReason::TimerOffline,
|
||||
| _ => PresenceUpdateReason::Ping,
|
||||
};
|
||||
|
||||
self.set_presence(
|
||||
user_id,
|
||||
&new_state,
|
||||
Some(false),
|
||||
last_active_ago,
|
||||
status_msg,
|
||||
reason,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
if aggregated.state == presence_state {
|
||||
self.schedule_presence_timer(user_id, &presence_state, current_count)
|
||||
.log_err()
|
||||
.ok();
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let reason = match aggregated.state {
|
||||
| PresenceState::Unavailable => PresenceUpdateReason::TimerIdle,
|
||||
| PresenceState::Offline => PresenceUpdateReason::TimerOffline,
|
||||
| _ => PresenceUpdateReason::Ping,
|
||||
};
|
||||
|
||||
let status_msg = aggregated.status_msg.or_else(|| presence.status_msg());
|
||||
let last_active_ago =
|
||||
Some(UInt::new_saturating(now.saturating_sub(aggregated.last_active_ts)));
|
||||
|
||||
self.set_presence(
|
||||
user_id,
|
||||
&aggregated.state,
|
||||
Some(aggregated.currently_active),
|
||||
last_active_ago,
|
||||
status_msg,
|
||||
reason,
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
async fn presence_timer(user_id: OwnedUserId, timeout: Duration) -> OwnedUserId {
|
||||
async fn presence_timer(user_id: OwnedUserId, timeout: Duration, count: u64) -> TimerFired {
|
||||
sleep(timeout).await;
|
||||
|
||||
user_id
|
||||
(user_id, count)
|
||||
}
|
||||
|
||||
@@ -40,6 +40,15 @@ impl Presence {
|
||||
.map_err(|_| Error::bad_database("Invalid presence data in database"))
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub(super) fn state(&self) -> &PresenceState { &self.state }
|
||||
|
||||
#[inline]
|
||||
pub(super) fn last_active_ts(&self) -> u64 { self.last_active_ts }
|
||||
|
||||
#[inline]
|
||||
pub(super) fn status_msg(&self) -> Option<String> { self.status_msg.clone() }
|
||||
|
||||
/// Creates a PresenceEvent from available data.
|
||||
pub(super) async fn to_presence_event(
|
||||
&self,
|
||||
|
||||
Reference in New Issue
Block a user