From 0d782095ad90ac944926d98e1b0220ee38d1009c Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Tue, 25 Nov 2025 16:40:42 +0000 Subject: [PATCH] Refactor room upgrade endpoint; rollback on failure. Signed-off-by: Jason Volk --- src/api/client/room/upgrade.rs | 450 +++++++++++-------- src/service/rooms/state_accessor/user_can.rs | 38 +- 2 files changed, 312 insertions(+), 176 deletions(-) diff --git a/src/api/client/room/upgrade.rs b/src/api/client/room/upgrade.rs index d9e54412..fb0b51fe 100644 --- a/src/api/client/room/upgrade.rs +++ b/src/api/client/room/upgrade.rs @@ -1,13 +1,14 @@ use std::cmp::max; use axum::extract::State; -use futures::StreamExt; +use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt}; use ruma::{ - CanonicalJsonObject, RoomId, RoomVersionId, - api::client::room::upgrade_room, + CanonicalJsonObject, OwnedEventId, OwnedRoomId, RoomId, RoomVersionId, UserId, + api::client::room::upgrade_room::v3, events::{ StateEventType, TimelineEventType, room::{ + create::PreviousRoom, member::{MembershipState, RoomMemberEventContent}, power_levels::RoomPowerLevelsEventContent, tombstone::RoomTombstoneEventContent, @@ -18,25 +19,40 @@ use ruma::{ }; use serde_json::{json, value::to_raw_value}; use tuwunel_core::{ - Err, Result, err, + Err, Result, err, error, implement, info, matrix::{Event, StateKey, pdu::PduBuilder, room_version}, + utils::{ + future::TryExtExt, + stream::{IterStream, ReadyExt, WidebandExt}, + }, }; +use tuwunel_service::{Services, rooms::timeline::RoomMutexGuard}; use crate::Ruma; -/// Recommended transferable state events list from the spec -const TRANSFERABLE_STATE_EVENTS: &[StateEventType; 9] = &[ - StateEventType::RoomAvatar, +//TODO: Upgrade Ruma +const RECOMMENDED_TRANSFERABLE_STATE_EVENT_TYPES: &[StateEventType; 9] = &[ + StateEventType::RoomServerAcl, StateEventType::RoomEncryption, + StateEventType::RoomName, + StateEventType::RoomAvatar, + StateEventType::RoomTopic, StateEventType::RoomGuestAccess, StateEventType::RoomHistoryVisibility, StateEventType::RoomJoinRules, - StateEventType::RoomName, StateEventType::RoomPowerLevels, - StateEventType::RoomServerAcl, - StateEventType::RoomTopic, ]; +#[derive(Clone, Copy, Debug)] +struct RoomUpgradeContext<'a> { + services: &'a Services, + sender_user: &'a UserId, + new_room_id: &'a RoomId, + new_state_lock: &'a RoomMutexGuard, + old_room_id: &'a RoomId, + old_state_lock: &'a RoomMutexGuard, +} + /// # `POST /_matrix/client/r0/rooms/{roomId}/upgrade` /// /// Upgrades the room. @@ -47,117 +63,137 @@ const TRANSFERABLE_STATE_EVENTS: &[StateEventType; 9] = &[ /// - Transfers some state events /// - Moves local aliases /// - Modifies old room power levels to prevent users from speaking +#[tracing::instrument(level = "debug")] pub(crate) async fn upgrade_room_route( State(services): State, - body: Ruma, -) -> Result { - debug_assert!( - TRANSFERABLE_STATE_EVENTS.is_sorted(), - "TRANSFERABLE_STATE_EVENTS is not sorted" - ); - + body: Ruma, +) -> Result { let sender_user = body.sender_user(); + let new_version = &body.new_version; + let version_rules = room_version::rules(new_version)?; if !services .server - .supported_room_version(&body.new_version) + .supported_room_version(new_version) { return Err!(Request(UnsupportedRoomVersion( "This server does not support that room version.", ))); } - if matches!(body.new_version, RoomVersionId::V12) { + if matches!(new_version, RoomVersionId::V12) { return Err!(Request(UnsupportedRoomVersion( "Upgrading to version 12 is still under development.", ))); } - let room_version_rules = room_version::rules(&body.new_version)?; - let room_id_format = &room_version_rules.room_id_format; - assert!(*room_id_format == RoomIdFormatVersion::V1, "TODO"); + let old_room_id = &body.room_id; + let old_state_lock = services.state.mutex.lock(old_room_id).await; + + if !services + .state_accessor + .user_can_tombstone(old_room_id, sender_user, &old_state_lock) + .await + { + return Err!(Request(Forbidden("You are not permitted to upgrade the room."))); + } + + let latest_event = services + .timeline + .latest_pdu_in_room(old_room_id) + .await + .ok(); + + let predecessor = PreviousRoom { + room_id: old_room_id.to_owned(), + event_id: latest_event + .as_ref() + .map(Event::event_id) + .map(ToOwned::to_owned), + }; + + let id_format = version_rules.room_id_format; + let (replacement_room, state_lock) = match id_format { + | RoomIdFormatVersion::V1 => upgrade_room_create_legacy(&services, &body, predecessor), + | _ => unimplemented!("Unexpected format {id_format:?} for room {new_version}"), + } + .inspect_err(|e| error!(?body, "Upgrade creation event failed: {e}")) + .await?; + + let context = RoomUpgradeContext { + services: &services, + sender_user, + new_room_id: &replacement_room, + new_state_lock: &state_lock, + old_room_id: &body.room_id, + old_state_lock: &old_state_lock, + }; + + if let Err(e) = context.transfer_room().await { + error!(?e, ?context, "Room upgrade failed. Cleaning up incomplete room..."); + + if let Err(e) = services + .delete + .delete_room(&replacement_room, false, state_lock) + .await + { + error!("Additional errors while deleting incomplete room: {e}"); + } + + return Err(e); + } + + info!( + old_room_id = %context.old_room_id, + new_room_id = %context.new_room_id, + "Room upgraded", + ); + + Ok(v3::Response { replacement_room }) +} + +#[tracing::instrument(level = "info")] +async fn upgrade_room_create_legacy( + services: &Services, + body: &Ruma, + predecessor: PreviousRoom, +) -> Result<(OwnedRoomId, RoomMutexGuard)> { + let sender_user = body.sender_user(); + let old_room_id = &body.room_id; // Create a replacement room - let replacement_room = RoomId::new_v1(services.globals.server_name()); - + let new_room_id = RoomId::new_v1(services.globals.server_name()); + let state_lock = services.state.mutex.lock(&new_room_id).await; let _short_id = services .short - .get_or_create_shortroomid(&replacement_room) + .get_or_create_shortroomid(&new_room_id) .await; - let state_lock = services.state.mutex.lock(&body.room_id).await; - - // Send a m.room.tombstone event to the old room to indicate that it is not - // intended to be used any further Fail if the sender does not have the required - // permissions - let tombstone_event_id = services - .timeline - .build_and_append_pdu( - PduBuilder::state(StateKey::new(), &RoomTombstoneEventContent { - body: "This room has been replaced".to_owned(), - replacement_room: replacement_room.clone(), - }), - sender_user, - &body.room_id, - &state_lock, - ) - .await?; - - // Change lock to replacement room - drop(state_lock); - let state_lock = services.state.mutex.lock(&replacement_room).await; - // Get the old room creation event let mut create_event_content: CanonicalJsonObject = services .state_accessor - .room_state_get_content(&body.room_id, &StateEventType::RoomCreate, "") + .room_state_get_content(old_room_id, &StateEventType::RoomCreate, "") .await .map_err(|_| err!(Database("Found room without m.room.create event.")))?; - // Use the m.room.tombstone event as the predecessor - let predecessor = Some(ruma::events::room::create::PreviousRoom::new( - body.room_id.clone(), - Some(tombstone_event_id), - )); - // Send a m.room.create event containing a predecessor field and the applicable - // room_version + // room_version. "creator" key no longer exists in V11+ rooms. { use RoomVersionId::*; match body.new_version { - | V1 | V2 | V3 | V4 | V5 | V6 | V7 | V8 | V9 | V10 => { - create_event_content.insert( - "creator".into(), - json!(&sender_user).try_into().map_err(|e| { - err!(Request(BadJson(error!("Error forming creation event: {e}")))) - })?, - ); - }, - | _ => { - // "creator" key no longer exists in V11+ rooms - create_event_content.remove("creator"); - }, + | V1 | V2 | V3 | V4 | V5 | V6 | V7 | V8 | V9 | V10 => + create_event_content.insert("creator".into(), json!(&sender_user).try_into()?), + | _ => create_event_content.remove("creator"), } - } + }; - create_event_content.insert( - "room_version".into(), - json!(&body.new_version) - .try_into() - .map_err(|_| err!(Request(BadJson("Error forming creation event"))))?, - ); - create_event_content.insert( - "predecessor".into(), - json!(predecessor) - .try_into() - .map_err(|_| err!(Request(BadJson("Error forming creation event"))))?, - ); + create_event_content.insert("predecessor".into(), json!(predecessor).try_into()?); + create_event_content.insert("room_version".into(), json!(&body.new_version).try_into()?); // Validate creation event content - if serde_json::from_str::(to_raw_value(&create_event_content)?.get()) - .is_err() - { - return Err!(Request(BadJson("Error forming creation event"))); + let raw_content = to_raw_value(&create_event_content)?; + if let Err(e) = serde_json::from_str::(raw_content.get()) { + return Err!(Request(BadJson("Error forming creation event: {e}"))); } services @@ -166,124 +202,188 @@ pub(crate) async fn upgrade_room_route( PduBuilder { event_type: TimelineEventType::RoomCreate, content: to_raw_value(&create_event_content)?, - unsigned: None, state_key: Some(StateKey::new()), - redacts: None, - timestamp: None, + ..Default::default() }, sender_user, - &replacement_room, + &new_room_id, &state_lock, ) .await?; - // Join the new room - services + Ok((new_room_id, state_lock)) +} + +#[implement(RoomUpgradeContext, params = "<'_>")] +#[tracing::instrument(level = "debug")] +async fn transfer_room(&self) -> Result { + self.move_joined_member().await?; + + self.move_state_events().await?; + + self.move_local_aliases().await?; + + self.tombstone_old_room().await?; + + // After commitment to the tombstone above no more errors can propagate. + self.lockdown_old_room() + .await + .inspect_err(|e| error!(?self, "Failed to lockdown old room: {e}")) + .ok(); + + Ok(()) +} + +// Join the new room +#[implement(RoomUpgradeContext, params = "<'_>")] +#[tracing::instrument(level = "debug")] +async fn move_joined_member(&self) -> Result { + let old_content: RoomMemberEventContent = self + .services + .state_accessor + .room_state_get_content( + self.old_room_id, + &StateEventType::RoomMember, + self.sender_user.as_str(), + ) + .inspect_err(|e| error!(?self, "Missing room member event: {e}")) + .await?; + + self.services .timeline .build_and_append_pdu( - PduBuilder { - event_type: TimelineEventType::RoomMember, - content: to_raw_value(&RoomMemberEventContent { - membership: MembershipState::Join, - displayname: services.users.displayname(sender_user).await.ok(), - avatar_url: services.users.avatar_url(sender_user).await.ok(), - is_direct: None, - third_party_invite: None, - blurhash: services.users.blurhash(sender_user).await.ok(), - reason: None, - join_authorized_via_users_server: None, - })?, - unsigned: None, - state_key: Some(sender_user.as_str().into()), - redacts: None, - timestamp: None, - }, - sender_user, - &replacement_room, - &state_lock, + PduBuilder::state(self.sender_user.as_str(), &RoomMemberEventContent { + membership: MembershipState::Join, + ..old_content + }), + self.sender_user, + self.new_room_id, + self.new_state_lock, ) - .await?; + .await +} - // Replicate transferable state events to the new room - for event_type in TRANSFERABLE_STATE_EVENTS { - let event_content = match services - .state_accessor - .room_state_get(&body.room_id, event_type, "") - .await - { - | Ok(v) => v.content().to_owned(), - | Err(_) => continue, // Skipping missing events. - }; +// Replicate transferable state events to the new room +#[implement(RoomUpgradeContext, params = "<'_>")] +#[tracing::instrument(level = "debug")] +async fn move_state_events(&self) -> Result { + RECOMMENDED_TRANSFERABLE_STATE_EVENT_TYPES + .iter() + .rev() + .stream() + .wide_filter_map(|event_type| { + self.services + .state_accessor + .room_state_get(self.old_room_id, event_type, "") + .ok() + }) + .map(Ok) + .try_for_each(async |event| { + let builder = PduBuilder { + event_type: event.kind().clone(), + content: to_raw_value(event.content())?, + state_key: Some(StateKey::new()), + ..Default::default() + }; - services - .timeline - .build_and_append_pdu( - PduBuilder { - event_type: event_type.to_string().into(), - content: event_content, - state_key: Some(StateKey::new()), - ..Default::default() - }, - sender_user, - &replacement_room, - &state_lock, - ) - .await?; - } + self.services + .timeline + .build_and_append_pdu( + builder, + self.sender_user, + self.new_room_id, + self.new_state_lock, + ) + .inspect_err(|e| { + error!(?event, ?self, "Failed to transfer state on upgrade: {e}"); + }) + .map_ok(|_| ()) + .await + }) + .await +} - // Moves any local aliases to the new room - let mut local_aliases = services +// Moves any local aliases to the new room +#[implement(RoomUpgradeContext, params = "<'_>")] +#[tracing::instrument(level = "debug")] +async fn move_local_aliases(&self) -> Result { + self.services .alias - .local_aliases_for_room(&body.room_id) - .boxed(); + .local_aliases_for_room(self.old_room_id) + .filter_map(|alias| { + self.services + .alias + .remove_alias(alias, self.sender_user) + .inspect_err(move |e| error!(?alias, ?self, "Failed to remove alias: {e}")) + .map_ok(move |()| alias) + .ok() + }) + .ready_for_each(|alias| { + self.services + .alias + .set_alias(alias, self.new_room_id, self.sender_user) + .inspect_err(|e| error!(?self, "Failed to add alias: {e}")) + .ok(); + }) + .map(Ok) + .await +} - while let Some(alias) = local_aliases.next().await { - services - .alias - .remove_alias(alias, sender_user) - .await?; - - services - .alias - .set_alias(alias, &replacement_room, sender_user)?; - } +// Send a m.room.tombstone event to the old room to indicate that it is not +// intended to be used any further Fail if the sender does not have the required +// permissions. +#[implement(RoomUpgradeContext, params = "<'_>")] +#[tracing::instrument(level = "debug")] +async fn tombstone_old_room(&self) -> Result { + self.services + .timeline + .build_and_append_pdu( + PduBuilder::state(StateKey::new(), &RoomTombstoneEventContent { + body: "This room has been upgraded".to_owned(), + replacement_room: self.new_room_id.to_owned(), + }), + self.sender_user, + self.old_room_id, + self.old_state_lock, + ) + .await +} +// Modify the power levels in the old room to prevent sending of events and +// inviting new users. Though a Result is returned, the callsite above treats it +// as infallible because the tombstone represents the commitment. +#[implement(RoomUpgradeContext, params = "<'_>")] +#[tracing::instrument(level = "debug")] +async fn lockdown_old_room(&self) -> Result { // Get the old room power levels - let power_levels_event_content: RoomPowerLevelsEventContent = services + let old_content: RoomPowerLevelsEventContent = self + .services .state_accessor - .room_state_get_content(&body.room_id, &StateEventType::RoomPowerLevels, "") + .room_state_get_content(self.old_room_id, &StateEventType::RoomPowerLevels, "") .await .map_err(|_| err!(Database("Found room without m.room.power_levels event.")))?; - // Setting events_default and invite to the greater of 50 and users_default + 1 - let new_level = max( - int!(50), - power_levels_event_content - .users_default - .checked_add(int!(1)) - .ok_or_else(|| { - err!(Request(BadJson("users_default power levels event content is not valid"))) - })?, - ); + let old_users_default = old_content + .users_default + .checked_add(int!(1)) + .ok_or_else(|| { + err!(Request(BadJson("users_default power levels event content is not valid"))) + })?; - // Modify the power levels in the old room to prevent sending of events and - // inviting new users - services + // Setting events_default and invite to the greater of 50 and users_default + 1 + let new_level = max(int!(50), old_users_default); + + self.services .timeline .build_and_append_pdu( PduBuilder::state(StateKey::new(), &RoomPowerLevelsEventContent { events_default: new_level, invite: new_level, - ..power_levels_event_content + ..old_content }), - sender_user, - &body.room_id, - &state_lock, + self.sender_user, + self.old_room_id, + self.old_state_lock, ) - .await?; - - drop(state_lock); - - // Return the replacement room id - Ok(upgrade_room::v3::Response { replacement_room }) + .await } diff --git a/src/service/rooms/state_accessor/user_can.rs b/src/service/rooms/state_accessor/user_can.rs index 7e02863e..900a678f 100644 --- a/src/service/rooms/state_accessor/user_can.rs +++ b/src/service/rooms/state_accessor/user_can.rs @@ -5,10 +5,15 @@ use ruma::{ room::{ history_visibility::{HistoryVisibility, RoomHistoryVisibilityEventContent}, member::{MembershipState, RoomMemberEventContent}, + tombstone::RoomTombstoneEventContent, }, }, }; -use tuwunel_core::{Err, Result, implement, matrix::Event, pdu::PduBuilder}; +use tuwunel_core::{ + Err, Result, implement, + matrix::{Event, StateKey}, + pdu::PduBuilder, +}; use crate::rooms::state::RoomMutexGuard; @@ -182,3 +187,34 @@ pub async fn user_can_invite( .await .is_ok() } + +#[implement(super::Service)] +pub async fn user_can_tombstone( + &self, + room_id: &RoomId, + user_id: &UserId, + state_lock: &RoomMutexGuard, +) -> bool { + if !self + .services + .state_cache + .is_joined(user_id, room_id) + .await + { + return false; + } + + self.services + .timeline + .create_hash_and_sign_event( + PduBuilder::state(StateKey::new(), &RoomTombstoneEventContent { + replacement_room: room_id.into(), // placeholder, + body: "Not a valid m.room.tombstone.".into(), + }), + user_id, + room_id, + state_lock, + ) + .await + .is_ok() +}