From 59481ad28dbe39e30adab59e7b5fb5b4335d7c02 Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Mon, 29 Dec 2025 04:21:36 +0000 Subject: [PATCH] Refactor/dedup join event preparation paths. Signed-off-by: Jason Volk --- src/service/membership/join.rs | 347 ++++++++++++++------------------- 1 file changed, 148 insertions(+), 199 deletions(-) diff --git a/src/service/membership/join.rs b/src/service/membership/join.rs index 7c0738c4..e0a53e59 100644 --- a/src/service/membership/join.rs +++ b/src/service/membership/join.rs @@ -5,10 +5,13 @@ use std::{ sync::Arc, }; -use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt, pin_mut}; +use futures::{ + FutureExt, StreamExt, TryFutureExt, TryStreamExt, + future::{OptionFuture, join3, join4}, +}; use ruma::{ - CanonicalJsonObject, CanonicalJsonValue, OwnedServerName, OwnedUserId, RoomId, RoomOrAliasId, - RoomVersionId, UserId, + CanonicalJsonObject, CanonicalJsonValue, OwnedEventId, OwnedServerName, OwnedUserId, RoomId, + RoomOrAliasId, RoomVersionId, UserId, api::{client::error::ErrorKind, federation}, canonical_json::to_canonical_value, events::{ @@ -19,13 +22,15 @@ use ruma::{ }, }, room::{AllowRule, JoinRule}, + room_version_rules::RoomVersionRules, }; +use serde_json::value::RawValue as RawJsonValue; use tuwunel_core::{ Err, Result, debug, debug_error, debug_info, debug_warn, err, error, implement, info, matrix::{event::gen_event_id_canonical_json, room_version}, pdu::{PduBuilder, format::from_incoming_federation}, state_res, trace, - utils::{self, IterStream, ReadyExt, shuffle}, + utils::{self, IterStream, ReadyExt, future::TryExtExt, shuffle}, warn, }; @@ -160,85 +165,15 @@ pub async fn join_remote( } let room_version_rules = room_version::rules(&room_version_id)?; - - let mut join_event_stub: CanonicalJsonObject = - serde_json::from_str(make_join_response.event.get()).map_err(|e| { - err!(BadServerResponse(warn!( - "Invalid make_join event json received from server: {e:?}" - ))) - })?; - - let join_authorized_via_users_server = { - use RoomVersionId::*; - if !matches!(room_version_id, V1 | V2 | V3 | V4 | V5 | V6 | V7) { - join_event_stub - .get("content") - .map(|s| { - s.as_object()? - .get("join_authorised_via_users_server")? - .as_str() - }) - .and_then(|s| OwnedUserId::try_from(s.unwrap_or_default()).ok()) - } else { - None - } - }; - - join_event_stub.insert( - "origin".into(), - CanonicalJsonValue::String( - self.services - .globals - .server_name() - .as_str() - .to_owned(), - ), - ); - join_event_stub.insert( - "origin_server_ts".into(), - CanonicalJsonValue::Integer( - utils::millis_since_unix_epoch() - .try_into() - .expect("Timestamp is valid js_int value"), - ), - ); - join_event_stub.insert( - "content".into(), - to_canonical_value(RoomMemberEventContent { - displayname: self - .services - .users - .displayname(sender_user) - .await - .ok(), - avatar_url: self - .services - .users - .avatar_url(sender_user) - .await - .ok(), - blurhash: self - .services - .users - .blurhash(sender_user) - .await - .ok(), + let (mut join_event, event_id, join_authorized_via_users_server) = self + .create_join_event( + sender_user, + &make_join_response.event, + &room_version_id, + &room_version_rules, reason, - join_authorized_via_users_server: join_authorized_via_users_server.clone(), - ..RoomMemberEventContent::new(MembershipState::Join) - }) - .expect("event is valid, we just created it"), - ); - - let event_id = self - .services - .server_keys - .gen_id_hash_and_sign_event(&mut join_event_stub, &room_version_id)?; - - state_res::check_pdu_format(&join_event_stub, &room_version_rules.event_format)?; - - // It has enough fields to be called a proper event now - let mut join_event = join_event_stub; + ) + .await?; info!("Asking {remote_server} for send_join in room {room_id}"); let send_join_request = federation::membership::create_join_event::v2::Request { @@ -543,19 +478,19 @@ pub async fn join_local( | _ => Vec::new(), }; - let join_authorized_via_users_server: Option = { - if restriction_rooms - .iter() - .stream() - .any(|restriction_room_id| { - self.services - .state_cache - .is_joined(sender_user, restriction_room_id) - }) - .await - { - let users = self - .services + let is_joined_restricted_rooms = restriction_rooms + .iter() + .stream() + .any(|restriction_room_id| { + self.services + .state_cache + .is_joined(sender_user, restriction_room_id) + }) + .await; + + let join_authorized_via_users_server: OptionFuture<_> = is_joined_restricted_rooms + .then(async || { + self.services .state_cache .local_users_in_room(room_id) .filter(|user| { @@ -566,34 +501,31 @@ pub async fn join_local( state_lock, ) }) - .map(ToOwned::to_owned); + .map(ToOwned::to_owned) + .boxed() + .next() + .await + }) + .into(); - pin_mut!(users); - users.next().await - } else { - None - } - }; + let displayname = self.services.users.displayname(sender_user).ok(); + + let avatar_url = self.services.users.avatar_url(sender_user).ok(); + + let blurhash = self.services.users.blurhash(sender_user).ok(); + + let (displayname, avatar_url, blurhash, join_authorized_via_users_server) = join4( + displayname, + avatar_url, + blurhash, + join_authorized_via_users_server.map(Option::flatten), + ) + .await; let content = RoomMemberEventContent { - displayname: self - .services - .users - .displayname(sender_user) - .await - .ok(), - avatar_url: self - .services - .users - .avatar_url(sender_user) - .await - .ok(), - blurhash: self - .services - .users - .blurhash(sender_user) - .await - .ok(), + displayname, + avatar_url, + blurhash, reason: reason.clone(), join_authorized_via_users_server, ..RoomMemberEventContent::new(MembershipState::Join) @@ -647,22 +579,84 @@ pub async fn join_local( } let room_version_rules = room_version::rules(&room_version_id)?; + let (join_event, event_id, _) = self + .create_join_event( + sender_user, + &make_join_response.event, + &room_version_id, + &room_version_rules, + reason, + ) + .await?; - let mut join_event_stub: CanonicalJsonObject = - serde_json::from_str(make_join_response.event.get()).map_err(|e| { + let send_join_request = federation::membership::create_join_event::v2::Request { + room_id: room_id.to_owned(), + event_id: event_id.clone(), + omit_members: false, + pdu: self + .services + .federation + .format_pdu_into(join_event.clone(), Some(&room_version_id)) + .await, + }; + + let send_join_response = self + .services + .federation + .execute(&remote_server, send_join_request) + .await?; + + let Some(signed_raw) = send_join_response.room_state.event else { + return Err(error); + }; + + let (signed_event_id, signed_value) = + gen_event_id_canonical_json(&signed_raw, &room_version_id).map_err(|e| { + err!(Request(BadJson(warn!("Could not convert event to canonical JSON: {e}")))) + })?; + + if signed_event_id != event_id { + return Err!(Request(BadJson(warn!( + %signed_event_id, %event_id, "Server {remote_server} sent event with wrong event ID" + )))); + } + + self.services + .event_handler + .handle_incoming_pdu(&remote_server, room_id, &signed_event_id, signed_value, true) + .boxed() + .await?; + + Ok(()) +} + +#[implement(Service)] +#[tracing::instrument(name = "make_join", level = "debug", skip_all)] +async fn create_join_event( + &self, + sender_user: &UserId, + join_event_stub: &RawJsonValue, + room_version_id: &RoomVersionId, + room_version_rules: &RoomVersionRules, + reason: Option, +) -> Result<(CanonicalJsonObject, OwnedEventId, Option)> { + let mut event: CanonicalJsonObject = + serde_json::from_str(join_event_stub.get()).map_err(|e| { err!(BadServerResponse("Invalid make_join event json received from server: {e:?}")) })?; - let join_authorized_via_users_server = join_event_stub - .get("content") - .map(|s| { + let join_authorized_via_users_server = room_version_rules + .authorization + .restricted_join_rule + .then(|| event.get("content")) + .flatten() + .and_then(|s| { s.as_object()? - .get("join_authorised_via_users_server")? - .as_str() + .get("join_authorised_via_users_server") }) - .and_then(|s| OwnedUserId::try_from(s.unwrap_or_default()).ok()); + .and_then(|s| OwnedUserId::try_from(s.as_str().unwrap_or_default()).ok()); - join_event_stub.insert( + event.insert( "origin".into(), CanonicalJsonValue::String( self.services @@ -672,89 +666,40 @@ pub async fn join_local( .to_owned(), ), ); - join_event_stub.insert( + + event.insert( "origin_server_ts".into(), - CanonicalJsonValue::Integer( - utils::millis_since_unix_epoch() - .try_into() - .expect("Timestamp is valid js_int value"), - ), + CanonicalJsonValue::Integer(utils::millis_since_unix_epoch().try_into()?), ); - join_event_stub.insert( + + let displayname = self.services.users.displayname(sender_user).ok(); + + let avatar_url = self.services.users.avatar_url(sender_user).ok(); + + let blurhash = self.services.users.blurhash(sender_user).ok(); + + let (displayname, avatar_url, blurhash) = join3(displayname, avatar_url, blurhash).await; + + event.insert( "content".into(), to_canonical_value(RoomMemberEventContent { - displayname: self - .services - .users - .displayname(sender_user) - .await - .ok(), - avatar_url: self - .services - .users - .avatar_url(sender_user) - .await - .ok(), - blurhash: self - .services - .users - .blurhash(sender_user) - .await - .ok(), + displayname, + avatar_url, + blurhash, reason, - join_authorized_via_users_server, + join_authorized_via_users_server: join_authorized_via_users_server.clone(), ..RoomMemberEventContent::new(MembershipState::Join) - }) - .expect("event is valid, we just created it"), + })?, ); let event_id = self .services .server_keys - .gen_id_hash_and_sign_event(&mut join_event_stub, &room_version_id)?; + .gen_id_hash_and_sign_event(&mut event, room_version_id)?; - state_res::check_pdu_format(&join_event_stub, &room_version_rules.event_format)?; + state_res::check_pdu_format(&event, &room_version_rules.event_format)?; - // It has enough fields to be called a proper event now - let join_event = join_event_stub; - - let send_join_response = self - .services - .federation - .execute(&remote_server, federation::membership::create_join_event::v2::Request { - room_id: room_id.to_owned(), - event_id: event_id.clone(), - omit_members: false, - pdu: self - .services - .federation - .format_pdu_into(join_event.clone(), Some(&room_version_id)) - .await, - }) - .await?; - - if let Some(signed_raw) = send_join_response.room_state.event { - let (signed_event_id, signed_value) = - gen_event_id_canonical_json(&signed_raw, &room_version_id).map_err(|e| { - err!(Request(BadJson(warn!("Could not convert event to canonical JSON: {e}")))) - })?; - - if signed_event_id != event_id { - return Err!(Request(BadJson( - warn!(%signed_event_id, %event_id, "Server {remote_server} sent event with wrong event ID") - ))); - } - - self.services - .event_handler - .handle_incoming_pdu(&remote_server, room_id, &signed_event_id, signed_value, true) - .boxed() - .await?; - } else { - return Err(error); - } - - Ok(()) + Ok((event, event_id, join_authorized_via_users_server)) } #[implement(Service)] @@ -817,8 +762,10 @@ async fn make_join_request( M_UNSUPPORTED_ROOM_VERSION, assuming that tuwunel does not support the \ room version {room_id}: {e}" ); + make_join_response_and_server = Err!(BadServerResponse("Room version is not supported by tuwunel")); + return make_join_response_and_server; } @@ -826,12 +773,14 @@ async fn make_join_request( .services .config .max_make_join_attempts_per_join_attempt; + if make_join_counter >= max_attempts { warn!(?remote_server, "last make_join failure reason: {e}"); warn!( "{max_attempts} servers failed to provide valid make_join response, \ assuming no server can assist in joining." ); + make_join_response_and_server = Err!(BadServerResponse("No server available to assist in joining."));