From d05d3f710fb68fd5413283d37cdf0580e720f36b Mon Sep 17 00:00:00 2001 From: dasha_uwu Date: Fri, 22 Aug 2025 20:21:05 +0500 Subject: [PATCH] Room deletion (fixes #43) --- src/admin/room/commands.rs | 14 +++ src/admin/room/mod.rs | 5 + src/service/rooms/delete/mod.rs | 160 +++++++++++++++++++++++++ src/service/rooms/mod.rs | 1 + src/service/rooms/pdu_metadata/data.rs | 20 +++- src/service/rooms/pdu_metadata/mod.rs | 7 ++ src/service/rooms/read_receipt/data.rs | 38 +++++- src/service/rooms/read_receipt/mod.rs | 4 + src/service/rooms/search/mod.rs | 20 +++- src/service/rooms/short/mod.rs | 18 ++- src/service/rooms/state/mod.rs | 28 +++++ src/service/rooms/state_cache/mod.rs | 87 ++++++++++++++ src/service/rooms/threads/mod.rs | 19 ++- src/service/rooms/timeline/mod.rs | 28 +++++ src/service/services.rs | 5 +- 15 files changed, 447 insertions(+), 7 deletions(-) create mode 100644 src/service/rooms/delete/mod.rs diff --git a/src/admin/room/commands.rs b/src/admin/room/commands.rs index 6a234211..63a9b7a5 100644 --- a/src/admin/room/commands.rs +++ b/src/admin/room/commands.rs @@ -65,3 +65,17 @@ pub(super) async fn exists(&self, room_id: OwnedRoomId) -> Result { self.write_str(&format!("{result}")).await } + +#[admin_command] +pub(super) async fn delete_room(&self, room_id: OwnedRoomId) -> Result { + if self.services.admin.is_admin_room(&room_id).await { + return Err!("Cannot delete admin room"); + } + + self.services.delete.delete_room(room_id).await?; + + self.write_str("Successfully deleted the room from our database.") + .await?; + + Ok(()) +} diff --git a/src/admin/room/mod.rs b/src/admin/room/mod.rs index 357c5741..c3980fc2 100644 --- a/src/admin/room/mod.rs +++ b/src/admin/room/mod.rs @@ -56,4 +56,9 @@ pub(super) enum RoomCommand { Exists { room_id: OwnedRoomId, }, + + /// - Delete room + DeleteRoom { + room_id: OwnedRoomId, + }, } diff --git a/src/service/rooms/delete/mod.rs b/src/service/rooms/delete/mod.rs new file mode 100644 index 00000000..3a951f71 --- /dev/null +++ b/src/service/rooms/delete/mod.rs @@ -0,0 +1,160 @@ +use std::sync::Arc; + +use futures::StreamExt; +use ruma::OwnedRoomId; +use tuwunel_core::{Result, debug, result::LogErr, utils::ReadyExt, warn}; + +pub struct Service { + services: Arc, +} + +impl crate::Service for Service { + fn build(args: crate::Args<'_>) -> Result> { + Ok(Arc::new(Self { services: args.services.clone() })) + } + + fn name(&self) -> &str { crate::service::make_name(std::module_path!()) } +} + +impl Service { + pub async fn delete_room(&self, room_id: OwnedRoomId) -> Result { + // ban the room locally so new users cannot join while we're in the process of + // deleting it + debug!("Banning room {}", &room_id); + self.services.metadata.ban_room(&room_id); + + debug!("Making all users leave the room {room_id} and forgetting it"); + let mut users = self + .services + .state_cache + .room_members(&room_id) + .ready_filter(|user| self.services.globals.user_is_local(user)) + .boxed(); + + while let Some(user_id) = users.next().await { + debug!( + "Attempting leave for user {user_id} in room {room_id} (ignoring all errors, \ + evicting admins too)", + ); + + if let Err(e) = self + .services + .membership + .remote_leave(user_id, &room_id) + .await + { + warn!("Failed to leave room: {e}"); + } + + self.services + .state_cache + .forget(&room_id, user_id); + } + + debug!("Disabling incoming federation on room {}", &room_id); + self.services.metadata.disable_room(&room_id); + + debug!("Deleting all our room aliases for the room"); + self.services + .alias + .local_aliases_for_room(&room_id) + .for_each(async |local_alias| { + self.services + .alias + .remove_alias(local_alias, &self.services.globals.server_user) + .await + .log_err() + .ok(); + }) + .await; + + debug!("Removing/unpublishing room from our room directory"); + self.services.directory.set_not_public(&room_id); + + debug!("Deleting room's threads from database"); + self.services + .threads + .delete_all_rooms_threads(&room_id) + .await + .log_err() + .ok(); + + debug!("Deleting all the room's search token IDs from our database"); + self.services + .search + .delete_all_search_tokenids_for_room(&room_id) + .await + .log_err() + .ok(); + + debug!("Deleting all room's forward extremities from our database"); + self.services + .state + .delete_all_rooms_forward_extremities(&room_id) + .await + .log_err() + .ok(); + + debug!("Deleting all the room's event (PDU) references"); + self.services + .pdu_metadata + .delete_all_referenced_for_room(&room_id) + .await + .log_err() + .ok(); + + debug!("Deleting all the room's member counts"); + self.services + .state_cache + .delete_room_join_counts(&room_id) + .await + .log_err() + .ok(); + + debug!("Deleting all the room's private read receipts"); + self.services + .read_receipt + .delete_all_read_receipts(&room_id) + .await + .log_err() + .ok(); + + debug!("Final stages of deleting the room"); + + debug!("Obtaining a mutex state lock for safety and future database operations"); + let state_lock = self.services.state.mutex.lock(&room_id).await; + + debug!("Deleting room state hash from our database"); + self.services + .state + .delete_room_shortstatehash(&room_id, &state_lock) + .await + .log_err() + .ok(); + + debug!("Deleting PDUs"); + self.services + .timeline + .delete_pdus(&room_id) + .await + .log_err() + .ok(); + + debug!("Deleting internal room ID from our database"); + self.services + .short + .delete_shortroomid(&room_id) + .await + .log_err() + .ok(); + + // TODO: add option to keep a room banned (`--block` or `--ban`) + self.services.metadata.enable_room(&room_id); + self.services.metadata.unban_room(&room_id); + + drop(state_lock); + + debug!("Successfully deleted room {} from our database", &room_id); + Ok(()) + } +} diff --git a/src/service/rooms/mod.rs b/src/service/rooms/mod.rs index 51a1660a..1f8c5029 100644 --- a/src/service/rooms/mod.rs +++ b/src/service/rooms/mod.rs @@ -1,5 +1,6 @@ pub mod alias; pub mod auth_chain; +pub mod delete; pub mod directory; pub mod event_handler; pub mod lazy_loading; diff --git a/src/service/rooms/pdu_metadata/data.rs b/src/service/rooms/pdu_metadata/data.rs index fe033e6f..52365c4f 100644 --- a/src/service/rooms/pdu_metadata/data.rs +++ b/src/service/rooms/pdu_metadata/data.rs @@ -3,16 +3,18 @@ use std::{mem::size_of, sync::Arc}; use futures::{Stream, StreamExt}; use ruma::{EventId, RoomId, UserId, api::Direction}; use tuwunel_core::{ + Result, arrayvec::ArrayVec, matrix::{Event, PduCount}, result::LogErr, + trace, utils::{ ReadyExt, stream::{TryIgnore, WidebandExt}, u64_from_u8, }, }; -use tuwunel_database::Map; +use tuwunel_database::{Interfix, Map}; use crate::rooms::{ short::{ShortEventId, ShortRoomId}, @@ -126,4 +128,20 @@ impl Data { .await .is_ok() } + + #[inline] + pub(super) async fn delete_all_referenced_for_room(&self, room_id: &RoomId) -> Result { + let prefix = (room_id, Interfix); + + self.referencedevents + .keys_prefix_raw(&prefix) + .ignore_err() + .ready_for_each(|key| { + trace!("Removing key: {key:?}"); + self.referencedevents.remove(key); + }) + .await; + + Ok(()) + } } diff --git a/src/service/rooms/pdu_metadata/mod.rs b/src/service/rooms/pdu_metadata/mod.rs index 087f7e5f..7487975c 100644 --- a/src/service/rooms/pdu_metadata/mod.rs +++ b/src/service/rooms/pdu_metadata/mod.rs @@ -126,4 +126,11 @@ impl Service { pub async fn is_event_soft_failed(&self, event_id: &EventId) -> bool { self.db.is_event_soft_failed(event_id).await } + + #[tracing::instrument(skip(self), level = "debug")] + pub async fn delete_all_referenced_for_room(&self, room_id: &RoomId) -> Result { + self.db + .delete_all_referenced_for_room(room_id) + .await + } } diff --git a/src/service/rooms/read_receipt/data.rs b/src/service/rooms/read_receipt/data.rs index f1b69a26..29242b5d 100644 --- a/src/service/rooms/read_receipt/data.rs +++ b/src/service/rooms/read_receipt/data.rs @@ -7,10 +7,10 @@ use ruma::{ serde::Raw, }; use tuwunel_core::{ - Result, + Result, trace, utils::{ReadyExt, stream::TryIgnore}, }; -use tuwunel_database::{Deserialized, Json, Map}; +use tuwunel_database::{Deserialized, Interfix, Json, Map}; pub(super) struct Data { roomuserid_privateread: Arc, @@ -120,4 +120,38 @@ impl Data { .deserialized() .unwrap_or(0) } + + #[inline] + pub(super) async fn delete_all_read_receipts(&self, room_id: &RoomId) -> Result { + let prefix = (room_id, Interfix); + + self.roomuserid_privateread + .keys_prefix_raw(&prefix) + .ignore_err() + .ready_for_each(|key| { + trace!("Removing key: {key:?}"); + self.roomuserid_privateread.remove(key); + }) + .await; + + self.roomuserid_lastprivatereadupdate + .keys_prefix_raw(&prefix) + .ignore_err() + .ready_for_each(|key| { + trace!("Removing key: {key:?}"); + self.roomuserid_lastprivatereadupdate.remove(key); + }) + .await; + + self.readreceiptid_readreceipt + .keys_prefix_raw(&prefix) + .ignore_err() + .ready_for_each(|key| { + trace!("Removing key: {key:?}"); + self.readreceiptid_readreceipt.remove(key); + }) + .await; + + Ok(()) + } } diff --git a/src/service/rooms/read_receipt/mod.rs b/src/service/rooms/read_receipt/mod.rs index d6c7cf25..209fd2d4 100644 --- a/src/service/rooms/read_receipt/mod.rs +++ b/src/service/rooms/read_receipt/mod.rs @@ -145,6 +145,10 @@ impl Service { .last_privateread_update(user_id, room_id) .await } + + pub async fn delete_all_read_receipts(&self, room_id: &RoomId) -> Result { + self.db.delete_all_read_receipts(room_id).await + } } #[must_use] diff --git a/src/service/rooms/search/mod.rs b/src/service/rooms/search/mod.rs index a128b7f3..6e007887 100644 --- a/src/service/rooms/search/mod.rs +++ b/src/service/rooms/search/mod.rs @@ -7,12 +7,13 @@ use tuwunel_core::{ arrayvec::ArrayVec, implement, matrix::event::{Event, Matches}, + trace, utils::{ ArrayVecExt, IterStream, ReadyExt, set, stream::{TryIgnore, WidebandExt}, }, }; -use tuwunel_database::{Map, keyval::Val}; +use tuwunel_database::{Interfix, Map, keyval::Val}; use crate::rooms::{ short::ShortRoomId, @@ -197,6 +198,23 @@ fn search_pdu_ids_query_word( .ready_take_while(move |key| key.starts_with(&prefix)) } +#[implement(Service)] +pub async fn delete_all_search_tokenids_for_room(&self, room_id: &RoomId) -> Result { + let prefix = (room_id, Interfix); + + self.db + .tokenids + .keys_prefix_raw(&prefix) + .ignore_err() + .ready_for_each(|key| { + trace!("Removing key: {key:?}"); + self.db.tokenids.remove(key); + }) + .await; + + Ok(()) +} + /// Splits a string into tokens used as keys in the search inverted index /// /// This may be used to tokenize both message bodies (for indexing) or search diff --git a/src/service/rooms/short/mod.rs b/src/service/rooms/short/mod.rs index 185986fd..e32b3bad 100644 --- a/src/service/rooms/short/mod.rs +++ b/src/service/rooms/short/mod.rs @@ -4,7 +4,7 @@ use futures::{Stream, StreamExt}; use ruma::{EventId, RoomId, events::StateEventType}; use serde::Deserialize; pub use tuwunel_core::matrix::pdu::{ShortEventId, ShortId, ShortRoomId, ShortStateKey}; -use tuwunel_core::{Result, err, implement, matrix::StateKey, utils, utils::IterStream}; +use tuwunel_core::{Err, Result, err, implement, matrix::StateKey, utils, utils::IterStream}; use tuwunel_database::{Deserialized, Get, Map, Qry}; pub struct Service { @@ -258,3 +258,19 @@ pub async fn get_or_create_shortroomid(&self, room_id: &RoomId) -> ShortRoomId { *short }) } + +#[implement(Service)] +pub async fn delete_shortroomid(&self, room_id: &RoomId) -> Result { + if self + .db + .roomid_shortroomid + .exists(room_id) + .await + .is_ok() + { + self.db.roomid_shortroomid.remove(room_id); + Ok(()) + } else { + Err!(Database("not found")) + } +} diff --git a/src/service/rooms/state/mod.rs b/src/service/rooms/state/mod.rs index 0594b43e..d915e71f 100644 --- a/src/service/rooms/state/mod.rs +++ b/src/service/rooms/state/mod.rs @@ -16,8 +16,10 @@ use tuwunel_core::{ matrix::{RoomVersionRules, StateKey, TypeStateKey, room_version}, result::{AndThenRef, FlatOk}, state_res::{StateMap, auth_types_for_event}, + trace, utils::{ IterStream, MutexMap, MutexMapGuard, ReadyExt, calculate_hash, + mutex_map::Guard, stream::{BroadbandExt, TryIgnore}, }, warn, @@ -523,4 +525,30 @@ impl Service { self.db.roomid_pduleaves.put_raw(key, event_id); } } + + pub(super) async fn delete_all_rooms_forward_extremities(&self, room_id: &RoomId) -> Result { + let prefix = (room_id, Interfix); + + self.db + .roomid_pduleaves + .keys_prefix_raw(&prefix) + .ignore_err() + .ready_for_each(|key| { + trace!("Removing key: {key:?}"); + self.db.roomid_pduleaves.remove(key); + }) + .await; + + Ok(()) + } + + pub(super) async fn delete_room_shortstatehash( + &self, + room_id: &RoomId, + _mutex_lock: &Guard, + ) -> Result { + self.db.roomid_shortstatehash.remove(room_id); + + Ok(()) + } } diff --git a/src/service/rooms/state_cache/mod.rs b/src/service/rooms/state_cache/mod.rs index 5f804765..930b092c 100644 --- a/src/service/rooms/state_cache/mod.rs +++ b/src/service/rooms/state_cache/mod.rs @@ -15,6 +15,7 @@ use ruma::{ use tuwunel_core::{ Result, implement, result::LogErr, + trace, utils::{ReadyExt, stream::TryIgnore}, warn, }; @@ -562,3 +563,89 @@ pub async fn is_left(&self, user_id: &UserId, room_id: &RoomId) -> bool { .await .is_ok() } + +#[implement(Service)] +#[tracing::instrument(skip(self), level = "trace")] +pub async fn delete_room_join_counts(&self, room_id: &RoomId) -> Result { + let prefix = (room_id, Interfix); + + self.db.roomid_knockedcount.remove(room_id); + + self.db.roomid_invitedcount.remove(room_id); + + self.db.roomid_inviteviaservers.remove(room_id); + + self.db.roomid_joinedcount.remove(room_id); + + self.db + .roomserverids + .keys_prefix(&prefix) + .ignore_err() + .ready_for_each(|key: (&RoomId, &ServerName)| { + trace!("Removing key: {key:?}"); + self.db.roomserverids.del(key); + + let reverse_key = (key.1, key.0); + trace!("Removing reverse key: {reverse_key:?}"); + self.db.serverroomids.del(reverse_key); + }) + .await; + + self.db + .roomuserid_invitecount + .keys_prefix(&prefix) + .ignore_err() + .ready_for_each(|key: (&RoomId, &UserId)| { + trace!("Removing key: {key:?}"); + self.db.roomuserid_invitecount.del(key); + + let reverse_key = (key.1, key.0); + trace!("Removing reverse key: {reverse_key:?}"); + self.db.userroomid_invitestate.del(reverse_key); + }) + .await; + + self.db + .roomuserid_joined + .keys_prefix(&prefix) + .ignore_err() + .ready_for_each(|key: (&RoomId, &UserId)| { + trace!("Removing key: {key:?}"); + self.db.roomuserid_joined.del(key); + + let reverse_key = (key.1, key.0); + trace!("Removing reverse key: {reverse_key:?}"); + self.db.userroomid_joined.del(reverse_key); + }) + .await; + + self.db + .roomuserid_knockedcount + .keys_prefix(&prefix) + .ignore_err() + .ready_for_each(|key: (&RoomId, &UserId)| { + trace!("Removing key: {key:?}"); + self.db.roomuserid_knockedcount.del(key); + + let reverse_key = (key.1, key.0); + trace!("Removing reverse key: {reverse_key:?}"); + self.db.userroomid_knockedstate.del(reverse_key); + }) + .await; + + self.db + .roomuserid_leftcount + .keys_prefix(&prefix) + .ignore_err() + .ready_for_each(|key: (&RoomId, &UserId)| { + trace!("Removing key: {key:?}"); + self.db.roomuserid_leftcount.del(key); + + let reverse_key = (key.1, key.0); + trace!("Removing reverse key: {reverse_key:?}"); + self.db.userroomid_leftstate.del(reverse_key); + }) + .await; + + Ok(()) +} diff --git a/src/service/rooms/threads/mod.rs b/src/service/rooms/threads/mod.rs index d121e682..7ddece26 100644 --- a/src/service/rooms/threads/mod.rs +++ b/src/service/rooms/threads/mod.rs @@ -9,12 +9,13 @@ use serde_json::json; use tuwunel_core::{ Event, Result, err, matrix::pdu::{PduCount, PduEvent, PduId, RawPduId}, + trace, utils::{ ReadyExt, stream::{TryIgnore, WidebandExt}, }, }; -use tuwunel_database::{Deserialized, Map}; +use tuwunel_database::{Deserialized, Interfix, Map}; pub struct Service { db: Data, @@ -191,4 +192,20 @@ impl Service { .await .deserialized() } + + pub(super) async fn delete_all_rooms_threads(&self, room_id: &RoomId) -> Result { + let prefix = (room_id, Interfix); + + self.db + .threadid_userids + .keys_prefix_raw(&prefix) + .ignore_err() + .ready_for_each(|key| { + trace!("Removing key: {key:?}"); + self.db.threadid_userids.remove(key); + }) + .await; + + Ok(()) + } } diff --git a/src/service/rooms/timeline/mod.rs b/src/service/rooms/timeline/mod.rs index 872ddb2f..85a0ab23 100644 --- a/src/service/rooms/timeline/mod.rs +++ b/src/service/rooms/timeline/mod.rs @@ -24,6 +24,7 @@ pub use tuwunel_core::matrix::pdu::{PduId, RawPduId}; use tuwunel_core::{ Err, Result, at, err, implement, matrix::pdu::{PduCount, PduEvent}, + trace, utils::{ MutexMap, MutexMapGuard, result::{LogErr, NotFound}, @@ -413,3 +414,30 @@ pub async fn get_pdu_id(&self, event_id: &EventId) -> Result { .await .map(|handle| RawPduId::from(&*handle)) } + +#[implement(Service)] +pub async fn delete_pdus(&self, room_id: &RoomId) -> Result { + self.count_to_id(room_id, PduCount::min(), Direction::Forward) + .map_ok(move |current| { + let prefix = current.shortroomid(); + self.db + .pduid_pdu + .raw_stream_from(¤t) + .ready_try_take_while(move |(key, _)| Ok(key.starts_with(&prefix))) + .ready_try_for_each(|(key, value)| { + trace!("Removing PDU {key:?}"); + self.db.pduid_pdu.remove(key); + let pdu = serde_json::from_slice::(value)?; + + let event_id = &pdu.event_id; + let room_id2 = &pdu.room_id; + trace!("Removed {event_id} {room_id2}"); + self.db.eventid_pduid.remove(event_id); + self.db.eventid_outlierpdu.remove(event_id); + Ok(()) + }) + }) + .try_flatten() + .await?; + Ok(()) +} diff --git a/src/service/services.rs b/src/service/services.rs index caf3ab44..a2d009f3 100644 --- a/src/service/services.rs +++ b/src/service/services.rs @@ -32,6 +32,7 @@ pub struct Services { pub resolver: Arc, pub alias: Arc, pub auth_chain: Arc, + pub delete: Arc, pub directory: Arc, pub event_handler: Arc, pub lazy_loading: Arc, @@ -112,6 +113,7 @@ impl Services { pusher: build!(pusher::Service), alias: build!(rooms::alias::Service), auth_chain: build!(rooms::auth_chain::Service), + delete: build!(rooms::delete::Service), directory: build!(rooms::directory::Service), event_handler: build!(rooms::event_handler::Service), lazy_loading: build!(rooms::lazy_loading::Service), @@ -189,7 +191,7 @@ impl Services { Ok(()) } - pub(crate) fn services(&self) -> [Arc; 40] { + pub(crate) fn services(&self) -> [Arc; 41] { [ self.account_data.clone(), self.admin.clone(), @@ -205,6 +207,7 @@ impl Services { self.pusher.clone(), self.alias.clone(), self.auth_chain.clone(), + self.delete.clone(), self.directory.clone(), self.event_handler.clone(), self.lazy_loading.clone(),