Split pusher service send/request into unit.

Refactor sender's push destination handler.

Combine remnants of service::rooms::user with pusher service.

Further split and reorg pusher service units.

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk
2025-10-27 15:55:55 +00:00
parent 563873af77
commit 5e89f0acae
13 changed files with 565 additions and 589 deletions

View File

@@ -10,14 +10,14 @@ use std::{
use base64::{Engine as _, engine::general_purpose::URL_SAFE_NO_PAD};
use futures::{
FutureExt, StreamExt,
future::{BoxFuture, OptionFuture, join3},
FutureExt, StreamExt, TryFutureExt,
future::{BoxFuture, OptionFuture, join3, try_join3},
pin_mut,
stream::FuturesUnordered,
};
use ruma::{
MilliSecondsSinceUnixEpoch, OwnedRoomId, OwnedServerName, OwnedUserId, RoomId, ServerName,
UInt,
UserId,
api::{
appservice::event::push_events::v1::EphemeralData,
federation::transactions::{
@@ -39,7 +39,7 @@ use ruma::{
uint,
};
use tuwunel_core::{
Error, Event, Result, debug, err, error,
Error, Event, Result, debug, err, error, extract_variant,
result::LogErr,
trace,
utils::{
@@ -792,114 +792,103 @@ impl Service {
pushkey: String,
events: Vec<SendingEvent>,
) -> SendingResult {
let Ok(pusher) = self
let suppressed = self.pushing_suppressed(&user_id).map(Ok);
let pusher = self
.services
.pusher
.get_pusher(&user_id, &pushkey)
.await
else {
return Err((
Destination::Push(user_id.clone(), pushkey.clone()),
err!(Database(error!(?user_id, ?pushkey, "Missing pusher"))),
));
};
.map_err(|_| {
(
Destination::Push(user_id.clone(), pushkey.clone()),
err!(Database(error!(?user_id, ?pushkey, "Missing pusher"))),
)
});
let mut pdus = Vec::with_capacity(
events
.iter()
.filter(|event| matches!(event, SendingEvent::Pdu(_)))
.count(),
);
for event in &events {
match event {
| SendingEvent::Pdu(pdu_id) => {
if let Ok(pdu) = self
.services
.timeline
.get_pdu_from_id(pdu_id)
.await
{
pdus.push(pdu);
}
},
| SendingEvent::Edu(_) | SendingEvent::Flush => {
// Push gateways don't need EDUs (?) and flush only;
// no new content
},
}
}
for pdu in pdus {
// Redacted events are not notification targets (we don't send push for them)
if pdu.contains_unsigned_property("redacted_because", serde_json::Value::is_string) {
continue;
}
// optional suppression: heuristic combining presence age and recent sync
// activity.
if self.services.config.suppress_push_when_active
&& let Ok(presence) = self
.services
.presence
.get_presence(&user_id)
.await
{
let is_online = presence.content.presence == PresenceState::Online;
let presence_age_ms = presence
.content
.last_active_ago
.map(u64::from)
.unwrap_or(u64::MAX);
let sync_gap_ms = self
.services
.presence
.last_sync_gap_ms(&user_id)
.await;
let considered_active = is_online
&& presence_age_ms < 65_000
&& sync_gap_ms.is_some_and(|gap| gap < 32_000);
if considered_active {
trace!(
?user_id,
presence_age_ms, sync_gap_ms, "suppressing push: active heuristic"
);
continue;
}
}
let rules_for_user = self
.services
.account_data
.get_global(&user_id, GlobalAccountDataEventType::PushRules)
.await
.map_or_else(
let rules_for_user = self
.services
.account_data
.get_global(&user_id, GlobalAccountDataEventType::PushRules)
.map(|ev| {
ev.map_or_else(
|_| push::Ruleset::server_default(&user_id),
|ev: PushRulesEvent| ev.content.global,
);
)
})
.map(Ok);
let unread: UInt = self
.services
.user
.notification_count(&user_id, pdu.room_id())
.await
.try_into()
.expect("notification count can't go that high");
let (pusher, rules_for_user, suppressed) =
try_join3(pusher, rules_for_user, suppressed).await?;
let _response = self
.services
.pusher
.send_push_notice(&user_id, unread, &pusher, rules_for_user, &pdu)
.await
.map_err(|e| (Destination::Push(user_id.clone(), pushkey.clone()), e));
if suppressed {
return Ok(Destination::Push(user_id, pushkey));
}
let _sent = events
.iter()
.stream()
.ready_filter_map(|event| extract_variant!(event, SendingEvent::Pdu))
.wide_filter_map(|pdu_id| {
self.services
.timeline
.get_pdu_from_id(pdu_id)
.ok()
})
.ready_filter(|pdu| !pdu.is_redacted())
.wide_filter_map(async |pdu| {
self.services
.pusher
.send_push_notice(&user_id, &pusher, &rules_for_user, &pdu)
.await
.map_err(|e| (Destination::Push(user_id.clone(), pushkey.clone()), e))
.ok()
})
.count()
.await;
Ok(Destination::Push(user_id, pushkey))
}
// optional suppression: heuristic combining presence age and recent sync
// activity.
async fn pushing_suppressed(&self, user_id: &UserId) -> bool {
if !self.services.config.suppress_push_when_active {
return false;
}
let Ok(presence) = self.services.presence.get_presence(user_id).await else {
return false;
};
if presence.content.presence != PresenceState::Online {
return false;
}
let presence_age_ms = presence
.content
.last_active_ago
.map(u64::from)
.unwrap_or(u64::MAX);
if presence_age_ms >= 65_000 {
return false;
}
let sync_gap_ms = self
.services
.presence
.last_sync_gap_ms(user_id)
.await;
let considered_active = sync_gap_ms.is_some_and(|gap| gap < 32_000);
if considered_active {
trace!(?user_id, presence_age_ms, "suppressing push: active heuristic");
}
considered_active
}
async fn send_events_dest_federation(
&self,
server: OwnedServerName,