diff --git a/src/service/pusher/append.rs b/src/service/pusher/append.rs new file mode 100644 index 00000000..7689965b --- /dev/null +++ b/src/service/pusher/append.rs @@ -0,0 +1,154 @@ +use std::{collections::HashSet, sync::Arc}; + +use futures::StreamExt; +use ruma::{ + OwnedUserId, RoomId, UserId, + events::{GlobalAccountDataEventType, TimelineEventType, push_rules::PushRulesEvent}, + push::{Action, Ruleset, Tweak}, +}; +use tuwunel_core::{ + Result, implement, + matrix::{ + event::Event, + pdu::{Pdu, RawPduId}, + }, + utils::{self, ReadyExt}, +}; +use tuwunel_database::Map; + +/// Called by timeline append_pdu. +#[implement(super::Service)] +#[tracing::instrument(name = "append", level = "debug", skip_all)] +pub(crate) async fn append_pdu(&self, pdu_id: RawPduId, pdu: &Pdu) -> Result { + // Don't notify the sender of their own events, and dont send from ignored users + let mut push_target: HashSet<_> = self + .services + .state_cache + .active_local_users_in_room(pdu.room_id()) + .map(ToOwned::to_owned) + .ready_filter(|user| *user != pdu.sender()) + .filter_map(async |recipient_user| { + self.services + .users + .user_is_ignored(pdu.sender(), &recipient_user) + .await + .eq(&false) + .then_some(recipient_user) + }) + .collect() + .await; + + let mut notifies = Vec::with_capacity(push_target.len().saturating_add(1)); + let mut highlights = Vec::with_capacity(push_target.len().saturating_add(1)); + + if *pdu.kind() == TimelineEventType::RoomMember { + if let Some(state_key) = pdu.state_key() { + let target_user_id = UserId::parse(state_key)?; + + if self + .services + .users + .is_active_local(target_user_id) + .await + { + push_target.insert(target_user_id.to_owned()); + } + } + } + + let serialized = pdu.to_format(); + for user in &push_target { + let rules_for_user = self + .services + .account_data + .get_global(user, GlobalAccountDataEventType::PushRules) + .await + .map_or_else( + |_| Ruleset::server_default(user), + |ev: PushRulesEvent| ev.content.global, + ); + + let mut highlight = false; + let mut notify = false; + + let power_levels = self + .services + .state_accessor + .get_power_levels(pdu.room_id()) + .await?; + + for action in self + .services + .pusher + .get_actions(user, &rules_for_user, &power_levels, &serialized, pdu.room_id()) + .await + { + match action { + | Action::Notify => notify = true, + | Action::SetTweak(Tweak::Highlight(true)) => { + highlight = true; + }, + | _ => {}, + } + + // Break early if both conditions are true + if notify && highlight { + break; + } + } + + if notify { + notifies.push(user.clone()); + } + + if highlight { + highlights.push(user.clone()); + } + + self.services + .pusher + .get_pushkeys(user) + .ready_for_each(|push_key| { + self.services + .sending + .send_pdu_push(&pdu_id, user, push_key.to_owned()) + .expect("TODO: replace with future"); + }) + .await; + } + + self.increment_notification_counts(pdu.room_id(), notifies, highlights); + + Ok(()) +} + +#[implement(super::Service)] +fn increment_notification_counts( + &self, + room_id: &RoomId, + notifies: Vec, + highlights: Vec, +) { + let _cork = self.db.db.cork(); + + for user in notifies { + let mut userroom_id = user.as_bytes().to_vec(); + userroom_id.push(0xFF); + userroom_id.extend_from_slice(room_id.as_bytes()); + increment(&self.db.userroomid_notificationcount, &userroom_id); + } + + for user in highlights { + let mut userroom_id = user.as_bytes().to_vec(); + userroom_id.push(0xFF); + userroom_id.extend_from_slice(room_id.as_bytes()); + increment(&self.db.userroomid_highlightcount, &userroom_id); + } +} + +//TODO: this is an ABA +fn increment(db: &Arc, key: &[u8]) { + let old = db.get_blocking(key); + let new = utils::increment(old.ok().as_deref()); + db.insert(key, new); +} diff --git a/src/service/pusher/mod.rs b/src/service/pusher/mod.rs index 8fc8dba8..a0946ba3 100644 --- a/src/service/pusher/mod.rs +++ b/src/service/pusher/mod.rs @@ -1,3 +1,5 @@ +mod append; + use std::{fmt::Debug, mem, sync::Arc}; use bytes::BytesMut; @@ -30,7 +32,7 @@ use tuwunel_core::{ }, warn, }; -use tuwunel_database::{Deserialized, Ignore, Interfix, Json, Map}; +use tuwunel_database::{Database, Deserialized, Ignore, Interfix, Json, Map}; pub struct Service { db: Data, @@ -40,6 +42,9 @@ pub struct Service { struct Data { senderkey_pusher: Arc, pushkey_deviceid: Arc, + userroomid_highlightcount: Arc, + userroomid_notificationcount: Arc, + db: Arc, } impl crate::Service for Service { @@ -48,6 +53,9 @@ impl crate::Service for Service { db: Data { senderkey_pusher: args.db["senderkey_pusher"].clone(), pushkey_deviceid: args.db["pushkey_deviceid"].clone(), + userroomid_highlightcount: args.db["userroomid_highlightcount"].clone(), + userroomid_notificationcount: args.db["userroomid_notificationcount"].clone(), + db: args.db.clone(), }, services: args.services.clone(), })) diff --git a/src/service/rooms/timeline/append.rs b/src/service/rooms/timeline/append.rs index bbe5a870..cba6a3ac 100644 --- a/src/service/rooms/timeline/append.rs +++ b/src/service/rooms/timeline/append.rs @@ -1,21 +1,15 @@ -use std::{ - collections::{BTreeMap, HashSet}, - sync::Arc, -}; +use std::{collections::BTreeMap, sync::Arc}; -use futures::StreamExt; use ruma::{ - CanonicalJsonObject, CanonicalJsonValue, EventId, OwnedUserId, RoomId, RoomVersionId, UserId, + CanonicalJsonObject, CanonicalJsonValue, EventId, RoomVersionId, UserId, events::{ - GlobalAccountDataEventType, TimelineEventType, - push_rules::PushRulesEvent, + TimelineEventType, room::{ encrypted::Relation, member::{MembershipState, RoomMemberEventContent}, redaction::RoomRedactionEventContent, }, }, - push::{Action, Ruleset, Tweak}, }; use tuwunel_core::{ Result, err, error, implement, @@ -23,12 +17,12 @@ use tuwunel_core::{ event::Event, pdu::{PduCount, PduEvent, PduId, RawPduId}, }, - utils::{self, ReadyExt, result::LogErr}, + utils::{self, result::LogErr}, }; -use tuwunel_database::{Json, Map}; +use tuwunel_database::Json; use super::{ExtractBody, ExtractRelatesTo, ExtractRelatesToEventId, RoomMutexGuard}; -use crate::rooms::state_compressor::CompressedState; +use crate::rooms::{short::ShortRoomId, state_compressor::CompressedState}; /// Append the incoming event setting the state snapshot to the state from /// the server that sent the event. @@ -185,105 +179,37 @@ where drop(insert_lock); - // Don't notify the sender of their own events, and dont send from ignored users - let mut push_target: HashSet<_> = self - .services - .state_cache - .active_local_users_in_room(pdu.room_id()) - .map(ToOwned::to_owned) - .ready_filter(|user| *user != pdu.sender()) - .filter_map(async |recipient_user| { - self.services - .users - .user_is_ignored(pdu.sender(), &recipient_user) - .await - .eq(&false) - .then_some(recipient_user) - }) - .collect() - .await; + self.services + .pusher + .append_pdu(pdu_id, pdu) + .await + .log_err() + .ok(); - let mut notifies = Vec::with_capacity(push_target.len().saturating_add(1)); - let mut highlights = Vec::with_capacity(push_target.len().saturating_add(1)); + self.append_pdu_effects(pdu_id, pdu, shortroomid, count) + .await?; - if *pdu.kind() == TimelineEventType::RoomMember { - if let Some(state_key) = pdu.state_key() { - let target_user_id = UserId::parse(state_key)?; + drop(next_count1); + drop(next_count2); - if self - .services - .users - .is_active_local(target_user_id) - .await - { - push_target.insert(target_user_id.to_owned()); - } - } - } + self.services + .appservice + .append_pdu(pdu_id, pdu) + .await + .log_err() + .ok(); - let serialized = pdu.to_format(); - for user in &push_target { - let rules_for_user = self - .services - .account_data - .get_global(user, GlobalAccountDataEventType::PushRules) - .await - .map_or_else( - |_| Ruleset::server_default(user), - |ev: PushRulesEvent| ev.content.global, - ); - - let mut highlight = false; - let mut notify = false; - - let power_levels = self - .services - .state_accessor - .get_power_levels(pdu.room_id()) - .await?; - - for action in self - .services - .pusher - .get_actions(user, &rules_for_user, &power_levels, &serialized, pdu.room_id()) - .await - { - match action { - | Action::Notify => notify = true, - | Action::SetTweak(Tweak::Highlight(true)) => { - highlight = true; - }, - | _ => {}, - } - - // Break early if both conditions are true - if notify && highlight { - break; - } - } - - if notify { - notifies.push(user.clone()); - } - - if highlight { - highlights.push(user.clone()); - } - - self.services - .pusher - .get_pushkeys(user) - .ready_for_each(|push_key| { - self.services - .sending - .send_pdu_push(&pdu_id, user, push_key.to_owned()) - .expect("TODO: replace with future"); - }) - .await; - } - - self.increment_notification_counts(pdu.room_id(), notifies, highlights); + Ok(pdu_id) +} +#[implement(super::Service)] +async fn append_pdu_effects( + &self, + pdu_id: RawPduId, + pdu: &PduEvent, + shortroomid: ShortRoomId, + count: PduCount, +) -> Result { match *pdu.kind() { | TimelineEventType::RoomRedaction => { use RoomVersionId::*; @@ -423,17 +349,7 @@ where } } - drop(next_count1); - drop(next_count2); - - self.services - .appservice - .append_pdu(pdu_id, pdu) - .await - .log_err() - .ok(); - - Ok(pdu_id) + Ok(()) } #[implement(super::Service)] @@ -456,34 +372,3 @@ fn append_pdu_json( .eventid_outlierpdu .remove(pdu.event_id.as_bytes()); } - -#[implement(super::Service)] -fn increment_notification_counts( - &self, - room_id: &RoomId, - notifies: Vec, - highlights: Vec, -) { - let _cork = self.db.db.cork(); - - for user in notifies { - let mut userroom_id = user.as_bytes().to_vec(); - userroom_id.push(0xFF); - userroom_id.extend_from_slice(room_id.as_bytes()); - increment(&self.db.userroomid_notificationcount, &userroom_id); - } - - for user in highlights { - let mut userroom_id = user.as_bytes().to_vec(); - userroom_id.push(0xFF); - userroom_id.extend_from_slice(room_id.as_bytes()); - increment(&self.db.userroomid_highlightcount, &userroom_id); - } -} - -//TODO: this is an ABA -fn increment(db: &Arc, key: &[u8]) { - let old = db.get_blocking(key); - let new = utils::increment(old.ok().as_deref()); - db.insert(key, new); -} diff --git a/src/service/rooms/timeline/mod.rs b/src/service/rooms/timeline/mod.rs index b7644296..959f50bf 100644 --- a/src/service/rooms/timeline/mod.rs +++ b/src/service/rooms/timeline/mod.rs @@ -49,8 +49,6 @@ struct Data { eventid_outlierpdu: Arc, eventid_pduid: Arc, pduid_pdu: Arc, - userroomid_highlightcount: Arc, - userroomid_notificationcount: Arc, db: Arc, } @@ -89,8 +87,6 @@ impl crate::Service for Service { eventid_outlierpdu: args.db["eventid_outlierpdu"].clone(), eventid_pduid: args.db["eventid_pduid"].clone(), pduid_pdu: args.db["pduid_pdu"].clone(), - userroomid_highlightcount: args.db["userroomid_highlightcount"].clone(), - userroomid_notificationcount: args.db["userroomid_notificationcount"].clone(), db: args.db.clone(), }, mutex_insert: RoomMutexMap::new(),