Room deletion (fixes #43)

This commit is contained in:
dasha_uwu
2025-08-22 20:21:05 +05:00
parent 7a3496869b
commit d05d3f710f
15 changed files with 447 additions and 7 deletions

View File

@@ -65,3 +65,17 @@ pub(super) async fn exists(&self, room_id: OwnedRoomId) -> Result {
self.write_str(&format!("{result}")).await
}
#[admin_command]
pub(super) async fn delete_room(&self, room_id: OwnedRoomId) -> Result {
if self.services.admin.is_admin_room(&room_id).await {
return Err!("Cannot delete admin room");
}
self.services.delete.delete_room(room_id).await?;
self.write_str("Successfully deleted the room from our database.")
.await?;
Ok(())
}

View File

@@ -56,4 +56,9 @@ pub(super) enum RoomCommand {
Exists {
room_id: OwnedRoomId,
},
/// - Delete room
DeleteRoom {
room_id: OwnedRoomId,
},
}

View File

@@ -0,0 +1,160 @@
use std::sync::Arc;
use futures::StreamExt;
use ruma::OwnedRoomId;
use tuwunel_core::{Result, debug, result::LogErr, utils::ReadyExt, warn};
pub struct Service {
services: Arc<crate::services::OnceServices>,
}
impl crate::Service for Service {
fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
Ok(Arc::new(Self { services: args.services.clone() }))
}
fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
}
impl Service {
pub async fn delete_room(&self, room_id: OwnedRoomId) -> Result {
// ban the room locally so new users cannot join while we're in the process of
// deleting it
debug!("Banning room {}", &room_id);
self.services.metadata.ban_room(&room_id);
debug!("Making all users leave the room {room_id} and forgetting it");
let mut users = self
.services
.state_cache
.room_members(&room_id)
.ready_filter(|user| self.services.globals.user_is_local(user))
.boxed();
while let Some(user_id) = users.next().await {
debug!(
"Attempting leave for user {user_id} in room {room_id} (ignoring all errors, \
evicting admins too)",
);
if let Err(e) = self
.services
.membership
.remote_leave(user_id, &room_id)
.await
{
warn!("Failed to leave room: {e}");
}
self.services
.state_cache
.forget(&room_id, user_id);
}
debug!("Disabling incoming federation on room {}", &room_id);
self.services.metadata.disable_room(&room_id);
debug!("Deleting all our room aliases for the room");
self.services
.alias
.local_aliases_for_room(&room_id)
.for_each(async |local_alias| {
self.services
.alias
.remove_alias(local_alias, &self.services.globals.server_user)
.await
.log_err()
.ok();
})
.await;
debug!("Removing/unpublishing room from our room directory");
self.services.directory.set_not_public(&room_id);
debug!("Deleting room's threads from database");
self.services
.threads
.delete_all_rooms_threads(&room_id)
.await
.log_err()
.ok();
debug!("Deleting all the room's search token IDs from our database");
self.services
.search
.delete_all_search_tokenids_for_room(&room_id)
.await
.log_err()
.ok();
debug!("Deleting all room's forward extremities from our database");
self.services
.state
.delete_all_rooms_forward_extremities(&room_id)
.await
.log_err()
.ok();
debug!("Deleting all the room's event (PDU) references");
self.services
.pdu_metadata
.delete_all_referenced_for_room(&room_id)
.await
.log_err()
.ok();
debug!("Deleting all the room's member counts");
self.services
.state_cache
.delete_room_join_counts(&room_id)
.await
.log_err()
.ok();
debug!("Deleting all the room's private read receipts");
self.services
.read_receipt
.delete_all_read_receipts(&room_id)
.await
.log_err()
.ok();
debug!("Final stages of deleting the room");
debug!("Obtaining a mutex state lock for safety and future database operations");
let state_lock = self.services.state.mutex.lock(&room_id).await;
debug!("Deleting room state hash from our database");
self.services
.state
.delete_room_shortstatehash(&room_id, &state_lock)
.await
.log_err()
.ok();
debug!("Deleting PDUs");
self.services
.timeline
.delete_pdus(&room_id)
.await
.log_err()
.ok();
debug!("Deleting internal room ID from our database");
self.services
.short
.delete_shortroomid(&room_id)
.await
.log_err()
.ok();
// TODO: add option to keep a room banned (`--block` or `--ban`)
self.services.metadata.enable_room(&room_id);
self.services.metadata.unban_room(&room_id);
drop(state_lock);
debug!("Successfully deleted room {} from our database", &room_id);
Ok(())
}
}

