Replace calls through sender request interface.

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk
2025-11-25 08:49:49 +00:00
parent 71f3ccf140
commit 8bb0d02619
23 changed files with 119 additions and 197 deletions

View File

@@ -236,8 +236,8 @@ pub(super) async fn get_remote_pdu(
match self
.services
.sending
.send_federation_request(&server, ruma::api::federation::event::get_event::v1::Request {
.federation
.execute(&server, ruma::api::federation::event::get_event::v1::Request {
event_id: event_id.clone(),
})
.await
@@ -327,11 +327,8 @@ pub(super) async fn ping(&self, server: OwnedServerName) -> Result {
match self
.services
.sending
.send_federation_request(
&server,
ruma::api::federation::discovery::get_server_version::v1::Request {},
)
.federation
.execute(&server, ruma::api::federation::discovery::get_server_version::v1::Request {})
.await
{
| Err(e) => {
@@ -571,8 +568,8 @@ pub(super) async fn force_set_room_state_from_server(
let remote_state_response = self
.services
.sending
.send_federation_request(&server_name, get_room_state::v1::Request {
.federation
.execute(&server_name, get_room_state::v1::Request {
room_id: room_id.clone(),
event_id: first_pdu.event_id().to_owned(),
})

View File

@@ -37,13 +37,10 @@ pub(crate) async fn appservice_ping(
let timer = tokio::time::Instant::now();
let _response = services
.sending
.send_appservice_request(
appservice_info.registration.clone(),
ping::send_ping::v1::Request {
transaction_id: body.transaction_id.clone(),
},
)
.appservice
.send_request(appservice_info.registration.clone(), ping::send_ping::v1::Request {
transaction_id: body.transaction_id.clone(),
})
.await?
.expect("We already validated if an appservice URL exists above");

View File

@@ -220,8 +220,8 @@ pub(crate) async fn get_public_rooms_filtered_helper(
server.filter(|server_name| !services.globals.server_is_ours(server_name))
{
let response = services
.sending
.send_federation_request(
.federation
.execute(
other_server,
federation::directory::get_public_rooms_filtered::v1::Request {
limit,

View File

@@ -503,10 +503,7 @@ where
let request =
federation::keys::get_keys::v1::Request { device_keys: device_keys_input_fed };
let response = services
.sending
.send_federation_request(server, request)
.await;
let response = services.federation.execute(server, request).await;
(server, response)
})
@@ -631,8 +628,8 @@ pub(crate) async fn claim_keys_helper(
(
server,
services
.sending
.send_federation_request(server, federation::keys::claim_keys::v1::Request {
.federation
.execute(server, federation::keys::claim_keys::v1::Request {
one_time_keys: one_time_keys_input_fed,
})
.await,

View File

@@ -260,8 +260,8 @@ async fn knock_room_helper_local(
};
let send_knock_response = services
.sending
.send_federation_request(&remote_server, send_knock_request)
.federation
.execute(&remote_server, send_knock_request)
.await?;
info!("send_knock finished");
@@ -396,8 +396,8 @@ async fn knock_room_helper_remote(
};
let send_knock_response = services
.sending
.send_federation_request(&remote_server, send_knock_request)
.federation
.execute(&remote_server, send_knock_request)
.await?;
info!("send_knock finished");
@@ -556,18 +556,15 @@ async fn make_knock_request(
info!("Asking {remote_server} for make_knock ({make_knock_counter})");
let make_knock_response = services
.sending
.send_federation_request(
remote_server,
federation::membership::prepare_knock_event::v1::Request {
room_id: room_id.to_owned(),
user_id: sender_user.to_owned(),
ver: services
.server
.supported_room_versions()
.collect(),
},
)
.federation
.execute(remote_server, federation::membership::prepare_knock_event::v1::Request {
room_id: room_id.to_owned(),
user_id: sender_user.to_owned(),
ver: services
.server
.supported_room_versions()
.collect(),
})
.await;
trace!("make_knock response: {make_knock_response:?}");

View File

@@ -68,8 +68,8 @@ pub(crate) async fn get_displayname_route(
if !services.globals.user_is_local(&body.user_id) {
// Create and update our local copy of the user
if let Ok(response) = services
.sending
.send_federation_request(
.federation
.execute(
body.user_id.server_name(),
federation::query::get_profile_information::v1::Request {
user_id: body.user_id.clone(),
@@ -169,8 +169,8 @@ pub(crate) async fn get_avatar_url_route(
if !services.globals.user_is_local(&body.user_id) {
// Create and update our local copy of the user
if let Ok(response) = services
.sending
.send_federation_request(
.federation
.execute(
body.user_id.server_name(),
federation::query::get_profile_information::v1::Request {
user_id: body.user_id.clone(),
@@ -231,8 +231,8 @@ pub(crate) async fn get_profile_route(
if !services.globals.user_is_local(&body.user_id) {
// Create and update our local copy of the user
if let Ok(response) = services
.sending
.send_federation_request(
.federation
.execute(
body.user_id.server_name(),
federation::query::get_profile_information::v1::Request {
user_id: body.user_id.clone(),

View File

@@ -232,8 +232,8 @@ async fn remote_room_summary_hierarchy_response(
.iter()
.map(|server| {
services
.sending
.send_federation_request(server, request.clone())
.federation
.execute(server, request.clone())
})
.collect();

View File

@@ -243,8 +243,8 @@ pub(crate) async fn get_timezone_key_route(
if !services.globals.user_is_local(&body.user_id) {
// Create and update our local copy of the user
if let Ok(response) = services
.sending
.send_federation_request(
.federation
.execute(
body.user_id.server_name(),
federation::query::get_profile_information::v1::Request {
user_id: body.user_id.clone(),
@@ -304,8 +304,8 @@ pub(crate) async fn get_profile_field_route(
if !services.globals.user_is_local(&body.user_id) {
// Create and update our local copy of the user
if let Ok(response) = services
.sending
.send_federation_request(
.federation
.execute(
body.user_id.server_name(),
federation::query::get_profile_information::v1::Request {
user_id: body.user_id.clone(),

View File

@@ -198,8 +198,8 @@ pub(crate) async fn create_invite_route(
for appservice in services.appservice.read().await.values() {
if appservice.is_user_match(&invited_user) {
services
.sending
.send_appservice_request(
.appservice
.send_request(
appservice.registration.clone(),
ruma::api::appservice::event::push_events::v1::Request {
events: vec![pdu.to_format()],

View File

@@ -16,7 +16,6 @@ use tokio::sync::{RwLock, RwLockReadGuard};
use tuwunel_core::{Err, Result, debug, err, utils::stream::IterStream};
use tuwunel_database::Map;
pub(crate) use self::request::send_request;
pub use self::{namespace_regex::NamespaceRegex, registration_info::RegistrationInfo};
pub struct Service {

View File

@@ -1,19 +1,19 @@
use std::{fmt::Debug, mem};
use bytes::BytesMut;
use reqwest::Client;
use ruma::api::{
IncomingResponse, MatrixVersion, OutgoingRequest, SendAccessToken, SupportedVersions,
appservice::Registration,
};
use tuwunel_core::{Err, Result, debug_error, err, trace, utils, warn};
use tuwunel_core::{Err, Result, debug_error, err, implement, trace, utils, warn};
/// Sends a request to an appservice
///
/// Only returns Ok(None) if there is no url specified in the appservice
/// registration file
pub(crate) async fn send_request<T>(
client: &Client,
#[implement(super::Service)]
pub async fn send_request<T>(
&self,
registration: Registration,
request: T,
) -> Result<Option<T::IncomingResponse>>
@@ -25,6 +25,7 @@ where
versions: VERSIONS.into(),
features: Default::default(),
};
let client = &self.services.client.appservice;
let Some(dest) = registration.url else {
return Ok(None);

View File

@@ -320,8 +320,8 @@ where
Request: OutgoingRequest + Send + Debug,
{
self.services
.sending
.send_federation_request(server.unwrap_or(mxc.server_name), request)
.federation
.execute(server.unwrap_or(mxc.server_name), request)
.await
.map_err(|error| handle_federation_error(mxc, user, server, error))
}
@@ -374,8 +374,8 @@ pub async fn fetch_remote_thumbnail_legacy(
self.check_fetch_authorized(&mxc)?;
let response = self
.services
.sending
.send_federation_request(mxc.server_name, media::get_content_thumbnail::v3::Request {
.federation
.execute(mxc.server_name, media::get_content_thumbnail::v3::Request {
allow_remote: body.allow_remote,
height: body.height,
width: body.width,
@@ -414,8 +414,8 @@ pub async fn fetch_remote_content_legacy(
self.check_fetch_authorized(mxc)?;
let response = self
.services
.sending
.send_federation_request(mxc.server_name, media::get_content::v3::Request {
.federation
.execute(mxc.server_name, media::get_content::v3::Request {
allow_remote: true,
server_name: mxc.server_name.into(),
media_id: mxc.media_id.into(),

View File

@@ -83,8 +83,8 @@ async fn remote_invite(
let response = self
.services
.sending
.send_federation_request(user_id.server_name(), create_invite::v2::Request {
.federation
.execute(user_id.server_name(), create_invite::v2::Request {
room_id: room_id.to_owned(),
event_id: (*pdu.event_id).to_owned(),
room_version: room_version_id.clone(),

View File

@@ -251,8 +251,8 @@ pub async fn join_remote(
let send_join_response = match self
.services
.sending
.send_synapse_request(&remote_server, send_join_request)
.federation
.execute(&remote_server, send_join_request)
.await
{
| Ok(response) => response,
@@ -704,20 +704,17 @@ pub async fn join_local(
let send_join_response = self
.services
.sending
.send_synapse_request(
&remote_server,
federation::membership::create_join_event::v2::Request {
room_id: room_id.to_owned(),
event_id: event_id.clone(),
omit_members: false,
pdu: self
.services
.federation
.format_pdu_into(join_event.clone(), Some(&room_version_id))
.await,
},
)
.federation
.execute(&remote_server, federation::membership::create_join_event::v2::Request {
room_id: room_id.to_owned(),
event_id: event_id.clone(),
omit_members: false,
pdu: self
.services
.federation
.format_pdu_into(join_event.clone(), Some(&room_version_id))
.await,
})
.await?;
if let Some(signed_raw) = send_join_response.room_state.event {
@@ -774,19 +771,16 @@ async fn make_join_request(
info!("Asking {remote_server} for make_join ({make_join_counter})");
let make_join_response = self
.services
.sending
.send_federation_request(
remote_server,
federation::membership::prepare_join_event::v1::Request {
room_id: room_id.to_owned(),
user_id: sender_user.to_owned(),
ver: self
.services
.server
.supported_room_versions()
.collect(),
},
)
.federation
.execute(remote_server, federation::membership::prepare_join_event::v1::Request {
room_id: room_id.to_owned(),
user_id: sender_user.to_owned(),
ver: self
.services
.server
.supported_room_versions()
.collect(),
})
.await;
trace!("make_join response: {make_join_response:?}");

View File

@@ -243,14 +243,11 @@ async fn remote_leave(&self, user_id: &UserId, room_id: &RoomId) -> Result {
{
let make_leave_response = self
.services
.sending
.send_federation_request(
&remote_server,
federation::membership::prepare_leave_event::v1::Request {
room_id: room_id.to_owned(),
user_id: user_id.to_owned(),
},
)
.federation
.execute(&remote_server, federation::membership::prepare_leave_event::v1::Request {
room_id: room_id.to_owned(),
user_id: user_id.to_owned(),
})
.await;
make_leave_response_and_server = make_leave_response.map(|r| (r, remote_server));
@@ -317,19 +314,16 @@ async fn remote_leave(&self, user_id: &UserId, room_id: &RoomId) -> Result {
let leave_event = leave_event_stub;
self.services
.sending
.send_federation_request(
&remote_server,
federation::membership::create_leave_event::v2::Request {
room_id: room_id.to_owned(),
event_id,
pdu: self
.services
.federation
.format_pdu_into(leave_event.clone(), Some(&room_version_id))
.await,
},
)
.federation
.execute(&remote_server, federation::membership::create_leave_event::v2::Request {
room_id: room_id.to_owned(),
event_id,
pdu: self
.services
.federation
.format_pdu_into(leave_event.clone(), Some(&room_version_id))
.await,
})
.await?;
Ok(())

View File

@@ -151,8 +151,8 @@ impl Service {
let response = self
.services
.sending
.send_federation_request(server, request)
.federation
.execute(server, request)
.await?;
Ok((response.room_id, response.servers))
@@ -257,8 +257,8 @@ impl Service {
if appservice.aliases.is_match(room_alias.as_str())
&& matches!(
self.services
.sending
.send_appservice_request(
.appservice
.send_request(
appservice.registration.clone(),
query_room_alias::v1::Request { room_alias: room_alias.to_owned() },
)

View File

@@ -139,8 +139,8 @@ async fn fetch_auth_chain(
debug!("Fetching {next_id} over federation.");
let Ok(res) = self
.services
.sending
.send_federation_request(origin, get_event::v1::Request { event_id: next_id.clone() })
.federation
.execute(origin, get_event::v1::Request { event_id: next_id.clone() })
.await
.inspect_err(|e| debug_error!("Failed to fetch event {next_id}: {e}"))
else {

View File

@@ -28,8 +28,8 @@ pub(super) async fn fetch_state(
) -> Result<Option<HashMap<u64, OwnedEventId>>> {
let res = self
.services
.sending
.send_federation_request(origin, get_room_state_ids::v1::Request {
.federation
.execute(origin, get_room_state_ids::v1::Request {
room_id: room_id.to_owned(),
event_id: event_id.to_owned(),
})

View File

@@ -170,8 +170,8 @@ async fn get_summary_and_children_federation(
.iter()
.map(|server| {
self.services
.sending
.send_federation_request(server, request.clone())
.federation
.execute(server, request.clone())
})
.collect();

View File

@@ -125,8 +125,8 @@ pub async fn backfill_if_required(&self, room_id: &RoomId, from: PduCount) -> Re
debug_info!("Asking {backfill_server} for backfill");
if let Ok(response) = self
.services
.sending
.send_federation_request(backfill_server, request)
.federation
.execute(backfill_server, request)
.inspect_err(|e| {
warn!("{backfill_server} failed backfilling for room {room_id}: {e}");
})

View File

@@ -11,10 +11,7 @@ use std::{
use async_trait::async_trait;
use futures::{FutureExt, Stream, StreamExt};
use ruma::{
RoomId, ServerName, UserId,
api::{OutgoingRequest, appservice::Registration},
};
use ruma::{RoomId, ServerName, UserId};
use tokio::{task, task::JoinSet};
use tuwunel_core::{
Result, Server, debug, debug_warn, err, error,
@@ -28,7 +25,7 @@ pub use self::{
dest::Destination,
sender::{EDU_LIMIT, PDU_LIMIT},
};
use crate::{appservice, rooms::timeline::RawPduId};
use crate::rooms::timeline::RawPduId;
pub struct Service {
pub db: Data,
@@ -270,54 +267,6 @@ impl Service {
.await
}
/// Sends a request to a federation server
#[inline]
pub async fn send_federation_request<T>(
&self,
dest: &ServerName,
request: T,
) -> Result<T::IncomingResponse>
where
T: OutgoingRequest + Debug + Send,
{
self.services
.federation
.execute(dest, request)
.await
}
/// Like send_federation_request() but with a very large timeout
#[inline]
pub async fn send_synapse_request<T>(
&self,
dest: &ServerName,
request: T,
) -> Result<T::IncomingResponse>
where
T: OutgoingRequest + Debug + Send,
{
self.services
.federation
.execute_synapse(dest, request)
.await
}
/// Sends a request to an appservice
///
/// Only returns None if there is no url specified in the appservice
/// registration file
pub async fn send_appservice_request<T>(
&self,
registration: Registration,
request: T,
) -> Result<Option<T::IncomingResponse>>
where
T: OutgoingRequest + Debug + Send,
{
let client = &self.services.client.appservice;
appservice::send_request(client, registration, request).await
}
/// Clean up queued sending event data
///
/// Used after we remove an appservice registration or a user deletes a push

View File

@@ -51,7 +51,6 @@ use tuwunel_core::{
};
use super::{Destination, EduBuf, EduVec, Msg, SendingEvent, Service, data::QueueItem};
use crate::appservice;
#[derive(Debug)]
enum TransactionStatus {
@@ -759,18 +758,16 @@ impl Service {
//debug_assert!(pdu_jsons.len() + edu_jsons.len() > 0, "sending empty
// transaction");
let client = &self.services.client.appservice;
match appservice::send_request(
client,
appservice,
ruma::api::appservice::event::push_events::v1::Request {
match self
.services
.appservice
.send_request(appservice, ruma::api::appservice::event::push_events::v1::Request {
txn_id: txn_id.into(),
events: pdu_jsons,
ephemeral: edu_jsons,
to_device: Vec::new(), // TODO
},
)
.await
})
.await
{
| Ok(_) => Ok(Destination::Appservice(id)),
| Err(e) => Err((Destination::Appservice(id), e)),

View File

@@ -65,8 +65,8 @@ where
let response = self
.services
.sending
.send_synapse_request(notary, request)
.federation
.execute_synapse(notary, request)
.await?
.server_keys
.into_iter()
@@ -94,8 +94,8 @@ pub async fn notary_request(
let response = self
.services
.sending
.send_federation_request(notary, request)
.federation
.execute(notary, request)
.await?
.server_keys
.into_iter()
@@ -111,8 +111,8 @@ pub async fn server_request(&self, target: &ServerName) -> Result<ServerSigningK
let server_signing_key = self
.services
.sending
.send_federation_request(target, Request::new())
.federation
.execute(target, Request::new())
.await
.map(|response| response.server_key)
.and_then(|key| key.deserialize().map_err(Into::into))?;