Membership refactor

Introduce deactivate, membership services

Move membership and user deactivation functions from api crate into those services
This commit is contained in:
dasha_uwu
2025-08-25 19:12:27 +05:00
parent 6810604629
commit 8e9c6661b2
28 changed files with 2318 additions and 2091 deletions

View File

@@ -0,0 +1,164 @@
use std::sync::Arc;
use futures::{FutureExt, StreamExt};
use ruma::{
OwnedRoomId, UserId,
events::{StateEventType, room::power_levels::RoomPowerLevelsEventContent},
};
use tuwunel_core::{Event, Result, info, pdu::PduBuilder, utils::ReadyExt, warn};
pub struct Service {
services: Arc<crate::services::OnceServices>,
}
impl crate::Service for Service {
fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
Ok(Arc::new(Self { services: args.services.clone() }))
}
fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
}
impl Service {
/// Runs through all the deactivation steps:
///
/// - Mark as deactivated
/// - Removing display name
/// - Removing avatar URL and blurhash
/// - Removing all profile data
/// - Leaving all rooms (and forgets all of them)
pub async fn full_deactivate(&self, user_id: &UserId) -> Result {
self.services
.users
.deactivate_account(user_id)
.await?;
let all_joined_rooms: Vec<OwnedRoomId> = self
.services
.state_cache
.rooms_joined(user_id)
.map(Into::into)
.collect()
.await;
self.services
.users
.update_displayname(user_id, None, &all_joined_rooms)
.await;
self.services
.users
.update_avatar_url(user_id, None, None, &all_joined_rooms)
.await;
self.services
.users
.all_profile_keys(user_id)
.ready_for_each(|(profile_key, _)| {
self.services
.users
.set_profile_key(user_id, &profile_key, None);
})
.await;
for room_id in all_joined_rooms {
let state_lock = self.services.state.mutex.lock(&room_id).await;
let room_power_levels = self
.services
.state_accessor
.get_power_levels(&room_id)
.await
.ok();
let user_can_change_self = room_power_levels
.as_ref()
.is_some_and(|power_levels| {
power_levels.user_can_change_user_power_level(user_id, user_id)
});
let user_can_demote_self = user_can_change_self
|| self
.services
.state_accessor
.room_state_get(&room_id, &StateEventType::RoomCreate, "")
.await
.is_ok_and(|event| event.sender() == user_id);
if user_can_demote_self {
let mut power_levels_content: RoomPowerLevelsEventContent = room_power_levels
.map(TryInto::try_into)
.transpose()?
.unwrap_or_default();
power_levels_content.users.remove(user_id);
// ignore errors so deactivation doesn't fail
match self
.services
.timeline
.build_and_append_pdu(
PduBuilder::state(String::new(), &power_levels_content),
user_id,
&room_id,
&state_lock,
)
.await
{
| Err(e) => {
warn!(%room_id, %user_id, "Failed to demote user's own power level: {e}");
},
| _ => {
info!("Demoted {user_id} in {room_id} as part of account deactivation");
},
}
}
}
let rooms_joined = self
.services
.state_cache
.rooms_joined(user_id)
.map(ToOwned::to_owned);
let rooms_invited = self
.services
.state_cache
.rooms_invited(user_id)
.map(|(r, _)| r);
let rooms_knocked = self
.services
.state_cache
.rooms_knocked(user_id)
.map(|(r, _)| r);
let all_rooms: Vec<_> = rooms_joined
.chain(rooms_invited)
.chain(rooms_knocked)
.collect()
.await;
for room_id in all_rooms {
let state_lock = self.services.state.mutex.lock(&room_id).await;
// ignore errors
if let Err(e) = self
.services
.membership
.leave(user_id, &room_id, None, &state_lock)
.boxed()
.await
{
warn!(%user_id, "Failed to leave {room_id} remotely: {e}");
}
drop(state_lock);
self.services
.state_cache
.forget(&room_id, user_id);
}
Ok(())
}
}

View File

@@ -0,0 +1,44 @@
use ruma::{
RoomId, UserId,
events::room::member::{MembershipState, RoomMemberEventContent},
};
use tuwunel_core::{Result, implement, pdu::PduBuilder};
use super::Service;
use crate::rooms::timeline::RoomMutexGuard;
#[implement(Service)]
#[tracing::instrument(
level = "debug",
skip_all,
fields(%sender_user, %room_id, %user_id)
)]
pub async fn ban(
&self,
room_id: &RoomId,
user_id: &UserId,
reason: Option<&String>,
sender_user: &UserId,
state_lock: &RoomMutexGuard,
) -> Result {
self.services
.timeline
.build_and_append_pdu(
PduBuilder::state(user_id.to_string(), &RoomMemberEventContent {
membership: MembershipState::Ban,
reason: reason.cloned(),
displayname: None,
avatar_url: None,
blurhash: None,
is_direct: None,
join_authorized_via_users_server: None,
third_party_invite: None,
}),
sender_user,
room_id,
state_lock,
)
.await?;
Ok(())
}

View File

