feature marked as experimental and executed rustfmt

This commit is contained in:
tototomate123
2025-09-02 17:16:41 +02:00
committed by Jason Volk
parent b5a9884194
commit 6295ca135a
4 changed files with 41 additions and 17 deletions

View File

@@ -134,7 +134,8 @@ pub(crate) async fn sync_events_route(
.log_err() .log_err()
.ok(); .ok();
// Record that this user was actively syncing now (for push suppression heuristic) // Record that this user was actively syncing now (for push suppression
// heuristic)
services.presence.note_sync(sender_user).await; services.presence.note_sync(sender_user).await;
} }

View File

@@ -1288,13 +1288,12 @@ pub struct Config {
#[serde(default = "true_fn")] #[serde(default = "true_fn")]
pub presence_timeout_remote_users: bool, pub presence_timeout_remote_users: bool,
/// Suppresses push notifications for users marked as active. (Experimental)
/// Suppresses push notifications for users marked as active.
/// ///
/// when enabled, users with `Online` presence and recent activity /// When enabled, users with `Online` presence and recent activity
/// (based on presence state and sync activity) wont receive push /// (based on presence state and sync activity) wont receive push
/// notifications, reducing duplicate alerts while they're active /// notifications, reducing duplicate alerts while they're active
/// elsewhere. /// on another client.
/// ///
/// Disabled by default to preserve legacy behavior. /// Disabled by default to preserve legacy behavior.
#[serde(default)] #[serde(default)]

View File

@@ -2,13 +2,12 @@ mod data;
mod presence; mod presence;
use std::{collections::HashMap, sync::Arc, time::Duration}; use std::{collections::HashMap, sync::Arc, time::Duration};
use tokio::sync::RwLock;
use async_trait::async_trait; use async_trait::async_trait;
use futures::{Stream, StreamExt, TryFutureExt, stream::FuturesUnordered}; use futures::{Stream, StreamExt, TryFutureExt, stream::FuturesUnordered};
use loole::{Receiver, Sender}; use loole::{Receiver, Sender};
use ruma::{OwnedUserId, UInt, UserId, events::presence::PresenceEvent, presence::PresenceState}; use ruma::{OwnedUserId, UInt, UserId, events::presence::PresenceEvent, presence::PresenceState};
use tokio::time::sleep; use tokio::{sync::RwLock, time::sleep};
use tuwunel_core::{Error, Result, checked, debug, debug_warn, error, result::LogErr, trace}; use tuwunel_core::{Error, Result, checked, debug, debug_warn, error, result::LogErr, trace};
use self::{data::Data, presence::Presence}; use self::{data::Data, presence::Presence};
@@ -91,10 +90,14 @@ impl crate::Service for Service {
} }
impl Service { impl Service {
/// record that a user has just successfully completed a /sync (or equivalent activity) /// record that a user has just successfully completed a /sync (or
/// equivalent activity)
pub async fn note_sync(&self, user_id: &UserId) { pub async fn note_sync(&self, user_id: &UserId) {
let now = tuwunel_core::utils::millis_since_unix_epoch(); let now = tuwunel_core::utils::millis_since_unix_epoch();
self.last_sync_seen.write().await.insert(user_id.to_owned(), now); self.last_sync_seen
.write()
.await
.insert(user_id.to_owned(), now);
} }
/// Returns milliseconds since last observed sync for user (if any) /// Returns milliseconds since last observed sync for user (if any)
@@ -106,6 +109,7 @@ impl Service {
.get(user_id) .get(user_id)
.map(|ts| now.saturating_sub(*ts)) .map(|ts| now.saturating_sub(*ts))
} }
/// Returns the latest presence event for the given user. /// Returns the latest presence event for the given user.
pub async fn get_presence(&self, user_id: &UserId) -> Result<PresenceEvent> { pub async fn get_presence(&self, user_id: &UserId) -> Result<PresenceEvent> {
self.db self.db

View File

@@ -19,13 +19,14 @@ use ruma::{
MilliSecondsSinceUnixEpoch, OwnedRoomId, OwnedServerName, OwnedUserId, RoomId, ServerName, MilliSecondsSinceUnixEpoch, OwnedRoomId, OwnedServerName, OwnedUserId, RoomId, ServerName,
UInt, UInt,
api::{ api::{
appservice::event::push_events::v1::EphemeralData, federation::transactions::{ appservice::event::push_events::v1::EphemeralData,
federation::transactions::{
edu::{ edu::{
DeviceListUpdateContent, Edu, PresenceContent, PresenceUpdate, ReceiptContent, DeviceListUpdateContent, Edu, PresenceContent, PresenceUpdate, ReceiptContent,
ReceiptData, ReceiptMap, ReceiptData, ReceiptMap,
}, },
send_transaction_message, send_transaction_message,
} },
}, },
device_id, device_id,
events::{ events::{
@@ -833,21 +834,40 @@ impl Service {
continue; continue;
} }
// optional suppression: heuristic combining presence age and recent sync activity. // optional suppression: heuristic combining presence age and recent sync
if self.services.server.config.suppress_push_when_active { // activity.
if let Ok(presence) = self.services.presence.get_presence(&user_id).await { if self
let is_online = presence.content.presence == ruma::presence::PresenceState::Online; .services
.server
.config
.suppress_push_when_active
{
if let Ok(presence) = self
.services
.presence
.get_presence(&user_id)
.await
{
let is_online =
presence.content.presence == ruma::presence::PresenceState::Online;
let presence_age_ms = presence let presence_age_ms = presence
.content .content
.last_active_ago .last_active_ago
.map(u64::from) .map(u64::from)
.unwrap_or(u64::MAX); .unwrap_or(u64::MAX);
let sync_gap_ms = self.services.presence.last_sync_gap_ms(&user_id).await; let sync_gap_ms = self
.services
.presence
.last_sync_gap_ms(&user_id)
.await;
let considered_active = is_online let considered_active = is_online
&& presence_age_ms < 65_000 && presence_age_ms < 65_000
&& sync_gap_ms.is_some_and(|gap| gap < 32_000); && sync_gap_ms.is_some_and(|gap| gap < 32_000);
if considered_active { if considered_active {
trace!(?user_id, presence_age_ms, sync_gap_ms, "suppressing push: active heuristic"); trace!(
?user_id,
presence_age_ms, sync_gap_ms, "suppressing push: active heuristic"
);
continue; continue;
} }
} }