Refactor/dedup join event preparation paths.

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk
2025-12-29 04:21:36 +00:00
parent 977f9e2e2e
commit 59481ad28d

View File

@@ -5,10 +5,13 @@ use std::{
sync::Arc,
};
use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt, pin_mut};
use futures::{
FutureExt, StreamExt, TryFutureExt, TryStreamExt,
future::{OptionFuture, join3, join4},
};
use ruma::{
CanonicalJsonObject, CanonicalJsonValue, OwnedServerName, OwnedUserId, RoomId, RoomOrAliasId,
RoomVersionId, UserId,
CanonicalJsonObject, CanonicalJsonValue, OwnedEventId, OwnedServerName, OwnedUserId, RoomId,
RoomOrAliasId, RoomVersionId, UserId,
api::{client::error::ErrorKind, federation},
canonical_json::to_canonical_value,
events::{
@@ -19,13 +22,15 @@ use ruma::{
},
},
room::{AllowRule, JoinRule},
room_version_rules::RoomVersionRules,
};
use serde_json::value::RawValue as RawJsonValue;
use tuwunel_core::{
Err, Result, debug, debug_error, debug_info, debug_warn, err, error, implement, info,
matrix::{event::gen_event_id_canonical_json, room_version},
pdu::{PduBuilder, format::from_incoming_federation},
state_res, trace,
utils::{self, IterStream, ReadyExt, shuffle},
utils::{self, IterStream, ReadyExt, future::TryExtExt, shuffle},
warn,
};
@@ -160,85 +165,15 @@ pub async fn join_remote(
}
let room_version_rules = room_version::rules(&room_version_id)?;
let mut join_event_stub: CanonicalJsonObject =
serde_json::from_str(make_join_response.event.get()).map_err(|e| {
err!(BadServerResponse(warn!(
"Invalid make_join event json received from server: {e:?}"
)))
})?;
let join_authorized_via_users_server = {
use RoomVersionId::*;
if !matches!(room_version_id, V1 | V2 | V3 | V4 | V5 | V6 | V7) {
join_event_stub
.get("content")
.map(|s| {
s.as_object()?
.get("join_authorised_via_users_server")?
.as_str()
})
.and_then(|s| OwnedUserId::try_from(s.unwrap_or_default()).ok())
} else {
None
}
};
join_event_stub.insert(
"origin".into(),
CanonicalJsonValue::String(
self.services
.globals
.server_name()
.as_str()
.to_owned(),
),
);
join_event_stub.insert(
"origin_server_ts".into(),
CanonicalJsonValue::Integer(
utils::millis_since_unix_epoch()
.try_into()
.expect("Timestamp is valid js_int value"),
),
);
join_event_stub.insert(
"content".into(),
to_canonical_value(RoomMemberEventContent {
displayname: self
.services
.users
.displayname(sender_user)
.await
.ok(),
avatar_url: self
.services
.users
.avatar_url(sender_user)
.await
.ok(),
blurhash: self
.services
.users
.blurhash(sender_user)
.await
.ok(),
let (mut join_event, event_id, join_authorized_via_users_server) = self
.create_join_event(
sender_user,
&make_join_response.event,
&room_version_id,
&room_version_rules,
reason,
join_authorized_via_users_server: join_authorized_via_users_server.clone(),
..RoomMemberEventContent::new(MembershipState::Join)
})
.expect("event is valid, we just created it"),
);
let event_id = self
.services
.server_keys
.gen_id_hash_and_sign_event(&mut join_event_stub, &room_version_id)?;
state_res::check_pdu_format(&join_event_stub, &room_version_rules.event_format)?;
// It has enough fields to be called a proper event now
let mut join_event = join_event_stub;
)
.await?;
info!("Asking {remote_server} for send_join in room {room_id}");
let send_join_request = federation::membership::create_join_event::v2::Request {
@@ -543,19 +478,19 @@ pub async fn join_local(
| _ => Vec::new(),
};
let join_authorized_via_users_server: Option<OwnedUserId> = {
if restriction_rooms
.iter()
.stream()
.any(|restriction_room_id| {
self.services
.state_cache
.is_joined(sender_user, restriction_room_id)
})
.await
{
let users = self
.services
let is_joined_restricted_rooms = restriction_rooms
.iter()
.stream()
.any(|restriction_room_id| {
self.services
.state_cache
.is_joined(sender_user, restriction_room_id)
})
.await;
let join_authorized_via_users_server: OptionFuture<_> = is_joined_restricted_rooms
.then(async || {
self.services
.state_cache
.local_users_in_room(room_id)
.filter(|user| {
@@ -566,34 +501,31 @@ pub async fn join_local(
state_lock,
)
})
.map(ToOwned::to_owned);
.map(ToOwned::to_owned)
.boxed()
.next()
.await
})
.into();
pin_mut!(users);
users.next().await
} else {
None
}
};
let displayname = self.services.users.displayname(sender_user).ok();
let avatar_url = self.services.users.avatar_url(sender_user).ok();
let blurhash = self.services.users.blurhash(sender_user).ok();
let (displayname, avatar_url, blurhash, join_authorized_via_users_server) = join4(
displayname,
avatar_url,
blurhash,
join_authorized_via_users_server.map(Option::flatten),
)
.await;
let content = RoomMemberEventContent {
displayname: self
.services
.users
.displayname(sender_user)
.await
.ok(),
avatar_url: self
.services
.users
.avatar_url(sender_user)
.await
.ok(),
blurhash: self
.services
.users
.blurhash(sender_user)
.await
.ok(),
displayname,
avatar_url,
blurhash,
reason: reason.clone(),
join_authorized_via_users_server,
..RoomMemberEventContent::new(MembershipState::Join)
@@ -647,22 +579,84 @@ pub async fn join_local(
}
let room_version_rules = room_version::rules(&room_version_id)?;
let (join_event, event_id, _) = self
.create_join_event(
sender_user,
&make_join_response.event,
&room_version_id,
&room_version_rules,
reason,
)
.await?;
let mut join_event_stub: CanonicalJsonObject =
serde_json::from_str(make_join_response.event.get()).map_err(|e| {
let send_join_request = federation::membership::create_join_event::v2::Request {
room_id: room_id.to_owned(),
event_id: event_id.clone(),
omit_members: false,
pdu: self
.services
.federation
.format_pdu_into(join_event.clone(), Some(&room_version_id))
.await,
};
let send_join_response = self
.services
.federation
.execute(&remote_server, send_join_request)
.await?;
let Some(signed_raw) = send_join_response.room_state.event else {
return Err(error);
};
let (signed_event_id, signed_value) =
gen_event_id_canonical_json(&signed_raw, &room_version_id).map_err(|e| {
err!(Request(BadJson(warn!("Could not convert event to canonical JSON: {e}"))))
})?;
if signed_event_id != event_id {
return Err!(Request(BadJson(warn!(
%signed_event_id, %event_id, "Server {remote_server} sent event with wrong event ID"
))));
}
self.services
.event_handler
.handle_incoming_pdu(&remote_server, room_id, &signed_event_id, signed_value, true)
.boxed()
.await?;
Ok(())
}
#[implement(Service)]
#[tracing::instrument(name = "make_join", level = "debug", skip_all)]
async fn create_join_event(
&self,
sender_user: &UserId,
join_event_stub: &RawJsonValue,
room_version_id: &RoomVersionId,
room_version_rules: &RoomVersionRules,
reason: Option<String>,
) -> Result<(CanonicalJsonObject, OwnedEventId, Option<OwnedUserId>)> {
let mut event: CanonicalJsonObject =
serde_json::from_str(join_event_stub.get()).map_err(|e| {
err!(BadServerResponse("Invalid make_join event json received from server: {e:?}"))
})?;
let join_authorized_via_users_server = join_event_stub
.get("content")
.map(|s| {
let join_authorized_via_users_server = room_version_rules
.authorization
.restricted_join_rule
.then(|| event.get("content"))
.flatten()
.and_then(|s| {
s.as_object()?
.get("join_authorised_via_users_server")?
.as_str()
.get("join_authorised_via_users_server")
})
.and_then(|s| OwnedUserId::try_from(s.unwrap_or_default()).ok());
.and_then(|s| OwnedUserId::try_from(s.as_str().unwrap_or_default()).ok());
join_event_stub.insert(
event.insert(
"origin".into(),
CanonicalJsonValue::String(
self.services
@@ -672,89 +666,40 @@ pub async fn join_local(
.to_owned(),
),
);
join_event_stub.insert(
event.insert(
"origin_server_ts".into(),
CanonicalJsonValue::Integer(
utils::millis_since_unix_epoch()
.try_into()
.expect("Timestamp is valid js_int value"),
),
CanonicalJsonValue::Integer(utils::millis_since_unix_epoch().try_into()?),
);
join_event_stub.insert(
let displayname = self.services.users.displayname(sender_user).ok();
let avatar_url = self.services.users.avatar_url(sender_user).ok();
let blurhash = self.services.users.blurhash(sender_user).ok();
let (displayname, avatar_url, blurhash) = join3(displayname, avatar_url, blurhash).await;
event.insert(
"content".into(),
to_canonical_value(RoomMemberEventContent {
displayname: self
.services
.users
.displayname(sender_user)
.await
.ok(),
avatar_url: self
.services
.users
.avatar_url(sender_user)
.await
.ok(),
blurhash: self
.services
.users
.blurhash(sender_user)
.await
.ok(),
displayname,
avatar_url,
blurhash,
reason,
join_authorized_via_users_server,
join_authorized_via_users_server: join_authorized_via_users_server.clone(),
..RoomMemberEventContent::new(MembershipState::Join)
})
.expect("event is valid, we just created it"),
})?,
);
let event_id = self
.services
.server_keys
.gen_id_hash_and_sign_event(&mut join_event_stub, &room_version_id)?;
.gen_id_hash_and_sign_event(&mut event, room_version_id)?;
state_res::check_pdu_format(&join_event_stub, &room_version_rules.event_format)?;
state_res::check_pdu_format(&event, &room_version_rules.event_format)?;
// It has enough fields to be called a proper event now
let join_event = join_event_stub;
let send_join_response = self
.services
.federation
.execute(&remote_server, federation::membership::create_join_event::v2::Request {
room_id: room_id.to_owned(),
event_id: event_id.clone(),
omit_members: false,
pdu: self
.services
.federation
.format_pdu_into(join_event.clone(), Some(&room_version_id))
.await,
})
.await?;
if let Some(signed_raw) = send_join_response.room_state.event {
let (signed_event_id, signed_value) =
gen_event_id_canonical_json(&signed_raw, &room_version_id).map_err(|e| {
err!(Request(BadJson(warn!("Could not convert event to canonical JSON: {e}"))))
})?;
if signed_event_id != event_id {
return Err!(Request(BadJson(
warn!(%signed_event_id, %event_id, "Server {remote_server} sent event with wrong event ID")
)));
}
self.services
.event_handler
.handle_incoming_pdu(&remote_server, room_id, &signed_event_id, signed_value, true)
.boxed()
.await?;
} else {
return Err(error);
}
Ok(())
Ok((event, event_id, join_authorized_via_users_server))
}
#[implement(Service)]
@@ -817,8 +762,10 @@ async fn make_join_request(
M_UNSUPPORTED_ROOM_VERSION, assuming that tuwunel does not support the \
room version {room_id}: {e}"
);
make_join_response_and_server =
Err!(BadServerResponse("Room version is not supported by tuwunel"));
return make_join_response_and_server;
}
@@ -826,12 +773,14 @@ async fn make_join_request(
.services
.config
.max_make_join_attempts_per_join_attempt;
if make_join_counter >= max_attempts {
warn!(?remote_server, "last make_join failure reason: {e}");
warn!(
"{max_attempts} servers failed to provide valid make_join response, \
assuming no server can assist in joining."
);
make_join_response_and_server =
Err!(BadServerResponse("No server available to assist in joining."));