@@ -0,0 +1,200 @@
use futures::FutureExt;
use ruma::{
OwnedServerName, RoomId, UserId,
api::federation::membership::create_invite,
events::room::member::{MembershipState, RoomMemberEventContent},
};
use tuwunel_core::{
Err, Result, err, implement, matrix::event::gen_event_id_canonical_json, pdu::PduBuilder,
};
use super::Service;
#[implement(Service)]
#[tracing::instrument(
level = "debug",
skip_all,
fields(%sender_user, %room_id, %user_id)
)]
pub async fn invite(
&self,
sender_user: &UserId,
user_id: &UserId,
room_id: &RoomId,
reason: Option<&String>,
is_direct: bool,
) -> Result {
if self.services.globals.user_is_local(user_id) {
self.local_invite(sender_user, user_id, room_id, reason, is_direct)
.boxed()
.await?;
} else {
self.remote_invite(sender_user, user_id, room_id, reason, is_direct)
.boxed()
.await?;
}
Ok(())
}
#[implement(Service)]
#[tracing::instrument(name = "remote", level = "debug", skip_all)]
async fn remote_invite(
&self,
sender_user: &UserId,
user_id: &UserId,
room_id: &RoomId,
reason: Option<&String>,
is_direct: bool,
) -> Result {
let (pdu, pdu_json, invite_room_state) = {
let state_lock = self.services.state.mutex.lock(room_id).await;
let content = RoomMemberEventContent {
avatar_url: self.services.users.avatar_url(user_id).await.ok(),
is_direct: Some(is_direct),
reason: reason.cloned(),
..RoomMemberEventContent::new(MembershipState::Invite)
};
let (pdu, pdu_json) = self
.services
.timeline
.create_hash_and_sign_event(
PduBuilder::state(user_id.to_string(), &content),
sender_user,
room_id,
&state_lock,
)
.await?;
let invite_room_state = self.services.state.summary_stripped(&pdu).await;
drop(state_lock);
(pdu, pdu_json, invite_room_state)
};
let room_version_id = self
.services
.state
.get_room_version(room_id)
.await?;
let response = self
.services
.sending
.send_federation_request(user_id.server_name(), create_invite::v2::Request {
room_id: room_id.to_owned(),
event_id: (*pdu.event_id).to_owned(),
room_version: room_version_id.clone(),
event: self
.services
.sending
.convert_to_outgoing_federation_event(pdu_json.clone())
.await,
invite_room_state: invite_room_state
.into_iter()
.map(Into::into)
.collect(),
via: self
.services
.state_cache
.servers_route_via(room_id)
.await
.ok(),
})
.await?;
// We do not add the event_id field to the pdu here because of signature and
// hashes checks
let (event_id, value) = gen_event_id_canonical_json(&response.event, &room_version_id)
.map_err(|e| {
err!(Request(BadJson(warn!("Could not convert event to canonical JSON: {e}"))))
})?;
if pdu.event_id != event_id {
return Err!(Request(BadJson(warn!(
%pdu.event_id, %event_id,
"Server {} sent event with wrong event ID",
user_id.server_name()
))));
}
let origin: OwnedServerName = serde_json::from_value(serde_json::to_value(
value
.get("origin")
.ok_or_else(|| err!(Request(BadJson("Event missing origin field."))))?,
)?)
.map_err(|e| {
err!(Request(BadJson(warn!("Origin field in event is not a valid server name: {e}"))))
})?;
let pdu_id = self
.services
.event_handler
.handle_incoming_pdu(&origin, room_id, &event_id, value, true)
.await?
.ok_or_else(|| {
err!(Request(InvalidParam("Could not accept incoming PDU as timeline event.")))
})?;
self.services
.sending
.send_pdu_room(room_id, &pdu_id)
.await?;
Ok(())
}
#[implement(Service)]
#[tracing::instrument(name = "local", level = "debug", skip_all)]
async fn local_invite(
&self,
sender_user: &UserId,
user_id: &UserId,
room_id: &RoomId,
reason: Option<&String>,
is_direct: bool,
) -> Result {
if !self
.services
.state_cache
.is_joined(sender_user, room_id)
.await
{
return Err!(Request(Forbidden(
"You must be joined in the room you are trying to invite from."
)));
}
let state_lock = self.services.state.mutex.lock(room_id).await;
let content = RoomMemberEventContent {
displayname: self
.services
.users
.displayname(user_id)
.await
.ok(),
avatar_url: self.services.users.avatar_url(user_id).await.ok(),
blurhash: self.services.users.blurhash(user_id).await.ok(),
is_direct: Some(is_direct),
reason: reason.cloned(),
..RoomMemberEventContent::new(MembershipState::Invite)
};
self.services
.timeline
.build_and_append_pdu(
PduBuilder::state(user_id.to_string(), &content),
sender_user,
room_id,
&state_lock,
)
.await?;
drop(state_lock);
Ok(())
}

View File

