Implement notifications retrieval. (closes #201)
Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
@@ -1,10 +1,11 @@
|
|||||||
use axum::extract::State;
|
use axum::extract::State;
|
||||||
|
use futures::StreamExt;
|
||||||
use ruma::{
|
use ruma::{
|
||||||
CanonicalJsonObject, CanonicalJsonValue,
|
CanonicalJsonObject, CanonicalJsonValue, MilliSecondsSinceUnixEpoch,
|
||||||
api::client::{
|
api::client::{
|
||||||
error::ErrorKind,
|
error::ErrorKind,
|
||||||
push::{
|
push::{
|
||||||
delete_pushrule, get_pushers, get_pushrule, get_pushrule_actions,
|
delete_pushrule, get_notifications, get_pushers, get_pushrule, get_pushrule_actions,
|
||||||
get_pushrule_enabled, get_pushrules_all, get_pushrules_global_scope, set_pusher,
|
get_pushrule_enabled, get_pushrules_all, get_pushrules_global_scope, set_pusher,
|
||||||
set_pushrule, set_pushrule_actions, set_pushrule_enabled,
|
set_pushrule, set_pushrule_actions, set_pushrule_enabled,
|
||||||
},
|
},
|
||||||
@@ -14,15 +15,117 @@ use ruma::{
|
|||||||
push_rules::{PushRulesEvent, PushRulesEventContent},
|
push_rules::{PushRulesEvent, PushRulesEventContent},
|
||||||
},
|
},
|
||||||
push::{
|
push::{
|
||||||
InsertPushRuleError, PredefinedContentRuleId, PredefinedOverrideRuleId,
|
Action, InsertPushRuleError, PredefinedContentRuleId, PredefinedOverrideRuleId,
|
||||||
RemovePushRuleError, Ruleset,
|
RemovePushRuleError, Ruleset,
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
use tuwunel_core::{Err, Error, Result, err};
|
use tuwunel_core::{
|
||||||
|
Err, Error, Result, at, err,
|
||||||
|
matrix::{Event, PduId},
|
||||||
|
utils::{
|
||||||
|
stream::{ReadyExt, WidebandExt},
|
||||||
|
string::to_small_string,
|
||||||
|
},
|
||||||
|
};
|
||||||
use tuwunel_service::Services;
|
use tuwunel_service::Services;
|
||||||
|
|
||||||
use crate::Ruma;
|
use crate::Ruma;
|
||||||
|
|
||||||
|
/// # `GET /_matrix/client/r0/notifications/`
|
||||||
|
///
|
||||||
|
/// Paginate through the list of events the user has been, or would have been
|
||||||
|
/// notified about.
|
||||||
|
pub(crate) async fn get_notifications_route(
|
||||||
|
State(services): State<crate::State>,
|
||||||
|
body: Ruma<get_notifications::v3::Request>,
|
||||||
|
) -> Result<get_notifications::v3::Response> {
|
||||||
|
use get_notifications::v3::Notification;
|
||||||
|
|
||||||
|
let sender_user = body.sender_user();
|
||||||
|
|
||||||
|
let from = body
|
||||||
|
.body
|
||||||
|
.from
|
||||||
|
.as_deref()
|
||||||
|
.map(str::parse)
|
||||||
|
.transpose()
|
||||||
|
.map_err(|e| err!(Request(InvalidParam("Invalid `from' parameter: {e}"))))?;
|
||||||
|
|
||||||
|
let limit: usize = body
|
||||||
|
.body
|
||||||
|
.limit
|
||||||
|
.map(TryInto::try_into)
|
||||||
|
.transpose()?
|
||||||
|
.unwrap_or(50)
|
||||||
|
.clamp(1, 100);
|
||||||
|
|
||||||
|
let only_highlight = body
|
||||||
|
.body
|
||||||
|
.only
|
||||||
|
.as_deref()
|
||||||
|
.is_some_and(|only| only.contains("highlight"));
|
||||||
|
|
||||||
|
let mut next_token: Option<u64> = None;
|
||||||
|
let notifications = services
|
||||||
|
.pusher
|
||||||
|
.get_notifications(sender_user, from)
|
||||||
|
.ready_filter(|(_, notify)| {
|
||||||
|
if only_highlight && !notify.actions.iter().any(Action::is_highlight) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
true
|
||||||
|
})
|
||||||
|
.wide_filter_map(async |(count, notify)| {
|
||||||
|
let pdu_id = PduId {
|
||||||
|
shortroomid: notify.sroomid,
|
||||||
|
count: count.into(),
|
||||||
|
};
|
||||||
|
|
||||||
|
let event = services
|
||||||
|
.timeline
|
||||||
|
.get_pdu_from_id(&pdu_id.into())
|
||||||
|
.await
|
||||||
|
.ok()
|
||||||
|
.filter(|event| !event.is_redacted())?;
|
||||||
|
|
||||||
|
let read = services
|
||||||
|
.pusher
|
||||||
|
.last_notification_read(sender_user, event.room_id())
|
||||||
|
.await
|
||||||
|
.is_ok_and(|last_read| last_read.ge(&count));
|
||||||
|
|
||||||
|
let ts = notify
|
||||||
|
.ts
|
||||||
|
.try_into()
|
||||||
|
.map(MilliSecondsSinceUnixEpoch)
|
||||||
|
.ok()?;
|
||||||
|
|
||||||
|
let notification = Notification {
|
||||||
|
room_id: event.room_id().into(),
|
||||||
|
event: event.into_format(),
|
||||||
|
ts,
|
||||||
|
read,
|
||||||
|
profile_tag: notify.tag,
|
||||||
|
actions: notify.actions,
|
||||||
|
};
|
||||||
|
|
||||||
|
Some((count, notification))
|
||||||
|
})
|
||||||
|
.take(limit)
|
||||||
|
.inspect(|(count, _)| {
|
||||||
|
next_token.replace(*count);
|
||||||
|
})
|
||||||
|
.map(at!(1))
|
||||||
|
.collect::<Vec<_>>()
|
||||||
|
.await;
|
||||||
|
|
||||||
|
Ok(get_notifications::v3::Response {
|
||||||
|
next_token: next_token.map(to_small_string),
|
||||||
|
notifications,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
/// # `GET /_matrix/client/r0/pushrules/`
|
/// # `GET /_matrix/client/r0/pushrules/`
|
||||||
///
|
///
|
||||||
/// Retrieves the push rules event for this user.
|
/// Retrieves the push rules event for this user.
|
||||||
|
|||||||
@@ -47,6 +47,7 @@ pub fn build(router: Router<State>, server: &Server) -> Router<State> {
|
|||||||
.ruma_route(&client::request_3pid_management_token_via_email_route)
|
.ruma_route(&client::request_3pid_management_token_via_email_route)
|
||||||
.ruma_route(&client::request_3pid_management_token_via_msisdn_route)
|
.ruma_route(&client::request_3pid_management_token_via_msisdn_route)
|
||||||
.ruma_route(&client::check_registration_token_validity)
|
.ruma_route(&client::check_registration_token_validity)
|
||||||
|
.ruma_route(&client::get_notifications_route)
|
||||||
.ruma_route(&client::get_capabilities_route)
|
.ruma_route(&client::get_capabilities_route)
|
||||||
.ruma_route(&client::get_pushrules_all_route)
|
.ruma_route(&client::get_pushrules_all_route)
|
||||||
.ruma_route(&client::get_pushrules_global_route)
|
.ruma_route(&client::get_pushrules_global_route)
|
||||||
|
|||||||
@@ -414,6 +414,11 @@ pub(super) static MAPS: &[Descriptor] = &[
|
|||||||
name: "userid_usersigningkeyid",
|
name: "userid_usersigningkeyid",
|
||||||
..descriptor::RANDOM_SMALL
|
..descriptor::RANDOM_SMALL
|
||||||
},
|
},
|
||||||
|
Descriptor {
|
||||||
|
name: "useridcount_notification",
|
||||||
|
limit_size: 1024 * 1024 * 256,
|
||||||
|
..descriptor::RANDOM_SMALL_CACHE
|
||||||
|
},
|
||||||
Descriptor {
|
Descriptor {
|
||||||
name: "useridprofilekey_value",
|
name: "useridprofilekey_value",
|
||||||
..descriptor::RANDOM_SMALL
|
..descriptor::RANDOM_SMALL
|
||||||
|
|||||||
@@ -3,18 +3,41 @@ use std::{collections::HashSet, sync::Arc};
|
|||||||
use futures::StreamExt;
|
use futures::StreamExt;
|
||||||
use ruma::{
|
use ruma::{
|
||||||
OwnedUserId, RoomId, UserId,
|
OwnedUserId, RoomId, UserId,
|
||||||
|
api::client::push::ProfileTag,
|
||||||
events::{GlobalAccountDataEventType, TimelineEventType, push_rules::PushRulesEvent},
|
events::{GlobalAccountDataEventType, TimelineEventType, push_rules::PushRulesEvent},
|
||||||
push::{Action, Ruleset, Tweak},
|
push::{Action, Actions, Ruleset, Tweak},
|
||||||
};
|
};
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
use tuwunel_core::{
|
use tuwunel_core::{
|
||||||
Result, implement,
|
Result, implement,
|
||||||
matrix::{
|
matrix::{
|
||||||
event::Event,
|
event::Event,
|
||||||
pdu::{Pdu, RawPduId},
|
pdu::{Count, Pdu, PduId, RawPduId},
|
||||||
},
|
},
|
||||||
utils::{self, ReadyExt},
|
utils::{self, ReadyExt, time::now_millis},
|
||||||
};
|
};
|
||||||
use tuwunel_database::Map;
|
use tuwunel_database::{Json, Map};
|
||||||
|
|
||||||
|
use crate::rooms::short::ShortRoomId;
|
||||||
|
|
||||||
|
/// Succinct version of Ruma's Notification. Appended to the database when the
|
||||||
|
/// user is notified. The PduCount is part of the database key so only the
|
||||||
|
/// shortroomid is included. Together they make the PduId.
|
||||||
|
#[derive(Clone, Debug, Deserialize, Serialize)]
|
||||||
|
pub struct Notified {
|
||||||
|
/// Milliseconds time at which the event notification was sent.
|
||||||
|
pub ts: u64,
|
||||||
|
|
||||||
|
/// ShortRoomId
|
||||||
|
pub sroomid: ShortRoomId,
|
||||||
|
|
||||||
|
/// The profile tag of the rule that matched this event.
|
||||||
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
|
pub tag: Option<ProfileTag>,
|
||||||
|
|
||||||
|
/// Actions vector
|
||||||
|
pub actions: Actions,
|
||||||
|
}
|
||||||
|
|
||||||
/// Called by timeline append_pdu.
|
/// Called by timeline append_pdu.
|
||||||
#[implement(super::Service)]
|
#[implement(super::Service)]
|
||||||
@@ -77,12 +100,13 @@ pub(crate) async fn append_pdu(&self, pdu_id: RawPduId, pdu: &Pdu) -> Result {
|
|||||||
.get_power_levels(pdu.room_id())
|
.get_power_levels(pdu.room_id())
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
for action in self
|
let actions = self
|
||||||
.services
|
.services
|
||||||
.pusher
|
.pusher
|
||||||
.get_actions(user, &rules_for_user, &power_levels, &serialized, pdu.room_id())
|
.get_actions(user, &rules_for_user, &power_levels, &serialized, pdu.room_id())
|
||||||
.await
|
.await;
|
||||||
{
|
|
||||||
|
for action in actions {
|
||||||
match action {
|
match action {
|
||||||
| Action::Notify => notify = true,
|
| Action::Notify => notify = true,
|
||||||
| Action::SetTweak(Tweak::Highlight(true)) => {
|
| Action::SetTweak(Tweak::Highlight(true)) => {
|
||||||
@@ -109,6 +133,20 @@ pub(crate) async fn append_pdu(&self, pdu_id: RawPduId, pdu: &Pdu) -> Result {
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let id: PduId = pdu_id.into();
|
||||||
|
let notified = Notified {
|
||||||
|
ts: now_millis(),
|
||||||
|
sroomid: id.shortroomid,
|
||||||
|
tag: None,
|
||||||
|
actions: actions.into(),
|
||||||
|
};
|
||||||
|
|
||||||
|
if matches!(id.count, Count::Normal(_)) {
|
||||||
|
self.db
|
||||||
|
.useridcount_notification
|
||||||
|
.put((user, id.count.into_unsigned()), Json(notified));
|
||||||
|
}
|
||||||
|
|
||||||
self.services
|
self.services
|
||||||
.pusher
|
.pusher
|
||||||
.get_pushkeys(user)
|
.get_pushkeys(user)
|
||||||
|
|||||||
@@ -17,10 +17,12 @@ use ruma::{
|
|||||||
};
|
};
|
||||||
use tuwunel_core::{
|
use tuwunel_core::{
|
||||||
Err, Result, err, implement,
|
Err, Result, err, implement,
|
||||||
utils::stream::{BroadbandExt, TryIgnore},
|
utils::stream::{BroadbandExt, ReadyExt, TryIgnore},
|
||||||
};
|
};
|
||||||
use tuwunel_database::{Database, Deserialized, Ignore, Interfix, Json, Map};
|
use tuwunel_database::{Database, Deserialized, Ignore, Interfix, Json, Map};
|
||||||
|
|
||||||
|
pub use self::append::Notified;
|
||||||
|
|
||||||
pub struct Service {
|
pub struct Service {
|
||||||
db: Data,
|
db: Data,
|
||||||
services: Arc<crate::services::OnceServices>,
|
services: Arc<crate::services::OnceServices>,
|
||||||
@@ -29,6 +31,7 @@ pub struct Service {
|
|||||||
struct Data {
|
struct Data {
|
||||||
senderkey_pusher: Arc<Map>,
|
senderkey_pusher: Arc<Map>,
|
||||||
pushkey_deviceid: Arc<Map>,
|
pushkey_deviceid: Arc<Map>,
|
||||||
|
useridcount_notification: Arc<Map>,
|
||||||
userroomid_highlightcount: Arc<Map>,
|
userroomid_highlightcount: Arc<Map>,
|
||||||
userroomid_notificationcount: Arc<Map>,
|
userroomid_notificationcount: Arc<Map>,
|
||||||
roomuserid_lastnotificationread: Arc<Map>,
|
roomuserid_lastnotificationread: Arc<Map>,
|
||||||
@@ -41,6 +44,7 @@ impl crate::Service for Service {
|
|||||||
db: Data {
|
db: Data {
|
||||||
senderkey_pusher: args.db["senderkey_pusher"].clone(),
|
senderkey_pusher: args.db["senderkey_pusher"].clone(),
|
||||||
pushkey_deviceid: args.db["pushkey_deviceid"].clone(),
|
pushkey_deviceid: args.db["pushkey_deviceid"].clone(),
|
||||||
|
useridcount_notification: args.db["useridcount_notification"].clone(),
|
||||||
userroomid_highlightcount: args.db["userroomid_highlightcount"].clone(),
|
userroomid_highlightcount: args.db["userroomid_highlightcount"].clone(),
|
||||||
userroomid_notificationcount: args.db["userroomid_notificationcount"].clone(),
|
userroomid_notificationcount: args.db["userroomid_notificationcount"].clone(),
|
||||||
roomuserid_lastnotificationread: args.db["roomuserid_lastnotificationread"]
|
roomuserid_lastnotificationread: args.db["roomuserid_lastnotificationread"]
|
||||||
@@ -193,6 +197,27 @@ pub fn get_pushkeys<'a>(&'a self, sender: &'a UserId) -> impl Stream<Item = &str
|
|||||||
.map(|(_, pushkey): (Ignore, &str)| pushkey)
|
.map(|(_, pushkey): (Ignore, &str)| pushkey)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[implement(Service)]
|
||||||
|
#[tracing::instrument(level = "debug", skip_all)]
|
||||||
|
#[inline]
|
||||||
|
pub fn get_notifications<'a>(
|
||||||
|
&'a self,
|
||||||
|
sender: &'a UserId,
|
||||||
|
from: Option<u64>,
|
||||||
|
) -> impl Stream<Item = (u64, Notified)> + Send + 'a {
|
||||||
|
let from = from
|
||||||
|
.map(|from| from.saturating_sub(1))
|
||||||
|
.unwrap_or(u64::MAX);
|
||||||
|
|
||||||
|
self.db
|
||||||
|
.useridcount_notification
|
||||||
|
.rev_stream_from(&(sender, from))
|
||||||
|
.ignore_err()
|
||||||
|
.map(|item: ((&UserId, u64), _)| (item.0, item.1))
|
||||||
|
.ready_take_while(move |((user_id, _count), _)| sender == *user_id)
|
||||||
|
.map(|((_, count), notified)| (count, notified))
|
||||||
|
}
|
||||||
|
|
||||||
#[implement(Service)]
|
#[implement(Service)]
|
||||||
#[tracing::instrument(level = "debug", skip_all)]
|
#[tracing::instrument(level = "debug", skip_all)]
|
||||||
pub async fn get_actions<'a>(
|
pub async fn get_actions<'a>(
|
||||||
|
|||||||
Reference in New Issue
Block a user