Refactor join, alias services
Split knock, user register from api into services Fix autojoin not working with v12 rooms Fix 'm.login.registration_token/validity' for reloaded registration tokens Change join servers order Move autojoin for ldap
This commit is contained in:
@@ -1,9 +1,14 @@
|
||||
use std::{borrow::Borrow, collections::HashMap, iter::once, sync::Arc};
|
||||
use std::{
|
||||
borrow::Borrow,
|
||||
collections::{HashMap, HashSet},
|
||||
iter::once,
|
||||
sync::Arc,
|
||||
};
|
||||
|
||||
use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt, pin_mut};
|
||||
use ruma::{
|
||||
CanonicalJsonObject, CanonicalJsonValue, OwnedServerName, OwnedUserId, RoomId, RoomVersionId,
|
||||
UserId,
|
||||
CanonicalJsonObject, CanonicalJsonValue, OwnedServerName, OwnedUserId, RoomId, RoomOrAliasId,
|
||||
RoomVersionId, UserId,
|
||||
api::{client::error::ErrorKind, federation},
|
||||
canonical_json::to_canonical_value,
|
||||
events::{
|
||||
@@ -20,13 +25,13 @@ use tuwunel_core::{
|
||||
matrix::{event::gen_event_id_canonical_json, room_version},
|
||||
pdu::{PduBuilder, format::from_incoming_federation},
|
||||
state_res, trace,
|
||||
utils::{self, IterStream, ReadyExt},
|
||||
utils::{self, IterStream, ReadyExt, shuffle},
|
||||
warn,
|
||||
};
|
||||
|
||||
use super::Service;
|
||||
use crate::{
|
||||
appservice::RegistrationInfo,
|
||||
Services,
|
||||
rooms::{
|
||||
state::RoomMutexGuard,
|
||||
state_compressor::{CompressedState, HashSetCompressStateEvent},
|
||||
@@ -39,22 +44,27 @@ use crate::{
|
||||
skip_all,
|
||||
fields(%sender_user, %room_id)
|
||||
)]
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub async fn join(
|
||||
&self,
|
||||
sender_user: &UserId,
|
||||
room_id: &RoomId,
|
||||
orig_room_id: Option<&RoomOrAliasId>,
|
||||
reason: Option<String>,
|
||||
servers: &[OwnedServerName],
|
||||
appservice_info: &Option<RegistrationInfo>,
|
||||
is_appservice: bool,
|
||||
state_lock: &RoomMutexGuard,
|
||||
) -> Result {
|
||||
let servers =
|
||||
get_servers_for_room(&self.services, sender_user, room_id, orig_room_id, servers).await?;
|
||||
|
||||
let user_is_guest = self
|
||||
.services
|
||||
.users
|
||||
.is_deactivated(sender_user)
|
||||
.await
|
||||
.unwrap_or(false)
|
||||
&& appservice_info.is_none();
|
||||
&& !is_appservice;
|
||||
|
||||
if user_is_guest
|
||||
&& !self
|
||||
@@ -99,12 +109,12 @@ pub async fn join(
|
||||
|| (servers.len() == 1 && self.services.globals.server_is_ours(&servers[0]));
|
||||
|
||||
if local_join {
|
||||
self.join_local(sender_user, room_id, reason, servers, state_lock)
|
||||
self.join_local(sender_user, room_id, reason, &servers, state_lock)
|
||||
.boxed()
|
||||
.await?;
|
||||
} else {
|
||||
// Ask a remote server if we are not participating in this room
|
||||
self.join_remote(sender_user, room_id, reason, servers, state_lock)
|
||||
self.join_remote(sender_user, room_id, reason, &servers, state_lock)
|
||||
.boxed()
|
||||
.await?;
|
||||
}
|
||||
@@ -838,3 +848,79 @@ async fn make_join_request(
|
||||
|
||||
make_join_response_and_server
|
||||
}
|
||||
|
||||
pub(super) async fn get_servers_for_room(
|
||||
services: &Services,
|
||||
user_id: &UserId,
|
||||
room_id: &RoomId,
|
||||
orig_room_id: Option<&RoomOrAliasId>,
|
||||
via: &[OwnedServerName],
|
||||
) -> Result<Vec<OwnedServerName>> {
|
||||
// add invited vias
|
||||
let mut additional_servers = services
|
||||
.state_cache
|
||||
.servers_invite_via(room_id)
|
||||
.map(ToOwned::to_owned)
|
||||
.collect::<Vec<_>>()
|
||||
.await;
|
||||
|
||||
// add invite senders' servers
|
||||
additional_servers.extend(
|
||||
services
|
||||
.state_cache
|
||||
.invite_state(user_id, room_id)
|
||||
.await
|
||||
.unwrap_or_default()
|
||||
.iter()
|
||||
.filter_map(|event| event.get_field("sender").ok().flatten())
|
||||
.filter_map(|sender: &str| UserId::parse(sender).ok())
|
||||
.map(|user| user.server_name().to_owned()),
|
||||
);
|
||||
|
||||
let mut servers = Vec::from(via);
|
||||
shuffle(&mut servers);
|
||||
|
||||
if let Some(server_name) = room_id.server_name() {
|
||||
servers.insert(0, server_name.to_owned());
|
||||
}
|
||||
|
||||
if let Some(orig_room_id) = orig_room_id
|
||||
&& let Some(orig_server_name) = orig_room_id.server_name()
|
||||
{
|
||||
servers.insert(0, orig_server_name.to_owned());
|
||||
}
|
||||
|
||||
shuffle(&mut additional_servers);
|
||||
|
||||
servers.extend_from_slice(&additional_servers);
|
||||
|
||||
// 1. (room alias server)?
|
||||
// 2. (room id server)?
|
||||
// 3. shuffle [via query + resolve servers]?
|
||||
// 4. shuffle [invited via, inviters servers]?
|
||||
|
||||
info!("{servers:?}");
|
||||
|
||||
// dedup preserving order
|
||||
let mut set = HashSet::new();
|
||||
servers.retain(|x| set.insert(x.clone()));
|
||||
|
||||
info!("{servers:?}");
|
||||
|
||||
// sort deprioritized servers last
|
||||
if !servers.is_empty() {
|
||||
for i in 0..servers.len() {
|
||||
if services
|
||||
.server
|
||||
.config
|
||||
.deprioritize_joins_through_servers
|
||||
.is_match(servers[i].host())
|
||||
{
|
||||
let server = servers.remove(i);
|
||||
servers.push(server);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(servers)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user