@@ -0,0 +1,864 @@
use std::{borrow::Borrow, collections::HashMap, iter::once, sync::Arc};
use futures::{FutureExt, StreamExt, pin_mut};
use ruma::{
CanonicalJsonObject, CanonicalJsonValue, OwnedServerName, OwnedUserId, RoomId, RoomVersionId,
UserId,
api::{client::error::ErrorKind, federation},
canonical_json::to_canonical_value,
events::{
StateEventType,
room::{
join_rules::RoomJoinRulesEventContent,
member::{MembershipState, RoomMemberEventContent},
},
},
room::{AllowRule, JoinRule},
};
use tuwunel_core::{
Err, Result, debug, debug_info, debug_warn, err, error, implement, info,
matrix::{
event::{gen_event_id, gen_event_id_canonical_json},
room_version,
},
pdu::{PduBuilder, PduEvent},
state_res, trace,
utils::{self, IterStream, ReadyExt},
warn,
};
use super::Service;
use crate::{
appservice::RegistrationInfo,
rooms::{
state::RoomMutexGuard,
state_compressor::{CompressedState, HashSetCompressStateEvent},
},
};
#[implement(Service)]
#[tracing::instrument(
level = "debug",
skip_all,
fields(%sender_user, %room_id)
)]
pub async fn join(
&self,
sender_user: &UserId,
room_id: &RoomId,
reason: Option<String>,
servers: &[OwnedServerName],
appservice_info: &Option<RegistrationInfo>,
state_lock: &RoomMutexGuard,
) -> Result {
let user_is_guest = self
.services
.users
.is_deactivated(sender_user)
.await
.unwrap_or(false)
&& appservice_info.is_none();
if user_is_guest
&& !self
.services
.state_accessor
.guest_can_join(room_id)
.await
{
return Err!(Request(Forbidden("Guests are not allowed to join this room")));
}
if self
.services
.state_cache
.is_joined(sender_user, room_id)
.await
{
debug_warn!("{sender_user} is already joined in {room_id}");
return Ok(());
}
if let Ok(membership) = self
.services
.state_accessor
.get_member(room_id, sender_user)
.await
{
if membership.membership == MembershipState::Ban {
debug_warn!("{sender_user} is banned from {room_id} but attempted to join");
return Err!(Request(Forbidden("You are banned from the room.")));
}
}
let server_in_room = self
.services
.state_cache
.server_in_room(self.services.globals.server_name(), room_id)
.await;
let local_join = server_in_room
|| servers.is_empty()
|| (servers.len() == 1 && self.services.globals.server_is_ours(&servers[0]));
if local_join {
self.join_local(sender_user, room_id, reason, servers, state_lock)
.boxed()
.await?;
} else {
// Ask a remote server if we are not participating in this room
self.join_remote(sender_user, room_id, reason, servers, state_lock)
.boxed()
.await?;
}
Ok(())
}
#[implement(Service)]
#[tracing::instrument(
name = "remote",
level = "debug",
skip_all,
fields(?servers)
)]
pub async fn join_remote(
&self,
sender_user: &UserId,
room_id: &RoomId,
reason: Option<String>,
servers: &[OwnedServerName],
state_lock: &RoomMutexGuard,
) -> Result {
info!("Joining {room_id} over federation.");
let (make_join_response, remote_server) = self
.make_join_request(sender_user, room_id, servers)
.await?;
info!("make_join finished");
let Some(room_version_id) = make_join_response.room_version else {
return Err!(BadServerResponse("Remote room version is not supported by tuwunel"));
};
if !self
.services
.server
.supported_room_version(&room_version_id)
{
return Err!(BadServerResponse(
"Remote room version {room_version_id} is not supported by tuwunel"
));
}
let mut join_event_stub: CanonicalJsonObject =
serde_json::from_str(make_join_response.event.get()).map_err(|e| {
err!(BadServerResponse(warn!(
"Invalid make_join event json received from server: {e:?}"
)))
})?;
let join_authorized_via_users_server = {
use RoomVersionId::*;
if !matches!(room_version_id, V1 | V2 | V3 | V4 | V5 | V6 | V7) {
join_event_stub
.get("content")
.map(|s| {
s.as_object()?
.get("join_authorised_via_users_server")?
.as_str()
})
.and_then(|s| OwnedUserId::try_from(s.unwrap_or_default()).ok())
} else {
None
}
};
join_event_stub.insert(
"origin".to_owned(),
CanonicalJsonValue::String(
self.services
.globals
.server_name()
.as_str()
.to_owned(),
),
);
join_event_stub.insert(
"origin_server_ts".to_owned(),
CanonicalJsonValue::Integer(
utils::millis_since_unix_epoch()
.try_into()
.expect("Timestamp is valid js_int value"),
),
);
join_event_stub.insert(
"content".to_owned(),
to_canonical_value(RoomMemberEventContent {
displayname: self
.services
.users
.displayname(sender_user)
.await
.ok(),
avatar_url: self
.services
.users
.avatar_url(sender_user)
.await
.ok(),
blurhash: self
.services
.users
.blurhash(sender_user)
.await
.ok(),
reason,
join_authorized_via_users_server: join_authorized_via_users_server.clone(),
..RoomMemberEventContent::new(MembershipState::Join)
})
.expect("event is valid, we just created it"),
);
// We keep the "event_id" in the pdu only in v1 or
// v2 rooms
match room_version_id {
| RoomVersionId::V1 | RoomVersionId::V2 => {},
| _ => {
join_event_stub.remove("event_id");
},
}
// In order to create a compatible ref hash (EventID) the `hashes` field needs
// to be present
self.services
.server_keys
.hash_and_sign_event(&mut join_event_stub, &room_version_id)?;
// Generate event id
let event_id = gen_event_id(&join_event_stub, &room_version_id)?;
// Add event_id back
join_event_stub
.insert("event_id".to_owned(), CanonicalJsonValue::String(event_id.clone().into()));
// It has enough fields to be called a proper event now
let mut join_event = join_event_stub;
info!("Asking {remote_server} for send_join in room {room_id}");
let send_join_request = federation::membership::create_join_event::v2::Request {
room_id: room_id.to_owned(),
event_id: event_id.clone(),
omit_members: false,
pdu: self
.services
.sending
.convert_to_outgoing_federation_event(join_event.clone())
.await,
};
// Once send_join hits the remote server it may start sending us events which
// have to be belayed until we process this response first.
let _federation_lock = self
.services
.event_handler
.mutex_federation
.lock(room_id)
.await;
let send_join_response = match self
.services
.sending
.send_synapse_request(&remote_server, send_join_request)
.await
{
| Ok(response) => response,
| Err(e) => {
error!("send_join failed: {e}");
return Err(e);
},
};
info!("send_join finished");
if join_authorized_via_users_server.is_some() {
if let Some(signed_raw) = &send_join_response.room_state.event {
debug_info!(
"There is a signed event with join_authorized_via_users_server. This room is \
probably using restricted joins. Adding signature to our event"
);
let (signed_event_id, signed_value) =
gen_event_id_canonical_json(signed_raw, &room_version_id).map_err(|e| {
err!(Request(BadJson(warn!(
"Could not convert event to canonical JSON: {e}"
))))
})?;
if signed_event_id != event_id {
return Err!(Request(BadJson(warn!(
%signed_event_id, %event_id,
"Server {remote_server} sent event with wrong event ID"
))));
}
match signed_value["signatures"]
.as_object()
.ok_or_else(|| {
err!(BadServerResponse(warn!(
"Server {remote_server} sent invalid signatures type"
)))
})
.and_then(|e| {
e.get(remote_server.as_str()).ok_or_else(|| {
err!(BadServerResponse(warn!(
"Server {remote_server} did not send its signature for a restricted \
room"
)))
})
}) {
| Ok(signature) => {
join_event
.get_mut("signatures")
.expect("we created a valid pdu")
.as_object_mut()
.expect("we created a valid pdu")
.insert(remote_server.to_string(), signature.clone());
},
| Err(e) => {
warn!(
"Server {remote_server} sent invalid signature in send_join signatures \
for event {signed_value:?}: {e:?}",
);
},
}
}
}
self.services
.short
.get_or_create_shortroomid(room_id)
.await;
info!("Parsing join event");
let parsed_join_pdu = PduEvent::from_id_val(&event_id, join_event.clone())
.map_err(|e| err!(BadServerResponse("Invalid join event PDU: {e:?}")))?;
info!("Acquiring server signing keys for response events");
let resp_events = &send_join_response.room_state;
let resp_state = &resp_events.state;
let resp_auth = &resp_events.auth_chain;
self.services
.server_keys
.acquire_events_pubkeys(resp_auth.iter().chain(resp_state.iter()))
.await;
info!("Going through send_join response room_state");
let cork = self.services.db.cork_and_flush();
let state = send_join_response
.room_state
.state
.iter()
.stream()
.then(|pdu| {
self.services
.server_keys
.validate_and_add_event_id_no_fetch(pdu, &room_version_id)
})
.ready_filter_map(Result::ok)
.fold(HashMap::new(), async |mut state, (event_id, value)| {
let pdu = if value["type"] == "m.room.create" {
PduEvent::from_rid_val(room_id, &event_id, value.clone())
} else {
PduEvent::from_id_val(&event_id, value.clone())
};
let pdu = match pdu {
| Ok(pdu) => pdu,
| Err(e) => {
debug_warn!("Invalid PDU in send_join response: {e:?}: {value:#?}");
return state;
},
};
self.services
.timeline
.add_pdu_outlier(&event_id, &value);
if let Some(state_key) = &pdu.state_key {
let shortstatekey = self
.services
.short
.get_or_create_shortstatekey(&pdu.kind.to_string().into(), state_key)
.await;
state.insert(shortstatekey, pdu.event_id.clone());
}
state
})
.await;
drop(cork);
info!("Going through send_join response auth_chain");
let cork = self.services.db.cork_and_flush();
send_join_response
.room_state
.auth_chain
.iter()
.stream()
.then(|pdu| {
self.services
.server_keys
.validate_and_add_event_id_no_fetch(pdu, &room_version_id)
})
.ready_filter_map(Result::ok)
.ready_for_each(|(event_id, value)| {
self.services
.timeline
.add_pdu_outlier(&event_id, &value);
})
.await;
drop(cork);
debug!("Running send_join auth check");
state_res::auth_check(
&room_version::rules(&room_version_id)?,
&parsed_join_pdu,
&async |event_id| self.services.timeline.get_pdu(&event_id).await,
&async |event_type, state_key| {
let shortstatekey = self
.services
.short
.get_shortstatekey(&event_type, state_key.as_str())
.await?;
let event_id = state.get(&shortstatekey).ok_or_else(|| {
err!(Request(NotFound("Missing fetch_state {shortstatekey:?}")))
})?;
self.services.timeline.get_pdu(event_id).await
},
)
.boxed()
.await?;
info!("Compressing state from send_join");
let compressed: CompressedState = self
.services
.state_compressor
.compress_state_events(state.iter().map(|(ssk, eid)| (ssk, eid.borrow())))
.collect()
.await;
debug!("Saving compressed state");
let HashSetCompressStateEvent {
shortstatehash: statehash_before_join,
added,
removed,
} = self
.services
.state_compressor
.save_state(room_id, Arc::new(compressed))
.await?;
debug!("Forcing state for new room");
self.services
.state
.force_state(room_id, statehash_before_join, added, removed, state_lock)
.await?;
info!("Updating joined counts for new room");
self.services
.state_cache
.update_joined_count(room_id)
.await;
// We append to state before appending the pdu, so we don't have a moment in
// time with the pdu without it's state. This is okay because append_pdu can't
// fail.
let statehash_after_join = self
.services
.state
.append_to_state(&parsed_join_pdu)
.await?;
info!("Appending new room join event");
self.services
.timeline
.append_pdu(
&parsed_join_pdu,
join_event,
once(parsed_join_pdu.event_id.borrow()),
state_lock,
)
.await?;
info!("Setting final room state for new room");
// We set the room state after inserting the pdu, so that we never have a moment
// in time where events in the current room state do not exist
self.services
.state
.set_room_state(room_id, statehash_after_join, state_lock);
Ok(())
}
#[implement(Service)]
#[tracing::instrument(name = "local", level = "debug", skip_all)]
pub async fn join_local(
&self,
sender_user: &UserId,
room_id: &RoomId,
reason: Option<String>,
servers: &[OwnedServerName],
state_lock: &RoomMutexGuard,
) -> Result {
debug_info!("We can join locally");
let join_rules_event_content = self
.services
.state_accessor
.room_state_get_content::<RoomJoinRulesEventContent>(
room_id,
&StateEventType::RoomJoinRules,
"",
)
.await;
let restriction_rooms = match join_rules_event_content {
| Ok(RoomJoinRulesEventContent {
join_rule: JoinRule::Restricted(restricted) | JoinRule::KnockRestricted(restricted),
}) => restricted
.allow
.into_iter()
.filter_map(|a| match a {
| AllowRule::RoomMembership(r) => Some(r.room_id),
| _ => None,
})
.collect(),
| _ => Vec::new(),
};
let join_authorized_via_users_server: Option<OwnedUserId> = {
if restriction_rooms
.iter()
.stream()
.any(|restriction_room_id| {
self.services
.state_cache
.is_joined(sender_user, restriction_room_id)
})
.await
{
let users = self
.services
.state_cache
.local_users_in_room(room_id)
.filter(|user| {
self.services.state_accessor.user_can_invite(
room_id,
user,
sender_user,
state_lock,
)
})
.map(ToOwned::to_owned);
pin_mut!(users);
users.next().await
} else {
None
}
};
let content = RoomMemberEventContent {
displayname: self
.services
.users
.displayname(sender_user)
.await
.ok(),
avatar_url: self
.services
.users
.avatar_url(sender_user)
.await
.ok(),
blurhash: self
.services
.users
.blurhash(sender_user)
.await
.ok(),
reason: reason.clone(),
join_authorized_via_users_server,
..RoomMemberEventContent::new(MembershipState::Join)
};
// Try normal join first
let Err(error) = self
.services
.timeline
.build_and_append_pdu(
PduBuilder::state(sender_user.to_string(), &content),
sender_user,
room_id,
state_lock,
)
.await
else {
return Ok(());
};
if restriction_rooms.is_empty()
&& (servers.is_empty()
|| servers.len() == 1 && self.services.globals.server_is_ours(&servers[0]))
{
return Err(error);
}
warn!(
"We couldn't do the join locally, maybe federation can help to satisfy the restricted \
join requirements"
);
let Ok((make_join_response, remote_server)) = self
.make_join_request(sender_user, room_id, servers)
.await
else {
return Err(error);
};
let Some(room_version_id) = make_join_response.room_version else {
return Err!(BadServerResponse("Remote room version is not supported by tuwunel"));
};
if !self
.services
.server
.supported_room_version(&room_version_id)
{
return Err!(BadServerResponse(
"Remote room version {room_version_id} is not supported by tuwunel"
));
}
let mut join_event_stub: CanonicalJsonObject =
serde_json::from_str(make_join_response.event.get()).map_err(|e| {
err!(BadServerResponse("Invalid make_join event json received from server: {e:?}"))
})?;
let join_authorized_via_users_server = join_event_stub
.get("content")
.map(|s| {
s.as_object()?
.get("join_authorised_via_users_server")?
.as_str()
})
.and_then(|s| OwnedUserId::try_from(s.unwrap_or_default()).ok());
join_event_stub.insert(
"origin".to_owned(),
CanonicalJsonValue::String(
self.services
.globals
.server_name()
.as_str()
.to_owned(),
),
);
join_event_stub.insert(
"origin_server_ts".to_owned(),
CanonicalJsonValue::Integer(
utils::millis_since_unix_epoch()
.try_into()
.expect("Timestamp is valid js_int value"),
),
);
join_event_stub.insert(
"content".to_owned(),
to_canonical_value(RoomMemberEventContent {
displayname: self
.services
.users
.displayname(sender_user)
.await
.ok(),
avatar_url: self
.services
.users
.avatar_url(sender_user)
.await
.ok(),
blurhash: self
.services
.users
.blurhash(sender_user)
.await
.ok(),
reason,
join_authorized_via_users_server,
..RoomMemberEventContent::new(MembershipState::Join)
})
.expect("event is valid, we just created it"),
);
// We keep the "event_id" in the pdu only in v1 or
// v2 rooms
match room_version_id {
| RoomVersionId::V1 | RoomVersionId::V2 => {},
| _ => {
join_event_stub.remove("event_id");
},
}
// In order to create a compatible ref hash (EventID) the `hashes` field needs
// to be present
self.services
.server_keys
.hash_and_sign_event(&mut join_event_stub, &room_version_id)?;
// Generate event id
let event_id = gen_event_id(&join_event_stub, &room_version_id)?;
// Add event_id back
join_event_stub
.insert("event_id".to_owned(), CanonicalJsonValue::String(event_id.clone().into()));
// It has enough fields to be called a proper event now
let join_event = join_event_stub;
let send_join_response = self
.services
.sending
.send_synapse_request(
&remote_server,
federation::membership::create_join_event::v2::Request {
room_id: room_id.to_owned(),
event_id: event_id.clone(),
omit_members: false,
pdu: self
.services
.sending
.convert_to_outgoing_federation_event(join_event.clone())
.await,
},
)
.await?;
if let Some(signed_raw) = send_join_response.room_state.event {
let (signed_event_id, signed_value) =
gen_event_id_canonical_json(&signed_raw, &room_version_id).map_err(|e| {
err!(Request(BadJson(warn!("Could not convert event to canonical JSON: {e}"))))
})?;
if signed_event_id != event_id {
return Err!(Request(BadJson(
warn!(%signed_event_id, %event_id, "Server {remote_server} sent event with wrong event ID")
)));
}
self.services
.event_handler
.handle_incoming_pdu(&remote_server, room_id, &signed_event_id, signed_value, true)
.boxed()
.await?;
} else {
return Err(error);
}
Ok(())
}
#[implement(Service)]
#[tracing::instrument(
name = "make_join",
level = "debug",
skip_all,
fields(?servers)
)]
async fn make_join_request(
&self,
sender_user: &UserId,
room_id: &RoomId,
servers: &[OwnedServerName],
) -> Result<(federation::membership::prepare_join_event::v1::Response, OwnedServerName)> {
let mut make_join_response_and_server =
Err!(BadServerResponse("No server available to assist in joining."));
let mut make_join_counter: usize = 0;
let mut incompatible_room_version_count: usize = 0;
for remote_server in servers {
if self
.services
.globals
.server_is_ours(remote_server)
{
continue;
}
info!("Asking {remote_server} for make_join ({make_join_counter})");
let make_join_response = self
.services
.sending
.send_federation_request(
remote_server,
federation::membership::prepare_join_event::v1::Request {
room_id: room_id.to_owned(),
user_id: sender_user.to_owned(),
ver: self
.services
.server
.supported_room_versions()
.collect(),
},
)
.await;
trace!("make_join response: {make_join_response:?}");
make_join_counter = make_join_counter.saturating_add(1);
if let Err(ref e) = make_join_response {
if matches!(
e.kind(),
ErrorKind::IncompatibleRoomVersion { .. } | ErrorKind::UnsupportedRoomVersion
) {
incompatible_room_version_count =
incompatible_room_version_count.saturating_add(1);
}
if incompatible_room_version_count > 15 {
info!(
"15 servers have responded with M_INCOMPATIBLE_ROOM_VERSION or \
M_UNSUPPORTED_ROOM_VERSION, assuming that tuwunel does not support the \
room version {room_id}: {e}"
);
make_join_response_and_server =
Err!(BadServerResponse("Room version is not supported by tuwunel"));
return make_join_response_and_server;
}
if make_join_counter > 40 {
warn!(
"40 servers failed to provide valid make_join response, assuming no server \
can assist in joining."
);
make_join_response_and_server =
Err!(BadServerResponse("No server available to assist in joining."));
return make_join_response_and_server;
}
}
make_join_response_and_server = make_join_response.map(|r| (r, remote_server.clone()));
if make_join_response_and_server.is_ok() {
break;
}
}
make_join_response_and_server
}

