diff --git a/src/admin/debug/commands.rs b/src/admin/debug/commands.rs index 3442ae09..e09d9159 100644 --- a/src/admin/debug/commands.rs +++ b/src/admin/debug/commands.rs @@ -236,8 +236,8 @@ pub(super) async fn get_remote_pdu( match self .services - .sending - .send_federation_request(&server, ruma::api::federation::event::get_event::v1::Request { + .federation + .execute(&server, ruma::api::federation::event::get_event::v1::Request { event_id: event_id.clone(), }) .await @@ -327,11 +327,8 @@ pub(super) async fn ping(&self, server: OwnedServerName) -> Result { match self .services - .sending - .send_federation_request( - &server, - ruma::api::federation::discovery::get_server_version::v1::Request {}, - ) + .federation + .execute(&server, ruma::api::federation::discovery::get_server_version::v1::Request {}) .await { | Err(e) => { @@ -571,8 +568,8 @@ pub(super) async fn force_set_room_state_from_server( let remote_state_response = self .services - .sending - .send_federation_request(&server_name, get_room_state::v1::Request { + .federation + .execute(&server_name, get_room_state::v1::Request { room_id: room_id.clone(), event_id: first_pdu.event_id().to_owned(), }) diff --git a/src/api/client/appservice.rs b/src/api/client/appservice.rs index bea665f3..516c45d5 100644 --- a/src/api/client/appservice.rs +++ b/src/api/client/appservice.rs @@ -37,13 +37,10 @@ pub(crate) async fn appservice_ping( let timer = tokio::time::Instant::now(); let _response = services - .sending - .send_appservice_request( - appservice_info.registration.clone(), - ping::send_ping::v1::Request { - transaction_id: body.transaction_id.clone(), - }, - ) + .appservice + .send_request(appservice_info.registration.clone(), ping::send_ping::v1::Request { + transaction_id: body.transaction_id.clone(), + }) .await? .expect("We already validated if an appservice URL exists above"); diff --git a/src/api/client/directory.rs b/src/api/client/directory.rs index 2c55c354..d3998339 100644 --- a/src/api/client/directory.rs +++ b/src/api/client/directory.rs @@ -220,8 +220,8 @@ pub(crate) async fn get_public_rooms_filtered_helper( server.filter(|server_name| !services.globals.server_is_ours(server_name)) { let response = services - .sending - .send_federation_request( + .federation + .execute( other_server, federation::directory::get_public_rooms_filtered::v1::Request { limit, diff --git a/src/api/client/keys.rs b/src/api/client/keys.rs index 7df137ed..f186ca72 100644 --- a/src/api/client/keys.rs +++ b/src/api/client/keys.rs @@ -503,10 +503,7 @@ where let request = federation::keys::get_keys::v1::Request { device_keys: device_keys_input_fed }; - let response = services - .sending - .send_federation_request(server, request) - .await; + let response = services.federation.execute(server, request).await; (server, response) }) @@ -631,8 +628,8 @@ pub(crate) async fn claim_keys_helper( ( server, services - .sending - .send_federation_request(server, federation::keys::claim_keys::v1::Request { + .federation + .execute(server, federation::keys::claim_keys::v1::Request { one_time_keys: one_time_keys_input_fed, }) .await, diff --git a/src/api/client/membership/knock.rs b/src/api/client/membership/knock.rs index 3f13aea5..ff72dd9f 100644 --- a/src/api/client/membership/knock.rs +++ b/src/api/client/membership/knock.rs @@ -260,8 +260,8 @@ async fn knock_room_helper_local( }; let send_knock_response = services - .sending - .send_federation_request(&remote_server, send_knock_request) + .federation + .execute(&remote_server, send_knock_request) .await?; info!("send_knock finished"); @@ -396,8 +396,8 @@ async fn knock_room_helper_remote( }; let send_knock_response = services - .sending - .send_federation_request(&remote_server, send_knock_request) + .federation + .execute(&remote_server, send_knock_request) .await?; info!("send_knock finished"); @@ -556,18 +556,15 @@ async fn make_knock_request( info!("Asking {remote_server} for make_knock ({make_knock_counter})"); let make_knock_response = services - .sending - .send_federation_request( - remote_server, - federation::membership::prepare_knock_event::v1::Request { - room_id: room_id.to_owned(), - user_id: sender_user.to_owned(), - ver: services - .server - .supported_room_versions() - .collect(), - }, - ) + .federation + .execute(remote_server, federation::membership::prepare_knock_event::v1::Request { + room_id: room_id.to_owned(), + user_id: sender_user.to_owned(), + ver: services + .server + .supported_room_versions() + .collect(), + }) .await; trace!("make_knock response: {make_knock_response:?}"); diff --git a/src/api/client/profile.rs b/src/api/client/profile.rs index ca37644a..64bb327a 100644 --- a/src/api/client/profile.rs +++ b/src/api/client/profile.rs @@ -68,8 +68,8 @@ pub(crate) async fn get_displayname_route( if !services.globals.user_is_local(&body.user_id) { // Create and update our local copy of the user if let Ok(response) = services - .sending - .send_federation_request( + .federation + .execute( body.user_id.server_name(), federation::query::get_profile_information::v1::Request { user_id: body.user_id.clone(), @@ -169,8 +169,8 @@ pub(crate) async fn get_avatar_url_route( if !services.globals.user_is_local(&body.user_id) { // Create and update our local copy of the user if let Ok(response) = services - .sending - .send_federation_request( + .federation + .execute( body.user_id.server_name(), federation::query::get_profile_information::v1::Request { user_id: body.user_id.clone(), @@ -231,8 +231,8 @@ pub(crate) async fn get_profile_route( if !services.globals.user_is_local(&body.user_id) { // Create and update our local copy of the user if let Ok(response) = services - .sending - .send_federation_request( + .federation + .execute( body.user_id.server_name(), federation::query::get_profile_information::v1::Request { user_id: body.user_id.clone(), diff --git a/src/api/client/room/summary.rs b/src/api/client/room/summary.rs index 127b73eb..7182ec7f 100644 --- a/src/api/client/room/summary.rs +++ b/src/api/client/room/summary.rs @@ -232,8 +232,8 @@ async fn remote_room_summary_hierarchy_response( .iter() .map(|server| { services - .sending - .send_federation_request(server, request.clone()) + .federation + .execute(server, request.clone()) }) .collect(); diff --git a/src/api/client/unstable.rs b/src/api/client/unstable.rs index dc311689..fd389d57 100644 --- a/src/api/client/unstable.rs +++ b/src/api/client/unstable.rs @@ -243,8 +243,8 @@ pub(crate) async fn get_timezone_key_route( if !services.globals.user_is_local(&body.user_id) { // Create and update our local copy of the user if let Ok(response) = services - .sending - .send_federation_request( + .federation + .execute( body.user_id.server_name(), federation::query::get_profile_information::v1::Request { user_id: body.user_id.clone(), @@ -304,8 +304,8 @@ pub(crate) async fn get_profile_field_route( if !services.globals.user_is_local(&body.user_id) { // Create and update our local copy of the user if let Ok(response) = services - .sending - .send_federation_request( + .federation + .execute( body.user_id.server_name(), federation::query::get_profile_information::v1::Request { user_id: body.user_id.clone(), diff --git a/src/api/server/invite.rs b/src/api/server/invite.rs index 77dfb071..94bc018f 100644 --- a/src/api/server/invite.rs +++ b/src/api/server/invite.rs @@ -198,8 +198,8 @@ pub(crate) async fn create_invite_route( for appservice in services.appservice.read().await.values() { if appservice.is_user_match(&invited_user) { services - .sending - .send_appservice_request( + .appservice + .send_request( appservice.registration.clone(), ruma::api::appservice::event::push_events::v1::Request { events: vec![pdu.to_format()], diff --git a/src/service/appservice/mod.rs b/src/service/appservice/mod.rs index 08b79f3a..7d5bc4a6 100644 --- a/src/service/appservice/mod.rs +++ b/src/service/appservice/mod.rs @@ -16,7 +16,6 @@ use tokio::sync::{RwLock, RwLockReadGuard}; use tuwunel_core::{Err, Result, debug, err, utils::stream::IterStream}; use tuwunel_database::Map; -pub(crate) use self::request::send_request; pub use self::{namespace_regex::NamespaceRegex, registration_info::RegistrationInfo}; pub struct Service { diff --git a/src/service/appservice/request.rs b/src/service/appservice/request.rs index 58d3986a..37f1ff8c 100644 --- a/src/service/appservice/request.rs +++ b/src/service/appservice/request.rs @@ -1,19 +1,19 @@ use std::{fmt::Debug, mem}; use bytes::BytesMut; -use reqwest::Client; use ruma::api::{ IncomingResponse, MatrixVersion, OutgoingRequest, SendAccessToken, SupportedVersions, appservice::Registration, }; -use tuwunel_core::{Err, Result, debug_error, err, trace, utils, warn}; +use tuwunel_core::{Err, Result, debug_error, err, implement, trace, utils, warn}; /// Sends a request to an appservice /// /// Only returns Ok(None) if there is no url specified in the appservice /// registration file -pub(crate) async fn send_request( - client: &Client, +#[implement(super::Service)] +pub async fn send_request( + &self, registration: Registration, request: T, ) -> Result> @@ -25,6 +25,7 @@ where versions: VERSIONS.into(), features: Default::default(), }; + let client = &self.services.client.appservice; let Some(dest) = registration.url else { return Ok(None); diff --git a/src/service/media/remote.rs b/src/service/media/remote.rs index 16c90b47..acd81872 100644 --- a/src/service/media/remote.rs +++ b/src/service/media/remote.rs @@ -320,8 +320,8 @@ where Request: OutgoingRequest + Send + Debug, { self.services - .sending - .send_federation_request(server.unwrap_or(mxc.server_name), request) + .federation + .execute(server.unwrap_or(mxc.server_name), request) .await .map_err(|error| handle_federation_error(mxc, user, server, error)) } @@ -374,8 +374,8 @@ pub async fn fetch_remote_thumbnail_legacy( self.check_fetch_authorized(&mxc)?; let response = self .services - .sending - .send_federation_request(mxc.server_name, media::get_content_thumbnail::v3::Request { + .federation + .execute(mxc.server_name, media::get_content_thumbnail::v3::Request { allow_remote: body.allow_remote, height: body.height, width: body.width, @@ -414,8 +414,8 @@ pub async fn fetch_remote_content_legacy( self.check_fetch_authorized(mxc)?; let response = self .services - .sending - .send_federation_request(mxc.server_name, media::get_content::v3::Request { + .federation + .execute(mxc.server_name, media::get_content::v3::Request { allow_remote: true, server_name: mxc.server_name.into(), media_id: mxc.media_id.into(), diff --git a/src/service/membership/invite.rs b/src/service/membership/invite.rs index a33c2d23..a92e87a1 100644 --- a/src/service/membership/invite.rs +++ b/src/service/membership/invite.rs @@ -83,8 +83,8 @@ async fn remote_invite( let response = self .services - .sending - .send_federation_request(user_id.server_name(), create_invite::v2::Request { + .federation + .execute(user_id.server_name(), create_invite::v2::Request { room_id: room_id.to_owned(), event_id: (*pdu.event_id).to_owned(), room_version: room_version_id.clone(), diff --git a/src/service/membership/join.rs b/src/service/membership/join.rs index fe4ec237..0b078a94 100644 --- a/src/service/membership/join.rs +++ b/src/service/membership/join.rs @@ -251,8 +251,8 @@ pub async fn join_remote( let send_join_response = match self .services - .sending - .send_synapse_request(&remote_server, send_join_request) + .federation + .execute(&remote_server, send_join_request) .await { | Ok(response) => response, @@ -704,20 +704,17 @@ pub async fn join_local( let send_join_response = self .services - .sending - .send_synapse_request( - &remote_server, - federation::membership::create_join_event::v2::Request { - room_id: room_id.to_owned(), - event_id: event_id.clone(), - omit_members: false, - pdu: self - .services - .federation - .format_pdu_into(join_event.clone(), Some(&room_version_id)) - .await, - }, - ) + .federation + .execute(&remote_server, federation::membership::create_join_event::v2::Request { + room_id: room_id.to_owned(), + event_id: event_id.clone(), + omit_members: false, + pdu: self + .services + .federation + .format_pdu_into(join_event.clone(), Some(&room_version_id)) + .await, + }) .await?; if let Some(signed_raw) = send_join_response.room_state.event { @@ -774,19 +771,16 @@ async fn make_join_request( info!("Asking {remote_server} for make_join ({make_join_counter})"); let make_join_response = self .services - .sending - .send_federation_request( - remote_server, - federation::membership::prepare_join_event::v1::Request { - room_id: room_id.to_owned(), - user_id: sender_user.to_owned(), - ver: self - .services - .server - .supported_room_versions() - .collect(), - }, - ) + .federation + .execute(remote_server, federation::membership::prepare_join_event::v1::Request { + room_id: room_id.to_owned(), + user_id: sender_user.to_owned(), + ver: self + .services + .server + .supported_room_versions() + .collect(), + }) .await; trace!("make_join response: {make_join_response:?}"); diff --git a/src/service/membership/leave.rs b/src/service/membership/leave.rs index da10ca14..89af7a0c 100644 --- a/src/service/membership/leave.rs +++ b/src/service/membership/leave.rs @@ -243,14 +243,11 @@ async fn remote_leave(&self, user_id: &UserId, room_id: &RoomId) -> Result { { let make_leave_response = self .services - .sending - .send_federation_request( - &remote_server, - federation::membership::prepare_leave_event::v1::Request { - room_id: room_id.to_owned(), - user_id: user_id.to_owned(), - }, - ) + .federation + .execute(&remote_server, federation::membership::prepare_leave_event::v1::Request { + room_id: room_id.to_owned(), + user_id: user_id.to_owned(), + }) .await; make_leave_response_and_server = make_leave_response.map(|r| (r, remote_server)); @@ -317,19 +314,16 @@ async fn remote_leave(&self, user_id: &UserId, room_id: &RoomId) -> Result { let leave_event = leave_event_stub; self.services - .sending - .send_federation_request( - &remote_server, - federation::membership::create_leave_event::v2::Request { - room_id: room_id.to_owned(), - event_id, - pdu: self - .services - .federation - .format_pdu_into(leave_event.clone(), Some(&room_version_id)) - .await, - }, - ) + .federation + .execute(&remote_server, federation::membership::create_leave_event::v2::Request { + room_id: room_id.to_owned(), + event_id, + pdu: self + .services + .federation + .format_pdu_into(leave_event.clone(), Some(&room_version_id)) + .await, + }) .await?; Ok(()) diff --git a/src/service/rooms/alias/mod.rs b/src/service/rooms/alias/mod.rs index 3e69cee0..3b9179ee 100644 --- a/src/service/rooms/alias/mod.rs +++ b/src/service/rooms/alias/mod.rs @@ -151,8 +151,8 @@ impl Service { let response = self .services - .sending - .send_federation_request(server, request) + .federation + .execute(server, request) .await?; Ok((response.room_id, response.servers)) @@ -257,8 +257,8 @@ impl Service { if appservice.aliases.is_match(room_alias.as_str()) && matches!( self.services - .sending - .send_appservice_request( + .appservice + .send_request( appservice.registration.clone(), query_room_alias::v1::Request { room_alias: room_alias.to_owned() }, ) diff --git a/src/service/rooms/event_handler/fetch_auth.rs b/src/service/rooms/event_handler/fetch_auth.rs index 9bff020c..a3b496ca 100644 --- a/src/service/rooms/event_handler/fetch_auth.rs +++ b/src/service/rooms/event_handler/fetch_auth.rs @@ -139,8 +139,8 @@ async fn fetch_auth_chain( debug!("Fetching {next_id} over federation."); let Ok(res) = self .services - .sending - .send_federation_request(origin, get_event::v1::Request { event_id: next_id.clone() }) + .federation + .execute(origin, get_event::v1::Request { event_id: next_id.clone() }) .await .inspect_err(|e| debug_error!("Failed to fetch event {next_id}: {e}")) else { diff --git a/src/service/rooms/event_handler/fetch_state.rs b/src/service/rooms/event_handler/fetch_state.rs index e5c13567..c95f4caa 100644 --- a/src/service/rooms/event_handler/fetch_state.rs +++ b/src/service/rooms/event_handler/fetch_state.rs @@ -28,8 +28,8 @@ pub(super) async fn fetch_state( ) -> Result>> { let res = self .services - .sending - .send_federation_request(origin, get_room_state_ids::v1::Request { + .federation + .execute(origin, get_room_state_ids::v1::Request { room_id: room_id.to_owned(), event_id: event_id.to_owned(), }) diff --git a/src/service/rooms/spaces/mod.rs b/src/service/rooms/spaces/mod.rs index a87dcccd..0cfebf3b 100644 --- a/src/service/rooms/spaces/mod.rs +++ b/src/service/rooms/spaces/mod.rs @@ -170,8 +170,8 @@ async fn get_summary_and_children_federation( .iter() .map(|server| { self.services - .sending - .send_federation_request(server, request.clone()) + .federation + .execute(server, request.clone()) }) .collect(); diff --git a/src/service/rooms/timeline/backfill.rs b/src/service/rooms/timeline/backfill.rs index 6aac1c1e..8f125f87 100644 --- a/src/service/rooms/timeline/backfill.rs +++ b/src/service/rooms/timeline/backfill.rs @@ -125,8 +125,8 @@ pub async fn backfill_if_required(&self, room_id: &RoomId, from: PduCount) -> Re debug_info!("Asking {backfill_server} for backfill"); if let Ok(response) = self .services - .sending - .send_federation_request(backfill_server, request) + .federation + .execute(backfill_server, request) .inspect_err(|e| { warn!("{backfill_server} failed backfilling for room {room_id}: {e}"); }) diff --git a/src/service/sending/mod.rs b/src/service/sending/mod.rs index 8ece9b11..3e9e5e6e 100644 --- a/src/service/sending/mod.rs +++ b/src/service/sending/mod.rs @@ -11,10 +11,7 @@ use std::{ use async_trait::async_trait; use futures::{FutureExt, Stream, StreamExt}; -use ruma::{ - RoomId, ServerName, UserId, - api::{OutgoingRequest, appservice::Registration}, -}; +use ruma::{RoomId, ServerName, UserId}; use tokio::{task, task::JoinSet}; use tuwunel_core::{ Result, Server, debug, debug_warn, err, error, @@ -28,7 +25,7 @@ pub use self::{ dest::Destination, sender::{EDU_LIMIT, PDU_LIMIT}, }; -use crate::{appservice, rooms::timeline::RawPduId}; +use crate::rooms::timeline::RawPduId; pub struct Service { pub db: Data, @@ -270,54 +267,6 @@ impl Service { .await } - /// Sends a request to a federation server - #[inline] - pub async fn send_federation_request( - &self, - dest: &ServerName, - request: T, - ) -> Result - where - T: OutgoingRequest + Debug + Send, - { - self.services - .federation - .execute(dest, request) - .await - } - - /// Like send_federation_request() but with a very large timeout - #[inline] - pub async fn send_synapse_request( - &self, - dest: &ServerName, - request: T, - ) -> Result - where - T: OutgoingRequest + Debug + Send, - { - self.services - .federation - .execute_synapse(dest, request) - .await - } - - /// Sends a request to an appservice - /// - /// Only returns None if there is no url specified in the appservice - /// registration file - pub async fn send_appservice_request( - &self, - registration: Registration, - request: T, - ) -> Result> - where - T: OutgoingRequest + Debug + Send, - { - let client = &self.services.client.appservice; - appservice::send_request(client, registration, request).await - } - /// Clean up queued sending event data /// /// Used after we remove an appservice registration or a user deletes a push diff --git a/src/service/sending/sender.rs b/src/service/sending/sender.rs index a5bc9a26..ac0a6cc0 100644 --- a/src/service/sending/sender.rs +++ b/src/service/sending/sender.rs @@ -51,7 +51,6 @@ use tuwunel_core::{ }; use super::{Destination, EduBuf, EduVec, Msg, SendingEvent, Service, data::QueueItem}; -use crate::appservice; #[derive(Debug)] enum TransactionStatus { @@ -759,18 +758,16 @@ impl Service { //debug_assert!(pdu_jsons.len() + edu_jsons.len() > 0, "sending empty // transaction"); - let client = &self.services.client.appservice; - match appservice::send_request( - client, - appservice, - ruma::api::appservice::event::push_events::v1::Request { + match self + .services + .appservice + .send_request(appservice, ruma::api::appservice::event::push_events::v1::Request { txn_id: txn_id.into(), events: pdu_jsons, ephemeral: edu_jsons, to_device: Vec::new(), // TODO - }, - ) - .await + }) + .await { | Ok(_) => Ok(Destination::Appservice(id)), | Err(e) => Err((Destination::Appservice(id), e)), diff --git a/src/service/server_keys/request.rs b/src/service/server_keys/request.rs index 11f93d5f..f88afe50 100644 --- a/src/service/server_keys/request.rs +++ b/src/service/server_keys/request.rs @@ -65,8 +65,8 @@ where let response = self .services - .sending - .send_synapse_request(notary, request) + .federation + .execute_synapse(notary, request) .await? .server_keys .into_iter() @@ -94,8 +94,8 @@ pub async fn notary_request( let response = self .services - .sending - .send_federation_request(notary, request) + .federation + .execute(notary, request) .await? .server_keys .into_iter() @@ -111,8 +111,8 @@ pub async fn server_request(&self, target: &ServerName) -> Result