From 48923b36576e194b37c8e7fb4c7721241f485284 Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Wed, 29 Oct 2025 22:05:49 +0000 Subject: [PATCH] Implement notifications retrieval. (closes #201) Signed-off-by: Jason Volk --- src/api/client/push.rs | 111 +++++++++++++++++++++++++++++++++-- src/api/router.rs | 1 + src/database/maps.rs | 5 ++ src/service/pusher/append.rs | 52 +++++++++++++--- src/service/pusher/mod.rs | 27 ++++++++- 5 files changed, 184 insertions(+), 12 deletions(-) diff --git a/src/api/client/push.rs b/src/api/client/push.rs index a7e66675..b0f383c6 100644 --- a/src/api/client/push.rs +++ b/src/api/client/push.rs @@ -1,10 +1,11 @@ use axum::extract::State; +use futures::StreamExt; use ruma::{ - CanonicalJsonObject, CanonicalJsonValue, + CanonicalJsonObject, CanonicalJsonValue, MilliSecondsSinceUnixEpoch, api::client::{ error::ErrorKind, push::{ - delete_pushrule, get_pushers, get_pushrule, get_pushrule_actions, + delete_pushrule, get_notifications, get_pushers, get_pushrule, get_pushrule_actions, get_pushrule_enabled, get_pushrules_all, get_pushrules_global_scope, set_pusher, set_pushrule, set_pushrule_actions, set_pushrule_enabled, }, @@ -14,15 +15,117 @@ use ruma::{ push_rules::{PushRulesEvent, PushRulesEventContent}, }, push::{ - InsertPushRuleError, PredefinedContentRuleId, PredefinedOverrideRuleId, + Action, InsertPushRuleError, PredefinedContentRuleId, PredefinedOverrideRuleId, RemovePushRuleError, Ruleset, }, }; -use tuwunel_core::{Err, Error, Result, err}; +use tuwunel_core::{ + Err, Error, Result, at, err, + matrix::{Event, PduId}, + utils::{ + stream::{ReadyExt, WidebandExt}, + string::to_small_string, + }, +}; use tuwunel_service::Services; use crate::Ruma; +/// # `GET /_matrix/client/r0/notifications/` +/// +/// Paginate through the list of events the user has been, or would have been +/// notified about. +pub(crate) async fn get_notifications_route( + State(services): State, + body: Ruma, +) -> Result { + use get_notifications::v3::Notification; + + let sender_user = body.sender_user(); + + let from = body + .body + .from + .as_deref() + .map(str::parse) + .transpose() + .map_err(|e| err!(Request(InvalidParam("Invalid `from' parameter: {e}"))))?; + + let limit: usize = body + .body + .limit + .map(TryInto::try_into) + .transpose()? + .unwrap_or(50) + .clamp(1, 100); + + let only_highlight = body + .body + .only + .as_deref() + .is_some_and(|only| only.contains("highlight")); + + let mut next_token: Option = None; + let notifications = services + .pusher + .get_notifications(sender_user, from) + .ready_filter(|(_, notify)| { + if only_highlight && !notify.actions.iter().any(Action::is_highlight) { + return false; + } + + true + }) + .wide_filter_map(async |(count, notify)| { + let pdu_id = PduId { + shortroomid: notify.sroomid, + count: count.into(), + }; + + let event = services + .timeline + .get_pdu_from_id(&pdu_id.into()) + .await + .ok() + .filter(|event| !event.is_redacted())?; + + let read = services + .pusher + .last_notification_read(sender_user, event.room_id()) + .await + .is_ok_and(|last_read| last_read.ge(&count)); + + let ts = notify + .ts + .try_into() + .map(MilliSecondsSinceUnixEpoch) + .ok()?; + + let notification = Notification { + room_id: event.room_id().into(), + event: event.into_format(), + ts, + read, + profile_tag: notify.tag, + actions: notify.actions, + }; + + Some((count, notification)) + }) + .take(limit) + .inspect(|(count, _)| { + next_token.replace(*count); + }) + .map(at!(1)) + .collect::>() + .await; + + Ok(get_notifications::v3::Response { + next_token: next_token.map(to_small_string), + notifications, + }) +} + /// # `GET /_matrix/client/r0/pushrules/` /// /// Retrieves the push rules event for this user. diff --git a/src/api/router.rs b/src/api/router.rs index 55253c1d..76b09dc6 100644 --- a/src/api/router.rs +++ b/src/api/router.rs @@ -47,6 +47,7 @@ pub fn build(router: Router, server: &Server) -> Router { .ruma_route(&client::request_3pid_management_token_via_email_route) .ruma_route(&client::request_3pid_management_token_via_msisdn_route) .ruma_route(&client::check_registration_token_validity) + .ruma_route(&client::get_notifications_route) .ruma_route(&client::get_capabilities_route) .ruma_route(&client::get_pushrules_all_route) .ruma_route(&client::get_pushrules_global_route) diff --git a/src/database/maps.rs b/src/database/maps.rs index 588bdc6e..d0ef6c22 100644 --- a/src/database/maps.rs +++ b/src/database/maps.rs @@ -414,6 +414,11 @@ pub(super) static MAPS: &[Descriptor] = &[ name: "userid_usersigningkeyid", ..descriptor::RANDOM_SMALL }, + Descriptor { + name: "useridcount_notification", + limit_size: 1024 * 1024 * 256, + ..descriptor::RANDOM_SMALL_CACHE + }, Descriptor { name: "useridprofilekey_value", ..descriptor::RANDOM_SMALL diff --git a/src/service/pusher/append.rs b/src/service/pusher/append.rs index 2440c599..7bc325f2 100644 --- a/src/service/pusher/append.rs +++ b/src/service/pusher/append.rs @@ -3,18 +3,41 @@ use std::{collections::HashSet, sync::Arc}; use futures::StreamExt; use ruma::{ OwnedUserId, RoomId, UserId, + api::client::push::ProfileTag, events::{GlobalAccountDataEventType, TimelineEventType, push_rules::PushRulesEvent}, - push::{Action, Ruleset, Tweak}, + push::{Action, Actions, Ruleset, Tweak}, }; +use serde::{Deserialize, Serialize}; use tuwunel_core::{ Result, implement, matrix::{ event::Event, - pdu::{Pdu, RawPduId}, + pdu::{Count, Pdu, PduId, RawPduId}, }, - utils::{self, ReadyExt}, + utils::{self, ReadyExt, time::now_millis}, }; -use tuwunel_database::Map; +use tuwunel_database::{Json, Map}; + +use crate::rooms::short::ShortRoomId; + +/// Succinct version of Ruma's Notification. Appended to the database when the +/// user is notified. The PduCount is part of the database key so only the +/// shortroomid is included. Together they make the PduId. +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct Notified { + /// Milliseconds time at which the event notification was sent. + pub ts: u64, + + /// ShortRoomId + pub sroomid: ShortRoomId, + + /// The profile tag of the rule that matched this event. + #[serde(skip_serializing_if = "Option::is_none")] + pub tag: Option, + + /// Actions vector + pub actions: Actions, +} /// Called by timeline append_pdu. #[implement(super::Service)] @@ -77,12 +100,13 @@ pub(crate) async fn append_pdu(&self, pdu_id: RawPduId, pdu: &Pdu) -> Result { .get_power_levels(pdu.room_id()) .await?; - for action in self + let actions = self .services .pusher .get_actions(user, &rules_for_user, &power_levels, &serialized, pdu.room_id()) - .await - { + .await; + + for action in actions { match action { | Action::Notify => notify = true, | Action::SetTweak(Tweak::Highlight(true)) => { @@ -109,6 +133,20 @@ pub(crate) async fn append_pdu(&self, pdu_id: RawPduId, pdu: &Pdu) -> Result { continue; } + let id: PduId = pdu_id.into(); + let notified = Notified { + ts: now_millis(), + sroomid: id.shortroomid, + tag: None, + actions: actions.into(), + }; + + if matches!(id.count, Count::Normal(_)) { + self.db + .useridcount_notification + .put((user, id.count.into_unsigned()), Json(notified)); + } + self.services .pusher .get_pushkeys(user) diff --git a/src/service/pusher/mod.rs b/src/service/pusher/mod.rs index 01849b63..a9b587c8 100644 --- a/src/service/pusher/mod.rs +++ b/src/service/pusher/mod.rs @@ -17,10 +17,12 @@ use ruma::{ }; use tuwunel_core::{ Err, Result, err, implement, - utils::stream::{BroadbandExt, TryIgnore}, + utils::stream::{BroadbandExt, ReadyExt, TryIgnore}, }; use tuwunel_database::{Database, Deserialized, Ignore, Interfix, Json, Map}; +pub use self::append::Notified; + pub struct Service { db: Data, services: Arc, @@ -29,6 +31,7 @@ pub struct Service { struct Data { senderkey_pusher: Arc, pushkey_deviceid: Arc, + useridcount_notification: Arc, userroomid_highlightcount: Arc, userroomid_notificationcount: Arc, roomuserid_lastnotificationread: Arc, @@ -41,6 +44,7 @@ impl crate::Service for Service { db: Data { senderkey_pusher: args.db["senderkey_pusher"].clone(), pushkey_deviceid: args.db["pushkey_deviceid"].clone(), + useridcount_notification: args.db["useridcount_notification"].clone(), userroomid_highlightcount: args.db["userroomid_highlightcount"].clone(), userroomid_notificationcount: args.db["userroomid_notificationcount"].clone(), roomuserid_lastnotificationread: args.db["roomuserid_lastnotificationread"] @@ -193,6 +197,27 @@ pub fn get_pushkeys<'a>(&'a self, sender: &'a UserId) -> impl Stream( + &'a self, + sender: &'a UserId, + from: Option, +) -> impl Stream + Send + 'a { + let from = from + .map(|from| from.saturating_sub(1)) + .unwrap_or(u64::MAX); + + self.db + .useridcount_notification + .rev_stream_from(&(sender, from)) + .ignore_err() + .map(|item: ((&UserId, u64), _)| (item.0, item.1)) + .ready_take_while(move |((user_id, _count), _)| sender == *user_id) + .map(|((_, count), notified)| (count, notified)) +} + #[implement(Service)] #[tracing::instrument(level = "debug", skip_all)] pub async fn get_actions<'a>(