View File

@@ -0,0 +1,63 @@
use ruma::{
RoomId, UserId,
events::room::member::{MembershipState, RoomMemberEventContent},
};
use tuwunel_core::{Err, Result, implement, pdu::PduBuilder};
use super::Service;
use crate::rooms::timeline::RoomMutexGuard;
#[implement(Service)]
#[tracing::instrument(
level = "debug",
skip_all,
fields(%sender_user, %room_id, %user_id)
)]
pub async fn kick(
&self,
room_id: &RoomId,
user_id: &UserId,
reason: Option<&String>,
sender_user: &UserId,
state_lock: &RoomMutexGuard,
) -> Result {
// kicking doesn't make sense if there is no membership
let Ok(event) = self
.services
.state_accessor
.get_member(room_id, user_id)
.await
else {
return Ok(());
};
// this is required to prevent ban -> leave transitions
if !matches!(
event.membership,
MembershipState::Invite | MembershipState::Knock | MembershipState::Join,
) {
return Err!(Request(Forbidden(
"Cannot kick a user who is not apart of the room (current membership: {})",
event.membership
)));
}
self.services
.timeline
.build_and_append_pdu(
PduBuilder::state(user_id.to_string(), &RoomMemberEventContent {
membership: MembershipState::Leave,
reason: reason.cloned(),
is_direct: None,
join_authorized_via_users_server: None,
third_party_invite: None,
..event
}),
sender_user,
room_id,
state_lock,
)
.await?;
Ok(())
}