View File

@@ -1,5 +1,6 @@
pub mod alias;
pub mod auth_chain;
pub mod delete;
pub mod directory;
pub mod event_handler;
pub mod lazy_loading;

View File

@@ -3,16 +3,18 @@ use std::{mem::size_of, sync::Arc};
use futures::{Stream, StreamExt};
use ruma::{EventId, RoomId, UserId, api::Direction};
use tuwunel_core::{
Result,
arrayvec::ArrayVec,
matrix::{Event, PduCount},
result::LogErr,
trace,
utils::{
ReadyExt,
stream::{TryIgnore, WidebandExt},
u64_from_u8,
},
};
use tuwunel_database::Map;
use tuwunel_database::{Interfix, Map};
use crate::rooms::{
short::{ShortEventId, ShortRoomId},
@@ -126,4 +128,20 @@ impl Data {
.await
.is_ok()
}
#[inline]
pub(super) async fn delete_all_referenced_for_room(&self, room_id: &RoomId) -> Result {
let prefix = (room_id, Interfix);
self.referencedevents
.keys_prefix_raw(&prefix)
.ignore_err()
.ready_for_each(|key| {
trace!("Removing key: {key:?}");
self.referencedevents.remove(key);
})
.await;
Ok(())
}
}

View File

@@ -126,4 +126,11 @@ impl Service {
pub async fn is_event_soft_failed(&self, event_id: &EventId) -> bool {
self.db.is_event_soft_failed(event_id).await
}
#[tracing::instrument(skip(self), level = "debug")]
pub async fn delete_all_referenced_for_room(&self, room_id: &RoomId) -> Result {
self.db
.delete_all_referenced_for_room(room_id)
.await
}
}

View File

@@ -7,10 +7,10 @@ use ruma::{
serde::Raw,
};
use tuwunel_core::{
Result,
Result, trace,
utils::{ReadyExt, stream::TryIgnore},
};
use tuwunel_database::{Deserialized, Json, Map};
use tuwunel_database::{Deserialized, Interfix, Json, Map};
pub(super) struct Data {
roomuserid_privateread: Arc<Map>,
@@ -120,4 +120,38 @@ impl Data {
.deserialized()
.unwrap_or(0)
}
#[inline]
pub(super) async fn delete_all_read_receipts(&self, room_id: &RoomId) -> Result {
let prefix = (room_id, Interfix);
self.roomuserid_privateread
.keys_prefix_raw(&prefix)
.ignore_err()
.ready_for_each(|key| {
trace!("Removing key: {key:?}");
self.roomuserid_privateread.remove(key);
})
.await;
self.roomuserid_lastprivatereadupdate
.keys_prefix_raw(&prefix)
.ignore_err()
.ready_for_each(|key| {
trace!("Removing key: {key:?}");
self.roomuserid_lastprivatereadupdate.remove(key);
})
.await;
self.readreceiptid_readreceipt
.keys_prefix_raw(&prefix)
.ignore_err()
.ready_for_each(|key| {
trace!("Removing key: {key:?}");
self.readreceiptid_readreceipt.remove(key);
})
.await;
Ok(())
}
}

View File

@@ -145,6 +145,10 @@ impl Service {
.last_privateread_update(user_id, room_id)
.await
}
pub async fn delete_all_read_receipts(&self, room_id: &RoomId) -> Result {
self.db.delete_all_read_receipts(room_id).await
}
}
#[must_use]

View File

@@ -7,12 +7,13 @@ use tuwunel_core::{
arrayvec::ArrayVec,
implement,
matrix::event::{Event, Matches},
trace,
utils::{
ArrayVecExt, IterStream, ReadyExt, set,
stream::{TryIgnore, WidebandExt},
},
};
use tuwunel_database::{Map, keyval::Val};
use tuwunel_database::{Interfix, Map, keyval::Val};
use crate::rooms::{
short::ShortRoomId,
@@ -197,6 +198,23 @@ fn search_pdu_ids_query_word(
.ready_take_while(move |key| key.starts_with(&prefix))
}
#[implement(Service)]
pub async fn delete_all_search_tokenids_for_room(&self, room_id: &RoomId) -> Result {
let prefix = (room_id, Interfix);
self.db
.tokenids
.keys_prefix_raw(&prefix)
.ignore_err()
.ready_for_each(|key| {
trace!("Removing key: {key:?}");
self.db.tokenids.remove(key);
})
.await;
Ok(())
}
/// Splits a string into tokens used as keys in the search inverted index
///
/// This may be used to tokenize both message bodies (for indexing) or search

View File

@@ -4,7 +4,7 @@ use futures::{Stream, StreamExt};
use ruma::{EventId, RoomId, events::StateEventType};
use serde::Deserialize;
pub use tuwunel_core::matrix::pdu::{ShortEventId, ShortId, ShortRoomId, ShortStateKey};
use tuwunel_core::{Result, err, implement, matrix::StateKey, utils, utils::IterStream};
use tuwunel_core::{Err, Result, err, implement, matrix::StateKey, utils, utils::IterStream};
use tuwunel_database::{Deserialized, Get, Map, Qry};
pub struct Service {
@@ -258,3 +258,19 @@ pub async fn get_or_create_shortroomid(&self, room_id: &RoomId) -> ShortRoomId {
*short
})
}
#[implement(Service)]
pub async fn delete_shortroomid(&self, room_id: &RoomId) -> Result {
if self
.db
.roomid_shortroomid
.exists(room_id)
.await
.is_ok()
{
self.db.roomid_shortroomid.remove(room_id);
Ok(())
} else {
Err!(Database("not found"))
}
}

