Re-establish federating with several Conduit endpoints.

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk
2025-11-03 11:15:23 +00:00
parent 05390d6097
commit 06618eadab

View File

@@ -7,8 +7,8 @@ use reqwest::{Client, Method, Request, Response, Url};
use ruma::{ use ruma::{
CanonicalJsonName, CanonicalJsonObject, CanonicalJsonValue, ServerName, ServerSigningKeyId, CanonicalJsonName, CanonicalJsonObject, CanonicalJsonValue, ServerName, ServerSigningKeyId,
api::{ api::{
EndpointError, IncomingResponse, MatrixVersion, OutgoingRequest, SendAccessToken, AuthScheme, EndpointError, IncomingResponse, MatrixVersion, OutgoingRequest,
SupportedVersions, client::error::Error as RumaError, SendAccessToken, SupportedVersions, client::error::Error as RumaError,
federation::authentication::XMatrix, federation::authentication::XMatrix,
}, },
serde::Base64, serde::Base64,
@@ -48,10 +48,10 @@ where
#[implement(super::Service)] #[implement(super::Service)]
#[tracing::instrument( #[tracing::instrument(
name = "fed", name = "fed",
level = INFO_SPAN_LEVEL, level = INFO_SPAN_LEVEL,
skip(self, client, request), skip(self, client, request),
)] )]
pub async fn execute_on<T>( pub async fn execute_on<T>(
&self, &self,
client: &Client, client: &Client,
@@ -81,17 +81,16 @@ where
.get_actual_dest(dest) .get_actual_dest(dest)
.await?; .await?;
let request = into_http_request::<T>(&actual, request)?; let request = self.prepare(&actual, dest, request)?;
let request = self.prepare(dest, request)?; self.perform::<T>(&actual, dest, request, client)
self.perform::<T>(dest, &actual, request, client)
.await .await
} }
#[implement(super::Service)] #[implement(super::Service)]
async fn perform<T>( async fn perform<T>(
&self, &self,
dest: &ServerName,
actual: &ActualDest, actual: &ActualDest,
dest: &ServerName,
request: Request, request: Request,
client: &Client, client: &Client,
) -> Result<T::IncomingResponse> ) -> Result<T::IncomingResponse>
@@ -103,7 +102,7 @@ where
debug!(?method, ?url, "Sending request"); debug!(?method, ?url, "Sending request");
match client.execute(request).await { match client.execute(request).await {
| Ok(response) => handle_response::<T>(dest, actual, &method, &url, response).await, | Ok(response) => handle_response::<T>(actual, dest, &method, &url, response).await,
| Err(error) => Err(self | Err(error) => Err(self
.handle_error(dest, actual, &method, &url, error) .handle_error(dest, actual, &method, &url, error)
.expect_err("always returns error")), .expect_err("always returns error")),
@@ -111,9 +110,11 @@ where
} }
#[implement(super::Service)] #[implement(super::Service)]
fn prepare(&self, dest: &ServerName, mut request: http::Request<Vec<u8>>) -> Result<Request> { fn prepare<T>(&self, actual: &ActualDest, dest: &ServerName, request: T) -> Result<Request>
self.sign_request(&mut request, dest); where
T: OutgoingRequest + Send,
{
let request = self.to_http_request::<T>(actual, dest, request)?;
let request = Request::try_from(request)?; let request = Request::try_from(request)?;
self.validate_url(request.url())?; self.validate_url(request.url())?;
self.services.server.check_running()?; self.services.server.check_running()?;
@@ -134,8 +135,8 @@ fn validate_url(&self, url: &Url) -> Result {
} }
async fn handle_response<T>( async fn handle_response<T>(
dest: &ServerName,
actual: &ActualDest, actual: &ActualDest,
dest: &ServerName,
method: &Method, method: &Method,
url: &Url, url: &Url,
response: Response, response: Response,
@@ -230,6 +231,34 @@ fn handle_error(
Err(e.into()) Err(e.into())
} }
#[implement(super::Service)]
fn to_http_request<T>(
&self,
actual: &ActualDest,
dest: &ServerName,
request: T,
) -> Result<http::Request<Vec<u8>>>
where
T: OutgoingRequest + Send,
{
const VERSIONS: [MatrixVersion; 1] = [MatrixVersion::V1_11];
const SATIR: SendAccessToken<'_> = SendAccessToken::IfRequired(EMPTY);
let supported = SupportedVersions {
versions: VERSIONS.into(),
features: Default::default(),
};
let mut request = request
.try_into_http_request::<Vec<u8>>(actual.string().as_str(), SATIR, &supported)
.map_err(|e| err!(BadServerResponse("Invalid destination: {e:?}")))?;
if matches!(T::METADATA.authentication, AuthScheme::ServerSignatures) {
self.sign_request(&mut request, dest);
}
Ok(request)
}
#[implement(super::Service)] #[implement(super::Service)]
fn sign_request(&self, http_request: &mut http::Request<Vec<u8>>, dest: &ServerName) { fn sign_request(&self, http_request: &mut http::Request<Vec<u8>>, dest: &ServerName) {
type Member = (CanonicalJsonName, Value); type Member = (CanonicalJsonName, Value);
@@ -300,21 +329,3 @@ fn sign_request(&self, http_request: &mut http::Request<Vec<u8>>, dest: &ServerN
debug_assert!(authorization.is_none(), "Authorization header already present"); debug_assert!(authorization.is_none(), "Authorization header already present");
} }
fn into_http_request<T>(actual: &ActualDest, request: T) -> Result<http::Request<Vec<u8>>>
where
T: OutgoingRequest + Send,
{
const VERSIONS: [MatrixVersion; 1] = [MatrixVersion::V1_11];
const SATIR: SendAccessToken<'_> = SendAccessToken::IfRequired(EMPTY);
let supported = SupportedVersions {
versions: VERSIONS.into(),
features: Default::default(),
};
let http_request = request
.try_into_http_request::<Vec<u8>>(actual.string().as_str(), SATIR, &supported)
.map_err(|e| err!(BadServerResponse("Invalid destination: {e:?}")))?;
Ok(http_request)
}