View File

@@ -0,0 +1,340 @@
use std::collections::HashSet;
use futures::{FutureExt, StreamExt, TryFutureExt, pin_mut};
use ruma::{
CanonicalJsonObject, CanonicalJsonValue, OwnedServerName, RoomId, RoomVersionId, UserId,
api::federation,
events::{
StateEventType,
room::member::{MembershipState, RoomMemberEventContent},
},
};
use tuwunel_core::{
Err, Result, debug_info, debug_warn, err, implement,
matrix::event::gen_event_id,
pdu::PduBuilder,
utils::{self, FutureBoolExt, future::ReadyEqExt},
warn,
};
use super::Service;
use crate::rooms::timeline::RoomMutexGuard;
#[implement(Service)]
#[tracing::instrument(
level = "debug",
skip_all,
fields(%room_id, %user_id)
)]
pub async fn leave(
&self,
user_id: &UserId,
room_id: &RoomId,
reason: Option<String>,
state_lock: &RoomMutexGuard,
) -> Result {
let default_member_content = RoomMemberEventContent {
membership: MembershipState::Leave,
reason: reason.clone(),
join_authorized_via_users_server: None,
is_direct: None,
avatar_url: None,
displayname: None,
third_party_invite: None,
blurhash: None,
};
let is_banned = self.services.metadata.is_banned(room_id);
let is_disabled = self.services.metadata.is_disabled(room_id);
pin_mut!(is_banned, is_disabled);
if is_banned.or(is_disabled).await {
// the room is banned/disabled, the room must be rejected locally since we
// cant/dont want to federate with this server
self.services
.state_cache
.update_membership(
room_id,
user_id,
default_member_content,
user_id,
None,
None,
true,
)
.await?;
return Ok(());
}
let dont_have_room = self
.services
.state_cache
.server_in_room(self.services.globals.server_name(), room_id)
.eq(&false);
let not_knocked = self
.services
.state_cache
.is_knocked(user_id, room_id)
.eq(&false);
// Ask a remote server if we don't have this room and are not knocking on it
if dont_have_room.and(not_knocked).await {
if let Err(e) = self.remote_leave(user_id, room_id).boxed().await {
warn!(%user_id, "Failed to leave room {room_id} remotely: {e}");
// Don't tell the client about this error
}
let last_state = self
.services
.state_cache
.invite_state(user_id, room_id)
.or_else(|_| {
self.services
.state_cache
.knock_state(user_id, room_id)
})
.or_else(|_| {
self.services
.state_cache
.left_state(user_id, room_id)
})
.await
.ok();
// We always drop the invite, we can't rely on other servers
self.services
.state_cache
.update_membership(
room_id,
user_id,
default_member_content,
user_id,
last_state,
None,
true,
)
.await?;
} else {
let Ok(event) = self
.services
.state_accessor
.room_state_get_content::<RoomMemberEventContent>(
room_id,
&StateEventType::RoomMember,
user_id.as_str(),
)
.await
else {
debug_warn!(
"Trying to leave a room you are not a member of, marking room as left locally."
);
return self
.services
.state_cache
.update_membership(
room_id,
user_id,
default_member_content,
user_id,
None,
None,
true,
)
.await;
};
self.services
.timeline
.build_and_append_pdu(
PduBuilder::state(user_id.to_string(), &RoomMemberEventContent {
membership: MembershipState::Leave,
reason,
join_authorized_via_users_server: None,
is_direct: None,
..event
}),
user_id,
room_id,
state_lock,
)
.await?;
}
Ok(())
}
#[implement(Service)]
#[tracing::instrument(name = "remote", level = "debug", skip_all)]
pub async fn remote_leave(&self, user_id: &UserId, room_id: &RoomId) -> Result {
let mut make_leave_response_and_server =
Err!(BadServerResponse("No remote server available to assist in leaving {room_id}."));
let mut servers: HashSet<OwnedServerName> = self
.services
.state_cache
.servers_invite_via(room_id)
.map(ToOwned::to_owned)
.collect()
.await;
match self
.services
.state_cache
.invite_state(user_id, room_id)
.await
{
| Ok(invite_state) => {
servers.extend(
invite_state
.iter()
.filter_map(|event| event.get_field("sender").ok().flatten())
.filter_map(|sender: &str| UserId::parse(sender).ok())
.map(|user| user.server_name().to_owned()),
);
},
| _ => {
match self
.services
.state_cache
.knock_state(user_id, room_id)
.await
{
| Ok(knock_state) => {
servers.extend(
knock_state
.iter()
.filter_map(|event| event.get_field("sender").ok().flatten())
.filter_map(|sender: &str| UserId::parse(sender).ok())
.filter_map(|sender| {
if !self.services.globals.user_is_local(sender) {
Some(sender.server_name().to_owned())
} else {
None
}
}),
);
},
| _ => {},
}
},
}
if let Some(room_id_server_name) = room_id.server_name() {
servers.insert(room_id_server_name.to_owned());
}
debug_info!("servers in remote_leave_room: {servers:?}");
for remote_server in servers {
let make_leave_response = self
.services
.sending
.send_federation_request(
&remote_server,
federation::membership::prepare_leave_event::v1::Request {
room_id: room_id.to_owned(),
user_id: user_id.to_owned(),
},
)
.await;
make_leave_response_and_server = make_leave_response.map(|r| (r, remote_server));
if make_leave_response_and_server.is_ok() {
break;
}
}
let (make_leave_response, remote_server) = make_leave_response_and_server?;
let Some(room_version_id) = make_leave_response.room_version else {
return Err!(BadServerResponse(warn!(
"No room version was returned by {remote_server} for {room_id}, room version is \
likely not supported by tuwunel"
)));
};
if !self
.services
.server
.supported_room_version(&room_version_id)
{
return Err!(BadServerResponse(warn!(
"Remote room version {room_version_id} for {room_id} is not supported by conduwuit",
)));
}
let mut leave_event_stub = serde_json::from_str::<CanonicalJsonObject>(
make_leave_response.event.get(),
)
.map_err(|e| {
err!(BadServerResponse(warn!(
"Invalid make_leave event json received from {remote_server} for {room_id}: {e:?}"
)))
})?;
// TODO: Is origin needed?
leave_event_stub.insert(
"origin".to_owned(),
CanonicalJsonValue::String(
self.services
.globals
.server_name()
.as_str()
.to_owned(),
),
);
leave_event_stub.insert(
"origin_server_ts".to_owned(),
CanonicalJsonValue::Integer(
utils::millis_since_unix_epoch()
.try_into()
.expect("Timestamp is valid js_int value"),
),
);
// room v3 and above removed the "event_id" field from remote PDU format
match room_version_id {
| RoomVersionId::V1 | RoomVersionId::V2 => {},
| _ => {
leave_event_stub.remove("event_id");
},
}
// In order to create a compatible ref hash (EventID) the `hashes` field needs
// to be present
self.services
.server_keys
.hash_and_sign_event(&mut leave_event_stub, &room_version_id)?;
// Generate event id
let event_id = gen_event_id(&leave_event_stub, &room_version_id)?;
// Add event_id back
leave_event_stub
.insert("event_id".to_owned(), CanonicalJsonValue::String(event_id.clone().into()));
// It has enough fields to be called a proper event now
let leave_event = leave_event_stub;
self.services
.sending
.send_federation_request(
&remote_server,
federation::membership::create_leave_event::v2::Request {
room_id: room_id.to_owned(),
event_id,
pdu: self
.services
.sending
.convert_to_outgoing_federation_event(leave_event.clone())
.await,
},
)
.await?;
Ok(())
}

