Implement MSC3706 two-step join.
Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
@@ -2,6 +2,7 @@ use std::{
|
|||||||
borrow::Borrow,
|
borrow::Borrow,
|
||||||
collections::{HashMap, HashSet},
|
collections::{HashMap, HashSet},
|
||||||
iter::once,
|
iter::once,
|
||||||
|
mem::take,
|
||||||
sync::Arc,
|
sync::Arc,
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -175,11 +176,10 @@ pub async fn join_remote(
|
|||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
info!("Asking {remote_server} for send_join in room {room_id}");
|
|
||||||
let send_join_request = federation::membership::create_join_event::v2::Request {
|
let send_join_request = federation::membership::create_join_event::v2::Request {
|
||||||
room_id: room_id.to_owned(),
|
room_id: room_id.to_owned(),
|
||||||
event_id: event_id.clone(),
|
event_id: event_id.clone(),
|
||||||
omit_members: false,
|
omit_members: true,
|
||||||
pdu: self
|
pdu: self
|
||||||
.services
|
.services
|
||||||
.federation
|
.federation
|
||||||
@@ -196,23 +196,60 @@ pub async fn join_remote(
|
|||||||
.lock(room_id)
|
.lock(room_id)
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
let send_join_response = match self
|
info!("Asking {remote_server} for fast_join in room {room_id}");
|
||||||
|
let mut response = match self
|
||||||
.services
|
.services
|
||||||
.federation
|
.federation
|
||||||
.execute(&remote_server, send_join_request)
|
.execute(&remote_server, send_join_request)
|
||||||
.await
|
.await
|
||||||
|
.inspect_err(|e| error!("send_join failed: {e}"))
|
||||||
{
|
{
|
||||||
| Ok(response) => response,
|
| Err(e) => return Err(e),
|
||||||
| Err(e) => {
|
| Ok(response) => response.room_state,
|
||||||
error!("send_join failed: {e}");
|
|
||||||
return Err(e);
|
|
||||||
},
|
|
||||||
};
|
};
|
||||||
|
|
||||||
info!("send_join finished");
|
info!(
|
||||||
|
fast_join = response.members_omitted,
|
||||||
|
auth_chain = response.auth_chain.len(),
|
||||||
|
state = response.state.len(),
|
||||||
|
servers = response
|
||||||
|
.servers_in_room
|
||||||
|
.as_ref()
|
||||||
|
.map(Vec::len)
|
||||||
|
.unwrap_or(0),
|
||||||
|
"send_join finished"
|
||||||
|
);
|
||||||
|
|
||||||
|
if response.members_omitted {
|
||||||
|
use federation::event::get_room_state::v1::{Request, Response};
|
||||||
|
|
||||||
|
info!("Asking {remote_server} for state in room {room_id}");
|
||||||
|
match self
|
||||||
|
.services
|
||||||
|
.federation
|
||||||
|
.execute(&remote_server, Request {
|
||||||
|
room_id: room_id.to_owned(),
|
||||||
|
event_id: event_id.clone(),
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.inspect_err(|e| error!("state failed: {e}"))
|
||||||
|
{
|
||||||
|
| Err(e) => return Err(e),
|
||||||
|
| Ok(Response { mut auth_chain, mut pdus }) => {
|
||||||
|
response.auth_chain = take(&mut auth_chain);
|
||||||
|
response.state = take(&mut pdus);
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
info!(
|
||||||
|
auth_chain = response.auth_chain.len(),
|
||||||
|
state = response.state.len(),
|
||||||
|
"state finished"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
if join_authorized_via_users_server.is_some() {
|
if join_authorized_via_users_server.is_some() {
|
||||||
if let Some(signed_raw) = &send_join_response.room_state.event {
|
if let Some(signed_raw) = &response.event {
|
||||||
debug_info!(
|
debug_info!(
|
||||||
"There is a signed event with join_authorized_via_users_server. This room is \
|
"There is a signed event with join_authorized_via_users_server. This room is \
|
||||||
probably using restricted joins. Adding signature to our event"
|
probably using restricted joins. Adding signature to our event"
|
||||||
@@ -275,9 +312,8 @@ pub async fn join_remote(
|
|||||||
from_incoming_federation(room_id, &event_id, &mut join_event, &room_version_rules)?;
|
from_incoming_federation(room_id, &event_id, &mut join_event, &room_version_rules)?;
|
||||||
|
|
||||||
info!("Acquiring server signing keys for response events");
|
info!("Acquiring server signing keys for response events");
|
||||||
let resp_events = &send_join_response.room_state;
|
let resp_state = &response.state;
|
||||||
let resp_state = &resp_events.state;
|
let resp_auth = &response.auth_chain;
|
||||||
let resp_auth = &resp_events.auth_chain;
|
|
||||||
self.services
|
self.services
|
||||||
.server_keys
|
.server_keys
|
||||||
.acquire_events_pubkeys(resp_auth.iter().chain(resp_state.iter()))
|
.acquire_events_pubkeys(resp_auth.iter().chain(resp_state.iter()))
|
||||||
@@ -285,8 +321,7 @@ pub async fn join_remote(
|
|||||||
|
|
||||||
info!("Going through send_join response room_state");
|
info!("Going through send_join response room_state");
|
||||||
let cork = self.services.db.cork_and_flush();
|
let cork = self.services.db.cork_and_flush();
|
||||||
let state = send_join_response
|
let state = response
|
||||||
.room_state
|
|
||||||
.state
|
.state
|
||||||
.iter()
|
.iter()
|
||||||
.stream()
|
.stream()
|
||||||
@@ -328,8 +363,7 @@ pub async fn join_remote(
|
|||||||
|
|
||||||
info!("Going through send_join response auth_chain");
|
info!("Going through send_join response auth_chain");
|
||||||
let cork = self.services.db.cork_and_flush();
|
let cork = self.services.db.cork_and_flush();
|
||||||
send_join_response
|
response
|
||||||
.room_state
|
|
||||||
.auth_chain
|
.auth_chain
|
||||||
.iter()
|
.iter()
|
||||||
.stream()
|
.stream()
|
||||||
@@ -592,7 +626,7 @@ pub async fn join_local(
|
|||||||
let send_join_request = federation::membership::create_join_event::v2::Request {
|
let send_join_request = federation::membership::create_join_event::v2::Request {
|
||||||
room_id: room_id.to_owned(),
|
room_id: room_id.to_owned(),
|
||||||
event_id: event_id.clone(),
|
event_id: event_id.clone(),
|
||||||
omit_members: false,
|
omit_members: true,
|
||||||
pdu: self
|
pdu: self
|
||||||
.services
|
.services
|
||||||
.federation
|
.federation
|
||||||
|
|||||||
Reference in New Issue
Block a user