Split pusher/push-rule from timeline append.

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk
2025-10-26 18:32:25 +00:00
parent 4b74c01895
commit 9abe9becd6
4 changed files with 196 additions and 153 deletions

View File

@@ -0,0 +1,154 @@
use std::{collections::HashSet, sync::Arc};
use futures::StreamExt;
use ruma::{
OwnedUserId, RoomId, UserId,
events::{GlobalAccountDataEventType, TimelineEventType, push_rules::PushRulesEvent},
push::{Action, Ruleset, Tweak},
};
use tuwunel_core::{
Result, implement,
matrix::{
event::Event,
pdu::{Pdu, RawPduId},
},
utils::{self, ReadyExt},
};
use tuwunel_database::Map;
/// Called by timeline append_pdu.
#[implement(super::Service)]
#[tracing::instrument(name = "append", level = "debug", skip_all)]
pub(crate) async fn append_pdu(&self, pdu_id: RawPduId, pdu: &Pdu) -> Result {
// Don't notify the sender of their own events, and dont send from ignored users
let mut push_target: HashSet<_> = self
.services
.state_cache
.active_local_users_in_room(pdu.room_id())
.map(ToOwned::to_owned)
.ready_filter(|user| *user != pdu.sender())
.filter_map(async |recipient_user| {
self.services
.users
.user_is_ignored(pdu.sender(), &recipient_user)
.await
.eq(&false)
.then_some(recipient_user)
})
.collect()
.await;
let mut notifies = Vec::with_capacity(push_target.len().saturating_add(1));
let mut highlights = Vec::with_capacity(push_target.len().saturating_add(1));
if *pdu.kind() == TimelineEventType::RoomMember {
if let Some(state_key) = pdu.state_key() {
let target_user_id = UserId::parse(state_key)?;
if self
.services
.users
.is_active_local(target_user_id)
.await
{
push_target.insert(target_user_id.to_owned());
}
}
}
let serialized = pdu.to_format();
for user in &push_target {
let rules_for_user = self
.services
.account_data
.get_global(user, GlobalAccountDataEventType::PushRules)
.await
.map_or_else(
|_| Ruleset::server_default(user),
|ev: PushRulesEvent| ev.content.global,
);
let mut highlight = false;
let mut notify = false;
let power_levels = self
.services
.state_accessor
.get_power_levels(pdu.room_id())
.await?;
for action in self
.services
.pusher
.get_actions(user, &rules_for_user, &power_levels, &serialized, pdu.room_id())
.await
{
match action {
| Action::Notify => notify = true,
| Action::SetTweak(Tweak::Highlight(true)) => {
highlight = true;
},
| _ => {},
}
// Break early if both conditions are true
if notify && highlight {
break;
}
}
if notify {
notifies.push(user.clone());
}
if highlight {
highlights.push(user.clone());
}
self.services
.pusher
.get_pushkeys(user)
.ready_for_each(|push_key| {
self.services
.sending
.send_pdu_push(&pdu_id, user, push_key.to_owned())
.expect("TODO: replace with future");
})
.await;
}
self.increment_notification_counts(pdu.room_id(), notifies, highlights);
Ok(())
}
#[implement(super::Service)]
fn increment_notification_counts(
&self,
room_id: &RoomId,
notifies: Vec<OwnedUserId>,
highlights: Vec<OwnedUserId>,
) {
let _cork = self.db.db.cork();
for user in notifies {
let mut userroom_id = user.as_bytes().to_vec();
userroom_id.push(0xFF);
userroom_id.extend_from_slice(room_id.as_bytes());
increment(&self.db.userroomid_notificationcount, &userroom_id);
}
for user in highlights {
let mut userroom_id = user.as_bytes().to_vec();
userroom_id.push(0xFF);
userroom_id.extend_from_slice(room_id.as_bytes());
increment(&self.db.userroomid_highlightcount, &userroom_id);
}
}
//TODO: this is an ABA
fn increment(db: &Arc<Map>, key: &[u8]) {
let old = db.get_blocking(key);
let new = utils::increment(old.ok().as_deref());
db.insert(key, new);
}