View File

@@ -0,0 +1,22 @@
mod ban;
mod invite;
mod join;
mod kick;
mod leave;
mod unban;
use std::sync::Arc;
use tuwunel_core::Result;
pub struct Service {
services: Arc<crate::services::OnceServices>,
}
impl crate::Service for Service {
fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
Ok(Arc::new(Self { services: args.services.clone() }))
}
fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
}

View File

@@ -0,0 +1,57 @@
use ruma::{
RoomId, UserId,
events::room::member::{MembershipState, RoomMemberEventContent},
};
use tuwunel_core::{Err, Result, implement, pdu::PduBuilder};
use super::Service;
use crate::rooms::timeline::RoomMutexGuard;
#[implement(Service)]
#[tracing::instrument(
name = "remote",
level = "debug",
skip_all,
fields(%sender_user, %room_id, %user_id),
)]
pub async fn unban(
&self,
room_id: &RoomId,
user_id: &UserId,
reason: Option<&String>,
sender_user: &UserId,
state_lock: &RoomMutexGuard,
) -> Result {
let current_member_content = self
.services
.state_accessor
.get_member(room_id, user_id)
.await
.unwrap_or_else(|_| RoomMemberEventContent::new(MembershipState::Leave));
if current_member_content.membership != MembershipState::Ban {
return Err!(Request(Forbidden(
"Cannot unban a user who is not banned (current membership: {})",
current_member_content.membership
)));
}
self.services
.timeline
.build_and_append_pdu(
PduBuilder::state(user_id.to_string(), &RoomMemberEventContent {
membership: MembershipState::Leave,
reason: reason.cloned(),
join_authorized_via_users_server: None,
third_party_invite: None,
is_direct: None,
..current_member_content
}),
sender_user,
room_id,
state_lock,
)
.await?;
Ok(())
}

