Fix pushrule evals relying on non-option RoomPowerLevels.

Add option to bypass pushrules to send everything to pushers.

Improve robustness of pushrule eval loops.

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk
2025-11-05 23:43:46 +00:00
parent ff0e007c45
commit 5051da493a
5 changed files with 112 additions and 79 deletions

View File

@@ -1308,7 +1308,8 @@ pub struct Config {
/// purposes such as recovering/recreating your admin room, or inviting
/// yourself back.
///
/// See https://tuwunel.chat/troubleshooting.html#lost-access-to-admin-room for other ways to get back into your admin room.
/// See https://tuwunel.chat/troubleshooting.html#lost-access-to-admin-room
/// for other ways to get back into your admin room.
///
/// Once this password is unset, all sessions will be logged out for
/// security purposes.
@@ -1322,6 +1323,19 @@ pub struct Config {
#[serde(default = "default_notification_push_path")]
pub notification_push_path: String,
/// For compatibility and special purpose use only. Setting this option to
/// true will not filter messages sent to pushers based on rules or actions.
/// Everything will be sent to the pusher. This option is offered for
/// several reasons, but should not be necessary:
/// - Bypass to workaround bugs or outdated server-side ruleset support.
/// - Allow clients to evaluate pushrules themselves (due to the above).
/// - Hosting or companies which have custom pushers and internal needs.
///
/// Note that setting this option to true will not affect the record of
/// notifications found in the notifications pane.
#[serde(default)]
pub push_everything: bool,
/// Allow local (your server only) presence updates/requests.
///
/// Note that presence on tuwunel is very fast unlike Synapse's. If using

View File

@@ -1,6 +1,6 @@
use std::{collections::HashSet, sync::Arc};
use futures::StreamExt;
use futures::{FutureExt, StreamExt, future::join};
use ruma::{
OwnedUserId, RoomId, UserId,
api::client::push::ProfileTag,
@@ -14,7 +14,7 @@ use tuwunel_core::{
event::Event,
pdu::{Count, Pdu, PduId, RawPduId},
},
utils::{self, ReadyExt, time::now_millis},
utils::{self, BoolExt, ReadyExt, future::TryExtExt, time::now_millis},
};
use tuwunel_database::{Json, Map};
@@ -44,7 +44,7 @@ pub struct Notified {
#[tracing::instrument(name = "append", level = "debug", skip_all)]
pub(crate) async fn append_pdu(&self, pdu_id: RawPduId, pdu: &Pdu) -> Result {
// Don't notify the sender of their own events, and dont send from ignored users
let mut push_target: HashSet<_> = self
let push_target = self
.services
.state_cache
.active_local_users_in_room(pdu.room_id())
@@ -55,19 +55,24 @@ pub(crate) async fn append_pdu(&self, pdu_id: RawPduId, pdu: &Pdu) -> Result {
.users
.user_is_ignored(pdu.sender(), &recipient_user)
.await
.eq(&false)
.is_false()
.then_some(recipient_user)
})
.collect()
.await;
.collect::<HashSet<_>>();
let power_levels = self
.services
.state_accessor
.get_power_levels(pdu.room_id())
.ok();
let (mut push_target, power_levels) = join(push_target, power_levels).boxed().await;
let mut notifies = Vec::with_capacity(push_target.len().saturating_add(1));
let mut highlights = Vec::with_capacity(push_target.len().saturating_add(1));
if *pdu.kind() == TimelineEventType::RoomMember {
if let Some(state_key) = pdu.state_key() {
let target_user_id = UserId::parse(state_key)?;
if let Some(Ok(target_user_id)) = pdu.state_key().map(UserId::parse) {
if self
.services
.users
@@ -94,16 +99,10 @@ pub(crate) async fn append_pdu(&self, pdu_id: RawPduId, pdu: &Pdu) -> Result {
let mut highlight = false;
let mut notify = false;
let power_levels = self
.services
.state_accessor
.get_power_levels(pdu.room_id())
.await?;
let actions = self
.services
.pusher
.get_actions(user, &rules_for_user, &power_levels, &serialized, pdu.room_id())
.get_actions(user, &rules_for_user, power_levels.as_ref(), &serialized, pdu.room_id())
.await;
for action in actions {
@@ -129,34 +128,35 @@ pub(crate) async fn append_pdu(&self, pdu_id: RawPduId, pdu: &Pdu) -> Result {
highlights.push(user.clone());
}
if !notify && !highlight {
continue;
if notify || highlight {
let id: PduId = pdu_id.into();
let notified = Notified {
ts: now_millis(),
sroomid: id.shortroomid,
tag: None,
actions: actions.into(),
};
if matches!(id.count, Count::Normal(_)) {
self.db
.useridcount_notification
.put((user, id.count.into_unsigned()), Json(notified));
}
}
let id: PduId = pdu_id.into();
let notified = Notified {
ts: now_millis(),
sroomid: id.shortroomid,
tag: None,
actions: actions.into(),
};
if matches!(id.count, Count::Normal(_)) {
self.db
.useridcount_notification
.put((user, id.count.into_unsigned()), Json(notified));
if notify || highlight || self.services.config.push_everything {
self.services
.pusher
.get_pushkeys(user)
.map(ToOwned::to_owned)
.ready_for_each(|push_key| {
self.services
.sending
.send_pdu_push(&pdu_id, user, push_key)
.expect("TODO: replace with future");
})
.await;
}
self.services
.pusher
.get_pushkeys(user)
.ready_for_each(|push_key| {
self.services
.sending
.send_pdu_push(&pdu_id, user, push_key.to_owned())
.expect("TODO: replace with future");
})
.await;
}
self.increment_notification_counts(pdu.room_id(), notifies, highlights);

View File

@@ -5,7 +5,7 @@ mod send;
use std::sync::Arc;
use futures::{Stream, StreamExt};
use futures::{Stream, StreamExt, TryFutureExt, future::join};
use ipaddress::IPAddress;
use ruma::{
DeviceId, OwnedDeviceId, RoomId, UserId,
@@ -17,7 +17,10 @@ use ruma::{
};
use tuwunel_core::{
Err, Result, err, implement,
utils::stream::{BroadbandExt, ReadyExt, TryIgnore},
utils::{
future::TryExtExt,
stream::{BroadbandExt, ReadyExt, TryIgnore},
},
};
use tuwunel_database::{Database, Deserialized, Ignore, Interfix, Json, Map};
@@ -223,39 +226,39 @@ pub async fn get_actions<'a>(
&self,
user: &UserId,
ruleset: &'a Ruleset,
power_levels: &RoomPowerLevels,
power_levels: Option<&RoomPowerLevels>,
pdu: &Raw<AnySyncTimelineEvent>,
room_id: &RoomId,
) -> &'a [Action] {
let power_levels = PushConditionPowerLevelsCtx {
users: power_levels.users.clone(),
users_default: power_levels.users_default,
notifications: power_levels.notifications.clone(),
rules: power_levels.rules.clone(),
};
let user_display_name = self
.services
.users
.displayname(user)
.unwrap_or_else(|_| user.localpart().to_owned());
let room_joined_count = self
.services
.state_cache
.room_joined_count(room_id)
.await
.unwrap_or(1)
.try_into()
.unwrap_or_else(|_| uint!(0));
.map_ok(TryInto::try_into)
.map_ok(|res| res.unwrap_or_else(|_| uint!(1)))
.unwrap_or_default();
let user_display_name = self
.services
.users
.displayname(user)
.await
.unwrap_or_else(|_| user.localpart().to_owned());
let (room_joined_count, user_display_name) = join(room_joined_count, user_display_name).await;
let power_levels = power_levels.map(|power_levels| PushConditionPowerLevelsCtx {
users: power_levels.users.clone(),
users_default: power_levels.users_default,
notifications: power_levels.notifications.clone(),
rules: power_levels.rules.clone(),
});
let ctx = PushConditionRoomCtx {
room_id: room_id.to_owned(),
member_count: room_joined_count,
user_id: user.to_owned(),
user_display_name,
power_levels: Some(power_levels),
power_levels,
};
ruleset.get_actions(pdu, &ctx).await

View File

@@ -8,7 +8,7 @@ use ruma::{
v1::{Device, Notification, NotificationCounts, NotificationPriority},
},
},
events::{TimelineEventType, room::power_levels::RoomPowerLevels},
events::TimelineEventType,
push::{Action, PushFormat, Ruleset, Tweak},
uint,
};
@@ -29,24 +29,19 @@ where
let mut notify = None;
let mut tweaks = Vec::new();
let unread: UInt = self
.services
.pusher
.notification_count(user_id, event.room_id())
.await
.try_into()?;
let power_levels: RoomPowerLevels = self
let power_levels = self
.services
.state_accessor
.get_power_levels(event.room_id())
.await?;
.await
.ok();
let serialized = event.to_format();
for action in self
.get_actions(user_id, ruleset, &power_levels, &serialized, event.room_id())
.await
{
let actions = self
.get_actions(user_id, ruleset, power_levels.as_ref(), &serialized, event.room_id())
.await;
for action in actions {
let n = match action {
| Action::Notify => true,
| Action::SetTweak(tweak) => {
@@ -65,11 +60,18 @@ where
notify = Some(n);
}
if notify == Some(true) {
if notify == Some(true) || self.services.config.push_everything {
let unread: UInt = self
.services
.pusher
.notification_count(user_id, event.room_id())
.await
.try_into()
.unwrap_or_else(|_| uint!(1));
self.send_notice(unread, pusher, tweaks, event)
.await?;
}
// Else the event triggered no actions
Ok(())
}

View File

@@ -1109,7 +1109,8 @@
# purposes such as recovering/recreating your admin room, or inviting
# yourself back.
#
# See https://tuwunel.chat/troubleshooting.html#lost-access-to-admin-room for other ways to get back into your admin room.
# See https://tuwunel.chat/troubleshooting.html#lost-access-to-admin-room
# for other ways to get back into your admin room.
#
# Once this password is unset, all sessions will be logged out for
# security purposes.
@@ -1122,6 +1123,19 @@
#
#notification_push_path = "/_matrix/push/v1/notify"
# For compatibility and special purpose use only. Setting this option to
# true will not filter messages sent to pushers based on rules or actions.
# Everything will be sent to the pusher. This option is offered for
# several reasons, but should not be necessary:
# - Bypass to workaround bugs or outdated server-side ruleset support.
# - Allow clients to evaluate pushrules themselves (due to the above).
# - Hosting or companies which have custom pushers and internal needs.
#
# Note that setting this option to true will not affect the record of
# notifications found in the notifications pane.
#
#push_everything = false
# Allow local (your server only) presence updates/requests.
#
# Note that presence on tuwunel is very fast unlike Synapse's. If using