presence refactor
This commit is contained in:
@@ -46,13 +46,11 @@ pub(crate) async fn set_displayname_route(
|
|||||||
.update_displayname(&body.user_id, body.displayname.clone(), &all_joined_rooms)
|
.update_displayname(&body.user_id, body.displayname.clone(), &all_joined_rooms)
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
if services.config.allow_local_presence {
|
// Presence update
|
||||||
// Presence update
|
services
|
||||||
services
|
.presence
|
||||||
.presence
|
.maybe_ping_presence(&body.user_id, &PresenceState::Online)
|
||||||
.ping_presence(&body.user_id, &PresenceState::Online)
|
.await?;
|
||||||
.await?;
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(set_display_name::v3::Response {})
|
Ok(set_display_name::v3::Response {})
|
||||||
}
|
}
|
||||||
@@ -148,14 +146,12 @@ pub(crate) async fn set_avatar_url_route(
|
|||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
if services.config.allow_local_presence {
|
// Presence update
|
||||||
// Presence update
|
services
|
||||||
services
|
.presence
|
||||||
.presence
|
.maybe_ping_presence(&body.user_id, &PresenceState::Online)
|
||||||
.ping_presence(&body.user_id, &PresenceState::Online)
|
.await
|
||||||
.await
|
.ok();
|
||||||
.ok();
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(set_avatar_url::v3::Response {})
|
Ok(set_avatar_url::v3::Response {})
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -49,12 +49,10 @@ pub(crate) async fn set_read_marker_route(
|
|||||||
}
|
}
|
||||||
|
|
||||||
if let Some(event) = &body.read_receipt {
|
if let Some(event) = &body.read_receipt {
|
||||||
if services.config.allow_local_presence {
|
services
|
||||||
services
|
.presence
|
||||||
.presence
|
.maybe_ping_presence(sender_user, &ruma::presence::PresenceState::Online)
|
||||||
.ping_presence(sender_user, &ruma::presence::PresenceState::Online)
|
.await?;
|
||||||
.await?;
|
|
||||||
}
|
|
||||||
|
|
||||||
let receipt_content = BTreeMap::from_iter([(
|
let receipt_content = BTreeMap::from_iter([(
|
||||||
event.to_owned(),
|
event.to_owned(),
|
||||||
@@ -137,12 +135,10 @@ pub(crate) async fn create_receipt_route(
|
|||||||
.await?;
|
.await?;
|
||||||
},
|
},
|
||||||
| create_receipt::v3::ReceiptType::Read => {
|
| create_receipt::v3::ReceiptType::Read => {
|
||||||
if services.config.allow_local_presence {
|
services
|
||||||
services
|
.presence
|
||||||
.presence
|
.maybe_ping_presence(sender_user, &ruma::presence::PresenceState::Online)
|
||||||
.ping_presence(sender_user, &ruma::presence::PresenceState::Online)
|
.await?;
|
||||||
.await?;
|
|
||||||
}
|
|
||||||
|
|
||||||
let receipt_content = BTreeMap::from_iter([(
|
let receipt_content = BTreeMap::from_iter([(
|
||||||
body.event_id.clone(),
|
body.event_id.clone(),
|
||||||
|
|||||||
@@ -125,20 +125,15 @@ pub(crate) async fn sync_events_route(
|
|||||||
) -> Result<sync_events::v3::Response, RumaResponse<UiaaResponse>> {
|
) -> Result<sync_events::v3::Response, RumaResponse<UiaaResponse>> {
|
||||||
let (sender_user, sender_device) = body.sender();
|
let (sender_user, sender_device) = body.sender();
|
||||||
|
|
||||||
// Presence update
|
services
|
||||||
if services.config.allow_local_presence {
|
.presence
|
||||||
services
|
.maybe_ping_presence(sender_user, &body.body.set_presence)
|
||||||
.presence
|
.await
|
||||||
.ping_presence(sender_user, &body.body.set_presence)
|
.log_err()
|
||||||
.await
|
.ok();
|
||||||
.log_err()
|
|
||||||
.ok();
|
|
||||||
|
|
||||||
// Record user as actively syncing for push suppression heuristic.
|
// Record user as actively syncing for push suppression heuristic.
|
||||||
if services.config.suppress_push_when_active {
|
services.presence.note_sync(sender_user).await;
|
||||||
services.presence.note_sync(sender_user).await;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut since = body
|
let mut since = body
|
||||||
.body
|
.body
|
||||||
|
|||||||
@@ -28,7 +28,7 @@ pub(crate) async fn create_typing_event_route(
|
|||||||
|
|
||||||
match body.state {
|
match body.state {
|
||||||
| Typing::Yes(duration) => {
|
| Typing::Yes(duration) => {
|
||||||
let duration = utils::clamp(
|
let duration = Ord::clamp(
|
||||||
duration
|
duration
|
||||||
.as_millis()
|
.as_millis()
|
||||||
.try_into()
|
.try_into()
|
||||||
@@ -64,12 +64,10 @@ pub(crate) async fn create_typing_event_route(
|
|||||||
}
|
}
|
||||||
|
|
||||||
// ping presence
|
// ping presence
|
||||||
if services.config.allow_local_presence {
|
services
|
||||||
services
|
.presence
|
||||||
.presence
|
.maybe_ping_presence(&body.user_id, &ruma::presence::PresenceState::Online)
|
||||||
.ping_presence(&body.user_id, &ruma::presence::PresenceState::Online)
|
.await?;
|
||||||
.await?;
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(create_typing_event::v3::Response {})
|
Ok(create_typing_event::v3::Response {})
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -73,13 +73,11 @@ pub(crate) async fn delete_timezone_key_route(
|
|||||||
|
|
||||||
services.users.set_timezone(&body.user_id, None);
|
services.users.set_timezone(&body.user_id, None);
|
||||||
|
|
||||||
if services.config.allow_local_presence {
|
// Presence update
|
||||||
// Presence update
|
services
|
||||||
services
|
.presence
|
||||||
.presence
|
.maybe_ping_presence(&body.user_id, &PresenceState::Online)
|
||||||
.ping_presence(&body.user_id, &PresenceState::Online)
|
.await?;
|
||||||
.await?;
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(delete_timezone_key::unstable::Response {})
|
Ok(delete_timezone_key::unstable::Response {})
|
||||||
}
|
}
|
||||||
@@ -103,13 +101,11 @@ pub(crate) async fn set_timezone_key_route(
|
|||||||
.users
|
.users
|
||||||
.set_timezone(&body.user_id, body.tz.clone());
|
.set_timezone(&body.user_id, body.tz.clone());
|
||||||
|
|
||||||
if services.config.allow_local_presence {
|
// Presence update
|
||||||
// Presence update
|
services
|
||||||
services
|
.presence
|
||||||
.presence
|
.maybe_ping_presence(&body.user_id, &PresenceState::Online)
|
||||||
.ping_presence(&body.user_id, &PresenceState::Online)
|
.await?;
|
||||||
.await?;
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(set_timezone_key::unstable::Response {})
|
Ok(set_timezone_key::unstable::Response {})
|
||||||
}
|
}
|
||||||
@@ -171,13 +167,11 @@ pub(crate) async fn set_profile_field_route(
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
if services.config.allow_local_presence {
|
// Presence update
|
||||||
// Presence update
|
services
|
||||||
services
|
.presence
|
||||||
.presence
|
.maybe_ping_presence(&body.user_id, &PresenceState::Online)
|
||||||
.ping_presence(&body.user_id, &PresenceState::Online)
|
.await?;
|
||||||
.await?;
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(set_profile_field::v3::Response {})
|
Ok(set_profile_field::v3::Response {})
|
||||||
}
|
}
|
||||||
@@ -227,13 +221,11 @@ pub(crate) async fn delete_profile_field_route(
|
|||||||
.set_profile_key(&body.user_id, body.field.as_str(), None);
|
.set_profile_key(&body.user_id, body.field.as_str(), None);
|
||||||
}
|
}
|
||||||
|
|
||||||
if services.config.allow_local_presence {
|
// Presence update
|
||||||
// Presence update
|
services
|
||||||
services
|
.presence
|
||||||
.presence
|
.maybe_ping_presence(&body.user_id, &PresenceState::Online)
|
||||||
.ping_presence(&body.user_id, &PresenceState::Online)
|
.await?;
|
||||||
.await?;
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(delete_profile_field::v3::Response {})
|
Ok(delete_profile_field::v3::Response {})
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -44,12 +44,10 @@ impl crate::Service for Service {
|
|||||||
async fn worker(self: Arc<Self>) -> Result {
|
async fn worker(self: Arc<Self>) -> Result {
|
||||||
// reset dormant online/away statuses to offline, and set the server user as
|
// reset dormant online/away statuses to offline, and set the server user as
|
||||||
// online
|
// online
|
||||||
if self.services.server.config.allow_local_presence && !self.services.db.is_read_only() {
|
self.unset_all_presence().await;
|
||||||
self.unset_all_presence().await;
|
_ = self
|
||||||
_ = self
|
.maybe_ping_presence(&self.services.globals.server_user, &PresenceState::Online)
|
||||||
.ping_presence(&self.services.globals.server_user, &PresenceState::Online)
|
.await;
|
||||||
.await;
|
|
||||||
}
|
|
||||||
|
|
||||||
let receiver = self.timer_channel.1.clone();
|
let receiver = self.timer_channel.1.clone();
|
||||||
|
|
||||||
@@ -74,11 +72,9 @@ impl crate::Service for Service {
|
|||||||
|
|
||||||
async fn interrupt(&self) {
|
async fn interrupt(&self) {
|
||||||
// set the server user as offline
|
// set the server user as offline
|
||||||
if self.services.server.config.allow_local_presence && !self.services.db.is_read_only() {
|
_ = self
|
||||||
_ = self
|
.maybe_ping_presence(&self.services.globals.server_user, &PresenceState::Offline)
|
||||||
.ping_presence(&self.services.globals.server_user, &PresenceState::Offline)
|
.await;
|
||||||
.await;
|
|
||||||
}
|
|
||||||
|
|
||||||
let (timer_sender, _) = &self.timer_channel;
|
let (timer_sender, _) = &self.timer_channel;
|
||||||
if !timer_sender.is_closed() {
|
if !timer_sender.is_closed() {
|
||||||
@@ -93,6 +89,10 @@ impl Service {
|
|||||||
/// record that a user has just successfully completed a /sync (or
|
/// record that a user has just successfully completed a /sync (or
|
||||||
/// equivalent activity)
|
/// equivalent activity)
|
||||||
pub async fn note_sync(&self, user_id: &UserId) {
|
pub async fn note_sync(&self, user_id: &UserId) {
|
||||||
|
if !self.services.config.suppress_push_when_active {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
let now = tuwunel_core::utils::millis_since_unix_epoch();
|
let now = tuwunel_core::utils::millis_since_unix_epoch();
|
||||||
self.last_sync_seen
|
self.last_sync_seen
|
||||||
.write()
|
.write()
|
||||||
@@ -120,9 +120,17 @@ impl Service {
|
|||||||
|
|
||||||
/// Pings the presence of the given user in the given room, setting the
|
/// Pings the presence of the given user in the given room, setting the
|
||||||
/// specified state.
|
/// specified state.
|
||||||
pub async fn ping_presence(&self, user_id: &UserId, new_state: &PresenceState) -> Result {
|
pub async fn maybe_ping_presence(
|
||||||
|
&self,
|
||||||
|
user_id: &UserId,
|
||||||
|
new_state: &PresenceState,
|
||||||
|
) -> Result {
|
||||||
const REFRESH_TIMEOUT: u64 = 60 * 1000;
|
const REFRESH_TIMEOUT: u64 = 60 * 1000;
|
||||||
|
|
||||||
|
if !self.services.server.config.allow_local_presence || self.services.db.is_read_only() {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
let last_presence = self.db.get_presence(user_id).await;
|
let last_presence = self.db.get_presence(user_id).await;
|
||||||
let state_changed = match last_presence {
|
let state_changed = match last_presence {
|
||||||
| Err(_) => true,
|
| Err(_) => true,
|
||||||
@@ -208,7 +216,11 @@ impl Service {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Unset online/unavailable presence to offline on startup
|
// Unset online/unavailable presence to offline on startup
|
||||||
pub async fn unset_all_presence(&self) {
|
async fn unset_all_presence(&self) {
|
||||||
|
if !self.services.server.config.allow_local_presence || self.services.db.is_read_only() {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
let _cork = self.services.db.cork();
|
let _cork = self.services.db.cork();
|
||||||
|
|
||||||
for user_id in &self
|
for user_id in &self
|
||||||
|
|||||||
Reference in New Issue
Block a user