View File

@@ -11,11 +11,13 @@ pub mod admin;
pub mod appservice;
pub mod client;
pub mod config;
pub mod deactivate;
pub mod emergency;
pub mod federation;
pub mod globals;
pub mod key_backups;
pub mod media;
pub mod membership;
pub mod presence;
pub mod pusher;
pub mod resolver;

View File

@@ -9,9 +9,10 @@ use tuwunel_core::{Result, Server, debug, debug_info, err, info, trace};
use tuwunel_database::Database;
use crate::{
account_data, admin, appservice, client, config, emergency, federation, globals, key_backups,
account_data, admin, appservice, client, config, deactivate, emergency, federation, globals,
key_backups,
manager::Manager,
media, presence, pusher, resolver, rooms, sending, server_keys,
media, membership, presence, pusher, resolver, rooms, sending, server_keys,
service::{Args, Service},
sync, transaction_ids, uiaa, users,
};
@@ -55,6 +56,8 @@ pub struct Services {
pub transaction_ids: Arc<transaction_ids::Service>,
pub uiaa: Arc<uiaa::Service>,
pub users: Arc<users::Service>,
pub membership: Arc<membership::Service>,
pub deactivate: Arc<deactivate::Service>,
manager: Mutex<Option<Arc<Manager>>>,
pub server: Arc<Server>,
@@ -133,6 +136,8 @@ impl Services {
transaction_ids: build!(transaction_ids::Service),
uiaa: build!(uiaa::Service),
users: build!(users::Service),
membership: build!(membership::Service),
deactivate: build!(deactivate::Service),
manager: Mutex::new(None),
server,
@@ -184,7 +189,7 @@ impl Services {
Ok(())
}
pub(crate) fn services(&self) -> [Arc<dyn Service>; 38] {
pub(crate) fn services(&self) -> [Arc<dyn Service>; 40] {
[
self.account_data.clone(),
self.admin.clone(),
@@ -224,6 +229,8 @@ impl Services {
self.transaction_ids.clone(),
self.uiaa.clone(),
self.users.clone(),
self.membership.clone(),
self.deactivate.clone(),
]
}

View File

@@ -5,15 +5,22 @@ mod profile;
use std::sync::Arc;
use futures::{Stream, StreamExt, TryFutureExt};
use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt, future::join3};
use ruma::{
OwnedMxcUri, OwnedUserId, UserId,
OwnedMxcUri, OwnedRoomId, OwnedUserId, UserId,
api::client::filter::FilterDefinition,
events::{GlobalAccountDataEventType, ignored_user_list::IgnoredUserListEvent},
events::{
GlobalAccountDataEventType,
ignored_user_list::IgnoredUserListEvent,
room::member::{MembershipState, RoomMemberEventContent},
},
};
use tuwunel_core::{
Err, Result, debug_warn, err, is_equal_to, trace,
utils::{self, ReadyExt, stream::TryIgnore},
Err, Result, debug_warn, err, is_equal_to,
pdu::PduBuilder,
trace,
utils::{self, IterStream, ReadyExt, TryFutureExtExt, stream::TryIgnore},
warn,
};
use tuwunel_database::{Deserialized, Json, Map};
@@ -439,4 +446,124 @@ impl Service {
pub async fn auth_ldap(&self, _user_dn: &str, _password: &str) -> Result {
Err!(FeatureDisabled("ldap"))
}
pub async fn update_displayname(
&self,
user_id: &UserId,
displayname: Option<String>,
rooms: &[OwnedRoomId],
) {
let (current_avatar_url, current_blurhash, current_displayname) = join3(
self.services.users.avatar_url(user_id).ok(),
self.services.users.blurhash(user_id).ok(),
self.services.users.displayname(user_id).ok(),
)
.await;
if displayname == current_displayname {
return;
}
self.services
.users
.set_displayname(user_id, displayname.clone());
// Send a new join membership event into rooms
let avatar_url = &current_avatar_url;
let blurhash = &current_blurhash;
let displayname = &displayname;
let rooms: Vec<_> = rooms
.iter()
.try_stream()
.and_then(async |room_id: &OwnedRoomId| {
let pdu = PduBuilder::state(user_id.to_string(), &RoomMemberEventContent {
displayname: displayname.clone(),
membership: MembershipState::Join,
avatar_url: avatar_url.clone(),
blurhash: blurhash.clone(),
join_authorized_via_users_server: None,
reason: None,
is_direct: None,
third_party_invite: None,
});
Ok((pdu, room_id))
})
.ignore_err()
.collect()
.await;
self.update_all_rooms(user_id, rooms)
.boxed()
.await;
}
pub async fn update_avatar_url(
&self,
user_id: &UserId,
avatar_url: Option<OwnedMxcUri>,
blurhash: Option<String>,
rooms: &[OwnedRoomId],
) {
let (current_avatar_url, current_blurhash, current_displayname) = join3(
self.services.users.avatar_url(user_id).ok(),
self.services.users.blurhash(user_id).ok(),
self.services.users.displayname(user_id).ok(),
)
.await;
if current_avatar_url == avatar_url && current_blurhash == blurhash {
return;
}
self.services
.users
.set_avatar_url(user_id, avatar_url.clone());
self.services
.users
.set_blurhash(user_id, blurhash.clone());
// Send a new join membership event into rooms
let avatar_url = &avatar_url;
let blurhash = &blurhash;
let displayname = &current_displayname;
let rooms: Vec<_> = rooms
.iter()
.try_stream()
.and_then(async |room_id: &OwnedRoomId| {
let pdu = PduBuilder::state(user_id.to_string(), &RoomMemberEventContent {
avatar_url: avatar_url.clone(),
blurhash: blurhash.clone(),
membership: MembershipState::Join,
displayname: displayname.clone(),
join_authorized_via_users_server: None,
reason: None,
is_direct: None,
third_party_invite: None,
});
Ok((pdu, room_id))
})
.ignore_err()
.collect()
.await;
self.update_all_rooms(user_id, rooms)
.boxed()
.await;
}
async fn update_all_rooms(&self, user_id: &UserId, rooms: Vec<(PduBuilder, &OwnedRoomId)>) {
for (pdu_builder, room_id) in rooms {
let state_lock = self.services.state.mutex.lock(room_id).await;
if let Err(e) = self
.services
.timeline
.build_and_append_pdu(pdu_builder, user_id, room_id, &state_lock)
.await
{
warn!(%user_id, %room_id, "Failed to update/send new profile join membership update in room: {e}");
}
}
}
}