View File

@@ -1,3 +1,5 @@
mod append;
use std::{fmt::Debug, mem, sync::Arc};
use bytes::BytesMut;
@@ -30,7 +32,7 @@ use tuwunel_core::{
},
warn,
};
use tuwunel_database::{Deserialized, Ignore, Interfix, Json, Map};
use tuwunel_database::{Database, Deserialized, Ignore, Interfix, Json, Map};
pub struct Service {
db: Data,
@@ -40,6 +42,9 @@ pub struct Service {
struct Data {
senderkey_pusher: Arc<Map>,
pushkey_deviceid: Arc<Map>,
userroomid_highlightcount: Arc<Map>,
userroomid_notificationcount: Arc<Map>,
db: Arc<Database>,
}
impl crate::Service for Service {
@@ -48,6 +53,9 @@ impl crate::Service for Service {
db: Data {
senderkey_pusher: args.db["senderkey_pusher"].clone(),
pushkey_deviceid: args.db["pushkey_deviceid"].clone(),
userroomid_highlightcount: args.db["userroomid_highlightcount"].clone(),
userroomid_notificationcount: args.db["userroomid_notificationcount"].clone(),
db: args.db.clone(),
},
services: args.services.clone(),
}))

View File

@@ -1,21 +1,15 @@
use std::{
collections::{BTreeMap, HashSet},
sync::Arc,
};
use std::{collections::BTreeMap, sync::Arc};
use futures::StreamExt;
use ruma::{
CanonicalJsonObject, CanonicalJsonValue, EventId, OwnedUserId, RoomId, RoomVersionId, UserId,
CanonicalJsonObject, CanonicalJsonValue, EventId, RoomVersionId, UserId,
events::{
GlobalAccountDataEventType, TimelineEventType,
push_rules::PushRulesEvent,
TimelineEventType,
room::{
encrypted::Relation,
member::{MembershipState, RoomMemberEventContent},
redaction::RoomRedactionEventContent,
},
},
push::{Action, Ruleset, Tweak},
};
use tuwunel_core::{
Result, err, error, implement,
@@ -23,12 +17,12 @@ use tuwunel_core::{
event::Event,
pdu::{PduCount, PduEvent, PduId, RawPduId},
},
utils::{self, ReadyExt, result::LogErr},
utils::{self, result::LogErr},
};
use tuwunel_database::{Json, Map};
use tuwunel_database::Json;
use super::{ExtractBody, ExtractRelatesTo, ExtractRelatesToEventId, RoomMutexGuard};
use crate::rooms::state_compressor::CompressedState;
use crate::rooms::{short::ShortRoomId, state_compressor::CompressedState};
/// Append the incoming event setting the state snapshot to the state from
/// the server that sent the event.
@@ -185,105 +179,37 @@ where
drop(insert_lock);
// Don't notify the sender of their own events, and dont send from ignored users
let mut push_target: HashSet<_> = self
.services
.state_cache
.active_local_users_in_room(pdu.room_id())
.map(ToOwned::to_owned)
.ready_filter(|user| *user != pdu.sender())
.filter_map(async |recipient_user| {
self.services
.users
.user_is_ignored(pdu.sender(), &recipient_user)
.await
.eq(&false)
.then_some(recipient_user)
})
.collect()
.await;
self.services
.pusher
.append_pdu(pdu_id, pdu)
.await
.log_err()
.ok();
let mut notifies = Vec::with_capacity(push_target.len().saturating_add(1));
let mut highlights = Vec::with_capacity(push_target.len().saturating_add(1));
self.append_pdu_effects(pdu_id, pdu, shortroomid, count)
.await?;
if *pdu.kind() == TimelineEventType::RoomMember {
if let Some(state_key) = pdu.state_key() {
let target_user_id = UserId::parse(state_key)?;
drop(next_count1);
drop(next_count2);
if self
.services
.users
.is_active_local(target_user_id)
.await
{
push_target.insert(target_user_id.to_owned());
}
}
}
self.services
.appservice
.append_pdu(pdu_id, pdu)
.await
.log_err()
.ok();
let serialized = pdu.to_format();
for user in &push_target {
let rules_for_user = self
.services
.account_data
.get_global(user, GlobalAccountDataEventType::PushRules)
.await
.map_or_else(
|_| Ruleset::server_default(user),
|ev: PushRulesEvent| ev.content.global,
);
let mut highlight = false;
let mut notify = false;
let power_levels = self
.services
.state_accessor
.get_power_levels(pdu.room_id())
.await?;
for action in self
.services
.pusher
.get_actions(user, &rules_for_user, &power_levels, &serialized, pdu.room_id())
.await
{
match action {
| Action::Notify => notify = true,
| Action::SetTweak(Tweak::Highlight(true)) => {
highlight = true;
},
| _ => {},
}
// Break early if both conditions are true
if notify && highlight {
break;
}
}
if notify {
notifies.push(user.clone());
}
if highlight {
highlights.push(user.clone());
}
self.services
.pusher
.get_pushkeys(user)
.ready_for_each(|push_key| {
self.services
.sending
.send_pdu_push(&pdu_id, user, push_key.to_owned())
.expect("TODO: replace with future");
})
.await;
}
self.increment_notification_counts(pdu.room_id(), notifies, highlights);
Ok(pdu_id)
}
#[implement(super::Service)]
async fn append_pdu_effects(
&self,
pdu_id: RawPduId,
pdu: &PduEvent,
shortroomid: ShortRoomId,
count: PduCount,
) -> Result {
match *pdu.kind() {
| TimelineEventType::RoomRedaction => {
use RoomVersionId::*;
@@ -423,17 +349,7 @@ where
}
}
drop(next_count1);
drop(next_count2);
self.services
.appservice
.append_pdu(pdu_id, pdu)
.await
.log_err()
.ok();
Ok(pdu_id)
Ok(())
}
#[implement(super::Service)]
@@ -456,34 +372,3 @@ fn append_pdu_json(
.eventid_outlierpdu
.remove(pdu.event_id.as_bytes());
}
#[implement(super::Service)]
fn increment_notification_counts(
&self,
room_id: &RoomId,
notifies: Vec<OwnedUserId>,
highlights: Vec<OwnedUserId>,
) {
let _cork = self.db.db.cork();
for user in notifies {
let mut userroom_id = user.as_bytes().to_vec();
userroom_id.push(0xFF);
userroom_id.extend_from_slice(room_id.as_bytes());
increment(&self.db.userroomid_notificationcount, &userroom_id);
}
for user in highlights {
let mut userroom_id = user.as_bytes().to_vec();
userroom_id.push(0xFF);
userroom_id.extend_from_slice(room_id.as_bytes());
increment(&self.db.userroomid_highlightcount, &userroom_id);
}
}
//TODO: this is an ABA
fn increment(db: &Arc<Map>, key: &[u8]) {
let old = db.get_blocking(key);
let new = utils::increment(old.ok().as_deref());
db.insert(key, new);
}

View File

@@ -49,8 +49,6 @@ struct Data {
eventid_outlierpdu: Arc<Map>,
eventid_pduid: Arc<Map>,
pduid_pdu: Arc<Map>,
userroomid_highlightcount: Arc<Map>,
userroomid_notificationcount: Arc<Map>,
db: Arc<Database>,
}
@@ -89,8 +87,6 @@ impl crate::Service for Service {
eventid_outlierpdu: args.db["eventid_outlierpdu"].clone(),
eventid_pduid: args.db["eventid_pduid"].clone(),
pduid_pdu: args.db["pduid_pdu"].clone(),
userroomid_highlightcount: args.db["userroomid_highlightcount"].clone(),
userroomid_notificationcount: args.db["userroomid_notificationcount"].clone(),
db: args.db.clone(),
},
mutex_insert: RoomMutexMap::new(),