Add an infolog progress message during batch notary request.

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk
2025-12-29 22:31:18 +00:00
parent 63bdeb79c9
commit b412aafaf8
2 changed files with 54 additions and 25 deletions

View File

@@ -31,7 +31,7 @@ use tuwunel_core::{
matrix::{event::gen_event_id_canonical_json, room_version}, matrix::{event::gen_event_id_canonical_json, room_version},
pdu::{PduBuilder, format::from_incoming_federation}, pdu::{PduBuilder, format::from_incoming_federation},
state_res, trace, state_res, trace,
utils::{self, IterStream, ReadyExt, future::TryExtExt, shuffle}, utils::{self, IterStream, ReadyExt, future::TryExtExt, math::Expected, shuffle},
warn, warn,
}; };
@@ -302,24 +302,32 @@ pub async fn join_remote(
} }
} }
self.services let shortroomid = self
.services
.short .short
.get_or_create_shortroomid(room_id) .get_or_create_shortroomid(room_id)
.await; .await;
info!("Parsing join event"); info!(
%room_id,
%shortroomid,
"Initialized room. Parsing join event..."
);
let parsed_join_pdu = let parsed_join_pdu =
from_incoming_federation(room_id, &event_id, &mut join_event, &room_version_rules)?; from_incoming_federation(room_id, &event_id, &mut join_event, &room_version_rules)?;
info!("Acquiring server signing keys for response events");
let resp_state = &response.state; let resp_state = &response.state;
let resp_auth = &response.auth_chain; let resp_auth = &response.auth_chain;
info!(
events = resp_state.len().expected_add(resp_auth.len()),
"Acquiring server signing keys for response events..."
);
self.services self.services
.server_keys .server_keys
.acquire_events_pubkeys(resp_auth.iter().chain(resp_state.iter())) .acquire_events_pubkeys(resp_auth.iter().chain(resp_state.iter()))
.await; .await;
info!("Going through send_join response room_state"); info!(events = response.state.len(), "Going through send_join response room_state...");
let cork = self.services.db.cork_and_flush(); let cork = self.services.db.cork_and_flush();
let state = response let state = response
.state .state
@@ -361,7 +369,10 @@ pub async fn join_remote(
drop(cork); drop(cork);
info!("Going through send_join response auth_chain"); info!(
events = response.auth_chain.len(),
"Going through send_join response auth_chain..."
);
let cork = self.services.db.cork_and_flush(); let cork = self.services.db.cork_and_flush();
response response
.auth_chain .auth_chain
@@ -392,7 +403,7 @@ pub async fn join_remote(
drop(cork); drop(cork);
debug!("Running send_join auth check"); debug!("Running send_join auth check...");
state_res::auth_check( state_res::auth_check(
&room_version_rules, &room_version_rules,
&parsed_join_pdu, &parsed_join_pdu,
@@ -415,7 +426,7 @@ pub async fn join_remote(
.boxed() .boxed()
.await?; .await?;
info!("Compressing state from send_join"); info!(events = state.len(), "Compressing state from send_join...");
let compressed: CompressedState = self let compressed: CompressedState = self
.services .services
.state_compressor .state_compressor
@@ -423,7 +434,7 @@ pub async fn join_remote(
.collect() .collect()
.await; .await;
debug!("Saving compressed state"); debug!("Saving compressed state...");
let HashSetCompressStateEvent { let HashSetCompressStateEvent {
shortstatehash: statehash_before_join, shortstatehash: statehash_before_join,
added, added,
@@ -434,13 +445,15 @@ pub async fn join_remote(
.save_state(room_id, Arc::new(compressed)) .save_state(room_id, Arc::new(compressed))
.await?; .await?;
debug!("Forcing state for new room"); debug!(
state_hash = ?statehash_before_join,
"Forcing state for new room..."
);
self.services self.services
.state .state
.force_state(room_id, statehash_before_join, added, removed, state_lock) .force_state(room_id, statehash_before_join, added, removed, state_lock)
.await?; .await?;
info!("Updating joined counts for new room");
self.services self.services
.state_cache .state_cache
.update_joined_count(room_id) .update_joined_count(room_id)
@@ -455,7 +468,11 @@ pub async fn join_remote(
.append_to_state(&parsed_join_pdu) .append_to_state(&parsed_join_pdu)
.await?; .await?;
info!("Appending new room join event"); info!(
event_id = %parsed_join_pdu.event_id,
"Appending new room join event..."
);
self.services self.services
.timeline .timeline
.append_pdu( .append_pdu(
@@ -466,13 +483,17 @@ pub async fn join_remote(
) )
.await?; .await?;
info!("Setting final room state for new room");
// We set the room state after inserting the pdu, so that we never have a moment // We set the room state after inserting the pdu, so that we never have a moment
// in time where events in the current room state do not exist // in time where events in the current room state do not exist
self.services self.services
.state .state
.set_room_state(room_id, statehash_after_join, state_lock); .set_room_state(room_id, statehash_after_join, state_lock);
info!(
statehash = %statehash_after_join,
"Set final room state for new room."
);
Ok(()) Ok(())
} }
@@ -881,14 +902,12 @@ pub(super) async fn get_servers_for_room(
// 2. (room id server)? // 2. (room id server)?
// 3. shuffle [via query + resolve servers]? // 3. shuffle [via query + resolve servers]?
// 4. shuffle [invited via, inviters servers]? // 4. shuffle [invited via, inviters servers]?
debug!(?servers);
info!("{servers:?}");
// dedup preserving order // dedup preserving order
let mut set = HashSet::new(); let mut set = HashSet::new();
servers.retain(|x| set.insert(x.clone())); servers.retain(|x| set.insert(x.clone()));
debug!(?servers);
info!("{servers:?}");
// sort deprioritized servers last // sort deprioritized servers last
if !servers.is_empty() { if !servers.is_empty() {
@@ -905,5 +924,6 @@ pub(super) async fn get_servers_for_room(
} }
} }
debug_info!(?servers);
Ok(servers) Ok(servers)
} }

View File

@@ -8,7 +8,7 @@ use ruma::{
get_server_keys, get_server_keys,
}, },
}; };
use tuwunel_core::{Err, Result, debug, implement}; use tuwunel_core::{Err, Result, debug, implement, info};
#[implement(super::Service)] #[implement(super::Service)]
pub(super) async fn batch_notary_request<'a, S, K>( pub(super) async fn batch_notary_request<'a, S, K>(
@@ -36,18 +36,20 @@ where
batch batch
}); });
debug_assert!(!server_keys.is_empty(), "empty batch request to notary"); let total_keys = server_keys.len();
debug_assert!(total_keys > 0, "empty batch request to notary");
let batch_max = self
.services
.server
.config
.trusted_server_batch_size;
let mut results = Vec::new(); let mut results = Vec::new();
while let Some(batch) = server_keys while let Some(batch) = server_keys
.keys() .keys()
.rev() .rev()
.take( .take(batch_max)
self.services
.server
.config
.trusted_server_batch_size,
)
.next_back() .next_back()
.cloned() .cloned()
{ {
@@ -74,6 +76,13 @@ where
.filter_map(Result::ok); .filter_map(Result::ok);
results.extend(response); results.extend(response);
info!(
"obtained {0} of {1} keys with {2} more to request",
results.len(),
total_keys,
server_keys.len(),
);
} }
Ok(results) Ok(results)