diff --git a/src/service/appservice/append.rs b/src/service/appservice/append.rs new file mode 100644 index 00000000..980ef28f --- /dev/null +++ b/src/service/appservice/append.rs @@ -0,0 +1,103 @@ +use ruma::{UserId, events::TimelineEventType}; +use tuwunel_core::{ + Result, error, implement, + matrix::{ + event::Event, + pdu::{Pdu, RawPduId}, + }, + utils::ReadyExt, +}; + +use super::{NamespaceRegex, RegistrationInfo}; + +/// Called by timeline::append() after accepting new PDU. +#[implement(super::Service)] +#[tracing::instrument(name = "append", level = "debug", skip_all)] +pub(crate) async fn append_pdu(&self, pdu_id: RawPduId, pdu: &Pdu) -> Result { + for appservice in self.read().await.values() { + self.append_pdu_to(appservice, pdu_id, pdu) + .await + .inspect_err(|e| { + error!( + event_id = %pdu.event_id(), + appservice = ?appservice.registration.id, + "Failed to send PDU to appservice: {e}" + ); + }) + .ok(); + } + + Ok(()) +} + +#[implement(super::Service)] +#[tracing::instrument( + name = "append_to", + level = "debug", + skip_all, + fields(id = %appservice.registration.id), +)] +async fn append_pdu_to( + &self, + appservice: &RegistrationInfo, + pdu_id: RawPduId, + pdu: &Pdu, +) -> Result { + if self + .services + .state_cache + .appservice_in_room(pdu.room_id(), appservice) + .await + { + self.services + .sending + .send_pdu_appservice(appservice.registration.id.clone(), pdu_id)?; + + return Ok(()); + } + + // If the RoomMember event has a non-empty state_key, it is targeted at someone. + // If it is our appservice user, we send this PDU to it. + if *pdu.kind() == TimelineEventType::RoomMember { + if let Some(state_key_uid) = &pdu + .state_key + .as_ref() + .and_then(|state_key| UserId::parse(state_key.as_str()).ok()) + { + let appservice_uid = appservice.registration.sender_localpart.as_str(); + if state_key_uid == &appservice_uid { + self.services + .sending + .send_pdu_appservice(appservice.registration.id.clone(), pdu_id)?; + + return Ok(()); + } + } + } + + let matching_users = |users: &NamespaceRegex| { + appservice.users.is_match(pdu.sender().as_str()) + || *pdu.kind() == TimelineEventType::RoomMember + && pdu + .state_key + .as_ref() + .is_some_and(|state_key| users.is_match(state_key)) + }; + let matching_aliases = |aliases: NamespaceRegex| { + self.services + .alias + .local_aliases_for_room(pdu.room_id()) + .ready_any(move |room_alias| aliases.is_match(room_alias.as_str())) + }; + + if matching_aliases(appservice.aliases.clone()).await + || appservice.rooms.is_match(pdu.room_id().as_str()) + || matching_users(&appservice.users) + { + self.services + .sending + .send_pdu_appservice(appservice.registration.id.clone(), pdu_id)?; + } + + Ok(()) +} diff --git a/src/service/appservice/mod.rs b/src/service/appservice/mod.rs index b99ed7aa..24382ca3 100644 --- a/src/service/appservice/mod.rs +++ b/src/service/appservice/mod.rs @@ -1,3 +1,4 @@ +mod append; mod namespace_regex; mod registration_info; diff --git a/src/service/rooms/timeline/append.rs b/src/service/rooms/timeline/append.rs index 3c94bfbd..bbe5a870 100644 --- a/src/service/rooms/timeline/append.rs +++ b/src/service/rooms/timeline/append.rs @@ -23,12 +23,12 @@ use tuwunel_core::{ event::Event, pdu::{PduCount, PduEvent, PduId, RawPduId}, }, - utils::{self, ReadyExt}, + utils::{self, ReadyExt, result::LogErr}, }; use tuwunel_database::{Json, Map}; use super::{ExtractBody, ExtractRelatesTo, ExtractRelatesToEventId, RoomMutexGuard}; -use crate::{appservice::NamespaceRegex, rooms::state_compressor::CompressedState}; +use crate::rooms::state_compressor::CompressedState; /// Append the incoming event setting the state snapshot to the state from /// the server that sent the event. @@ -426,63 +426,12 @@ where drop(next_count1); drop(next_count2); - for appservice in self.services.appservice.read().await.values() { - if self - .services - .state_cache - .appservice_in_room(pdu.room_id(), appservice) - .await - { - self.services - .sending - .send_pdu_appservice(appservice.registration.id.clone(), pdu_id)?; - - continue; - } - - // If the RoomMember event has a non-empty state_key, it is targeted at someone. - // If it is our appservice user, we send this PDU to it. - if *pdu.kind() == TimelineEventType::RoomMember { - if let Some(state_key_uid) = &pdu - .state_key - .as_ref() - .and_then(|state_key| UserId::parse(state_key.as_str()).ok()) - { - let appservice_uid = appservice.registration.sender_localpart.as_str(); - if state_key_uid == &appservice_uid { - self.services - .sending - .send_pdu_appservice(appservice.registration.id.clone(), pdu_id)?; - - continue; - } - } - } - - let matching_users = |users: &NamespaceRegex| { - appservice.users.is_match(pdu.sender().as_str()) - || *pdu.kind() == TimelineEventType::RoomMember - && pdu - .state_key - .as_ref() - .is_some_and(|state_key| users.is_match(state_key)) - }; - let matching_aliases = |aliases: NamespaceRegex| { - self.services - .alias - .local_aliases_for_room(pdu.room_id()) - .ready_any(move |room_alias| aliases.is_match(room_alias.as_str())) - }; - - if matching_aliases(appservice.aliases.clone()).await - || appservice.rooms.is_match(pdu.room_id().as_str()) - || matching_users(&appservice.users) - { - self.services - .sending - .send_pdu_appservice(appservice.registration.id.clone(), pdu_id)?; - } - } + self.services + .appservice + .append_pdu(pdu_id, pdu) + .await + .log_err() + .ok(); Ok(pdu_id) }