Implement notifications retrieval. (closes #201)

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk
2025-10-29 22:05:49 +00:00
parent aedaf3f0c1
commit 48923b3657
5 changed files with 184 additions and 12 deletions

View File

@@ -1,10 +1,11 @@
use axum::extract::State;
use futures::StreamExt;
use ruma::{
CanonicalJsonObject, CanonicalJsonValue,
CanonicalJsonObject, CanonicalJsonValue, MilliSecondsSinceUnixEpoch,
api::client::{
error::ErrorKind,
push::{
delete_pushrule, get_pushers, get_pushrule, get_pushrule_actions,
delete_pushrule, get_notifications, get_pushers, get_pushrule, get_pushrule_actions,
get_pushrule_enabled, get_pushrules_all, get_pushrules_global_scope, set_pusher,
set_pushrule, set_pushrule_actions, set_pushrule_enabled,
},
@@ -14,15 +15,117 @@ use ruma::{
push_rules::{PushRulesEvent, PushRulesEventContent},
},
push::{
InsertPushRuleError, PredefinedContentRuleId, PredefinedOverrideRuleId,
Action, InsertPushRuleError, PredefinedContentRuleId, PredefinedOverrideRuleId,
RemovePushRuleError, Ruleset,
},
};
use tuwunel_core::{Err, Error, Result, err};
use tuwunel_core::{
Err, Error, Result, at, err,
matrix::{Event, PduId},
utils::{
stream::{ReadyExt, WidebandExt},
string::to_small_string,
},
};
use tuwunel_service::Services;
use crate::Ruma;
/// # `GET /_matrix/client/r0/notifications/`
///
/// Paginate through the list of events the user has been, or would have been
/// notified about.
pub(crate) async fn get_notifications_route(
State(services): State<crate::State>,
body: Ruma<get_notifications::v3::Request>,
) -> Result<get_notifications::v3::Response> {
use get_notifications::v3::Notification;
let sender_user = body.sender_user();
let from = body
.body
.from
.as_deref()
.map(str::parse)
.transpose()
.map_err(|e| err!(Request(InvalidParam("Invalid `from' parameter: {e}"))))?;
let limit: usize = body
.body
.limit
.map(TryInto::try_into)
.transpose()?
.unwrap_or(50)
.clamp(1, 100);
let only_highlight = body
.body
.only
.as_deref()
.is_some_and(|only| only.contains("highlight"));
let mut next_token: Option<u64> = None;
let notifications = services
.pusher
.get_notifications(sender_user, from)
.ready_filter(|(_, notify)| {
if only_highlight && !notify.actions.iter().any(Action::is_highlight) {
return false;
}
true
})
.wide_filter_map(async |(count, notify)| {
let pdu_id = PduId {
shortroomid: notify.sroomid,
count: count.into(),
};
let event = services
.timeline
.get_pdu_from_id(&pdu_id.into())
.await
.ok()
.filter(|event| !event.is_redacted())?;
let read = services
.pusher
.last_notification_read(sender_user, event.room_id())
.await
.is_ok_and(|last_read| last_read.ge(&count));
let ts = notify
.ts
.try_into()
.map(MilliSecondsSinceUnixEpoch)
.ok()?;
let notification = Notification {
room_id: event.room_id().into(),
event: event.into_format(),
ts,
read,
profile_tag: notify.tag,
actions: notify.actions,
};
Some((count, notification))
})
.take(limit)
.inspect(|(count, _)| {
next_token.replace(*count);
})
.map(at!(1))
.collect::<Vec<_>>()
.await;
Ok(get_notifications::v3::Response {
next_token: next_token.map(to_small_string),
notifications,
})
}
/// # `GET /_matrix/client/r0/pushrules/`
///
/// Retrieves the push rules event for this user.