diff --git a/src/admin/room/moderation.rs b/src/admin/room/moderation.rs index 362332ac..c6e8bfbb 100644 --- a/src/admin/room/moderation.rs +++ b/src/admin/room/moderation.rs @@ -1,7 +1,6 @@ use clap::Subcommand; use futures::{FutureExt, StreamExt}; use ruma::{OwnedRoomId, OwnedRoomOrAliasId, RoomAliasId, RoomId, RoomOrAliasId}; -use tuwunel_api::client::leave_room; use tuwunel_core::{ Err, Result, debug, utils::{IterStream, ReadyExt}, @@ -152,13 +151,20 @@ async fn ban_room(&self, room: OwnedRoomOrAliasId) -> Result { evicting admins too)", ); - if let Err(e) = leave_room(self.services, user_id, &room_id, None) + let state_lock = self.services.state.mutex.lock(&room_id).await; + + if let Err(e) = self + .services + .membership + .leave(user_id, &room_id, None, &state_lock) .boxed() .await { warn!("Failed to leave room: {e}"); } + drop(state_lock); + self.services .state_cache .forget(&room_id, user_id); @@ -320,13 +326,20 @@ async fn ban_list_of_rooms(&self) -> Result { evicting admins too)", ); - if let Err(e) = leave_room(self.services, user_id, &room_id, None) + let state_lock = self.services.state.mutex.lock(&room_id).await; + + if let Err(e) = self + .services + .membership + .leave(user_id, &room_id, None, &state_lock) .boxed() .await { warn!("Failed to leave room: {e}"); } + drop(state_lock); + self.services .state_cache .forget(&room_id, user_id); diff --git a/src/admin/user/commands.rs b/src/admin/user/commands.rs index 3b7526c3..07dd459f 100644 --- a/src/admin/user/commands.rs +++ b/src/admin/user/commands.rs @@ -12,16 +12,13 @@ use ruma::{ tag::{TagEvent, TagEventContent, TagInfo}, }, }; -use tuwunel_api::client::{ - full_user_deactivate, join_room_by_id_helper, leave_all_rooms, leave_room, update_avatar_url, - update_displayname, -}; use tuwunel_core::{ Err, Result, debug, debug_warn, error, info, is_equal_to, matrix::{Event, pdu::PduBuilder}, utils::{self, ReadyExt}, warn, }; +use tuwunel_service::Services; use crate::{ admin_command, get_room_info, @@ -142,20 +139,24 @@ pub(super) async fn create_user(&self, username: String, password: Option { info!("Automatically joined room {room} for user {user_id}"); @@ -174,6 +175,8 @@ pub(super) async fn create_user(&self, username: String, password: Option return Err!("Not allowed to deactivate the server service account.",); } - self.services - .users - .deactivate_account(&user_id) - .await?; - - if !no_leave_rooms { - self.services - .admin - .send_text(&format!("Making {user_id} leave all rooms after deactivation...")) - .await; - - let all_joined_rooms: Vec = self - .services - .state_cache - .rooms_joined(&user_id) - .map(Into::into) - .collect() - .await; - - full_user_deactivate(self.services, &user_id, &all_joined_rooms) - .boxed() - .await?; - - update_displayname(self.services, &user_id, None, &all_joined_rooms).await; - update_avatar_url(self.services, &user_id, None, None, &all_joined_rooms).await; - leave_all_rooms(self.services, &user_id).await; - } + deactivate_user(self.services, &user_id, no_leave_rooms).await?; self.write_str(&format!("User {user_id} has been deactivated")) .await @@ -334,40 +311,16 @@ pub(super) async fn deactivate_all(&self, no_leave_rooms: bool, force: bool) -> let mut deactivation_count: usize = 0; for user_id in user_ids { - match self - .services - .users - .deactivate_account(&user_id) - .await - { + match deactivate_user(self.services, &user_id, no_leave_rooms).await { + | Ok(()) => { + deactivation_count = deactivation_count.saturating_add(1); + }, | Err(e) => { self.services .admin .send_text(&format!("Failed deactivating user: {e}")) .await; }, - | Ok(()) => { - deactivation_count = deactivation_count.saturating_add(1); - if !no_leave_rooms { - info!("Forcing user {user_id} to leave all rooms apart of deactivate-all"); - let all_joined_rooms: Vec = self - .services - .state_cache - .rooms_joined(&user_id) - .map(Into::into) - .collect() - .await; - - full_user_deactivate(self.services, &user_id, &all_joined_rooms) - .boxed() - .await?; - - update_displayname(self.services, &user_id, None, &all_joined_rooms).await; - update_avatar_url(self.services, &user_id, None, None, &all_joined_rooms) - .await; - leave_all_rooms(self.services, &user_id).await; - } - }, } } @@ -384,6 +337,20 @@ pub(super) async fn deactivate_all(&self, no_leave_rooms: bool, force: bool) -> .await } +async fn deactivate_user(services: &Services, user_id: &UserId, no_leave_rooms: bool) -> Result { + if !no_leave_rooms { + services + .deactivate + .full_deactivate(user_id) + .boxed() + .await?; + } else { + services.users.deactivate_account(user_id).await?; + } + + Ok(()) +} + #[admin_command] pub(super) async fn list_joined_rooms(&self, user_id: String) -> Result { // Validate user id @@ -510,17 +477,21 @@ pub(super) async fn force_join_list_of_local_users( let mut failed_joins: usize = 0; let mut successful_joins: usize = 0; + let state_lock = self.services.state.mutex.lock(&room_id).await; + for user_id in user_ids { - match join_room_by_id_helper( - self.services, - &user_id, - &room_id, - Some(String::from(BULK_JOIN_REASON)), - &servers, - None, - &None, - ) - .await + match self + .services + .membership + .join( + &user_id, + &room_id, + Some(String::from(BULK_JOIN_REASON)), + &servers, + &None, + &state_lock, + ) + .await { | Ok(_res) => { successful_joins = successful_joins.saturating_add(1); @@ -532,6 +503,8 @@ pub(super) async fn force_join_list_of_local_users( } } + drop(state_lock); + self.write_str(&format!( "{successful_joins} local users have been joined to {room_id}. {failed_joins} joins \ failed.", @@ -592,6 +565,8 @@ pub(super) async fn force_join_all_local_users( let mut failed_joins: usize = 0; let mut successful_joins: usize = 0; + let state_lock = self.services.state.mutex.lock(&room_id).await; + for user_id in &self .services .users @@ -600,16 +575,18 @@ pub(super) async fn force_join_all_local_users( .collect::>() .await { - match join_room_by_id_helper( - self.services, - user_id, - &room_id, - Some(String::from(BULK_JOIN_REASON)), - &servers, - None, - &None, - ) - .await + match self + .services + .membership + .join( + user_id, + &room_id, + Some(String::from(BULK_JOIN_REASON)), + &servers, + &None, + &state_lock, + ) + .await { | Ok(_res) => { successful_joins = successful_joins.saturating_add(1); @@ -621,6 +598,8 @@ pub(super) async fn force_join_all_local_users( } } + drop(state_lock); + self.write_str(&format!( "{successful_joins} local users have been joined to {room_id}. {failed_joins} joins \ failed.", @@ -645,9 +624,16 @@ pub(super) async fn force_join_room( self.services.globals.user_is_local(&user_id), "Parsed user_id must be a local user" ); - join_room_by_id_helper(self.services, &user_id, &room_id, None, &servers, None, &None) + + let state_lock = self.services.state.mutex.lock(&room_id).await; + + self.services + .membership + .join(&user_id, &room_id, None, &servers, &None, &state_lock) .await?; + drop(state_lock); + self.write_str(&format!("{user_id} has been joined to {room_id}.",)) .await } @@ -675,10 +661,16 @@ pub(super) async fn force_leave_room( return Err!("{user_id} is not joined in the room"); } - leave_room(self.services, &user_id, &room_id, None) + let state_lock = self.services.state.mutex.lock(&room_id).await; + + self.services + .membership + .leave(&user_id, &room_id, None, &state_lock) .boxed() .await?; + drop(state_lock); + self.write_str(&format!("{user_id} has left {room_id}.",)) .await } diff --git a/src/api/client/account.rs b/src/api/client/account.rs index f49bd78e..5cccbd76 100644 --- a/src/api/client/account.rs +++ b/src/api/client/account.rs @@ -1,26 +1,18 @@ use axum::extract::State; use axum_client_ip::InsecureClientIp; use futures::{FutureExt, StreamExt}; -use ruma::{ - OwnedRoomId, UserId, - api::client::{ - account::{ - ThirdPartyIdRemovalStatus, change_password, deactivate, get_3pids, - request_3pid_management_token_via_email, request_3pid_management_token_via_msisdn, - whoami, - }, - uiaa::{AuthFlow, AuthType, UiaaInfo}, +use ruma::api::client::{ + account::{ + ThirdPartyIdRemovalStatus, change_password, deactivate, get_3pids, + request_3pid_management_token_via_email, request_3pid_management_token_via_msisdn, + whoami, }, - events::{StateEventType, room::power_levels::RoomPowerLevelsEventContent}, + uiaa::{AuthFlow, AuthType, UiaaInfo}, }; use tuwunel_core::{ - Err, Error, Result, err, info, - matrix::{Event, pdu::PduBuilder}, - utils, + Err, Error, Result, err, info, utils, utils::{ReadyExt, stream::BroadbandExt}, - warn, }; -use tuwunel_service::Services; use super::SESSION_ID_LENGTH; use crate::Ruma; @@ -213,18 +205,9 @@ pub(crate) async fn deactivate_route( }, } - // Remove profile pictures and display name - let all_joined_rooms: Vec = services - .state_cache - .rooms_joined(sender_user) - .map(Into::into) - .collect() - .await; - - super::update_displayname(&services, sender_user, None, &all_joined_rooms).await; - super::update_avatar_url(&services, sender_user, None, None, &all_joined_rooms).await; - - full_user_deactivate(&services, sender_user, &all_joined_rooms) + services + .deactivate + .full_deactivate(sender_user) .boxed() .await?; @@ -283,92 +266,3 @@ pub(crate) async fn request_3pid_management_token_via_msisdn_route( ) -> Result { Err!(Request(ThreepidDenied("Third party identifiers are not implemented"))) } - -/// Runs through all the deactivation steps: -/// -/// - Mark as deactivated -/// - Removing display name -/// - Removing avatar URL and blurhash -/// - Removing all profile data -/// - Leaving all rooms (and forgets all of them) -pub async fn full_user_deactivate( - services: &Services, - user_id: &UserId, - all_joined_rooms: &[OwnedRoomId], -) -> Result { - services - .users - .deactivate_account(user_id) - .await - .ok(); - - super::update_displayname(services, user_id, None, all_joined_rooms).await; - super::update_avatar_url(services, user_id, None, None, all_joined_rooms).await; - - services - .users - .all_profile_keys(user_id) - .ready_for_each(|(profile_key, _)| { - services - .users - .set_profile_key(user_id, &profile_key, None); - }) - .await; - - for room_id in all_joined_rooms { - let state_lock = services.state.mutex.lock(room_id).await; - - let room_power_levels = services - .state_accessor - .get_power_levels(room_id) - .await - .ok(); - - let user_can_change_self = room_power_levels - .as_ref() - .is_some_and(|power_levels| { - power_levels.user_can_change_user_power_level(user_id, user_id) - }); - - let user_can_demote_self = user_can_change_self - || services - .state_accessor - .room_state_get(room_id, &StateEventType::RoomCreate, "") - .await - .is_ok_and(|event| event.sender() == user_id); - - if user_can_demote_self { - let mut power_levels_content: RoomPowerLevelsEventContent = room_power_levels - .map(TryInto::try_into) - .transpose()? - .unwrap_or_default(); - - power_levels_content.users.remove(user_id); - - // ignore errors so deactivation doesn't fail - match services - .timeline - .build_and_append_pdu( - PduBuilder::state(String::new(), &power_levels_content), - user_id, - room_id, - &state_lock, - ) - .await - { - | Err(e) => { - warn!(%room_id, %user_id, "Failed to demote user's own power level: {e}"); - }, - | _ => { - info!("Demoted {user_id} in {room_id} as part of account deactivation"); - }, - } - } - } - - super::leave_all_rooms(services, user_id) - .boxed() - .await; - - Ok(()) -} diff --git a/src/api/client/membership/ban.rs b/src/api/client/membership/ban.rs index 064cf439..588433ae 100644 --- a/src/api/client/membership/ban.rs +++ b/src/api/client/membership/ban.rs @@ -1,9 +1,7 @@ use axum::extract::State; -use ruma::{ - api::client::membership::ban_user, - events::room::member::{MembershipState, RoomMemberEventContent}, -}; -use tuwunel_core::{Err, Result, matrix::pdu::PduBuilder}; +use futures::FutureExt; +use ruma::api::client::membership::ban_user; +use tuwunel_core::{Err, Result}; use crate::Ruma; @@ -22,29 +20,10 @@ pub(crate) async fn ban_user_route( let state_lock = services.state.mutex.lock(&body.room_id).await; - let current_member_content = services - .state_accessor - .get_member(&body.room_id, &body.user_id) - .await - .unwrap_or_else(|_| RoomMemberEventContent::new(MembershipState::Ban)); - services - .timeline - .build_and_append_pdu( - PduBuilder::state(body.user_id.to_string(), &RoomMemberEventContent { - membership: MembershipState::Ban, - reason: body.reason.clone(), - displayname: None, // display name may be offensive - avatar_url: None, // avatar may be offensive - is_direct: None, - join_authorized_via_users_server: None, - third_party_invite: None, - ..current_member_content - }), - sender_user, - &body.room_id, - &state_lock, - ) + .membership + .ban(&body.room_id, &body.user_id, body.reason.as_ref(), sender_user, &state_lock) + .boxed() .await?; drop(state_lock); diff --git a/src/api/client/membership/invite.rs b/src/api/client/membership/invite.rs index 7b146e7f..f39217b0 100644 --- a/src/api/client/membership/invite.rs +++ b/src/api/client/membership/invite.rs @@ -1,20 +1,11 @@ use axum::extract::State; use axum_client_ip::InsecureClientIp; use futures::{FutureExt, join}; -use ruma::{ - OwnedServerName, RoomId, UserId, - api::{client::membership::invite_user, federation::membership::create_invite}, - events::room::member::{MembershipState, RoomMemberEventContent}, -}; -use tuwunel_core::{ - Err, Result, err, - matrix::{event::gen_event_id_canonical_json, pdu::PduBuilder}, - warn, -}; -use tuwunel_service::Services; +use ruma::{api::client::membership::invite_user, events::room::member::MembershipState}; +use tuwunel_core::{Err, Result}; use super::banned_room_check; -use crate::Ruma; +use crate::{Ruma, client::utils::invite_check}; /// # `POST /_matrix/client/r0/rooms/{roomId}/invite` /// @@ -27,210 +18,56 @@ pub(crate) async fn invite_user_route( ) -> Result { let sender_user = body.sender_user(); - if !services.users.is_admin(sender_user).await && services.config.block_non_admin_invites { - warn!( - "{sender_user} is not an admin and attempted to send an invite to {}", - &body.room_id - ); - return Err!(Request(Forbidden("Invites are not allowed on this server."))); - } + let room_id = &body.room_id; - banned_room_check( - &services, - sender_user, - Some(&body.room_id), - body.room_id.server_name(), - client, - ) - .await?; + invite_check(&services, sender_user, room_id).await?; - match &body.recipient { - | invite_user::v3::InvitationRecipient::UserId { user_id } => { - let sender_ignored_recipient = services - .users - .user_is_ignored(sender_user, user_id); - let recipient_ignored_by_sender = services - .users - .user_is_ignored(user_id, sender_user); - - let (sender_ignored_recipient, recipient_ignored_by_sender) = - join!(sender_ignored_recipient, recipient_ignored_by_sender); - - if sender_ignored_recipient { - return Ok(invite_user::v3::Response {}); - } - - if let Ok(target_user_membership) = services - .state_accessor - .get_member(&body.room_id, user_id) - .await - { - if target_user_membership.membership == MembershipState::Ban { - return Err!(Request(Forbidden("User is banned from this room."))); - } - } - - if recipient_ignored_by_sender { - // silently drop the invite to the recipient if they've been ignored by the - // sender, pretend it worked - return Ok(invite_user::v3::Response {}); - } - - invite_helper( - &services, - sender_user, - user_id, - &body.room_id, - body.reason.clone(), - false, - ) - .boxed() - .await?; - - Ok(invite_user::v3::Response {}) - }, - | _ => { - Err!(Request(NotFound("User not found."))) - }, - } -} - -pub(crate) async fn invite_helper( - services: &Services, - sender_user: &UserId, - user_id: &UserId, - room_id: &RoomId, - reason: Option, - is_direct: bool, -) -> Result { - if !services.users.is_admin(sender_user).await && services.config.block_non_admin_invites { - warn!("{sender_user} is not an admin and attempted to send an invite to {room_id}"); - return Err!(Request(Forbidden("Invites are not allowed on this server."))); - } - - if !services.globals.user_is_local(user_id) { - let (pdu, pdu_json, invite_room_state) = { - let state_lock = services.state.mutex.lock(room_id).await; - - let content = RoomMemberEventContent { - avatar_url: services.users.avatar_url(user_id).await.ok(), - is_direct: Some(is_direct), - reason, - ..RoomMemberEventContent::new(MembershipState::Invite) - }; - - let (pdu, pdu_json) = services - .timeline - .create_hash_and_sign_event( - PduBuilder::state(user_id.to_string(), &content), - sender_user, - room_id, - &state_lock, - ) - .await?; - - let invite_room_state = services.state.summary_stripped(&pdu).await; - - drop(state_lock); - - (pdu, pdu_json, invite_room_state) - }; - - let room_version_id = services.state.get_room_version(room_id).await?; - - let response = services - .sending - .send_federation_request(user_id.server_name(), create_invite::v2::Request { - room_id: room_id.to_owned(), - event_id: (*pdu.event_id).to_owned(), - room_version: room_version_id.clone(), - event: services - .sending - .convert_to_outgoing_federation_event(pdu_json.clone()) - .await, - invite_room_state: invite_room_state - .into_iter() - .map(Into::into) - .collect(), - via: services - .state_cache - .servers_route_via(room_id) - .await - .ok(), - }) - .await?; - - // We do not add the event_id field to the pdu here because of signature and - // hashes checks - let (event_id, value) = gen_event_id_canonical_json(&response.event, &room_version_id) - .map_err(|e| { - err!(Request(BadJson(warn!("Could not convert event to canonical JSON: {e}")))) - })?; - - if pdu.event_id != event_id { - return Err!(Request(BadJson(warn!( - %pdu.event_id, %event_id, - "Server {} sent event with wrong event ID", - user_id.server_name() - )))); - } - - let origin: OwnedServerName = serde_json::from_value(serde_json::to_value( - value - .get("origin") - .ok_or_else(|| err!(Request(BadJson("Event missing origin field."))))?, - )?) - .map_err(|e| { - err!(Request(BadJson(warn!("Origin field in event is not a valid server name: {e}")))) - })?; - - let pdu_id = services - .event_handler - .handle_incoming_pdu(&origin, room_id, &event_id, value, true) - .boxed() - .await? - .ok_or_else(|| { - err!(Request(InvalidParam("Could not accept incoming PDU as timeline event."))) - })?; - - return services - .sending - .send_pdu_room(room_id, &pdu_id) - .await; - } - - if !services - .state_cache - .is_joined(sender_user, room_id) - .await - { - return Err!(Request(Forbidden( - "You must be joined in the room you are trying to invite from." - ))); - } - - let state_lock = services.state.mutex.lock(room_id).await; - - let content = RoomMemberEventContent { - displayname: services.users.displayname(user_id).await.ok(), - avatar_url: services.users.avatar_url(user_id).await.ok(), - blurhash: services.users.blurhash(user_id).await.ok(), - is_direct: Some(is_direct), - reason, - ..RoomMemberEventContent::new(MembershipState::Invite) - }; - - services - .timeline - .build_and_append_pdu( - PduBuilder::state(user_id.to_string(), &content), - sender_user, - room_id, - &state_lock, - ) + banned_room_check(&services, sender_user, Some(room_id), room_id.server_name(), client) .await?; - drop(state_lock); + let invite_user::v3::InvitationRecipient::UserId { user_id } = &body.recipient else { + return Err!(Request(ThreepidDenied("Third party identifiers are not implemented"))); + }; - Ok(()) + let sender_ignored_recipient = services + .users + .user_is_ignored(sender_user, user_id); + + let recipient_ignored_by_sender = services + .users + .user_is_ignored(user_id, sender_user); + + let (sender_ignored_recipient, recipient_ignored_by_sender) = + join!(sender_ignored_recipient, recipient_ignored_by_sender); + + if sender_ignored_recipient { + return Ok(invite_user::v3::Response {}); + } + + // TODO: this should be in the service, but moving it from here would + // trigger the recipient_ignored_by_sender check before the banned check, + // revealing the ignore state to the sending user if the recipient is banned + if let Ok(target_user_membership) = services + .state_accessor + .get_member(room_id, user_id) + .await + { + if target_user_membership.membership == MembershipState::Ban { + return Err!(Request(Forbidden("User is banned from this room."))); + } + } + + if recipient_ignored_by_sender { + // silently drop the invite to the recipient if they've been ignored by the + // sender, pretend it worked + return Ok(invite_user::v3::Response {}); + } + + services + .membership + .invite(sender_user, user_id, room_id, body.reason.as_ref(), false) + .boxed() + .await?; + + Ok(invite_user::v3::Response {}) } diff --git a/src/api/client/membership/join.rs b/src/api/client/membership/join.rs index 7d07159f..48ede1b1 100644 --- a/src/api/client/membership/join.rs +++ b/src/api/client/membership/join.rs @@ -1,53 +1,14 @@ -use std::{borrow::Borrow, collections::HashMap, iter::once, sync::Arc}; - use axum::extract::State; use axum_client_ip::InsecureClientIp; -use futures::{FutureExt, StreamExt, pin_mut}; +use futures::FutureExt; use ruma::{ - CanonicalJsonObject, CanonicalJsonValue, OwnedRoomId, OwnedServerName, OwnedUserId, RoomId, - RoomVersionId, UserId, - api::{ - client::{ - error::ErrorKind, - membership::{ThirdPartySigned, join_room_by_id, join_room_by_id_or_alias}, - }, - federation::{self}, - }, - canonical_json::to_canonical_value, - events::{ - StateEventType, - room::{ - join_rules::{AllowRule, JoinRule, RoomJoinRulesEventContent}, - member::{MembershipState, RoomMemberEventContent}, - }, - }, -}; -use tuwunel_core::{ - Err, Result, debug, debug_info, debug_warn, err, error, info, - matrix::{ - event::{gen_event_id, gen_event_id_canonical_json}, - pdu::{PduBuilder, PduEvent}, - room_version, state_res, - }, - result::FlatOk, - trace, - utils::{ - self, shuffle, - stream::{IterStream, ReadyExt}, - }, - warn, -}; -use tuwunel_service::{ - Services, - appservice::RegistrationInfo, - rooms::{ - state::RoomMutexGuard, - state_compressor::{CompressedState, HashSetCompressStateEvent}, - }, + RoomId, RoomOrAliasId, + api::client::membership::{join_room_by_id, join_room_by_id_or_alias}, }; +use tuwunel_core::Result; use super::banned_room_check; -use crate::Ruma; +use crate::{Ruma, client::membership::get_join_params}; /// # `POST /_matrix/client/r0/rooms/{roomId}/join` /// @@ -65,54 +26,32 @@ pub(crate) async fn join_room_by_id_route( ) -> Result { let sender_user = body.sender_user(); - banned_room_check( - &services, - sender_user, - Some(&body.room_id), - body.room_id.server_name(), - client, - ) - .await?; + let room_id: &RoomId = &body.room_id; - // There is no body.server_name for /roomId/join - let mut servers: Vec<_> = services - .state_cache - .servers_invite_via(&body.room_id) - .map(ToOwned::to_owned) - .collect() - .await; + banned_room_check(&services, sender_user, Some(room_id), room_id.server_name(), client) + .await?; - servers.extend( - services - .state_cache - .invite_state(sender_user, &body.room_id) - .await - .unwrap_or_default() - .iter() - .filter_map(|event| event.get_field("sender").ok().flatten()) - .filter_map(|sender: &str| UserId::parse(sender).ok()) - .map(|user| user.server_name().to_owned()), - ); + let (room_id, servers) = + get_join_params(&services, sender_user, <&RoomOrAliasId>::from(room_id), &[]).await?; - if let Some(server) = body.room_id.server_name() { - servers.push(server.into()); - } + let state_lock = services.state.mutex.lock(&room_id).await; - servers.sort_unstable(); - servers.dedup(); - shuffle(&mut servers); + services + .membership + .join( + sender_user, + &room_id, + body.reason.clone(), + &servers, + &body.appservice_info, + &state_lock, + ) + .boxed() + .await?; - join_room_by_id_helper( - &services, - sender_user, - &body.room_id, - body.reason.clone(), - &servers, - body.third_party_signed.as_ref(), - &body.appservice_info, - ) - .boxed() - .await + drop(state_lock); + + Ok(join_room_by_id::v3::Response { room_id }) } /// # `POST /_matrix/client/r0/join/{roomIdOrAlias}` @@ -132,843 +71,29 @@ pub(crate) async fn join_room_by_id_or_alias_route( ) -> Result { let sender_user = body.sender_user(); let appservice_info = &body.appservice_info; - let body = &body.body; - let (servers, room_id) = match OwnedRoomId::try_from(body.room_id_or_alias.clone()) { - | Ok(room_id) => { - banned_room_check( - &services, - sender_user, - Some(&room_id), - room_id.server_name(), - client, - ) - .boxed() - .await?; + let (room_id, servers) = + get_join_params(&services, sender_user, &body.room_id_or_alias, &body.via).await?; - let mut servers = body.via.clone(); - servers.extend( - services - .state_cache - .servers_invite_via(&room_id) - .map(ToOwned::to_owned) - .collect::>() - .await, - ); + banned_room_check(&services, sender_user, Some(&room_id), room_id.server_name(), client) + .await?; - servers.extend( - services - .state_cache - .invite_state(sender_user, &room_id) - .await - .unwrap_or_default() - .iter() - .filter_map(|event| event.get_field("sender").ok().flatten()) - .filter_map(|sender: &str| UserId::parse(sender).ok()) - .map(|user| user.server_name().to_owned()), - ); + let state_lock = services.state.mutex.lock(&room_id).await; - if let Some(server) = room_id.server_name() { - servers.push(server.to_owned()); - } - - servers.sort_unstable(); - servers.dedup(); - shuffle(&mut servers); - - (servers, room_id) - }, - | Err(room_alias) => { - let (room_id, mut servers) = services - .alias - .resolve_alias(&room_alias, Some(body.via.clone())) - .await?; - - banned_room_check( - &services, - sender_user, - Some(&room_id), - Some(room_alias.server_name()), - client, - ) - .await?; - - let addl_via_servers = services - .state_cache - .servers_invite_via(&room_id) - .map(ToOwned::to_owned); - - let addl_state_servers = services - .state_cache - .invite_state(sender_user, &room_id) - .await - .unwrap_or_default(); - - let mut addl_servers: Vec<_> = addl_state_servers - .iter() - .map(|event| event.get_field("sender")) - .filter_map(FlatOk::flat_ok) - .map(|user: &UserId| user.server_name().to_owned()) - .stream() - .chain(addl_via_servers) - .collect() - .await; - - addl_servers.sort_unstable(); - addl_servers.dedup(); - shuffle(&mut addl_servers); - servers.append(&mut addl_servers); - - (servers, room_id) - }, - }; - - let join_room_response = join_room_by_id_helper( - &services, - sender_user, - &room_id, - body.reason.clone(), - &servers, - body.third_party_signed.as_ref(), - appservice_info, - ) - .boxed() - .await?; - - Ok(join_room_by_id_or_alias::v3::Response { room_id: join_room_response.room_id }) -} - -pub async fn join_room_by_id_helper( - services: &Services, - sender_user: &UserId, - room_id: &RoomId, - reason: Option, - servers: &[OwnedServerName], - third_party_signed: Option<&ThirdPartySigned>, - appservice_info: &Option, -) -> Result { - let state_lock = services.state.mutex.lock(room_id).await; - - let user_is_guest = services - .users - .is_deactivated(sender_user) - .await - .unwrap_or(false) - && appservice_info.is_none(); - - if user_is_guest - && !services - .state_accessor - .guest_can_join(room_id) - .await - { - return Err!(Request(Forbidden("Guests are not allowed to join this room"))); - } - - if services - .state_cache - .is_joined(sender_user, room_id) - .await - { - debug_warn!("{sender_user} is already joined in {room_id}"); - return Ok(join_room_by_id::v3::Response { room_id: room_id.into() }); - } - - if let Ok(membership) = services - .state_accessor - .get_member(room_id, sender_user) - .await - { - if membership.membership == MembershipState::Ban { - debug_warn!("{sender_user} is banned from {room_id} but attempted to join"); - return Err!(Request(Forbidden("You are banned from the room."))); - } - } - - let server_in_room = services - .state_cache - .server_in_room(services.globals.server_name(), room_id) - .await; - - let local_join = server_in_room - || servers.is_empty() - || (servers.len() == 1 && services.globals.server_is_ours(&servers[0])); - - if local_join { - join_room_by_id_helper_local( - services, + services + .membership + .join( sender_user, - room_id, - reason, - servers, - third_party_signed, - state_lock, + &room_id, + body.reason.clone(), + &servers, + appservice_info, + &state_lock, ) .boxed() .await?; - } else { - // Ask a remote server if we are not participating in this room - join_room_by_id_helper_remote( - services, - sender_user, - room_id, - reason, - servers, - third_party_signed, - state_lock, - ) - .boxed() - .await?; - } - Ok(join_room_by_id::v3::Response::new(room_id.to_owned())) -} - -#[tracing::instrument(skip_all, fields(%sender_user, %room_id), name = "join_remote")] -async fn join_room_by_id_helper_remote( - services: &Services, - sender_user: &UserId, - room_id: &RoomId, - reason: Option, - servers: &[OwnedServerName], - _third_party_signed: Option<&ThirdPartySigned>, - state_lock: RoomMutexGuard, -) -> Result { - info!("Joining {room_id} over federation."); - - let (make_join_response, remote_server) = - make_join_request(services, sender_user, room_id, servers).await?; - - info!("make_join finished"); - - let Some(room_version_id) = make_join_response.room_version else { - return Err!(BadServerResponse("Remote room version is not supported by tuwunel")); - }; - - if !services - .server - .supported_room_version(&room_version_id) - { - return Err!(BadServerResponse( - "Remote room version {room_version_id} is not supported by tuwunel" - )); - } - - let mut join_event_stub: CanonicalJsonObject = - serde_json::from_str(make_join_response.event.get()).map_err(|e| { - err!(BadServerResponse(warn!( - "Invalid make_join event json received from server: {e:?}" - ))) - })?; - - let join_authorized_via_users_server = { - use RoomVersionId::*; - if !matches!(room_version_id, V1 | V2 | V3 | V4 | V5 | V6 | V7) { - join_event_stub - .get("content") - .map(|s| { - s.as_object()? - .get("join_authorised_via_users_server")? - .as_str() - }) - .and_then(|s| OwnedUserId::try_from(s.unwrap_or_default()).ok()) - } else { - None - } - }; - - join_event_stub.insert( - "origin".to_owned(), - CanonicalJsonValue::String(services.globals.server_name().as_str().to_owned()), - ); - join_event_stub.insert( - "origin_server_ts".to_owned(), - CanonicalJsonValue::Integer( - utils::millis_since_unix_epoch() - .try_into() - .expect("Timestamp is valid js_int value"), - ), - ); - join_event_stub.insert( - "content".to_owned(), - to_canonical_value(RoomMemberEventContent { - displayname: services.users.displayname(sender_user).await.ok(), - avatar_url: services.users.avatar_url(sender_user).await.ok(), - blurhash: services.users.blurhash(sender_user).await.ok(), - reason, - join_authorized_via_users_server: join_authorized_via_users_server.clone(), - ..RoomMemberEventContent::new(MembershipState::Join) - }) - .expect("event is valid, we just created it"), - ); - - // We keep the "event_id" in the pdu only in v1 or - // v2 rooms - match room_version_id { - | RoomVersionId::V1 | RoomVersionId::V2 => {}, - | _ => { - join_event_stub.remove("event_id"); - }, - } - - // In order to create a compatible ref hash (EventID) the `hashes` field needs - // to be present - services - .server_keys - .hash_and_sign_event(&mut join_event_stub, &room_version_id)?; - - // Generate event id - let event_id = gen_event_id(&join_event_stub, &room_version_id)?; - - // Add event_id back - join_event_stub - .insert("event_id".to_owned(), CanonicalJsonValue::String(event_id.clone().into())); - - // It has enough fields to be called a proper event now - let mut join_event = join_event_stub; - - info!("Asking {remote_server} for send_join in room {room_id}"); - let send_join_request = federation::membership::create_join_event::v2::Request { - room_id: room_id.to_owned(), - event_id: event_id.clone(), - omit_members: false, - pdu: services - .sending - .convert_to_outgoing_federation_event(join_event.clone()) - .await, - }; - - let send_join_response = match services - .sending - .send_synapse_request(&remote_server, send_join_request) - .await - { - | Ok(response) => response, - | Err(e) => { - error!("send_join failed: {e}"); - return Err(e); - }, - }; - - info!("send_join finished"); - - if join_authorized_via_users_server.is_some() { - if let Some(signed_raw) = &send_join_response.room_state.event { - debug_info!( - "There is a signed event with join_authorized_via_users_server. This room is \ - probably using restricted joins. Adding signature to our event" - ); - - let (signed_event_id, signed_value) = - gen_event_id_canonical_json(signed_raw, &room_version_id).map_err(|e| { - err!(Request(BadJson(warn!( - "Could not convert event to canonical JSON: {e}" - )))) - })?; - - if signed_event_id != event_id { - return Err!(Request(BadJson(warn!( - %signed_event_id, %event_id, - "Server {remote_server} sent event with wrong event ID" - )))); - } - - match signed_value["signatures"] - .as_object() - .ok_or_else(|| { - err!(BadServerResponse(warn!( - "Server {remote_server} sent invalid signatures type" - ))) - }) - .and_then(|e| { - e.get(remote_server.as_str()).ok_or_else(|| { - err!(BadServerResponse(warn!( - "Server {remote_server} did not send its signature for a restricted \ - room" - ))) - }) - }) { - | Ok(signature) => { - join_event - .get_mut("signatures") - .expect("we created a valid pdu") - .as_object_mut() - .expect("we created a valid pdu") - .insert(remote_server.to_string(), signature.clone()); - }, - | Err(e) => { - warn!( - "Server {remote_server} sent invalid signature in send_join signatures \ - for event {signed_value:?}: {e:?}", - ); - }, - } - } - } - - services - .short - .get_or_create_shortroomid(room_id) - .await; - - info!("Parsing join event"); - let parsed_join_pdu = PduEvent::from_id_val(&event_id, join_event.clone()) - .map_err(|e| err!(BadServerResponse("Invalid join event PDU: {e:?}")))?; - - info!("Acquiring server signing keys for response events"); - let resp_events = &send_join_response.room_state; - let resp_state = &resp_events.state; - let resp_auth = &resp_events.auth_chain; - services - .server_keys - .acquire_events_pubkeys(resp_auth.iter().chain(resp_state.iter())) - .await; - - info!("Going through send_join response room_state"); - let cork = services.db.cork_and_flush(); - let state = send_join_response - .room_state - .state - .iter() - .stream() - .then(|pdu| { - services - .server_keys - .validate_and_add_event_id_no_fetch(pdu, &room_version_id) - }) - .ready_filter_map(Result::ok) - .fold(HashMap::new(), async |mut state, (event_id, value)| { - let pdu = if value["type"] == "m.room.create" { - PduEvent::from_rid_val(room_id, &event_id, value.clone()) - } else { - PduEvent::from_id_val(&event_id, value.clone()) - }; - - let pdu = match pdu { - | Ok(pdu) => pdu, - | Err(e) => { - debug_warn!("Invalid PDU in send_join response: {e:?}: {value:#?}"); - return state; - }, - }; - - services - .timeline - .add_pdu_outlier(&event_id, &value); - - if let Some(state_key) = &pdu.state_key { - let shortstatekey = services - .short - .get_or_create_shortstatekey(&pdu.kind.to_string().into(), state_key) - .await; - - state.insert(shortstatekey, pdu.event_id.clone()); - } - - state - }) - .await; - - drop(cork); - - info!("Going through send_join response auth_chain"); - let cork = services.db.cork_and_flush(); - send_join_response - .room_state - .auth_chain - .iter() - .stream() - .then(|pdu| { - services - .server_keys - .validate_and_add_event_id_no_fetch(pdu, &room_version_id) - }) - .ready_filter_map(Result::ok) - .ready_for_each(|(event_id, value)| { - services - .timeline - .add_pdu_outlier(&event_id, &value); - }) - .await; - - drop(cork); - - debug!("Running send_join auth check"); - state_res::auth_check( - &room_version::rules(&room_version_id)?, - &parsed_join_pdu, - &async |event_id| services.timeline.get_pdu(&event_id).await, - &async |event_type, state_key| { - let shortstatekey = services - .short - .get_shortstatekey(&event_type, state_key.as_str()) - .await?; - - let event_id = state.get(&shortstatekey).ok_or_else(|| { - err!(Request(NotFound("Missing fetch_state {shortstatekey:?}"))) - })?; - - services.timeline.get_pdu(event_id).await - }, - ) - .boxed() - .await?; - - info!("Compressing state from send_join"); - let compressed: CompressedState = services - .state_compressor - .compress_state_events(state.iter().map(|(ssk, eid)| (ssk, eid.borrow()))) - .collect() - .await; - - debug!("Saving compressed state"); - let HashSetCompressStateEvent { - shortstatehash: statehash_before_join, - added, - removed, - } = services - .state_compressor - .save_state(room_id, Arc::new(compressed)) - .await?; - - debug!("Forcing state for new room"); - services - .state - .force_state(room_id, statehash_before_join, added, removed, &state_lock) - .await?; - - info!("Updating joined counts for new room"); - services - .state_cache - .update_joined_count(room_id) - .await; - - // We append to state before appending the pdu, so we don't have a moment in - // time with the pdu without it's state. This is okay because append_pdu can't - // fail. - let statehash_after_join = services - .state - .append_to_state(&parsed_join_pdu) - .await?; - - info!("Appending new room join event"); - services - .timeline - .append_pdu( - &parsed_join_pdu, - join_event, - once(parsed_join_pdu.event_id.borrow()), - &state_lock, - ) - .await?; - - info!("Setting final room state for new room"); - // We set the room state after inserting the pdu, so that we never have a moment - // in time where events in the current room state do not exist - services - .state - .set_room_state(room_id, statehash_after_join, &state_lock); - - Ok(()) -} - -#[tracing::instrument(skip_all, fields(%sender_user, %room_id), name = "join_local")] -async fn join_room_by_id_helper_local( - services: &Services, - sender_user: &UserId, - room_id: &RoomId, - reason: Option, - servers: &[OwnedServerName], - _third_party_signed: Option<&ThirdPartySigned>, - state_lock: RoomMutexGuard, -) -> Result { - debug_info!("We can join locally"); - - let join_rules_event_content = services - .state_accessor - .room_state_get_content::( - room_id, - &StateEventType::RoomJoinRules, - "", - ) - .await; - - let restriction_rooms = match join_rules_event_content { - | Ok(RoomJoinRulesEventContent { - join_rule: JoinRule::Restricted(restricted) | JoinRule::KnockRestricted(restricted), - }) => restricted - .allow - .into_iter() - .filter_map(|a| match a { - | AllowRule::RoomMembership(r) => Some(r.room_id), - | _ => None, - }) - .collect(), - | _ => Vec::new(), - }; - - let join_authorized_via_users_server: Option = { - if restriction_rooms - .iter() - .stream() - .any(|restriction_room_id| { - services - .state_cache - .is_joined(sender_user, restriction_room_id) - }) - .await - { - let users = services - .state_cache - .local_users_in_room(room_id) - .filter(|user| { - services.state_accessor.user_can_invite( - room_id, - user, - sender_user, - &state_lock, - ) - }) - .map(ToOwned::to_owned); - - pin_mut!(users); - users.next().await - } else { - None - } - }; - - let content = RoomMemberEventContent { - displayname: services.users.displayname(sender_user).await.ok(), - avatar_url: services.users.avatar_url(sender_user).await.ok(), - blurhash: services.users.blurhash(sender_user).await.ok(), - reason: reason.clone(), - join_authorized_via_users_server, - ..RoomMemberEventContent::new(MembershipState::Join) - }; - - // Try normal join first - let Err(error) = services - .timeline - .build_and_append_pdu( - PduBuilder::state(sender_user.to_string(), &content), - sender_user, - room_id, - &state_lock, - ) - .await - else { - return Ok(()); - }; - - if restriction_rooms.is_empty() - && (servers.is_empty() - || servers.len() == 1 && services.globals.server_is_ours(&servers[0])) - { - return Err(error); - } - - warn!( - "We couldn't do the join locally, maybe federation can help to satisfy the restricted \ - join requirements" - ); - let Ok((make_join_response, remote_server)) = - make_join_request(services, sender_user, room_id, servers).await - else { - return Err(error); - }; - - let Some(room_version_id) = make_join_response.room_version else { - return Err!(BadServerResponse("Remote room version is not supported by tuwunel")); - }; - - if !services - .server - .supported_room_version(&room_version_id) - { - return Err!(BadServerResponse( - "Remote room version {room_version_id} is not supported by tuwunel" - )); - } - - let mut join_event_stub: CanonicalJsonObject = - serde_json::from_str(make_join_response.event.get()).map_err(|e| { - err!(BadServerResponse("Invalid make_join event json received from server: {e:?}")) - })?; - - let join_authorized_via_users_server = join_event_stub - .get("content") - .map(|s| { - s.as_object()? - .get("join_authorised_via_users_server")? - .as_str() - }) - .and_then(|s| OwnedUserId::try_from(s.unwrap_or_default()).ok()); - - join_event_stub.insert( - "origin".to_owned(), - CanonicalJsonValue::String(services.globals.server_name().as_str().to_owned()), - ); - join_event_stub.insert( - "origin_server_ts".to_owned(), - CanonicalJsonValue::Integer( - utils::millis_since_unix_epoch() - .try_into() - .expect("Timestamp is valid js_int value"), - ), - ); - join_event_stub.insert( - "content".to_owned(), - to_canonical_value(RoomMemberEventContent { - displayname: services.users.displayname(sender_user).await.ok(), - avatar_url: services.users.avatar_url(sender_user).await.ok(), - blurhash: services.users.blurhash(sender_user).await.ok(), - reason, - join_authorized_via_users_server, - ..RoomMemberEventContent::new(MembershipState::Join) - }) - .expect("event is valid, we just created it"), - ); - - // We keep the "event_id" in the pdu only in v1 or - // v2 rooms - match room_version_id { - | RoomVersionId::V1 | RoomVersionId::V2 => {}, - | _ => { - join_event_stub.remove("event_id"); - }, - } - - // In order to create a compatible ref hash (EventID) the `hashes` field needs - // to be present - services - .server_keys - .hash_and_sign_event(&mut join_event_stub, &room_version_id)?; - - // Generate event id - let event_id = gen_event_id(&join_event_stub, &room_version_id)?; - - // Add event_id back - join_event_stub - .insert("event_id".to_owned(), CanonicalJsonValue::String(event_id.clone().into())); - - // It has enough fields to be called a proper event now - let join_event = join_event_stub; - - let send_join_response = services - .sending - .send_synapse_request( - &remote_server, - federation::membership::create_join_event::v2::Request { - room_id: room_id.to_owned(), - event_id: event_id.clone(), - omit_members: false, - pdu: services - .sending - .convert_to_outgoing_federation_event(join_event.clone()) - .await, - }, - ) - .await?; - - if let Some(signed_raw) = send_join_response.room_state.event { - let (signed_event_id, signed_value) = - gen_event_id_canonical_json(&signed_raw, &room_version_id).map_err(|e| { - err!(Request(BadJson(warn!("Could not convert event to canonical JSON: {e}")))) - })?; - - if signed_event_id != event_id { - return Err!(Request(BadJson( - warn!(%signed_event_id, %event_id, "Server {remote_server} sent event with wrong event ID") - ))); - } - - drop(state_lock); - services - .event_handler - .handle_incoming_pdu(&remote_server, room_id, &signed_event_id, signed_value, true) - .boxed() - .await?; - } else { - return Err(error); - } - - Ok(()) -} - -async fn make_join_request( - services: &Services, - sender_user: &UserId, - room_id: &RoomId, - servers: &[OwnedServerName], -) -> Result<(federation::membership::prepare_join_event::v1::Response, OwnedServerName)> { - let mut make_join_response_and_server = - Err!(BadServerResponse("No server available to assist in joining.")); - - let mut make_join_counter: usize = 0; - let mut incompatible_room_version_count: usize = 0; - - for remote_server in servers { - if services.globals.server_is_ours(remote_server) { - continue; - } - info!("Asking {remote_server} for make_join ({make_join_counter})"); - let make_join_response = services - .sending - .send_federation_request( - remote_server, - federation::membership::prepare_join_event::v1::Request { - room_id: room_id.to_owned(), - user_id: sender_user.to_owned(), - ver: services - .server - .supported_room_versions() - .collect(), - }, - ) - .await; - - trace!("make_join response: {:?}", make_join_response); - make_join_counter = make_join_counter.saturating_add(1); - - if let Err(ref e) = make_join_response { - if matches!( - e.kind(), - ErrorKind::IncompatibleRoomVersion { .. } | ErrorKind::UnsupportedRoomVersion - ) { - incompatible_room_version_count = - incompatible_room_version_count.saturating_add(1); - } - - if incompatible_room_version_count > 15 { - info!( - "15 servers have responded with M_INCOMPATIBLE_ROOM_VERSION or \ - M_UNSUPPORTED_ROOM_VERSION, assuming that tuwunel does not support the \ - room version {room_id}: {e}" - ); - make_join_response_and_server = - Err!(BadServerResponse("Room version is not supported by tuwunel")); - return make_join_response_and_server; - } - - if make_join_counter > 40 { - warn!( - "40 servers failed to provide valid make_join response, assuming no server \ - can assist in joining." - ); - make_join_response_and_server = - Err!(BadServerResponse("No server available to assist in joining.")); - - return make_join_response_and_server; - } - } - - make_join_response_and_server = make_join_response.map(|r| (r, remote_server.clone())); - - if make_join_response_and_server.is_ok() { - break; - } - } - - make_join_response_and_server + drop(state_lock); + + Ok(join_room_by_id_or_alias::v3::Response { room_id }) } diff --git a/src/api/client/membership/kick.rs b/src/api/client/membership/kick.rs index 4aac2077..7ac7df14 100644 --- a/src/api/client/membership/kick.rs +++ b/src/api/client/membership/kick.rs @@ -1,9 +1,7 @@ use axum::extract::State; -use ruma::{ - api::client::membership::kick_user, - events::room::member::{MembershipState, RoomMemberEventContent}, -}; -use tuwunel_core::{Err, Result, matrix::pdu::PduBuilder}; +use futures::FutureExt; +use ruma::api::client::membership::kick_user; +use tuwunel_core::{Err, Result}; use crate::Ruma; @@ -14,43 +12,18 @@ pub(crate) async fn kick_user_route( State(services): State, body: Ruma, ) -> Result { - let state_lock = services.state.mutex.lock(&body.room_id).await; + let sender_user = body.sender_user(); - let Ok(event) = services - .state_accessor - .get_member(&body.room_id, &body.user_id) - .await - else { - // copy synapse's behaviour of returning 200 without any change to the state - // instead of erroring on left users - return Ok(kick_user::v3::Response::new()); - }; - - if !matches!( - event.membership, - MembershipState::Invite | MembershipState::Knock | MembershipState::Join, - ) { - return Err!(Request(Forbidden( - "Cannot kick a user who is not apart of the room (current membership: {})", - event.membership - ))); + if sender_user == body.user_id { + return Err!(Request(Forbidden("You cannot kick yourself."))); } + let state_lock = services.state.mutex.lock(&body.room_id).await; + services - .timeline - .build_and_append_pdu( - PduBuilder::state(body.user_id.to_string(), &RoomMemberEventContent { - membership: MembershipState::Leave, - reason: body.reason.clone(), - is_direct: None, - join_authorized_via_users_server: None, - third_party_invite: None, - ..event - }), - body.sender_user(), - &body.room_id, - &state_lock, - ) + .membership + .kick(&body.room_id, &body.user_id, body.reason.as_ref(), sender_user, &state_lock) + .boxed() .await?; drop(state_lock); diff --git a/src/api/client/membership/knock.rs b/src/api/client/membership/knock.rs index 4286d8f7..c981988d 100644 --- a/src/api/client/membership/knock.rs +++ b/src/api/client/membership/knock.rs @@ -4,7 +4,7 @@ use axum::extract::State; use axum_client_ip::InsecureClientIp; use futures::{FutureExt, StreamExt}; use ruma::{ - CanonicalJsonObject, CanonicalJsonValue, OwnedEventId, OwnedRoomId, OwnedServerName, RoomId, + CanonicalJsonObject, CanonicalJsonValue, OwnedEventId, OwnedServerName, RoomId, RoomVersionId, UserId, api::{ client::knock::knock_room, @@ -25,9 +25,8 @@ use tuwunel_core::{ event::{Event, gen_event_id}, pdu::{PduBuilder, PduEvent}, }, - result::FlatOk, trace, - utils::{self, shuffle, stream::IterStream}, + utils::{self}, warn, }; use tuwunel_service::{ @@ -39,7 +38,7 @@ use tuwunel_service::{ }; use super::banned_room_check; -use crate::Ruma; +use crate::{Ruma, client::membership::get_join_params}; /// # `POST /_matrix/client/*/knock/{roomIdOrAlias}` /// @@ -53,97 +52,13 @@ pub(crate) async fn knock_room_route( let sender_user = body.sender_user(); let body = &body.body; - let (servers, room_id) = match OwnedRoomId::try_from(body.room_id_or_alias.clone()) { - | Ok(room_id) => { - banned_room_check( - &services, - sender_user, - Some(&room_id), - room_id.server_name(), - client, - ) - .await?; + let (room_id, servers) = + get_join_params(&services, sender_user, &body.room_id_or_alias, &body.via).await?; - let mut servers = body.via.clone(); - servers.extend( - services - .state_cache - .servers_invite_via(&room_id) - .map(ToOwned::to_owned) - .collect::>() - .await, - ); + banned_room_check(&services, sender_user, Some(&room_id), room_id.server_name(), client) + .await?; - servers.extend( - services - .state_cache - .invite_state(sender_user, &room_id) - .await - .unwrap_or_default() - .iter() - .filter_map(|event| event.get_field("sender").ok().flatten()) - .filter_map(|sender: &str| UserId::parse(sender).ok()) - .map(|user| user.server_name().to_owned()), - ); - - if let Some(server) = room_id.server_name() { - servers.push(server.to_owned()); - } - - servers.sort_unstable(); - servers.dedup(); - shuffle(&mut servers); - - (servers, room_id) - }, - | Err(room_alias) => { - let (room_id, mut servers) = services - .alias - .resolve_alias(&room_alias, Some(body.via.clone())) - .await?; - - banned_room_check( - &services, - sender_user, - Some(&room_id), - Some(room_alias.server_name()), - client, - ) - .await?; - - let addl_via_servers = services - .state_cache - .servers_invite_via(&room_id) - .map(ToOwned::to_owned); - - let addl_state_servers = services - .state_cache - .invite_state(sender_user, &room_id) - .await - .unwrap_or_default(); - - let mut addl_servers: Vec<_> = addl_state_servers - .iter() - .map(|event| event.get_field("sender")) - .filter_map(FlatOk::flat_ok) - .map(|user: &UserId| user.server_name().to_owned()) - .stream() - .chain(addl_via_servers) - .collect() - .await; - - addl_servers.sort_unstable(); - addl_servers.dedup(); - shuffle(&mut addl_servers); - servers.append(&mut addl_servers); - - (servers, room_id) - }, - }; - - knock_room_by_id_helper(&services, sender_user, &room_id, body.reason.clone(), &servers) - .boxed() - .await + knock_room_by_id_helper(&services, sender_user, &room_id, body.reason.clone(), &servers).await } async fn knock_room_by_id_helper( diff --git a/src/api/client/membership/leave.rs b/src/api/client/membership/leave.rs index eccbc2b6..cef09fac 100644 --- a/src/api/client/membership/leave.rs +++ b/src/api/client/membership/leave.rs @@ -1,25 +1,7 @@ -use std::collections::HashSet; - use axum::extract::State; -use futures::{FutureExt, StreamExt, TryFutureExt, pin_mut}; -use ruma::{ - CanonicalJsonObject, CanonicalJsonValue, OwnedServerName, RoomId, RoomVersionId, UserId, - api::{ - client::membership::leave_room, - federation::{self}, - }, - events::{ - StateEventType, - room::member::{MembershipState, RoomMemberEventContent}, - }, -}; -use tuwunel_core::{ - Err, Result, debug_info, debug_warn, err, - matrix::{event::gen_event_id, pdu::PduBuilder}, - utils::{self, FutureBoolExt, future::ReadyEqExt}, - warn, -}; -use tuwunel_service::Services; +use futures::FutureExt; +use ruma::api::client::membership::leave_room; +use tuwunel_core::Result; use crate::Ruma; @@ -32,335 +14,17 @@ pub(crate) async fn leave_room_route( State(services): State, body: Ruma, ) -> Result { - leave_room(&services, body.sender_user(), &body.room_id, body.reason.clone()) + let room_id = &body.room_id; + + let state_lock = services.state.mutex.lock(room_id).await; + + services + .membership + .leave(body.sender_user(), room_id, body.reason.clone(), &state_lock) .boxed() - .await - .map(|()| leave_room::v3::Response::new()) -} - -// Make a user leave all their joined rooms, rescinds knocks, forgets all rooms, -// and ignores errors -pub async fn leave_all_rooms(services: &Services, user_id: &UserId) { - let rooms_joined = services - .state_cache - .rooms_joined(user_id) - .map(ToOwned::to_owned); - - let rooms_invited = services - .state_cache - .rooms_invited(user_id) - .map(|(r, _)| r); - - let rooms_knocked = services - .state_cache - .rooms_knocked(user_id) - .map(|(r, _)| r); - - let all_rooms: Vec<_> = rooms_joined - .chain(rooms_invited) - .chain(rooms_knocked) - .collect() - .await; - - for room_id in all_rooms { - // ignore errors - if let Err(e) = leave_room(services, user_id, &room_id, None) - .boxed() - .await - { - warn!(%user_id, "Failed to leave {room_id} remotely: {e}"); - } - - services.state_cache.forget(&room_id, user_id); - } -} - -pub async fn leave_room( - services: &Services, - user_id: &UserId, - room_id: &RoomId, - reason: Option, -) -> Result { - let default_member_content = RoomMemberEventContent { - membership: MembershipState::Leave, - reason: reason.clone(), - join_authorized_via_users_server: None, - is_direct: None, - avatar_url: None, - displayname: None, - third_party_invite: None, - blurhash: None, - }; - - let is_banned = services.metadata.is_banned(room_id); - let is_disabled = services.metadata.is_disabled(room_id); - - pin_mut!(is_banned, is_disabled); - if is_banned.or(is_disabled).await { - // the room is banned/disabled, the room must be rejected locally since we - // cant/dont want to federate with this server - services - .state_cache - .update_membership( - room_id, - user_id, - default_member_content, - user_id, - None, - None, - true, - ) - .await?; - - return Ok(()); - } - - let dont_have_room = services - .state_cache - .server_in_room(services.globals.server_name(), room_id) - .eq(&false); - - let not_knocked = services - .state_cache - .is_knocked(user_id, room_id) - .eq(&false); - - // Ask a remote server if we don't have this room and are not knocking on it - if dont_have_room.and(not_knocked).await { - if let Err(e) = remote_leave_room(services, user_id, room_id) - .boxed() - .await - { - warn!(%user_id, "Failed to leave room {room_id} remotely: {e}"); - // Don't tell the client about this error - } - - let last_state = services - .state_cache - .invite_state(user_id, room_id) - .or_else(|_| services.state_cache.knock_state(user_id, room_id)) - .or_else(|_| services.state_cache.left_state(user_id, room_id)) - .await - .ok(); - - // We always drop the invite, we can't rely on other servers - services - .state_cache - .update_membership( - room_id, - user_id, - default_member_content, - user_id, - last_state, - None, - true, - ) - .await?; - } else { - let state_lock = services.state.mutex.lock(room_id).await; - - let Ok(event) = services - .state_accessor - .room_state_get_content::( - room_id, - &StateEventType::RoomMember, - user_id.as_str(), - ) - .await - else { - debug_warn!( - "Trying to leave a room you are not a member of, marking room as left locally." - ); - - return services - .state_cache - .update_membership( - room_id, - user_id, - default_member_content, - user_id, - None, - None, - true, - ) - .await; - }; - - services - .timeline - .build_and_append_pdu( - PduBuilder::state(user_id.to_string(), &RoomMemberEventContent { - membership: MembershipState::Leave, - reason, - join_authorized_via_users_server: None, - is_direct: None, - ..event - }), - user_id, - room_id, - &state_lock, - ) - .await?; - } - - Ok(()) -} - -async fn remote_leave_room(services: &Services, user_id: &UserId, room_id: &RoomId) -> Result { - let mut make_leave_response_and_server = - Err!(BadServerResponse("No remote server available to assist in leaving {room_id}.")); - - let mut servers: HashSet = services - .state_cache - .servers_invite_via(room_id) - .map(ToOwned::to_owned) - .collect() - .await; - - match services - .state_cache - .invite_state(user_id, room_id) - .await - { - | Ok(invite_state) => { - servers.extend( - invite_state - .iter() - .filter_map(|event| event.get_field("sender").ok().flatten()) - .filter_map(|sender: &str| UserId::parse(sender).ok()) - .map(|user| user.server_name().to_owned()), - ); - }, - | _ => { - match services - .state_cache - .knock_state(user_id, room_id) - .await - { - | Ok(knock_state) => { - servers.extend( - knock_state - .iter() - .filter_map(|event| event.get_field("sender").ok().flatten()) - .filter_map(|sender: &str| UserId::parse(sender).ok()) - .filter_map(|sender| { - if !services.globals.user_is_local(sender) { - Some(sender.server_name().to_owned()) - } else { - None - } - }), - ); - }, - | _ => {}, - } - }, - } - - if let Some(room_id_server_name) = room_id.server_name() { - servers.insert(room_id_server_name.to_owned()); - } - - debug_info!("servers in remote_leave_room: {servers:?}"); - - for remote_server in servers { - let make_leave_response = services - .sending - .send_federation_request( - &remote_server, - federation::membership::prepare_leave_event::v1::Request { - room_id: room_id.to_owned(), - user_id: user_id.to_owned(), - }, - ) - .await; - - make_leave_response_and_server = make_leave_response.map(|r| (r, remote_server)); - - if make_leave_response_and_server.is_ok() { - break; - } - } - - let (make_leave_response, remote_server) = make_leave_response_and_server?; - - let Some(room_version_id) = make_leave_response.room_version else { - return Err!(BadServerResponse(warn!( - "No room version was returned by {remote_server} for {room_id}, room version is \ - likely not supported by tuwunel" - ))); - }; - - if !services - .server - .supported_room_version(&room_version_id) - { - return Err!(BadServerResponse(warn!( - "Remote room version {room_version_id} for {room_id} is not supported by conduwuit", - ))); - } - - let mut leave_event_stub = serde_json::from_str::( - make_leave_response.event.get(), - ) - .map_err(|e| { - err!(BadServerResponse(warn!( - "Invalid make_leave event json received from {remote_server} for {room_id}: {e:?}" - ))) - })?; - - // TODO: Is origin needed? - leave_event_stub.insert( - "origin".to_owned(), - CanonicalJsonValue::String(services.globals.server_name().as_str().to_owned()), - ); - leave_event_stub.insert( - "origin_server_ts".to_owned(), - CanonicalJsonValue::Integer( - utils::millis_since_unix_epoch() - .try_into() - .expect("Timestamp is valid js_int value"), - ), - ); - - // room v3 and above removed the "event_id" field from remote PDU format - match room_version_id { - | RoomVersionId::V1 | RoomVersionId::V2 => {}, - | _ => { - leave_event_stub.remove("event_id"); - }, - } - - // In order to create a compatible ref hash (EventID) the `hashes` field needs - // to be present - services - .server_keys - .hash_and_sign_event(&mut leave_event_stub, &room_version_id)?; - - // Generate event id - let event_id = gen_event_id(&leave_event_stub, &room_version_id)?; - - // Add event_id back - leave_event_stub - .insert("event_id".to_owned(), CanonicalJsonValue::String(event_id.clone().into())); - - // It has enough fields to be called a proper event now - let leave_event = leave_event_stub; - - services - .sending - .send_federation_request( - &remote_server, - federation::membership::create_leave_event::v2::Request { - room_id: room_id.to_owned(), - event_id, - pdu: services - .sending - .convert_to_outgoing_federation_event(leave_event.clone()) - .await, - }, - ) .await?; - Ok(()) + drop(state_lock); + + Ok(leave_room::v3::Response {}) } diff --git a/src/api/client/membership/mod.rs b/src/api/client/membership/mod.rs index eda45a88..6c87b9d9 100644 --- a/src/api/client/membership/mod.rs +++ b/src/api/client/membership/mod.rs @@ -22,7 +22,7 @@ use tuwunel_service::Services; pub(crate) use self::{ ban::ban_user_route, forget::forget_room_route, - invite::{invite_helper, invite_user_route}, + invite::invite_user_route, join::{join_room_by_id_or_alias_route, join_room_by_id_route}, kick::kick_user_route, knock::knock_room_route, @@ -30,11 +30,7 @@ pub(crate) use self::{ members::{get_member_events_route, joined_members_route}, unban::unban_user_route, }; -pub use self::{ - join::join_room_by_id_helper, - leave::{leave_all_rooms, leave_room}, -}; -use crate::{Ruma, client::full_user_deactivate}; +use crate::Ruma; /// # `POST /_matrix/client/r0/joined_rooms` /// @@ -136,17 +132,76 @@ async fn maybe_deactivate(services: &Services, user_id: &UserId, client_ip: IpAd .await; } - let all_joined_rooms: Vec = services - .state_cache - .rooms_joined(user_id) - .map(Into::into) - .collect() - .await; - - full_user_deactivate(services, user_id, &all_joined_rooms) + services + .deactivate + .full_deactivate(user_id) .boxed() .await?; } Ok(()) } + +// TODO: should this be in services? banned check would have to resolve again if +// room_id is not available at callsite +async fn get_join_params( + services: &Services, + user_id: &UserId, + room_id_or_alias: &RoomOrAliasId, + via: &[OwnedServerName], +) -> Result<(OwnedRoomId, Vec)> { + // servers tried first, additional_servers shuffled then tried after + let (room_id, mut servers, mut additional_servers) = + match OwnedRoomId::try_from(room_id_or_alias.to_owned()) { + // if room id, shuffle via + room_id server_name ... + | Ok(room_id) => { + let mut additional_servers = via.to_vec(); + + if let Some(server) = room_id.server_name() { + additional_servers.push(server.to_owned()); + } + + (room_id, Vec::new(), additional_servers) + }, + // ... if room alias, resolve and don't shuffle ... + | Err(room_alias) => { + let (room_id, servers) = services + .alias + .resolve_alias(&room_alias, Some(via.to_vec())) + .await?; + + (room_id, servers, Vec::new()) + }, + }; + + // either way, add invited vias + additional_servers.extend( + services + .state_cache + .servers_invite_via(&room_id) + .map(ToOwned::to_owned) + .collect::>() + .await, + ); + + // either way, add invite senders' servers + additional_servers.extend( + services + .state_cache + .invite_state(user_id, &room_id) + .await + .unwrap_or_default() + .iter() + .filter_map(|event| event.get_field("sender").ok().flatten()) + .filter_map(|sender: &str| UserId::parse(sender).ok()) + .map(|user| user.server_name().to_owned()), + ); + + // shuffle additionals, append to base servers + additional_servers.sort_unstable(); + additional_servers.dedup(); + shuffle(&mut additional_servers); + servers.append(&mut additional_servers); + + Ok((room_id, servers)) +} diff --git a/src/api/client/membership/unban.rs b/src/api/client/membership/unban.rs index b3f01bd8..4a4a7bfc 100644 --- a/src/api/client/membership/unban.rs +++ b/src/api/client/membership/unban.rs @@ -1,9 +1,7 @@ use axum::extract::State; -use ruma::{ - api::client::membership::unban_user, - events::room::member::{MembershipState, RoomMemberEventContent}, -}; -use tuwunel_core::{Err, Result, matrix::pdu::PduBuilder}; +use futures::FutureExt; +use ruma::api::client::membership::unban_user; +use tuwunel_core::Result; use crate::Ruma; @@ -16,34 +14,16 @@ pub(crate) async fn unban_user_route( ) -> Result { let state_lock = services.state.mutex.lock(&body.room_id).await; - let current_member_content = services - .state_accessor - .get_member(&body.room_id, &body.user_id) - .await - .unwrap_or_else(|_| RoomMemberEventContent::new(MembershipState::Leave)); - - if current_member_content.membership != MembershipState::Ban { - return Err!(Request(Forbidden( - "Cannot unban a user who is not banned (current membership: {})", - current_member_content.membership - ))); - } - services - .timeline - .build_and_append_pdu( - PduBuilder::state(body.user_id.to_string(), &RoomMemberEventContent { - membership: MembershipState::Leave, - reason: body.reason.clone(), - join_authorized_via_users_server: None, - third_party_invite: None, - is_direct: None, - ..current_member_content - }), - body.sender_user(), + .membership + .unban( &body.room_id, + &body.user_id, + body.reason.as_ref(), + body.sender_user(), &state_lock, ) + .boxed() .await?; drop(state_lock); diff --git a/src/api/client/mod.rs b/src/api/client/mod.rs index c1e8b61a..c30e03ab 100644 --- a/src/api/client/mod.rs +++ b/src/api/client/mod.rs @@ -40,7 +40,8 @@ pub(super) mod user_directory; pub(super) mod voip; pub(super) mod well_known; -pub use account::full_user_deactivate; +mod utils; + pub(super) use account::*; pub(super) use account_data::*; pub(super) use alias::*; @@ -55,12 +56,10 @@ pub(super) use keys::*; pub(super) use media::*; pub(super) use media_legacy::*; pub(super) use membership::*; -pub use membership::{join_room_by_id_helper, leave_all_rooms, leave_room}; pub(super) use message::*; pub(super) use openid::*; pub(super) use presence::*; pub(super) use profile::*; -pub use profile::{update_avatar_url, update_displayname}; pub(super) use push::*; pub(super) use read_marker::*; pub(super) use redact::*; diff --git a/src/api/client/profile.rs b/src/api/client/profile.rs index 58d43473..456bd4da 100644 --- a/src/api/client/profile.rs +++ b/src/api/client/profile.rs @@ -2,27 +2,20 @@ use std::collections::BTreeMap; use axum::extract::State; use futures::{ - FutureExt, StreamExt, TryStreamExt, - future::{join, join3, join4}, + StreamExt, + future::{join, join4}, }; use ruma::{ - OwnedMxcUri, OwnedRoomId, UserId, + OwnedRoomId, api::{ client::profile::{ get_avatar_url, get_display_name, get_profile, set_avatar_url, set_display_name, }, federation, }, - events::room::member::{MembershipState, RoomMemberEventContent}, presence::PresenceState, }; -use tuwunel_core::{ - Err, Result, - matrix::pdu::PduBuilder, - utils::{IterStream, future::TryExtExt, stream::TryIgnore}, - warn, -}; -use tuwunel_service::Services; +use tuwunel_core::{Err, Result, utils::future::TryExtExt}; use crate::Ruma; @@ -48,7 +41,9 @@ pub(crate) async fn set_displayname_route( .collect() .await; - update_displayname(&services, &body.user_id, body.displayname.clone(), &all_joined_rooms) + services + .users + .update_displayname(&body.user_id, body.displayname.clone(), &all_joined_rooms) .await; if services.config.allow_local_presence { @@ -143,14 +138,15 @@ pub(crate) async fn set_avatar_url_route( .collect() .await; - update_avatar_url( - &services, - &body.user_id, - body.avatar_url.clone(), - body.blurhash.clone(), - &all_joined_rooms, - ) - .await; + services + .users + .update_avatar_url( + &body.user_id, + body.avatar_url.clone(), + body.blurhash.clone(), + &all_joined_rooms, + ) + .await; if services.config.allow_local_presence { // Presence update @@ -333,126 +329,3 @@ pub(crate) async fn get_profile_route( Ok(response.collect::()) } - -pub async fn update_displayname( - services: &Services, - user_id: &UserId, - displayname: Option, - all_joined_rooms: &[OwnedRoomId], -) { - let (current_avatar_url, current_blurhash, current_displayname) = join3( - services.users.avatar_url(user_id).ok(), - services.users.blurhash(user_id).ok(), - services.users.displayname(user_id).ok(), - ) - .await; - - if displayname == current_displayname { - return; - } - - services - .users - .set_displayname(user_id, displayname.clone()); - - // Send a new join membership event into all joined rooms - let avatar_url = ¤t_avatar_url; - let blurhash = ¤t_blurhash; - let displayname = &displayname; - let all_joined_rooms: Vec<_> = all_joined_rooms - .iter() - .try_stream() - .and_then(async |room_id: &OwnedRoomId| { - let pdu = PduBuilder::state(user_id.to_string(), &RoomMemberEventContent { - displayname: displayname.clone(), - membership: MembershipState::Join, - avatar_url: avatar_url.clone(), - blurhash: blurhash.clone(), - join_authorized_via_users_server: None, - reason: None, - is_direct: None, - third_party_invite: None, - }); - - Ok((pdu, room_id)) - }) - .ignore_err() - .collect() - .await; - - update_all_rooms(services, all_joined_rooms, user_id) - .boxed() - .await; -} - -pub async fn update_avatar_url( - services: &Services, - user_id: &UserId, - avatar_url: Option, - blurhash: Option, - all_joined_rooms: &[OwnedRoomId], -) { - let (current_avatar_url, current_blurhash, current_displayname) = join3( - services.users.avatar_url(user_id).ok(), - services.users.blurhash(user_id).ok(), - services.users.displayname(user_id).ok(), - ) - .await; - - if current_avatar_url == avatar_url && current_blurhash == blurhash { - return; - } - - services - .users - .set_avatar_url(user_id, avatar_url.clone()); - services - .users - .set_blurhash(user_id, blurhash.clone()); - - // Send a new join membership event into all joined rooms - let avatar_url = &avatar_url; - let blurhash = &blurhash; - let displayname = ¤t_displayname; - let all_joined_rooms: Vec<_> = all_joined_rooms - .iter() - .try_stream() - .and_then(async |room_id: &OwnedRoomId| { - let pdu = PduBuilder::state(user_id.to_string(), &RoomMemberEventContent { - avatar_url: avatar_url.clone(), - blurhash: blurhash.clone(), - membership: MembershipState::Join, - displayname: displayname.clone(), - join_authorized_via_users_server: None, - reason: None, - is_direct: None, - third_party_invite: None, - }); - - Ok((pdu, room_id)) - }) - .ignore_err() - .collect() - .await; - - update_all_rooms(services, all_joined_rooms, user_id) - .boxed() - .await; -} - -async fn update_all_rooms( - services: &Services, - all_joined_rooms: Vec<(PduBuilder, &OwnedRoomId)>, - user_id: &UserId, -) { - for (pdu_builder, room_id) in all_joined_rooms { - let state_lock = services.state.mutex.lock(room_id).await; - if let Err(e) = services - .timeline - .build_and_append_pdu(pdu_builder, user_id, room_id, &state_lock) - .await - { - warn!(%user_id, %room_id, "Failed to update/send new profile join membership update in room: {e}"); - } - } -} diff --git a/src/api/client/register.rs b/src/api/client/register.rs index 80c81055..e87ab36a 100644 --- a/src/api/client/register.rs +++ b/src/api/client/register.rs @@ -19,7 +19,7 @@ use ruma::{ use tuwunel_core::{Err, Error, Result, debug_info, error, info, is_equal_to, utils, warn}; use tuwunel_service::users::device::generate_refresh_token; -use super::{DEVICE_ID_LENGTH, SESSION_ID_LENGTH, join_room_by_id_helper}; +use super::{DEVICE_ID_LENGTH, SESSION_ID_LENGTH}; use crate::Ruma; const RANDOM_USER_ID_LENGTH: usize = 10; @@ -555,17 +555,20 @@ pub(crate) async fn register_route( } if let Some(room_server_name) = room.server_name() { - match join_room_by_id_helper( - &services, - &user_id, - &room_id, - Some("Automatically joining this room upon registration".to_owned()), - &[services.globals.server_name().to_owned(), room_server_name.to_owned()], - None, - &body.appservice_info, - ) - .boxed() - .await + let state_lock = services.state.mutex.lock(&room_id).await; + + match services + .membership + .join( + &user_id, + &room_id, + Some("Automatically joining this room upon registration".to_owned()), + &[services.globals.server_name().to_owned(), room_server_name.to_owned()], + &body.appservice_info, + &state_lock, + ) + .boxed() + .await { | Err(e) => { // don't return this error so we don't fail registrations @@ -577,6 +580,8 @@ pub(crate) async fn register_route( info!("Automatically joined room {room} for user {user_id}"); }, } + + drop(state_lock); } } } diff --git a/src/api/client/room/create.rs b/src/api/client/room/create.rs index cd29c770..752917c3 100644 --- a/src/api/client/room/create.rs +++ b/src/api/client/room/create.rs @@ -35,7 +35,7 @@ use tuwunel_core::{ }; use tuwunel_service::{Services, appservice::RegistrationInfo, rooms::state::RoomMutexGuard}; -use crate::{Ruma, client::invite_helper}; +use crate::{Ruma, client::utils::invite_check}; /// # `POST /_matrix/client/v3/createRoom` /// @@ -332,30 +332,38 @@ pub(crate) async fn create_room_route( drop(next_count); drop(state_lock); - // 8. Events implied by invite (and TODO: invite_3pid) - for user_id in &body.invite { - if services - .users - .user_is_ignored(sender_user, user_id) + // if inviting anyone with room creation and invite check passes + if (!body.invite.is_empty() || !body.invite_3pid.is_empty()) + && invite_check(&services, sender_user, &room_id) .await - { - continue; - } else if services - .users - .user_is_ignored(user_id, sender_user) - .await - { - // silently drop the invite to the recipient if they've been ignored by the - // sender, pretend it worked - continue; - } + .is_ok() + { + // 8. Events implied by invite (and TODO: invite_3pid) + for user_id in &body.invite { + if services + .users + .user_is_ignored(sender_user, user_id) + .await + { + continue; + } else if services + .users + .user_is_ignored(user_id, sender_user) + .await + { + // silently drop the invite to the recipient if they've been ignored by the + // sender, pretend it worked + continue; + } - if let Err(e) = - invite_helper(&services, sender_user, user_id, &room_id, None, body.is_direct) + if let Err(e) = services + .membership + .invite(sender_user, user_id, &room_id, None, body.is_direct) .boxed() .await - { - warn!(%e, "Failed to send invite"); + { + warn!(%e, "Failed to send invite"); + } } } diff --git a/src/api/client/unstable.rs b/src/api/client/unstable.rs index 11d9dd85..29541308 100644 --- a/src/api/client/unstable.rs +++ b/src/api/client/unstable.rs @@ -18,7 +18,6 @@ use ruma::{ }; use tuwunel_core::{Err, Error, Result}; -use super::{update_avatar_url, update_displayname}; use crate::Ruma; /// # `GET /_matrix/client/unstable/uk.half-shot.msc2666/user/mutual_rooms` @@ -142,13 +141,14 @@ pub(crate) async fn set_profile_field_route( .collect() .await; - update_displayname( - &services, - &body.user_id, - Some(body.value.value().to_string()), - &all_joined_rooms, - ) - .await; + services + .users + .update_displayname( + &body.user_id, + Some(body.value.value().to_string()), + &all_joined_rooms, + ) + .await; } else if body.value.field_name() == ProfileFieldName::AvatarUrl { let mxc = ruma::OwnedMxcUri::from(body.value.value().to_string()); @@ -159,7 +159,10 @@ pub(crate) async fn set_profile_field_route( .collect() .await; - update_avatar_url(&services, &body.user_id, Some(mxc), None, &all_joined_rooms).await; + services + .users + .update_avatar_url(&body.user_id, Some(mxc), None, &all_joined_rooms) + .await; } else { services.users.set_profile_key( &body.user_id, @@ -202,7 +205,10 @@ pub(crate) async fn delete_profile_field_route( .collect() .await; - update_displayname(&services, &body.user_id, None, &all_joined_rooms).await; + services + .users + .update_displayname(&body.user_id, None, &all_joined_rooms) + .await; } else if body.field == ProfileFieldName::AvatarUrl { let all_joined_rooms: Vec = services .state_cache @@ -211,7 +217,10 @@ pub(crate) async fn delete_profile_field_route( .collect() .await; - update_avatar_url(&services, &body.user_id, None, None, &all_joined_rooms).await; + services + .users + .update_avatar_url(&body.user_id, None, None, &all_joined_rooms) + .await; } else { services .users diff --git a/src/api/client/utils.rs b/src/api/client/utils.rs new file mode 100644 index 00000000..2a41372e --- /dev/null +++ b/src/api/client/utils.rs @@ -0,0 +1,16 @@ +use ruma::{RoomId, UserId}; +use tuwunel_core::{Err, Result, warn}; +use tuwunel_service::Services; + +pub(crate) async fn invite_check( + services: &Services, + sender_user: &UserId, + room_id: &RoomId, +) -> Result { + if !services.users.is_admin(sender_user).await && services.config.block_non_admin_invites { + warn!("{sender_user} is not an admin and attempted to send an invite to {room_id}"); + return Err!(Request(Forbidden("Invites are not allowed on this server."))); + } + + Ok(()) +} diff --git a/src/service/deactivate/mod.rs b/src/service/deactivate/mod.rs new file mode 100644 index 00000000..ba4a5dfa --- /dev/null +++ b/src/service/deactivate/mod.rs @@ -0,0 +1,164 @@ +use std::sync::Arc; + +use futures::{FutureExt, StreamExt}; +use ruma::{ + OwnedRoomId, UserId, + events::{StateEventType, room::power_levels::RoomPowerLevelsEventContent}, +}; +use tuwunel_core::{Event, Result, info, pdu::PduBuilder, utils::ReadyExt, warn}; + +pub struct Service { + services: Arc, +} + +impl crate::Service for Service { + fn build(args: crate::Args<'_>) -> Result> { + Ok(Arc::new(Self { services: args.services.clone() })) + } + + fn name(&self) -> &str { crate::service::make_name(std::module_path!()) } +} + +impl Service { + /// Runs through all the deactivation steps: + /// + /// - Mark as deactivated + /// - Removing display name + /// - Removing avatar URL and blurhash + /// - Removing all profile data + /// - Leaving all rooms (and forgets all of them) + pub async fn full_deactivate(&self, user_id: &UserId) -> Result { + self.services + .users + .deactivate_account(user_id) + .await?; + + let all_joined_rooms: Vec = self + .services + .state_cache + .rooms_joined(user_id) + .map(Into::into) + .collect() + .await; + + self.services + .users + .update_displayname(user_id, None, &all_joined_rooms) + .await; + self.services + .users + .update_avatar_url(user_id, None, None, &all_joined_rooms) + .await; + + self.services + .users + .all_profile_keys(user_id) + .ready_for_each(|(profile_key, _)| { + self.services + .users + .set_profile_key(user_id, &profile_key, None); + }) + .await; + + for room_id in all_joined_rooms { + let state_lock = self.services.state.mutex.lock(&room_id).await; + + let room_power_levels = self + .services + .state_accessor + .get_power_levels(&room_id) + .await + .ok(); + + let user_can_change_self = room_power_levels + .as_ref() + .is_some_and(|power_levels| { + power_levels.user_can_change_user_power_level(user_id, user_id) + }); + + let user_can_demote_self = user_can_change_self + || self + .services + .state_accessor + .room_state_get(&room_id, &StateEventType::RoomCreate, "") + .await + .is_ok_and(|event| event.sender() == user_id); + + if user_can_demote_self { + let mut power_levels_content: RoomPowerLevelsEventContent = room_power_levels + .map(TryInto::try_into) + .transpose()? + .unwrap_or_default(); + + power_levels_content.users.remove(user_id); + + // ignore errors so deactivation doesn't fail + match self + .services + .timeline + .build_and_append_pdu( + PduBuilder::state(String::new(), &power_levels_content), + user_id, + &room_id, + &state_lock, + ) + .await + { + | Err(e) => { + warn!(%room_id, %user_id, "Failed to demote user's own power level: {e}"); + }, + | _ => { + info!("Demoted {user_id} in {room_id} as part of account deactivation"); + }, + } + } + } + + let rooms_joined = self + .services + .state_cache + .rooms_joined(user_id) + .map(ToOwned::to_owned); + + let rooms_invited = self + .services + .state_cache + .rooms_invited(user_id) + .map(|(r, _)| r); + + let rooms_knocked = self + .services + .state_cache + .rooms_knocked(user_id) + .map(|(r, _)| r); + + let all_rooms: Vec<_> = rooms_joined + .chain(rooms_invited) + .chain(rooms_knocked) + .collect() + .await; + + for room_id in all_rooms { + let state_lock = self.services.state.mutex.lock(&room_id).await; + + // ignore errors + if let Err(e) = self + .services + .membership + .leave(user_id, &room_id, None, &state_lock) + .boxed() + .await + { + warn!(%user_id, "Failed to leave {room_id} remotely: {e}"); + } + + drop(state_lock); + + self.services + .state_cache + .forget(&room_id, user_id); + } + + Ok(()) + } +} diff --git a/src/service/membership/ban.rs b/src/service/membership/ban.rs new file mode 100644 index 00000000..780bdd67 --- /dev/null +++ b/src/service/membership/ban.rs @@ -0,0 +1,44 @@ +use ruma::{ + RoomId, UserId, + events::room::member::{MembershipState, RoomMemberEventContent}, +}; +use tuwunel_core::{Result, implement, pdu::PduBuilder}; + +use super::Service; +use crate::rooms::timeline::RoomMutexGuard; + +#[implement(Service)] +#[tracing::instrument( + level = "debug", + skip_all, + fields(%sender_user, %room_id, %user_id) +)] +pub async fn ban( + &self, + room_id: &RoomId, + user_id: &UserId, + reason: Option<&String>, + sender_user: &UserId, + state_lock: &RoomMutexGuard, +) -> Result { + self.services + .timeline + .build_and_append_pdu( + PduBuilder::state(user_id.to_string(), &RoomMemberEventContent { + membership: MembershipState::Ban, + reason: reason.cloned(), + displayname: None, + avatar_url: None, + blurhash: None, + is_direct: None, + join_authorized_via_users_server: None, + third_party_invite: None, + }), + sender_user, + room_id, + state_lock, + ) + .await?; + + Ok(()) +} diff --git a/src/service/membership/invite.rs b/src/service/membership/invite.rs new file mode 100644 index 00000000..2c8c3485 --- /dev/null +++ b/src/service/membership/invite.rs @@ -0,0 +1,200 @@ +use futures::FutureExt; +use ruma::{ + OwnedServerName, RoomId, UserId, + api::federation::membership::create_invite, + events::room::member::{MembershipState, RoomMemberEventContent}, +}; +use tuwunel_core::{ + Err, Result, err, implement, matrix::event::gen_event_id_canonical_json, pdu::PduBuilder, +}; + +use super::Service; + +#[implement(Service)] +#[tracing::instrument( + level = "debug", + skip_all, + fields(%sender_user, %room_id, %user_id) +)] +pub async fn invite( + &self, + sender_user: &UserId, + user_id: &UserId, + room_id: &RoomId, + reason: Option<&String>, + is_direct: bool, +) -> Result { + if self.services.globals.user_is_local(user_id) { + self.local_invite(sender_user, user_id, room_id, reason, is_direct) + .boxed() + .await?; + } else { + self.remote_invite(sender_user, user_id, room_id, reason, is_direct) + .boxed() + .await?; + } + + Ok(()) +} + +#[implement(Service)] +#[tracing::instrument(name = "remote", level = "debug", skip_all)] +async fn remote_invite( + &self, + sender_user: &UserId, + user_id: &UserId, + room_id: &RoomId, + reason: Option<&String>, + is_direct: bool, +) -> Result { + let (pdu, pdu_json, invite_room_state) = { + let state_lock = self.services.state.mutex.lock(room_id).await; + + let content = RoomMemberEventContent { + avatar_url: self.services.users.avatar_url(user_id).await.ok(), + is_direct: Some(is_direct), + reason: reason.cloned(), + ..RoomMemberEventContent::new(MembershipState::Invite) + }; + + let (pdu, pdu_json) = self + .services + .timeline + .create_hash_and_sign_event( + PduBuilder::state(user_id.to_string(), &content), + sender_user, + room_id, + &state_lock, + ) + .await?; + + let invite_room_state = self.services.state.summary_stripped(&pdu).await; + + drop(state_lock); + + (pdu, pdu_json, invite_room_state) + }; + + let room_version_id = self + .services + .state + .get_room_version(room_id) + .await?; + + let response = self + .services + .sending + .send_federation_request(user_id.server_name(), create_invite::v2::Request { + room_id: room_id.to_owned(), + event_id: (*pdu.event_id).to_owned(), + room_version: room_version_id.clone(), + event: self + .services + .sending + .convert_to_outgoing_federation_event(pdu_json.clone()) + .await, + invite_room_state: invite_room_state + .into_iter() + .map(Into::into) + .collect(), + via: self + .services + .state_cache + .servers_route_via(room_id) + .await + .ok(), + }) + .await?; + + // We do not add the event_id field to the pdu here because of signature and + // hashes checks + let (event_id, value) = gen_event_id_canonical_json(&response.event, &room_version_id) + .map_err(|e| { + err!(Request(BadJson(warn!("Could not convert event to canonical JSON: {e}")))) + })?; + + if pdu.event_id != event_id { + return Err!(Request(BadJson(warn!( + %pdu.event_id, %event_id, + "Server {} sent event with wrong event ID", + user_id.server_name() + )))); + } + + let origin: OwnedServerName = serde_json::from_value(serde_json::to_value( + value + .get("origin") + .ok_or_else(|| err!(Request(BadJson("Event missing origin field."))))?, + )?) + .map_err(|e| { + err!(Request(BadJson(warn!("Origin field in event is not a valid server name: {e}")))) + })?; + + let pdu_id = self + .services + .event_handler + .handle_incoming_pdu(&origin, room_id, &event_id, value, true) + .await? + .ok_or_else(|| { + err!(Request(InvalidParam("Could not accept incoming PDU as timeline event."))) + })?; + + self.services + .sending + .send_pdu_room(room_id, &pdu_id) + .await?; + + Ok(()) +} + +#[implement(Service)] +#[tracing::instrument(name = "local", level = "debug", skip_all)] +async fn local_invite( + &self, + sender_user: &UserId, + user_id: &UserId, + room_id: &RoomId, + reason: Option<&String>, + is_direct: bool, +) -> Result { + if !self + .services + .state_cache + .is_joined(sender_user, room_id) + .await + { + return Err!(Request(Forbidden( + "You must be joined in the room you are trying to invite from." + ))); + } + + let state_lock = self.services.state.mutex.lock(room_id).await; + + let content = RoomMemberEventContent { + displayname: self + .services + .users + .displayname(user_id) + .await + .ok(), + avatar_url: self.services.users.avatar_url(user_id).await.ok(), + blurhash: self.services.users.blurhash(user_id).await.ok(), + is_direct: Some(is_direct), + reason: reason.cloned(), + ..RoomMemberEventContent::new(MembershipState::Invite) + }; + + self.services + .timeline + .build_and_append_pdu( + PduBuilder::state(user_id.to_string(), &content), + sender_user, + room_id, + &state_lock, + ) + .await?; + + drop(state_lock); + + Ok(()) +} diff --git a/src/service/membership/join.rs b/src/service/membership/join.rs new file mode 100644 index 00000000..2e8a2aac --- /dev/null +++ b/src/service/membership/join.rs @@ -0,0 +1,864 @@ +use std::{borrow::Borrow, collections::HashMap, iter::once, sync::Arc}; + +use futures::{FutureExt, StreamExt, pin_mut}; +use ruma::{ + CanonicalJsonObject, CanonicalJsonValue, OwnedServerName, OwnedUserId, RoomId, RoomVersionId, + UserId, + api::{client::error::ErrorKind, federation}, + canonical_json::to_canonical_value, + events::{ + StateEventType, + room::{ + join_rules::RoomJoinRulesEventContent, + member::{MembershipState, RoomMemberEventContent}, + }, + }, + room::{AllowRule, JoinRule}, +}; +use tuwunel_core::{ + Err, Result, debug, debug_info, debug_warn, err, error, implement, info, + matrix::{ + event::{gen_event_id, gen_event_id_canonical_json}, + room_version, + }, + pdu::{PduBuilder, PduEvent}, + state_res, trace, + utils::{self, IterStream, ReadyExt}, + warn, +}; + +use super::Service; +use crate::{ + appservice::RegistrationInfo, + rooms::{ + state::RoomMutexGuard, + state_compressor::{CompressedState, HashSetCompressStateEvent}, + }, +}; + +#[implement(Service)] +#[tracing::instrument( + level = "debug", + skip_all, + fields(%sender_user, %room_id) +)] +pub async fn join( + &self, + sender_user: &UserId, + room_id: &RoomId, + reason: Option, + servers: &[OwnedServerName], + appservice_info: &Option, + state_lock: &RoomMutexGuard, +) -> Result { + let user_is_guest = self + .services + .users + .is_deactivated(sender_user) + .await + .unwrap_or(false) + && appservice_info.is_none(); + + if user_is_guest + && !self + .services + .state_accessor + .guest_can_join(room_id) + .await + { + return Err!(Request(Forbidden("Guests are not allowed to join this room"))); + } + + if self + .services + .state_cache + .is_joined(sender_user, room_id) + .await + { + debug_warn!("{sender_user} is already joined in {room_id}"); + return Ok(()); + } + + if let Ok(membership) = self + .services + .state_accessor + .get_member(room_id, sender_user) + .await + { + if membership.membership == MembershipState::Ban { + debug_warn!("{sender_user} is banned from {room_id} but attempted to join"); + return Err!(Request(Forbidden("You are banned from the room."))); + } + } + + let server_in_room = self + .services + .state_cache + .server_in_room(self.services.globals.server_name(), room_id) + .await; + + let local_join = server_in_room + || servers.is_empty() + || (servers.len() == 1 && self.services.globals.server_is_ours(&servers[0])); + + if local_join { + self.join_local(sender_user, room_id, reason, servers, state_lock) + .boxed() + .await?; + } else { + // Ask a remote server if we are not participating in this room + self.join_remote(sender_user, room_id, reason, servers, state_lock) + .boxed() + .await?; + } + + Ok(()) +} + +#[implement(Service)] +#[tracing::instrument( + name = "remote", + level = "debug", + skip_all, + fields(?servers) +)] +pub async fn join_remote( + &self, + sender_user: &UserId, + room_id: &RoomId, + reason: Option, + servers: &[OwnedServerName], + state_lock: &RoomMutexGuard, +) -> Result { + info!("Joining {room_id} over federation."); + + let (make_join_response, remote_server) = self + .make_join_request(sender_user, room_id, servers) + .await?; + + info!("make_join finished"); + + let Some(room_version_id) = make_join_response.room_version else { + return Err!(BadServerResponse("Remote room version is not supported by tuwunel")); + }; + + if !self + .services + .server + .supported_room_version(&room_version_id) + { + return Err!(BadServerResponse( + "Remote room version {room_version_id} is not supported by tuwunel" + )); + } + + let mut join_event_stub: CanonicalJsonObject = + serde_json::from_str(make_join_response.event.get()).map_err(|e| { + err!(BadServerResponse(warn!( + "Invalid make_join event json received from server: {e:?}" + ))) + })?; + + let join_authorized_via_users_server = { + use RoomVersionId::*; + if !matches!(room_version_id, V1 | V2 | V3 | V4 | V5 | V6 | V7) { + join_event_stub + .get("content") + .map(|s| { + s.as_object()? + .get("join_authorised_via_users_server")? + .as_str() + }) + .and_then(|s| OwnedUserId::try_from(s.unwrap_or_default()).ok()) + } else { + None + } + }; + + join_event_stub.insert( + "origin".to_owned(), + CanonicalJsonValue::String( + self.services + .globals + .server_name() + .as_str() + .to_owned(), + ), + ); + join_event_stub.insert( + "origin_server_ts".to_owned(), + CanonicalJsonValue::Integer( + utils::millis_since_unix_epoch() + .try_into() + .expect("Timestamp is valid js_int value"), + ), + ); + join_event_stub.insert( + "content".to_owned(), + to_canonical_value(RoomMemberEventContent { + displayname: self + .services + .users + .displayname(sender_user) + .await + .ok(), + avatar_url: self + .services + .users + .avatar_url(sender_user) + .await + .ok(), + blurhash: self + .services + .users + .blurhash(sender_user) + .await + .ok(), + reason, + join_authorized_via_users_server: join_authorized_via_users_server.clone(), + ..RoomMemberEventContent::new(MembershipState::Join) + }) + .expect("event is valid, we just created it"), + ); + + // We keep the "event_id" in the pdu only in v1 or + // v2 rooms + match room_version_id { + | RoomVersionId::V1 | RoomVersionId::V2 => {}, + | _ => { + join_event_stub.remove("event_id"); + }, + } + + // In order to create a compatible ref hash (EventID) the `hashes` field needs + // to be present + self.services + .server_keys + .hash_and_sign_event(&mut join_event_stub, &room_version_id)?; + + // Generate event id + let event_id = gen_event_id(&join_event_stub, &room_version_id)?; + + // Add event_id back + join_event_stub + .insert("event_id".to_owned(), CanonicalJsonValue::String(event_id.clone().into())); + + // It has enough fields to be called a proper event now + let mut join_event = join_event_stub; + + info!("Asking {remote_server} for send_join in room {room_id}"); + let send_join_request = federation::membership::create_join_event::v2::Request { + room_id: room_id.to_owned(), + event_id: event_id.clone(), + omit_members: false, + pdu: self + .services + .sending + .convert_to_outgoing_federation_event(join_event.clone()) + .await, + }; + + // Once send_join hits the remote server it may start sending us events which + // have to be belayed until we process this response first. + let _federation_lock = self + .services + .event_handler + .mutex_federation + .lock(room_id) + .await; + + let send_join_response = match self + .services + .sending + .send_synapse_request(&remote_server, send_join_request) + .await + { + | Ok(response) => response, + | Err(e) => { + error!("send_join failed: {e}"); + return Err(e); + }, + }; + + info!("send_join finished"); + + if join_authorized_via_users_server.is_some() { + if let Some(signed_raw) = &send_join_response.room_state.event { + debug_info!( + "There is a signed event with join_authorized_via_users_server. This room is \ + probably using restricted joins. Adding signature to our event" + ); + + let (signed_event_id, signed_value) = + gen_event_id_canonical_json(signed_raw, &room_version_id).map_err(|e| { + err!(Request(BadJson(warn!( + "Could not convert event to canonical JSON: {e}" + )))) + })?; + + if signed_event_id != event_id { + return Err!(Request(BadJson(warn!( + %signed_event_id, %event_id, + "Server {remote_server} sent event with wrong event ID" + )))); + } + + match signed_value["signatures"] + .as_object() + .ok_or_else(|| { + err!(BadServerResponse(warn!( + "Server {remote_server} sent invalid signatures type" + ))) + }) + .and_then(|e| { + e.get(remote_server.as_str()).ok_or_else(|| { + err!(BadServerResponse(warn!( + "Server {remote_server} did not send its signature for a restricted \ + room" + ))) + }) + }) { + | Ok(signature) => { + join_event + .get_mut("signatures") + .expect("we created a valid pdu") + .as_object_mut() + .expect("we created a valid pdu") + .insert(remote_server.to_string(), signature.clone()); + }, + | Err(e) => { + warn!( + "Server {remote_server} sent invalid signature in send_join signatures \ + for event {signed_value:?}: {e:?}", + ); + }, + } + } + } + + self.services + .short + .get_or_create_shortroomid(room_id) + .await; + + info!("Parsing join event"); + let parsed_join_pdu = PduEvent::from_id_val(&event_id, join_event.clone()) + .map_err(|e| err!(BadServerResponse("Invalid join event PDU: {e:?}")))?; + + info!("Acquiring server signing keys for response events"); + let resp_events = &send_join_response.room_state; + let resp_state = &resp_events.state; + let resp_auth = &resp_events.auth_chain; + self.services + .server_keys + .acquire_events_pubkeys(resp_auth.iter().chain(resp_state.iter())) + .await; + + info!("Going through send_join response room_state"); + let cork = self.services.db.cork_and_flush(); + let state = send_join_response + .room_state + .state + .iter() + .stream() + .then(|pdu| { + self.services + .server_keys + .validate_and_add_event_id_no_fetch(pdu, &room_version_id) + }) + .ready_filter_map(Result::ok) + .fold(HashMap::new(), async |mut state, (event_id, value)| { + let pdu = if value["type"] == "m.room.create" { + PduEvent::from_rid_val(room_id, &event_id, value.clone()) + } else { + PduEvent::from_id_val(&event_id, value.clone()) + }; + + let pdu = match pdu { + | Ok(pdu) => pdu, + | Err(e) => { + debug_warn!("Invalid PDU in send_join response: {e:?}: {value:#?}"); + return state; + }, + }; + + self.services + .timeline + .add_pdu_outlier(&event_id, &value); + + if let Some(state_key) = &pdu.state_key { + let shortstatekey = self + .services + .short + .get_or_create_shortstatekey(&pdu.kind.to_string().into(), state_key) + .await; + + state.insert(shortstatekey, pdu.event_id.clone()); + } + + state + }) + .await; + + drop(cork); + + info!("Going through send_join response auth_chain"); + let cork = self.services.db.cork_and_flush(); + send_join_response + .room_state + .auth_chain + .iter() + .stream() + .then(|pdu| { + self.services + .server_keys + .validate_and_add_event_id_no_fetch(pdu, &room_version_id) + }) + .ready_filter_map(Result::ok) + .ready_for_each(|(event_id, value)| { + self.services + .timeline + .add_pdu_outlier(&event_id, &value); + }) + .await; + + drop(cork); + + debug!("Running send_join auth check"); + state_res::auth_check( + &room_version::rules(&room_version_id)?, + &parsed_join_pdu, + &async |event_id| self.services.timeline.get_pdu(&event_id).await, + &async |event_type, state_key| { + let shortstatekey = self + .services + .short + .get_shortstatekey(&event_type, state_key.as_str()) + .await?; + + let event_id = state.get(&shortstatekey).ok_or_else(|| { + err!(Request(NotFound("Missing fetch_state {shortstatekey:?}"))) + })?; + + self.services.timeline.get_pdu(event_id).await + }, + ) + .boxed() + .await?; + + info!("Compressing state from send_join"); + let compressed: CompressedState = self + .services + .state_compressor + .compress_state_events(state.iter().map(|(ssk, eid)| (ssk, eid.borrow()))) + .collect() + .await; + + debug!("Saving compressed state"); + let HashSetCompressStateEvent { + shortstatehash: statehash_before_join, + added, + removed, + } = self + .services + .state_compressor + .save_state(room_id, Arc::new(compressed)) + .await?; + + debug!("Forcing state for new room"); + self.services + .state + .force_state(room_id, statehash_before_join, added, removed, state_lock) + .await?; + + info!("Updating joined counts for new room"); + self.services + .state_cache + .update_joined_count(room_id) + .await; + + // We append to state before appending the pdu, so we don't have a moment in + // time with the pdu without it's state. This is okay because append_pdu can't + // fail. + let statehash_after_join = self + .services + .state + .append_to_state(&parsed_join_pdu) + .await?; + + info!("Appending new room join event"); + self.services + .timeline + .append_pdu( + &parsed_join_pdu, + join_event, + once(parsed_join_pdu.event_id.borrow()), + state_lock, + ) + .await?; + + info!("Setting final room state for new room"); + // We set the room state after inserting the pdu, so that we never have a moment + // in time where events in the current room state do not exist + self.services + .state + .set_room_state(room_id, statehash_after_join, state_lock); + + Ok(()) +} + +#[implement(Service)] +#[tracing::instrument(name = "local", level = "debug", skip_all)] +pub async fn join_local( + &self, + sender_user: &UserId, + room_id: &RoomId, + reason: Option, + servers: &[OwnedServerName], + state_lock: &RoomMutexGuard, +) -> Result { + debug_info!("We can join locally"); + + let join_rules_event_content = self + .services + .state_accessor + .room_state_get_content::( + room_id, + &StateEventType::RoomJoinRules, + "", + ) + .await; + + let restriction_rooms = match join_rules_event_content { + | Ok(RoomJoinRulesEventContent { + join_rule: JoinRule::Restricted(restricted) | JoinRule::KnockRestricted(restricted), + }) => restricted + .allow + .into_iter() + .filter_map(|a| match a { + | AllowRule::RoomMembership(r) => Some(r.room_id), + | _ => None, + }) + .collect(), + | _ => Vec::new(), + }; + + let join_authorized_via_users_server: Option = { + if restriction_rooms + .iter() + .stream() + .any(|restriction_room_id| { + self.services + .state_cache + .is_joined(sender_user, restriction_room_id) + }) + .await + { + let users = self + .services + .state_cache + .local_users_in_room(room_id) + .filter(|user| { + self.services.state_accessor.user_can_invite( + room_id, + user, + sender_user, + state_lock, + ) + }) + .map(ToOwned::to_owned); + + pin_mut!(users); + users.next().await + } else { + None + } + }; + + let content = RoomMemberEventContent { + displayname: self + .services + .users + .displayname(sender_user) + .await + .ok(), + avatar_url: self + .services + .users + .avatar_url(sender_user) + .await + .ok(), + blurhash: self + .services + .users + .blurhash(sender_user) + .await + .ok(), + reason: reason.clone(), + join_authorized_via_users_server, + ..RoomMemberEventContent::new(MembershipState::Join) + }; + + // Try normal join first + let Err(error) = self + .services + .timeline + .build_and_append_pdu( + PduBuilder::state(sender_user.to_string(), &content), + sender_user, + room_id, + state_lock, + ) + .await + else { + return Ok(()); + }; + + if restriction_rooms.is_empty() + && (servers.is_empty() + || servers.len() == 1 && self.services.globals.server_is_ours(&servers[0])) + { + return Err(error); + } + + warn!( + "We couldn't do the join locally, maybe federation can help to satisfy the restricted \ + join requirements" + ); + let Ok((make_join_response, remote_server)) = self + .make_join_request(sender_user, room_id, servers) + .await + else { + return Err(error); + }; + + let Some(room_version_id) = make_join_response.room_version else { + return Err!(BadServerResponse("Remote room version is not supported by tuwunel")); + }; + + if !self + .services + .server + .supported_room_version(&room_version_id) + { + return Err!(BadServerResponse( + "Remote room version {room_version_id} is not supported by tuwunel" + )); + } + + let mut join_event_stub: CanonicalJsonObject = + serde_json::from_str(make_join_response.event.get()).map_err(|e| { + err!(BadServerResponse("Invalid make_join event json received from server: {e:?}")) + })?; + + let join_authorized_via_users_server = join_event_stub + .get("content") + .map(|s| { + s.as_object()? + .get("join_authorised_via_users_server")? + .as_str() + }) + .and_then(|s| OwnedUserId::try_from(s.unwrap_or_default()).ok()); + + join_event_stub.insert( + "origin".to_owned(), + CanonicalJsonValue::String( + self.services + .globals + .server_name() + .as_str() + .to_owned(), + ), + ); + join_event_stub.insert( + "origin_server_ts".to_owned(), + CanonicalJsonValue::Integer( + utils::millis_since_unix_epoch() + .try_into() + .expect("Timestamp is valid js_int value"), + ), + ); + join_event_stub.insert( + "content".to_owned(), + to_canonical_value(RoomMemberEventContent { + displayname: self + .services + .users + .displayname(sender_user) + .await + .ok(), + avatar_url: self + .services + .users + .avatar_url(sender_user) + .await + .ok(), + blurhash: self + .services + .users + .blurhash(sender_user) + .await + .ok(), + reason, + join_authorized_via_users_server, + ..RoomMemberEventContent::new(MembershipState::Join) + }) + .expect("event is valid, we just created it"), + ); + + // We keep the "event_id" in the pdu only in v1 or + // v2 rooms + match room_version_id { + | RoomVersionId::V1 | RoomVersionId::V2 => {}, + | _ => { + join_event_stub.remove("event_id"); + }, + } + + // In order to create a compatible ref hash (EventID) the `hashes` field needs + // to be present + self.services + .server_keys + .hash_and_sign_event(&mut join_event_stub, &room_version_id)?; + + // Generate event id + let event_id = gen_event_id(&join_event_stub, &room_version_id)?; + + // Add event_id back + join_event_stub + .insert("event_id".to_owned(), CanonicalJsonValue::String(event_id.clone().into())); + + // It has enough fields to be called a proper event now + let join_event = join_event_stub; + + let send_join_response = self + .services + .sending + .send_synapse_request( + &remote_server, + federation::membership::create_join_event::v2::Request { + room_id: room_id.to_owned(), + event_id: event_id.clone(), + omit_members: false, + pdu: self + .services + .sending + .convert_to_outgoing_federation_event(join_event.clone()) + .await, + }, + ) + .await?; + + if let Some(signed_raw) = send_join_response.room_state.event { + let (signed_event_id, signed_value) = + gen_event_id_canonical_json(&signed_raw, &room_version_id).map_err(|e| { + err!(Request(BadJson(warn!("Could not convert event to canonical JSON: {e}")))) + })?; + + if signed_event_id != event_id { + return Err!(Request(BadJson( + warn!(%signed_event_id, %event_id, "Server {remote_server} sent event with wrong event ID") + ))); + } + + self.services + .event_handler + .handle_incoming_pdu(&remote_server, room_id, &signed_event_id, signed_value, true) + .boxed() + .await?; + } else { + return Err(error); + } + + Ok(()) +} + +#[implement(Service)] +#[tracing::instrument( + name = "make_join", + level = "debug", + skip_all, + fields(?servers) +)] +async fn make_join_request( + &self, + sender_user: &UserId, + room_id: &RoomId, + servers: &[OwnedServerName], +) -> Result<(federation::membership::prepare_join_event::v1::Response, OwnedServerName)> { + let mut make_join_response_and_server = + Err!(BadServerResponse("No server available to assist in joining.")); + + let mut make_join_counter: usize = 0; + let mut incompatible_room_version_count: usize = 0; + + for remote_server in servers { + if self + .services + .globals + .server_is_ours(remote_server) + { + continue; + } + info!("Asking {remote_server} for make_join ({make_join_counter})"); + let make_join_response = self + .services + .sending + .send_federation_request( + remote_server, + federation::membership::prepare_join_event::v1::Request { + room_id: room_id.to_owned(), + user_id: sender_user.to_owned(), + ver: self + .services + .server + .supported_room_versions() + .collect(), + }, + ) + .await; + + trace!("make_join response: {make_join_response:?}"); + make_join_counter = make_join_counter.saturating_add(1); + + if let Err(ref e) = make_join_response { + if matches!( + e.kind(), + ErrorKind::IncompatibleRoomVersion { .. } | ErrorKind::UnsupportedRoomVersion + ) { + incompatible_room_version_count = + incompatible_room_version_count.saturating_add(1); + } + + if incompatible_room_version_count > 15 { + info!( + "15 servers have responded with M_INCOMPATIBLE_ROOM_VERSION or \ + M_UNSUPPORTED_ROOM_VERSION, assuming that tuwunel does not support the \ + room version {room_id}: {e}" + ); + make_join_response_and_server = + Err!(BadServerResponse("Room version is not supported by tuwunel")); + return make_join_response_and_server; + } + + if make_join_counter > 40 { + warn!( + "40 servers failed to provide valid make_join response, assuming no server \ + can assist in joining." + ); + make_join_response_and_server = + Err!(BadServerResponse("No server available to assist in joining.")); + + return make_join_response_and_server; + } + } + + make_join_response_and_server = make_join_response.map(|r| (r, remote_server.clone())); + + if make_join_response_and_server.is_ok() { + break; + } + } + + make_join_response_and_server +} diff --git a/src/service/membership/kick.rs b/src/service/membership/kick.rs new file mode 100644 index 00000000..048a7a9a --- /dev/null +++ b/src/service/membership/kick.rs @@ -0,0 +1,63 @@ +use ruma::{ + RoomId, UserId, + events::room::member::{MembershipState, RoomMemberEventContent}, +}; +use tuwunel_core::{Err, Result, implement, pdu::PduBuilder}; + +use super::Service; +use crate::rooms::timeline::RoomMutexGuard; + +#[implement(Service)] +#[tracing::instrument( + level = "debug", + skip_all, + fields(%sender_user, %room_id, %user_id) +)] +pub async fn kick( + &self, + room_id: &RoomId, + user_id: &UserId, + reason: Option<&String>, + sender_user: &UserId, + state_lock: &RoomMutexGuard, +) -> Result { + // kicking doesn't make sense if there is no membership + let Ok(event) = self + .services + .state_accessor + .get_member(room_id, user_id) + .await + else { + return Ok(()); + }; + + // this is required to prevent ban -> leave transitions + if !matches!( + event.membership, + MembershipState::Invite | MembershipState::Knock | MembershipState::Join, + ) { + return Err!(Request(Forbidden( + "Cannot kick a user who is not apart of the room (current membership: {})", + event.membership + ))); + } + + self.services + .timeline + .build_and_append_pdu( + PduBuilder::state(user_id.to_string(), &RoomMemberEventContent { + membership: MembershipState::Leave, + reason: reason.cloned(), + is_direct: None, + join_authorized_via_users_server: None, + third_party_invite: None, + ..event + }), + sender_user, + room_id, + state_lock, + ) + .await?; + + Ok(()) +} diff --git a/src/service/membership/leave.rs b/src/service/membership/leave.rs new file mode 100644 index 00000000..d8c2da6e --- /dev/null +++ b/src/service/membership/leave.rs @@ -0,0 +1,340 @@ +use std::collections::HashSet; + +use futures::{FutureExt, StreamExt, TryFutureExt, pin_mut}; +use ruma::{ + CanonicalJsonObject, CanonicalJsonValue, OwnedServerName, RoomId, RoomVersionId, UserId, + api::federation, + events::{ + StateEventType, + room::member::{MembershipState, RoomMemberEventContent}, + }, +}; +use tuwunel_core::{ + Err, Result, debug_info, debug_warn, err, implement, + matrix::event::gen_event_id, + pdu::PduBuilder, + utils::{self, FutureBoolExt, future::ReadyEqExt}, + warn, +}; + +use super::Service; +use crate::rooms::timeline::RoomMutexGuard; + +#[implement(Service)] +#[tracing::instrument( + level = "debug", + skip_all, + fields(%room_id, %user_id) +)] +pub async fn leave( + &self, + user_id: &UserId, + room_id: &RoomId, + reason: Option, + state_lock: &RoomMutexGuard, +) -> Result { + let default_member_content = RoomMemberEventContent { + membership: MembershipState::Leave, + reason: reason.clone(), + join_authorized_via_users_server: None, + is_direct: None, + avatar_url: None, + displayname: None, + third_party_invite: None, + blurhash: None, + }; + + let is_banned = self.services.metadata.is_banned(room_id); + let is_disabled = self.services.metadata.is_disabled(room_id); + + pin_mut!(is_banned, is_disabled); + if is_banned.or(is_disabled).await { + // the room is banned/disabled, the room must be rejected locally since we + // cant/dont want to federate with this server + self.services + .state_cache + .update_membership( + room_id, + user_id, + default_member_content, + user_id, + None, + None, + true, + ) + .await?; + + return Ok(()); + } + + let dont_have_room = self + .services + .state_cache + .server_in_room(self.services.globals.server_name(), room_id) + .eq(&false); + + let not_knocked = self + .services + .state_cache + .is_knocked(user_id, room_id) + .eq(&false); + + // Ask a remote server if we don't have this room and are not knocking on it + if dont_have_room.and(not_knocked).await { + if let Err(e) = self.remote_leave(user_id, room_id).boxed().await { + warn!(%user_id, "Failed to leave room {room_id} remotely: {e}"); + // Don't tell the client about this error + } + + let last_state = self + .services + .state_cache + .invite_state(user_id, room_id) + .or_else(|_| { + self.services + .state_cache + .knock_state(user_id, room_id) + }) + .or_else(|_| { + self.services + .state_cache + .left_state(user_id, room_id) + }) + .await + .ok(); + + // We always drop the invite, we can't rely on other servers + self.services + .state_cache + .update_membership( + room_id, + user_id, + default_member_content, + user_id, + last_state, + None, + true, + ) + .await?; + } else { + let Ok(event) = self + .services + .state_accessor + .room_state_get_content::( + room_id, + &StateEventType::RoomMember, + user_id.as_str(), + ) + .await + else { + debug_warn!( + "Trying to leave a room you are not a member of, marking room as left locally." + ); + + return self + .services + .state_cache + .update_membership( + room_id, + user_id, + default_member_content, + user_id, + None, + None, + true, + ) + .await; + }; + + self.services + .timeline + .build_and_append_pdu( + PduBuilder::state(user_id.to_string(), &RoomMemberEventContent { + membership: MembershipState::Leave, + reason, + join_authorized_via_users_server: None, + is_direct: None, + ..event + }), + user_id, + room_id, + state_lock, + ) + .await?; + } + + Ok(()) +} + +#[implement(Service)] +#[tracing::instrument(name = "remote", level = "debug", skip_all)] +pub async fn remote_leave(&self, user_id: &UserId, room_id: &RoomId) -> Result { + let mut make_leave_response_and_server = + Err!(BadServerResponse("No remote server available to assist in leaving {room_id}.")); + + let mut servers: HashSet = self + .services + .state_cache + .servers_invite_via(room_id) + .map(ToOwned::to_owned) + .collect() + .await; + + match self + .services + .state_cache + .invite_state(user_id, room_id) + .await + { + | Ok(invite_state) => { + servers.extend( + invite_state + .iter() + .filter_map(|event| event.get_field("sender").ok().flatten()) + .filter_map(|sender: &str| UserId::parse(sender).ok()) + .map(|user| user.server_name().to_owned()), + ); + }, + | _ => { + match self + .services + .state_cache + .knock_state(user_id, room_id) + .await + { + | Ok(knock_state) => { + servers.extend( + knock_state + .iter() + .filter_map(|event| event.get_field("sender").ok().flatten()) + .filter_map(|sender: &str| UserId::parse(sender).ok()) + .filter_map(|sender| { + if !self.services.globals.user_is_local(sender) { + Some(sender.server_name().to_owned()) + } else { + None + } + }), + ); + }, + | _ => {}, + } + }, + } + + if let Some(room_id_server_name) = room_id.server_name() { + servers.insert(room_id_server_name.to_owned()); + } + + debug_info!("servers in remote_leave_room: {servers:?}"); + + for remote_server in servers { + let make_leave_response = self + .services + .sending + .send_federation_request( + &remote_server, + federation::membership::prepare_leave_event::v1::Request { + room_id: room_id.to_owned(), + user_id: user_id.to_owned(), + }, + ) + .await; + + make_leave_response_and_server = make_leave_response.map(|r| (r, remote_server)); + + if make_leave_response_and_server.is_ok() { + break; + } + } + + let (make_leave_response, remote_server) = make_leave_response_and_server?; + + let Some(room_version_id) = make_leave_response.room_version else { + return Err!(BadServerResponse(warn!( + "No room version was returned by {remote_server} for {room_id}, room version is \ + likely not supported by tuwunel" + ))); + }; + + if !self + .services + .server + .supported_room_version(&room_version_id) + { + return Err!(BadServerResponse(warn!( + "Remote room version {room_version_id} for {room_id} is not supported by conduwuit", + ))); + } + + let mut leave_event_stub = serde_json::from_str::( + make_leave_response.event.get(), + ) + .map_err(|e| { + err!(BadServerResponse(warn!( + "Invalid make_leave event json received from {remote_server} for {room_id}: {e:?}" + ))) + })?; + + // TODO: Is origin needed? + leave_event_stub.insert( + "origin".to_owned(), + CanonicalJsonValue::String( + self.services + .globals + .server_name() + .as_str() + .to_owned(), + ), + ); + leave_event_stub.insert( + "origin_server_ts".to_owned(), + CanonicalJsonValue::Integer( + utils::millis_since_unix_epoch() + .try_into() + .expect("Timestamp is valid js_int value"), + ), + ); + + // room v3 and above removed the "event_id" field from remote PDU format + match room_version_id { + | RoomVersionId::V1 | RoomVersionId::V2 => {}, + | _ => { + leave_event_stub.remove("event_id"); + }, + } + + // In order to create a compatible ref hash (EventID) the `hashes` field needs + // to be present + self.services + .server_keys + .hash_and_sign_event(&mut leave_event_stub, &room_version_id)?; + + // Generate event id + let event_id = gen_event_id(&leave_event_stub, &room_version_id)?; + + // Add event_id back + leave_event_stub + .insert("event_id".to_owned(), CanonicalJsonValue::String(event_id.clone().into())); + + // It has enough fields to be called a proper event now + let leave_event = leave_event_stub; + + self.services + .sending + .send_federation_request( + &remote_server, + federation::membership::create_leave_event::v2::Request { + room_id: room_id.to_owned(), + event_id, + pdu: self + .services + .sending + .convert_to_outgoing_federation_event(leave_event.clone()) + .await, + }, + ) + .await?; + + Ok(()) +} diff --git a/src/service/membership/mod.rs b/src/service/membership/mod.rs new file mode 100644 index 00000000..c065ed37 --- /dev/null +++ b/src/service/membership/mod.rs @@ -0,0 +1,22 @@ +mod ban; +mod invite; +mod join; +mod kick; +mod leave; +mod unban; + +use std::sync::Arc; + +use tuwunel_core::Result; + +pub struct Service { + services: Arc, +} + +impl crate::Service for Service { + fn build(args: crate::Args<'_>) -> Result> { + Ok(Arc::new(Self { services: args.services.clone() })) + } + + fn name(&self) -> &str { crate::service::make_name(std::module_path!()) } +} diff --git a/src/service/membership/unban.rs b/src/service/membership/unban.rs new file mode 100644 index 00000000..87293465 --- /dev/null +++ b/src/service/membership/unban.rs @@ -0,0 +1,57 @@ +use ruma::{ + RoomId, UserId, + events::room::member::{MembershipState, RoomMemberEventContent}, +}; +use tuwunel_core::{Err, Result, implement, pdu::PduBuilder}; + +use super::Service; +use crate::rooms::timeline::RoomMutexGuard; + +#[implement(Service)] +#[tracing::instrument( + name = "remote", + level = "debug", + skip_all, + fields(%sender_user, %room_id, %user_id), +)] +pub async fn unban( + &self, + room_id: &RoomId, + user_id: &UserId, + reason: Option<&String>, + sender_user: &UserId, + state_lock: &RoomMutexGuard, +) -> Result { + let current_member_content = self + .services + .state_accessor + .get_member(room_id, user_id) + .await + .unwrap_or_else(|_| RoomMemberEventContent::new(MembershipState::Leave)); + + if current_member_content.membership != MembershipState::Ban { + return Err!(Request(Forbidden( + "Cannot unban a user who is not banned (current membership: {})", + current_member_content.membership + ))); + } + + self.services + .timeline + .build_and_append_pdu( + PduBuilder::state(user_id.to_string(), &RoomMemberEventContent { + membership: MembershipState::Leave, + reason: reason.cloned(), + join_authorized_via_users_server: None, + third_party_invite: None, + is_direct: None, + ..current_member_content + }), + sender_user, + room_id, + state_lock, + ) + .await?; + + Ok(()) +} diff --git a/src/service/mod.rs b/src/service/mod.rs index d84ef29b..a3093d6a 100644 --- a/src/service/mod.rs +++ b/src/service/mod.rs @@ -11,11 +11,13 @@ pub mod admin; pub mod appservice; pub mod client; pub mod config; +pub mod deactivate; pub mod emergency; pub mod federation; pub mod globals; pub mod key_backups; pub mod media; +pub mod membership; pub mod presence; pub mod pusher; pub mod resolver; diff --git a/src/service/services.rs b/src/service/services.rs index de777f24..caf3ab44 100644 --- a/src/service/services.rs +++ b/src/service/services.rs @@ -9,9 +9,10 @@ use tuwunel_core::{Result, Server, debug, debug_info, err, info, trace}; use tuwunel_database::Database; use crate::{ - account_data, admin, appservice, client, config, emergency, federation, globals, key_backups, + account_data, admin, appservice, client, config, deactivate, emergency, federation, globals, + key_backups, manager::Manager, - media, presence, pusher, resolver, rooms, sending, server_keys, + media, membership, presence, pusher, resolver, rooms, sending, server_keys, service::{Args, Service}, sync, transaction_ids, uiaa, users, }; @@ -55,6 +56,8 @@ pub struct Services { pub transaction_ids: Arc, pub uiaa: Arc, pub users: Arc, + pub membership: Arc, + pub deactivate: Arc, manager: Mutex>>, pub server: Arc, @@ -133,6 +136,8 @@ impl Services { transaction_ids: build!(transaction_ids::Service), uiaa: build!(uiaa::Service), users: build!(users::Service), + membership: build!(membership::Service), + deactivate: build!(deactivate::Service), manager: Mutex::new(None), server, @@ -184,7 +189,7 @@ impl Services { Ok(()) } - pub(crate) fn services(&self) -> [Arc; 38] { + pub(crate) fn services(&self) -> [Arc; 40] { [ self.account_data.clone(), self.admin.clone(), @@ -224,6 +229,8 @@ impl Services { self.transaction_ids.clone(), self.uiaa.clone(), self.users.clone(), + self.membership.clone(), + self.deactivate.clone(), ] } diff --git a/src/service/users/mod.rs b/src/service/users/mod.rs index 0e6fa71a..1761b5de 100644 --- a/src/service/users/mod.rs +++ b/src/service/users/mod.rs @@ -5,15 +5,22 @@ mod profile; use std::sync::Arc; -use futures::{Stream, StreamExt, TryFutureExt}; +use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt, future::join3}; use ruma::{ - OwnedMxcUri, OwnedUserId, UserId, + OwnedMxcUri, OwnedRoomId, OwnedUserId, UserId, api::client::filter::FilterDefinition, - events::{GlobalAccountDataEventType, ignored_user_list::IgnoredUserListEvent}, + events::{ + GlobalAccountDataEventType, + ignored_user_list::IgnoredUserListEvent, + room::member::{MembershipState, RoomMemberEventContent}, + }, }; use tuwunel_core::{ - Err, Result, debug_warn, err, is_equal_to, trace, - utils::{self, ReadyExt, stream::TryIgnore}, + Err, Result, debug_warn, err, is_equal_to, + pdu::PduBuilder, + trace, + utils::{self, IterStream, ReadyExt, TryFutureExtExt, stream::TryIgnore}, + warn, }; use tuwunel_database::{Deserialized, Json, Map}; @@ -439,4 +446,124 @@ impl Service { pub async fn auth_ldap(&self, _user_dn: &str, _password: &str) -> Result { Err!(FeatureDisabled("ldap")) } + + pub async fn update_displayname( + &self, + user_id: &UserId, + displayname: Option, + rooms: &[OwnedRoomId], + ) { + let (current_avatar_url, current_blurhash, current_displayname) = join3( + self.services.users.avatar_url(user_id).ok(), + self.services.users.blurhash(user_id).ok(), + self.services.users.displayname(user_id).ok(), + ) + .await; + + if displayname == current_displayname { + return; + } + + self.services + .users + .set_displayname(user_id, displayname.clone()); + + // Send a new join membership event into rooms + let avatar_url = ¤t_avatar_url; + let blurhash = ¤t_blurhash; + let displayname = &displayname; + let rooms: Vec<_> = rooms + .iter() + .try_stream() + .and_then(async |room_id: &OwnedRoomId| { + let pdu = PduBuilder::state(user_id.to_string(), &RoomMemberEventContent { + displayname: displayname.clone(), + membership: MembershipState::Join, + avatar_url: avatar_url.clone(), + blurhash: blurhash.clone(), + join_authorized_via_users_server: None, + reason: None, + is_direct: None, + third_party_invite: None, + }); + + Ok((pdu, room_id)) + }) + .ignore_err() + .collect() + .await; + + self.update_all_rooms(user_id, rooms) + .boxed() + .await; + } + + pub async fn update_avatar_url( + &self, + user_id: &UserId, + avatar_url: Option, + blurhash: Option, + rooms: &[OwnedRoomId], + ) { + let (current_avatar_url, current_blurhash, current_displayname) = join3( + self.services.users.avatar_url(user_id).ok(), + self.services.users.blurhash(user_id).ok(), + self.services.users.displayname(user_id).ok(), + ) + .await; + + if current_avatar_url == avatar_url && current_blurhash == blurhash { + return; + } + + self.services + .users + .set_avatar_url(user_id, avatar_url.clone()); + self.services + .users + .set_blurhash(user_id, blurhash.clone()); + + // Send a new join membership event into rooms + let avatar_url = &avatar_url; + let blurhash = &blurhash; + let displayname = ¤t_displayname; + let rooms: Vec<_> = rooms + .iter() + .try_stream() + .and_then(async |room_id: &OwnedRoomId| { + let pdu = PduBuilder::state(user_id.to_string(), &RoomMemberEventContent { + avatar_url: avatar_url.clone(), + blurhash: blurhash.clone(), + membership: MembershipState::Join, + displayname: displayname.clone(), + join_authorized_via_users_server: None, + reason: None, + is_direct: None, + third_party_invite: None, + }); + + Ok((pdu, room_id)) + }) + .ignore_err() + .collect() + .await; + + self.update_all_rooms(user_id, rooms) + .boxed() + .await; + } + + async fn update_all_rooms(&self, user_id: &UserId, rooms: Vec<(PduBuilder, &OwnedRoomId)>) { + for (pdu_builder, room_id) in rooms { + let state_lock = self.services.state.mutex.lock(room_id).await; + if let Err(e) = self + .services + .timeline + .build_and_append_pdu(pdu_builder, user_id, room_id, &state_lock) + .await + { + warn!(%user_id, %room_id, "Failed to update/send new profile join membership update in room: {e}"); + } + } + } }