View File

@@ -16,8 +16,10 @@ use tuwunel_core::{
matrix::{RoomVersionRules, StateKey, TypeStateKey, room_version},
result::{AndThenRef, FlatOk},
state_res::{StateMap, auth_types_for_event},
trace,
utils::{
IterStream, MutexMap, MutexMapGuard, ReadyExt, calculate_hash,
mutex_map::Guard,
stream::{BroadbandExt, TryIgnore},
},
warn,
@@ -523,4 +525,30 @@ impl Service {
self.db.roomid_pduleaves.put_raw(key, event_id);
}
}
pub(super) async fn delete_all_rooms_forward_extremities(&self, room_id: &RoomId) -> Result {
let prefix = (room_id, Interfix);
self.db
.roomid_pduleaves
.keys_prefix_raw(&prefix)
.ignore_err()
.ready_for_each(|key| {
trace!("Removing key: {key:?}");
self.db.roomid_pduleaves.remove(key);
})
.await;
Ok(())
}
pub(super) async fn delete_room_shortstatehash(
&self,
room_id: &RoomId,
_mutex_lock: &Guard<OwnedRoomId, ()>,
) -> Result {
self.db.roomid_shortstatehash.remove(room_id);
Ok(())
}
}

View File

@@ -15,6 +15,7 @@ use ruma::{
use tuwunel_core::{
Result, implement,
result::LogErr,
trace,
utils::{ReadyExt, stream::TryIgnore},
warn,
};
@@ -562,3 +563,89 @@ pub async fn is_left(&self, user_id: &UserId, room_id: &RoomId) -> bool {
.await
.is_ok()
}
#[implement(Service)]
#[tracing::instrument(skip(self), level = "trace")]
pub async fn delete_room_join_counts(&self, room_id: &RoomId) -> Result {
let prefix = (room_id, Interfix);
self.db.roomid_knockedcount.remove(room_id);
self.db.roomid_invitedcount.remove(room_id);
self.db.roomid_inviteviaservers.remove(room_id);
self.db.roomid_joinedcount.remove(room_id);
self.db
.roomserverids
.keys_prefix(&prefix)
.ignore_err()
.ready_for_each(|key: (&RoomId, &ServerName)| {
trace!("Removing key: {key:?}");
self.db.roomserverids.del(key);
let reverse_key = (key.1, key.0);
trace!("Removing reverse key: {reverse_key:?}");
self.db.serverroomids.del(reverse_key);
})
.await;
self.db
.roomuserid_invitecount
.keys_prefix(&prefix)
.ignore_err()
.ready_for_each(|key: (&RoomId, &UserId)| {
trace!("Removing key: {key:?}");
self.db.roomuserid_invitecount.del(key);
let reverse_key = (key.1, key.0);
trace!("Removing reverse key: {reverse_key:?}");
self.db.userroomid_invitestate.del(reverse_key);
})
.await;
self.db
.roomuserid_joined
.keys_prefix(&prefix)
.ignore_err()
.ready_for_each(|key: (&RoomId, &UserId)| {
trace!("Removing key: {key:?}");
self.db.roomuserid_joined.del(key);
let reverse_key = (key.1, key.0);
trace!("Removing reverse key: {reverse_key:?}");
self.db.userroomid_joined.del(reverse_key);
})
.await;
self.db
.roomuserid_knockedcount
.keys_prefix(&prefix)
.ignore_err()
.ready_for_each(|key: (&RoomId, &UserId)| {
trace!("Removing key: {key:?}");
self.db.roomuserid_knockedcount.del(key);
let reverse_key = (key.1, key.0);
trace!("Removing reverse key: {reverse_key:?}");
self.db.userroomid_knockedstate.del(reverse_key);
})
.await;
self.db
.roomuserid_leftcount
.keys_prefix(&prefix)
.ignore_err()
.ready_for_each(|key: (&RoomId, &UserId)| {
trace!("Removing key: {key:?}");
self.db.roomuserid_leftcount.del(key);
let reverse_key = (key.1, key.0);
trace!("Removing reverse key: {reverse_key:?}");
self.db.userroomid_leftstate.del(reverse_key);
})
.await;
Ok(())
}

View File

@@ -9,12 +9,13 @@ use serde_json::json;
use tuwunel_core::{
Event, Result, err,
matrix::pdu::{PduCount, PduEvent, PduId, RawPduId},
trace,
utils::{
ReadyExt,
stream::{TryIgnore, WidebandExt},
},
};
use tuwunel_database::{Deserialized, Map};
use tuwunel_database::{Deserialized, Interfix, Map};
pub struct Service {
db: Data,
@@ -191,4 +192,20 @@ impl Service {
.await
.deserialized()
}
pub(super) async fn delete_all_rooms_threads(&self, room_id: &RoomId) -> Result {
let prefix = (room_id, Interfix);
self.db
.threadid_userids
.keys_prefix_raw(&prefix)
.ignore_err()
.ready_for_each(|key| {
trace!("Removing key: {key:?}");
self.db.threadid_userids.remove(key);
})
.await;
Ok(())
}
}

View File

@@ -24,6 +24,7 @@ pub use tuwunel_core::matrix::pdu::{PduId, RawPduId};
use tuwunel_core::{
Err, Result, at, err, implement,
matrix::pdu::{PduCount, PduEvent},
trace,
utils::{
MutexMap, MutexMapGuard,
result::{LogErr, NotFound},
@@ -413,3 +414,30 @@ pub async fn get_pdu_id(&self, event_id: &EventId) -> Result<RawPduId> {
.await
.map(|handle| RawPduId::from(&*handle))
}
#[implement(Service)]
pub async fn delete_pdus(&self, room_id: &RoomId) -> Result {
self.count_to_id(room_id, PduCount::min(), Direction::Forward)
.map_ok(move |current| {
let prefix = current.shortroomid();
self.db
.pduid_pdu
.raw_stream_from(&current)
.ready_try_take_while(move |(key, _)| Ok(key.starts_with(&prefix)))
.ready_try_for_each(|(key, value)| {
trace!("Removing PDU {key:?}");
self.db.pduid_pdu.remove(key);
let pdu = serde_json::from_slice::<PduEvent>(value)?;
let event_id = &pdu.event_id;
let room_id2 = &pdu.room_id;
trace!("Removed {event_id} {room_id2}");
self.db.eventid_pduid.remove(event_id);
self.db.eventid_outlierpdu.remove(event_id);
Ok(())
})
})
.try_flatten()
.await?;
Ok(())
}

View File

@@ -32,6 +32,7 @@ pub struct Services {
pub resolver: Arc<resolver::Service>,
pub alias: Arc<rooms::alias::Service>,
pub auth_chain: Arc<rooms::auth_chain::Service>,
pub delete: Arc<rooms::delete::Service>,
pub directory: Arc<rooms::directory::Service>,
pub event_handler: Arc<rooms::event_handler::Service>,
pub lazy_loading: Arc<rooms::lazy_loading::Service>,
@@ -112,6 +113,7 @@ impl Services {
pusher: build!(pusher::Service),
alias: build!(rooms::alias::Service),
auth_chain: build!(rooms::auth_chain::Service),
delete: build!(rooms::delete::Service),
directory: build!(rooms::directory::Service),
event_handler: build!(rooms::event_handler::Service),
lazy_loading: build!(rooms::lazy_loading::Service),
@@ -189,7 +191,7 @@ impl Services {
Ok(())
}
pub(crate) fn services(&self) -> [Arc<dyn Service>; 40] {
pub(crate) fn services(&self) -> [Arc<dyn Service>; 41] {
[
self.account_data.clone(),
self.admin.clone(),
@@ -205,6 +207,7 @@ impl Services {
self.pusher.clone(),
self.alias.clone(),
self.auth_chain.clone(),
self.delete.clone(),
self.directory.clone(),
self.event_handler.clone(),
self.lazy_loading.clone(),