Fix deletion of local leave state to allow sync for clients.
Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
@@ -67,7 +67,7 @@ pub(super) async fn exists(&self, room_id: OwnedRoomId) -> Result {
|
||||
}
|
||||
|
||||
#[admin_command]
|
||||
pub(super) async fn delete_room(&self, room_id: OwnedRoomId) -> Result {
|
||||
pub(super) async fn delete_room(&self, room_id: OwnedRoomId, force: bool) -> Result {
|
||||
if self.services.admin.is_admin_room(&room_id).await {
|
||||
return Err!("Cannot delete admin room");
|
||||
}
|
||||
@@ -76,7 +76,7 @@ pub(super) async fn delete_room(&self, room_id: OwnedRoomId) -> Result {
|
||||
|
||||
self.services
|
||||
.delete
|
||||
.delete_room(&room_id, state_lock)
|
||||
.delete_room(&room_id, force, state_lock)
|
||||
.await?;
|
||||
|
||||
self.write_str("Successfully deleted the room from our database.")
|
||||
|
||||
@@ -60,5 +60,8 @@ pub(super) enum RoomCommand {
|
||||
/// - Delete room
|
||||
DeleteRoom {
|
||||
room_id: OwnedRoomId,
|
||||
|
||||
#[arg(short, long)]
|
||||
force: bool,
|
||||
},
|
||||
}
|
||||
|
||||
@@ -156,7 +156,7 @@ async fn ban_room(&self, room: OwnedRoomOrAliasId) -> Result {
|
||||
if let Err(e) = self
|
||||
.services
|
||||
.membership
|
||||
.leave(user_id, &room_id, None, &state_lock)
|
||||
.leave(user_id, &room_id, None, false, &state_lock)
|
||||
.boxed()
|
||||
.await
|
||||
{
|
||||
@@ -331,7 +331,7 @@ async fn ban_list_of_rooms(&self) -> Result {
|
||||
if let Err(e) = self
|
||||
.services
|
||||
.membership
|
||||
.leave(user_id, &room_id, None, &state_lock)
|
||||
.leave(user_id, &room_id, None, false, &state_lock)
|
||||
.boxed()
|
||||
.await
|
||||
{
|
||||
|
||||
@@ -665,7 +665,7 @@ pub(super) async fn force_leave_room(
|
||||
|
||||
self.services
|
||||
.membership
|
||||
.leave(&user_id, &room_id, None, &state_lock)
|
||||
.leave(&user_id, &room_id, None, false, &state_lock)
|
||||
.boxed()
|
||||
.await?;
|
||||
|
||||
|
||||
@@ -18,7 +18,7 @@ pub(crate) async fn leave_room_route(
|
||||
|
||||
services
|
||||
.membership
|
||||
.leave(body.sender_user(), &body.room_id, body.reason.clone(), &state_lock)
|
||||
.leave(body.sender_user(), &body.room_id, body.reason.clone(), false, &state_lock)
|
||||
.boxed()
|
||||
.await?;
|
||||
|
||||
|
||||
@@ -145,7 +145,7 @@ impl Service {
|
||||
if let Err(e) = self
|
||||
.services
|
||||
.membership
|
||||
.leave(user_id, &room_id, None, &state_lock)
|
||||
.leave(user_id, &room_id, None, false, &state_lock)
|
||||
.boxed()
|
||||
.await
|
||||
{
|
||||
|
||||
@@ -30,6 +30,7 @@ pub async fn leave(
|
||||
user_id: &UserId,
|
||||
room_id: &RoomId,
|
||||
reason: Option<String>,
|
||||
remote_leave_now: bool,
|
||||
state_lock: &RoomMutexGuard,
|
||||
) -> Result {
|
||||
let default_member_content = RoomMemberEventContent {
|
||||
@@ -79,7 +80,7 @@ pub async fn leave(
|
||||
.eq(&false);
|
||||
|
||||
// Ask a remote server if we don't have this room and are not knocking on it
|
||||
if dont_have_room.and(not_knocked).await {
|
||||
if remote_leave_now || dont_have_room.and(not_knocked).await {
|
||||
if let Err(e) = self.remote_leave(user_id, room_id).boxed().await {
|
||||
warn!(%user_id, "Failed to leave room {room_id} remotely: {e}");
|
||||
// Don't tell the client about this error
|
||||
@@ -167,7 +168,7 @@ pub async fn leave(
|
||||
|
||||
#[implement(Service)]
|
||||
#[tracing::instrument(name = "remote", level = "debug", skip_all)]
|
||||
pub async fn remote_leave(&self, user_id: &UserId, room_id: &RoomId) -> Result {
|
||||
async fn remote_leave(&self, user_id: &UserId, room_id: &RoomId) -> Result {
|
||||
let mut make_leave_response_and_server =
|
||||
Err!(BadServerResponse("No remote server available to assist in leaving {room_id}."));
|
||||
|
||||
@@ -175,6 +176,7 @@ pub async fn remote_leave(&self, user_id: &UserId, room_id: &RoomId) -> Result {
|
||||
.services
|
||||
.state_cache
|
||||
.servers_invite_via(room_id)
|
||||
.chain(self.services.state_cache.room_servers(room_id))
|
||||
.map(ToOwned::to_owned)
|
||||
.collect()
|
||||
.await;
|
||||
@@ -221,13 +223,17 @@ pub async fn remote_leave(&self, user_id: &UserId, room_id: &RoomId) -> Result {
|
||||
},
|
||||
}
|
||||
|
||||
servers.insert(user_id.server_name().to_owned());
|
||||
if let Some(room_id_server_name) = room_id.server_name() {
|
||||
servers.insert(room_id_server_name.to_owned());
|
||||
}
|
||||
|
||||
debug_info!("servers in remote_leave_room: {servers:?}");
|
||||
|
||||
for remote_server in servers {
|
||||
for remote_server in servers
|
||||
.into_iter()
|
||||
.filter(|server| !self.services.globals.server_is_ours(server))
|
||||
{
|
||||
let make_leave_response = self
|
||||
.services
|
||||
.sending
|
||||
|
||||
@@ -1,8 +1,7 @@
|
||||
use std::{sync::Arc, time::Duration};
|
||||
use std::sync::Arc;
|
||||
|
||||
use futures::{FutureExt, StreamExt, pin_mut};
|
||||
use ruma::RoomId;
|
||||
use tokio::time::sleep;
|
||||
use tuwunel_core::{
|
||||
Result, debug,
|
||||
result::LogErr,
|
||||
@@ -54,30 +53,20 @@ impl Service {
|
||||
|
||||
debug!(?room_id, "Preparing to delete room...");
|
||||
|
||||
// Some arbitrary delay has to account for the leave event being synced to the
|
||||
// client or they'll never be updated on their leave. This can be removed once
|
||||
// a tombstone solution is implemented instead.
|
||||
sleep(Duration::from_millis(2500)).await;
|
||||
|
||||
self.services
|
||||
.delete
|
||||
.delete_room(room_id, state_lock)
|
||||
.delete_room(room_id, false, state_lock)
|
||||
.boxed()
|
||||
.await
|
||||
.expect("unhandled error during room deletion");
|
||||
}
|
||||
|
||||
pub async fn delete_room(&self, room_id: &RoomId, state_lock: RoomMutexGuard) -> Result {
|
||||
// ban the room locally so new users cannot join while we're in the process of
|
||||
// deleting it
|
||||
debug!("Banning room {room_id} prior to deletion.");
|
||||
self.services.metadata.ban_room(room_id);
|
||||
|
||||
// This might have to be dropped here to prevent deadlock, but the goal should
|
||||
// be to hold it all the way through. For now the room is banned under lock at
|
||||
// least.
|
||||
drop(state_lock);
|
||||
|
||||
pub async fn delete_room(
|
||||
&self,
|
||||
room_id: &RoomId,
|
||||
force: bool,
|
||||
state_lock: RoomMutexGuard,
|
||||
) -> Result {
|
||||
debug!("Making all users leave the room {room_id} and forgetting it");
|
||||
let mut users = self
|
||||
.services
|
||||
@@ -95,18 +84,14 @@ impl Service {
|
||||
if let Err(e) = self
|
||||
.services
|
||||
.membership
|
||||
.remote_leave(user_id, room_id)
|
||||
.leave(user_id, room_id, Some("Room Deleted".into()), true, &state_lock)
|
||||
.boxed()
|
||||
.await
|
||||
{
|
||||
warn!("Failed to leave room: {e}");
|
||||
}
|
||||
|
||||
self.services.state_cache.forget(room_id, user_id);
|
||||
}
|
||||
|
||||
debug!("Disabling incoming federation on room {room_id}");
|
||||
self.services.metadata.disable_room(room_id);
|
||||
|
||||
debug!("Deleting all our room aliases for the room");
|
||||
self.services
|
||||
.alias
|
||||
@@ -159,7 +144,7 @@ impl Service {
|
||||
debug!("Deleting all the room's member counts");
|
||||
self.services
|
||||
.state_cache
|
||||
.delete_room_join_counts(room_id)
|
||||
.delete_room_join_counts(room_id, force)
|
||||
.await
|
||||
.log_err()
|
||||
.ok();
|
||||
@@ -174,9 +159,6 @@ impl Service {
|
||||
|
||||
debug!("Final stages of deleting the room");
|
||||
|
||||
debug!("Obtaining a mutex state lock for safety and future database operations");
|
||||
let state_lock = self.services.state.mutex.lock(room_id).await;
|
||||
|
||||
debug!("Deleting room state hash from our database");
|
||||
self.services
|
||||
.state
|
||||
@@ -201,12 +183,6 @@ impl Service {
|
||||
.log_err()
|
||||
.ok();
|
||||
|
||||
// TODO: add option to keep a room banned (`--block` or `--ban`)
|
||||
self.services.metadata.enable_room(room_id);
|
||||
self.services.metadata.unban_room(room_id);
|
||||
|
||||
drop(state_lock);
|
||||
|
||||
debug!("Successfully deleted room {room_id} from our database");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -577,7 +577,7 @@ pub async fn is_left(&self, user_id: &UserId, room_id: &RoomId) -> bool {
|
||||
|
||||
#[implement(Service)]
|
||||
#[tracing::instrument(skip(self), level = "trace")]
|
||||
pub async fn delete_room_join_counts(&self, room_id: &RoomId) -> Result {
|
||||
pub async fn delete_room_join_counts(&self, room_id: &RoomId, force: bool) -> Result {
|
||||
let prefix = (room_id, Interfix);
|
||||
|
||||
self.db.roomid_knockedcount.remove(room_id);
|
||||
@@ -648,6 +648,9 @@ pub async fn delete_room_join_counts(&self, room_id: &RoomId) -> Result {
|
||||
.roomuserid_leftcount
|
||||
.keys_prefix(&prefix)
|
||||
.ignore_err()
|
||||
.ready_filter(|(_, user_id): &(&RoomId, &UserId)| {
|
||||
force || !self.services.globals.user_is_local(user_id)
|
||||
})
|
||||
.ready_for_each(|key: (&RoomId, &UserId)| {
|
||||
trace!("Removing key: {key:?}");
|
||||
self.db.roomuserid_leftcount.del(key);
|
||||
|
||||
Reference in New Issue
Block a user