Refactor room upgrade endpoint; rollback on failure.

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk
2025-11-25 16:40:42 +00:00
parent f6b95ff1c4
commit 0d782095ad
2 changed files with 312 additions and 176 deletions

View File

@@ -1,13 +1,14 @@
use std::cmp::max; use std::cmp::max;
use axum::extract::State; use axum::extract::State;
use futures::StreamExt; use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt};
use ruma::{ use ruma::{
CanonicalJsonObject, RoomId, RoomVersionId, CanonicalJsonObject, OwnedEventId, OwnedRoomId, RoomId, RoomVersionId, UserId,
api::client::room::upgrade_room, api::client::room::upgrade_room::v3,
events::{ events::{
StateEventType, TimelineEventType, StateEventType, TimelineEventType,
room::{ room::{
create::PreviousRoom,
member::{MembershipState, RoomMemberEventContent}, member::{MembershipState, RoomMemberEventContent},
power_levels::RoomPowerLevelsEventContent, power_levels::RoomPowerLevelsEventContent,
tombstone::RoomTombstoneEventContent, tombstone::RoomTombstoneEventContent,
@@ -18,25 +19,40 @@ use ruma::{
}; };
use serde_json::{json, value::to_raw_value}; use serde_json::{json, value::to_raw_value};
use tuwunel_core::{ use tuwunel_core::{
Err, Result, err, Err, Result, err, error, implement, info,
matrix::{Event, StateKey, pdu::PduBuilder, room_version}, matrix::{Event, StateKey, pdu::PduBuilder, room_version},
utils::{
future::TryExtExt,
stream::{IterStream, ReadyExt, WidebandExt},
},
}; };
use tuwunel_service::{Services, rooms::timeline::RoomMutexGuard};
use crate::Ruma; use crate::Ruma;
/// Recommended transferable state events list from the spec //TODO: Upgrade Ruma
const TRANSFERABLE_STATE_EVENTS: &[StateEventType; 9] = &[ const RECOMMENDED_TRANSFERABLE_STATE_EVENT_TYPES: &[StateEventType; 9] = &[
StateEventType::RoomAvatar, StateEventType::RoomServerAcl,
StateEventType::RoomEncryption, StateEventType::RoomEncryption,
StateEventType::RoomName,
StateEventType::RoomAvatar,
StateEventType::RoomTopic,
StateEventType::RoomGuestAccess, StateEventType::RoomGuestAccess,
StateEventType::RoomHistoryVisibility, StateEventType::RoomHistoryVisibility,
StateEventType::RoomJoinRules, StateEventType::RoomJoinRules,
StateEventType::RoomName,
StateEventType::RoomPowerLevels, StateEventType::RoomPowerLevels,
StateEventType::RoomServerAcl,
StateEventType::RoomTopic,
]; ];
#[derive(Clone, Copy, Debug)]
struct RoomUpgradeContext<'a> {
services: &'a Services,
sender_user: &'a UserId,
new_room_id: &'a RoomId,
new_state_lock: &'a RoomMutexGuard,
old_room_id: &'a RoomId,
old_state_lock: &'a RoomMutexGuard,
}
/// # `POST /_matrix/client/r0/rooms/{roomId}/upgrade` /// # `POST /_matrix/client/r0/rooms/{roomId}/upgrade`
/// ///
/// Upgrades the room. /// Upgrades the room.
@@ -47,117 +63,137 @@ const TRANSFERABLE_STATE_EVENTS: &[StateEventType; 9] = &[
/// - Transfers some state events /// - Transfers some state events
/// - Moves local aliases /// - Moves local aliases
/// - Modifies old room power levels to prevent users from speaking /// - Modifies old room power levels to prevent users from speaking
#[tracing::instrument(level = "debug")]
pub(crate) async fn upgrade_room_route( pub(crate) async fn upgrade_room_route(
State(services): State<crate::State>, State(services): State<crate::State>,
body: Ruma<upgrade_room::v3::Request>, body: Ruma<v3::Request>,
) -> Result<upgrade_room::v3::Response> { ) -> Result<v3::Response> {
debug_assert!(
TRANSFERABLE_STATE_EVENTS.is_sorted(),
"TRANSFERABLE_STATE_EVENTS is not sorted"
);
let sender_user = body.sender_user(); let sender_user = body.sender_user();
let new_version = &body.new_version;
let version_rules = room_version::rules(new_version)?;
if !services if !services
.server .server
.supported_room_version(&body.new_version) .supported_room_version(new_version)
{ {
return Err!(Request(UnsupportedRoomVersion( return Err!(Request(UnsupportedRoomVersion(
"This server does not support that room version.", "This server does not support that room version.",
))); )));
} }
if matches!(body.new_version, RoomVersionId::V12) { if matches!(new_version, RoomVersionId::V12) {
return Err!(Request(UnsupportedRoomVersion( return Err!(Request(UnsupportedRoomVersion(
"Upgrading to version 12 is still under development.", "Upgrading to version 12 is still under development.",
))); )));
} }
let room_version_rules = room_version::rules(&body.new_version)?; let old_room_id = &body.room_id;
let room_id_format = &room_version_rules.room_id_format; let old_state_lock = services.state.mutex.lock(old_room_id).await;
assert!(*room_id_format == RoomIdFormatVersion::V1, "TODO");
if !services
.state_accessor
.user_can_tombstone(old_room_id, sender_user, &old_state_lock)
.await
{
return Err!(Request(Forbidden("You are not permitted to upgrade the room.")));
}
let latest_event = services
.timeline
.latest_pdu_in_room(old_room_id)
.await
.ok();
let predecessor = PreviousRoom {
room_id: old_room_id.to_owned(),
event_id: latest_event
.as_ref()
.map(Event::event_id)
.map(ToOwned::to_owned),
};
let id_format = version_rules.room_id_format;
let (replacement_room, state_lock) = match id_format {
| RoomIdFormatVersion::V1 => upgrade_room_create_legacy(&services, &body, predecessor),
| _ => unimplemented!("Unexpected format {id_format:?} for room {new_version}"),
}
.inspect_err(|e| error!(?body, "Upgrade creation event failed: {e}"))
.await?;
let context = RoomUpgradeContext {
services: &services,
sender_user,
new_room_id: &replacement_room,
new_state_lock: &state_lock,
old_room_id: &body.room_id,
old_state_lock: &old_state_lock,
};
if let Err(e) = context.transfer_room().await {
error!(?e, ?context, "Room upgrade failed. Cleaning up incomplete room...");
if let Err(e) = services
.delete
.delete_room(&replacement_room, false, state_lock)
.await
{
error!("Additional errors while deleting incomplete room: {e}");
}
return Err(e);
}
info!(
old_room_id = %context.old_room_id,
new_room_id = %context.new_room_id,
"Room upgraded",
);
Ok(v3::Response { replacement_room })
}
#[tracing::instrument(level = "info")]
async fn upgrade_room_create_legacy(
services: &Services,
body: &Ruma<v3::Request>,
predecessor: PreviousRoom,
) -> Result<(OwnedRoomId, RoomMutexGuard)> {
let sender_user = body.sender_user();
let old_room_id = &body.room_id;
// Create a replacement room // Create a replacement room
let replacement_room = RoomId::new_v1(services.globals.server_name()); let new_room_id = RoomId::new_v1(services.globals.server_name());
let state_lock = services.state.mutex.lock(&new_room_id).await;
let _short_id = services let _short_id = services
.short .short
.get_or_create_shortroomid(&replacement_room) .get_or_create_shortroomid(&new_room_id)
.await; .await;
let state_lock = services.state.mutex.lock(&body.room_id).await;
// Send a m.room.tombstone event to the old room to indicate that it is not
// intended to be used any further Fail if the sender does not have the required
// permissions
let tombstone_event_id = services
.timeline
.build_and_append_pdu(
PduBuilder::state(StateKey::new(), &RoomTombstoneEventContent {
body: "This room has been replaced".to_owned(),
replacement_room: replacement_room.clone(),
}),
sender_user,
&body.room_id,
&state_lock,
)
.await?;
// Change lock to replacement room
drop(state_lock);
let state_lock = services.state.mutex.lock(&replacement_room).await;
// Get the old room creation event // Get the old room creation event
let mut create_event_content: CanonicalJsonObject = services let mut create_event_content: CanonicalJsonObject = services
.state_accessor .state_accessor
.room_state_get_content(&body.room_id, &StateEventType::RoomCreate, "") .room_state_get_content(old_room_id, &StateEventType::RoomCreate, "")
.await .await
.map_err(|_| err!(Database("Found room without m.room.create event.")))?; .map_err(|_| err!(Database("Found room without m.room.create event.")))?;
// Use the m.room.tombstone event as the predecessor
let predecessor = Some(ruma::events::room::create::PreviousRoom::new(
body.room_id.clone(),
Some(tombstone_event_id),
));
// Send a m.room.create event containing a predecessor field and the applicable // Send a m.room.create event containing a predecessor field and the applicable
// room_version // room_version. "creator" key no longer exists in V11+ rooms.
{ {
use RoomVersionId::*; use RoomVersionId::*;
match body.new_version { match body.new_version {
| V1 | V2 | V3 | V4 | V5 | V6 | V7 | V8 | V9 | V10 => { | V1 | V2 | V3 | V4 | V5 | V6 | V7 | V8 | V9 | V10 =>
create_event_content.insert( create_event_content.insert("creator".into(), json!(&sender_user).try_into()?),
"creator".into(), | _ => create_event_content.remove("creator"),
json!(&sender_user).try_into().map_err(|e| {
err!(Request(BadJson(error!("Error forming creation event: {e}"))))
})?,
);
},
| _ => {
// "creator" key no longer exists in V11+ rooms
create_event_content.remove("creator");
},
} }
} };
create_event_content.insert( create_event_content.insert("predecessor".into(), json!(predecessor).try_into()?);
"room_version".into(), create_event_content.insert("room_version".into(), json!(&body.new_version).try_into()?);
json!(&body.new_version)
.try_into()
.map_err(|_| err!(Request(BadJson("Error forming creation event"))))?,
);
create_event_content.insert(
"predecessor".into(),
json!(predecessor)
.try_into()
.map_err(|_| err!(Request(BadJson("Error forming creation event"))))?,
);
// Validate creation event content // Validate creation event content
if serde_json::from_str::<CanonicalJsonObject>(to_raw_value(&create_event_content)?.get()) let raw_content = to_raw_value(&create_event_content)?;
.is_err() if let Err(e) = serde_json::from_str::<CanonicalJsonObject>(raw_content.get()) {
{ return Err!(Request(BadJson("Error forming creation event: {e}")));
return Err!(Request(BadJson("Error forming creation event")));
} }
services services
@@ -166,124 +202,188 @@ pub(crate) async fn upgrade_room_route(
PduBuilder { PduBuilder {
event_type: TimelineEventType::RoomCreate, event_type: TimelineEventType::RoomCreate,
content: to_raw_value(&create_event_content)?, content: to_raw_value(&create_event_content)?,
unsigned: None,
state_key: Some(StateKey::new()), state_key: Some(StateKey::new()),
redacts: None, ..Default::default()
timestamp: None,
}, },
sender_user, sender_user,
&replacement_room, &new_room_id,
&state_lock, &state_lock,
) )
.await?; .await?;
// Join the new room Ok((new_room_id, state_lock))
services }
#[implement(RoomUpgradeContext, params = "<'_>")]
#[tracing::instrument(level = "debug")]
async fn transfer_room(&self) -> Result {
self.move_joined_member().await?;
self.move_state_events().await?;
self.move_local_aliases().await?;
self.tombstone_old_room().await?;
// After commitment to the tombstone above no more errors can propagate.
self.lockdown_old_room()
.await
.inspect_err(|e| error!(?self, "Failed to lockdown old room: {e}"))
.ok();
Ok(())
}
// Join the new room
#[implement(RoomUpgradeContext, params = "<'_>")]
#[tracing::instrument(level = "debug")]
async fn move_joined_member(&self) -> Result<OwnedEventId> {
let old_content: RoomMemberEventContent = self
.services
.state_accessor
.room_state_get_content(
self.old_room_id,
&StateEventType::RoomMember,
self.sender_user.as_str(),
)
.inspect_err(|e| error!(?self, "Missing room member event: {e}"))
.await?;
self.services
.timeline .timeline
.build_and_append_pdu( .build_and_append_pdu(
PduBuilder { PduBuilder::state(self.sender_user.as_str(), &RoomMemberEventContent {
event_type: TimelineEventType::RoomMember, membership: MembershipState::Join,
content: to_raw_value(&RoomMemberEventContent { ..old_content
membership: MembershipState::Join, }),
displayname: services.users.displayname(sender_user).await.ok(), self.sender_user,
avatar_url: services.users.avatar_url(sender_user).await.ok(), self.new_room_id,
is_direct: None, self.new_state_lock,
third_party_invite: None,
blurhash: services.users.blurhash(sender_user).await.ok(),
reason: None,
join_authorized_via_users_server: None,
})?,
unsigned: None,
state_key: Some(sender_user.as_str().into()),
redacts: None,
timestamp: None,
},
sender_user,
&replacement_room,
&state_lock,
) )
.await?; .await
}
// Replicate transferable state events to the new room // Replicate transferable state events to the new room
for event_type in TRANSFERABLE_STATE_EVENTS { #[implement(RoomUpgradeContext, params = "<'_>")]
let event_content = match services #[tracing::instrument(level = "debug")]
.state_accessor async fn move_state_events(&self) -> Result {
.room_state_get(&body.room_id, event_type, "") RECOMMENDED_TRANSFERABLE_STATE_EVENT_TYPES
.await .iter()
{ .rev()
| Ok(v) => v.content().to_owned(), .stream()
| Err(_) => continue, // Skipping missing events. .wide_filter_map(|event_type| {
}; self.services
.state_accessor
.room_state_get(self.old_room_id, event_type, "")
.ok()
})
.map(Ok)
.try_for_each(async |event| {
let builder = PduBuilder {
event_type: event.kind().clone(),
content: to_raw_value(event.content())?,
state_key: Some(StateKey::new()),
..Default::default()
};
services self.services
.timeline .timeline
.build_and_append_pdu( .build_and_append_pdu(
PduBuilder { builder,
event_type: event_type.to_string().into(), self.sender_user,
content: event_content, self.new_room_id,
state_key: Some(StateKey::new()), self.new_state_lock,
..Default::default() )
}, .inspect_err(|e| {
sender_user, error!(?event, ?self, "Failed to transfer state on upgrade: {e}");
&replacement_room, })
&state_lock, .map_ok(|_| ())
) .await
.await?; })
} .await
}
// Moves any local aliases to the new room // Moves any local aliases to the new room
let mut local_aliases = services #[implement(RoomUpgradeContext, params = "<'_>")]
#[tracing::instrument(level = "debug")]
async fn move_local_aliases(&self) -> Result {
self.services
.alias .alias
.local_aliases_for_room(&body.room_id) .local_aliases_for_room(self.old_room_id)
.boxed(); .filter_map(|alias| {
self.services
.alias
.remove_alias(alias, self.sender_user)
.inspect_err(move |e| error!(?alias, ?self, "Failed to remove alias: {e}"))
.map_ok(move |()| alias)
.ok()
})
.ready_for_each(|alias| {
self.services
.alias
.set_alias(alias, self.new_room_id, self.sender_user)
.inspect_err(|e| error!(?self, "Failed to add alias: {e}"))
.ok();
})
.map(Ok)
.await
}
while let Some(alias) = local_aliases.next().await { // Send a m.room.tombstone event to the old room to indicate that it is not
services // intended to be used any further Fail if the sender does not have the required
.alias // permissions.
.remove_alias(alias, sender_user) #[implement(RoomUpgradeContext, params = "<'_>")]
.await?; #[tracing::instrument(level = "debug")]
async fn tombstone_old_room(&self) -> Result<OwnedEventId> {
services self.services
.alias .timeline
.set_alias(alias, &replacement_room, sender_user)?; .build_and_append_pdu(
} PduBuilder::state(StateKey::new(), &RoomTombstoneEventContent {
body: "This room has been upgraded".to_owned(),
replacement_room: self.new_room_id.to_owned(),
}),
self.sender_user,
self.old_room_id,
self.old_state_lock,
)
.await
}
// Modify the power levels in the old room to prevent sending of events and
// inviting new users. Though a Result is returned, the callsite above treats it
// as infallible because the tombstone represents the commitment.
#[implement(RoomUpgradeContext, params = "<'_>")]
#[tracing::instrument(level = "debug")]
async fn lockdown_old_room(&self) -> Result<OwnedEventId> {
// Get the old room power levels // Get the old room power levels
let power_levels_event_content: RoomPowerLevelsEventContent = services let old_content: RoomPowerLevelsEventContent = self
.services
.state_accessor .state_accessor
.room_state_get_content(&body.room_id, &StateEventType::RoomPowerLevels, "") .room_state_get_content(self.old_room_id, &StateEventType::RoomPowerLevels, "")
.await .await
.map_err(|_| err!(Database("Found room without m.room.power_levels event.")))?; .map_err(|_| err!(Database("Found room without m.room.power_levels event.")))?;
// Setting events_default and invite to the greater of 50 and users_default + 1 let old_users_default = old_content
let new_level = max( .users_default
int!(50), .checked_add(int!(1))
power_levels_event_content .ok_or_else(|| {
.users_default err!(Request(BadJson("users_default power levels event content is not valid")))
.checked_add(int!(1)) })?;
.ok_or_else(|| {
err!(Request(BadJson("users_default power levels event content is not valid")))
})?,
);
// Modify the power levels in the old room to prevent sending of events and // Setting events_default and invite to the greater of 50 and users_default + 1
// inviting new users let new_level = max(int!(50), old_users_default);
services
self.services
.timeline .timeline
.build_and_append_pdu( .build_and_append_pdu(
PduBuilder::state(StateKey::new(), &RoomPowerLevelsEventContent { PduBuilder::state(StateKey::new(), &RoomPowerLevelsEventContent {
events_default: new_level, events_default: new_level,
invite: new_level, invite: new_level,
..power_levels_event_content ..old_content
}), }),
sender_user, self.sender_user,
&body.room_id, self.old_room_id,
&state_lock, self.old_state_lock,
) )
.await?; .await
drop(state_lock);
// Return the replacement room id
Ok(upgrade_room::v3::Response { replacement_room })
} }

View File

@@ -5,10 +5,15 @@ use ruma::{
room::{ room::{
history_visibility::{HistoryVisibility, RoomHistoryVisibilityEventContent}, history_visibility::{HistoryVisibility, RoomHistoryVisibilityEventContent},
member::{MembershipState, RoomMemberEventContent}, member::{MembershipState, RoomMemberEventContent},
tombstone::RoomTombstoneEventContent,
}, },
}, },
}; };
use tuwunel_core::{Err, Result, implement, matrix::Event, pdu::PduBuilder}; use tuwunel_core::{
Err, Result, implement,
matrix::{Event, StateKey},
pdu::PduBuilder,
};
use crate::rooms::state::RoomMutexGuard; use crate::rooms::state::RoomMutexGuard;
@@ -182,3 +187,34 @@ pub async fn user_can_invite(
.await .await
.is_ok() .is_ok()
} }
#[implement(super::Service)]
pub async fn user_can_tombstone(
&self,
room_id: &RoomId,
user_id: &UserId,
state_lock: &RoomMutexGuard,
) -> bool {
if !self
.services
.state_cache
.is_joined(user_id, room_id)
.await
{
return false;
}
self.services
.timeline
.create_hash_and_sign_event(
PduBuilder::state(StateKey::new(), &RoomTombstoneEventContent {
replacement_room: room_id.into(), // placeholder,
body: "Not a valid m.room.tombstone.".into(),
}),
user_id,
room_id,
state_lock,
)
.await
.is_ok()
}