From 6014c0fd6c7bad945f421a6bc9b482f0a840d322 Mon Sep 17 00:00:00 2001 From: dasha_uwu Date: Fri, 6 Feb 2026 12:09:15 +0500 Subject: [PATCH] Refactor `admin debug` --- src/admin/debug/commands.rs | 321 ++++++++++++++---------------------- 1 file changed, 128 insertions(+), 193 deletions(-) diff --git a/src/admin/debug/commands.rs b/src/admin/debug/commands.rs index 8c9bb869..cc9897d2 100644 --- a/src/admin/debug/commands.rs +++ b/src/admin/debug/commands.rs @@ -10,7 +10,9 @@ use futures::{FutureExt, StreamExt, TryStreamExt}; use ruma::{ CanonicalJsonObject, CanonicalJsonValue, EventId, OwnedEventId, OwnedRoomId, OwnedRoomOrAliasId, OwnedServerName, RoomId, RoomVersionId, - api::federation::event::get_room_state, events::AnyStateEvent, serde::Raw, + api::federation::{discovery::get_server_version, event::get_room_state}, + events::AnyStateEvent, + serde::Raw, }; use serde::Serialize; use tracing_subscriber::EnvFilter; @@ -91,18 +93,20 @@ pub(super) async fn parse_pdu(&self) -> Result { let rules = RoomVersionId::V6 .rules() .expect("rules for V6 rooms"); - match serde_json::from_str(&string) { - | Err(e) => return Err!("Invalid json in command body: {e}"), - | Ok(value) => match ruma::signatures::reference_hash(&value, &rules) { - | Err(e) => return Err!("Could not parse PDU JSON: {e:?}"), - | Ok(hash) => { - let event_id = OwnedEventId::parse(format!("${hash}")); - match serde_json::from_value::(serde_json::to_value(value)?) { - | Err(e) => return Err!("EventId: {event_id:?}\nCould not parse event: {e}"), - | Ok(pdu) => write!(self, "EventId: {event_id:?}\n{pdu:#?}"), - } - }, - }, + + let value = + serde_json::from_str(&string).map_err(|e| err!("Invalid json in command body: {e}"))?; + + let hash = ruma::signatures::reference_hash(&value, &rules) + .map_err(|e| err!("Could not parse PDU JSON: {e:?}"))?; + + let event_id = OwnedEventId::parse(format!("${hash}")); + + let value = serde_json::to_value(value)?; + + match serde_json::from_value::(value) { + | Err(e) => return Err!("EventId: {event_id:?}\nCould not parse event: {e}"), + | Ok(pdu) => write!(self, "EventId: {event_id:?}\n{pdu:#?}"), } .await } @@ -125,19 +129,17 @@ pub(super) async fn get_pdu(&self, event_id: OwnedEventId) -> Result { .await; } - match pdu_json { - | Err(_) => return Err!("PDU not found locally."), - | Ok(json) => { - let text = serde_json::to_string_pretty(&json)?; - let msg = if outlier { - "Outlier (Rejected / Soft Failed) PDU found in our database" - } else { - "PDU found in our database" - }; - write!(self, "{msg}\n```json\n{text}\n```",) - }, - } - .await + let json = pdu_json.map_err(|_| err!("PDU not found locally."))?; + + let text = serde_json::to_string_pretty(&json)?; + let msg = if outlier { + "Outlier (Rejected / Soft Failed) PDU found in our database" + } else { + "PDU found in our database" + }; + + self.write_str(&format!("{msg}\n```json\n{text}\n```")) + .await } #[admin_command] @@ -150,14 +152,12 @@ pub(super) async fn get_short_pdu(&self, shortroomid: ShortRoomId, count: i64) - .get_pdu_json_from_id(&pdu_id) .await; - match pdu_json { - | Err(_) => return Err!("PDU not found locally."), - | Ok(json) => { - let json_text = serde_json::to_string_pretty(&json)?; - write!(self, "```json\n{json_text}\n```") - }, - } - .await + let json = pdu_json.map_err(|_| err!("PDU not found locally."))?; + + let json_text = serde_json::to_string_pretty(&json)?; + + self.write_str(&format!("```json\n{json_text}\n```")) + .await } #[admin_command] @@ -192,29 +192,19 @@ pub(super) async fn get_remote_pdu_list(&self, server: OwnedServerName, force: b let mut success_count: usize = 0; for event_id in list { - if force { - match self - .get_remote_pdu(event_id.to_owned(), server.clone()) - .await - { - | Err(e) => { - failed_count = failed_count.saturating_add(1); - self.services - .admin - .send_text(&format!("Failed to get remote PDU, ignoring error: {e}")) - .await; + let result = self + .get_remote_pdu(event_id.to_owned(), server.clone()) + .await; - warn!("Failed to get remote PDU, ignoring error: {e}"); - }, - | _ => { - success_count = success_count.saturating_add(1); - }, - } - } else { - self.get_remote_pdu(event_id.to_owned(), server.clone()) - .await?; - success_count = success_count.saturating_add(1); + if !force { + result?; + } else if let Err(e) = result { + warn!("Failed to get remote PDU, ignoring error: {e}"); + failed_count = failed_count.saturating_add(1); + continue; } + + success_count = success_count.saturating_add(1); } let out = @@ -240,62 +230,48 @@ pub(super) async fn get_remote_pdu( ); } - match self + let response = self .services .federation .execute(&server, ruma::api::federation::event::get_event::v1::Request { event_id: event_id.clone(), }) .await - { - | Err(e) => { - return Err!( - "Remote server did not have PDU or failed sending request to remote server: {e}" - ); - }, - | Ok(response) => { - let json: CanonicalJsonObject = - serde_json::from_str(response.pdu.get()).map_err(|e| { - warn!( - "Requested event ID {event_id} from server but failed to convert from \ - RawValue to CanonicalJsonObject (malformed event/response?): {e}" - ); - err!(Request(Unknown( - "Received response from server but failed to parse PDU" - ))) - })?; + .map_err(|e| { + err!("Remote server did not have PDU or failed sending request to remote server: {e}") + })?; - trace!("Attempting to parse PDU: {:?}", &response.pdu); - let (room_id, ..) = { - let parsed_result = self - .services - .event_handler - .parse_incoming_pdu(&response.pdu) - .boxed() - .await; + let json: CanonicalJsonObject = serde_json::from_str(response.pdu.get()).map_err(|e| { + warn!( + "Requested event ID {event_id} from server but failed to convert from RawValue to \ + CanonicalJsonObject (malformed event/response?): {e}" + ); + err!(Request(Unknown("Received response from server but failed to parse PDU"))) + })?; - match parsed_result { - | Ok(t) => t, - | Err(e) => { - warn!("Failed to parse PDU: {e}"); - info!("Full PDU: {:?}", &response.pdu); - return Err!("Failed to parse PDU remote server {server} sent us: {e}"); - }, - } - }; + trace!("Attempting to parse PDU: {:?}", &response.pdu); + let (room_id, ..) = self + .services + .event_handler + .parse_incoming_pdu(&response.pdu) + .boxed() + .await + .map_err(|e| { + warn!("Failed to parse PDU: {e}"); + info!("Full PDU: {:?}", &response.pdu); + err!("Failed to parse PDU remote server {server} sent us: {e}") + })?; - info!("Attempting to handle event ID {event_id} as backfilled PDU"); - self.services - .timeline - .backfill_pdu(&room_id, &server, response.pdu) - .await?; + info!("Attempting to handle event ID {event_id} as backfilled PDU"); + self.services + .timeline + .backfill_pdu(&room_id, &server, response.pdu) + .await?; - let text = serde_json::to_string_pretty(&json)?; - let msg = "Got PDU from specified server and handled as backfilled"; - write!(self, "{msg}. Event body:\n```json\n{text}\n```") - }, - } - .await + let text = serde_json::to_string_pretty(&json)?; + let msg = "Got PDU from specified server and handled as backfilled"; + self.write_str(&format!("{msg}. Event body:\n```json\n{text}\n```")) + .await } #[admin_command] @@ -320,8 +296,8 @@ pub(super) async fn get_room_state(&self, room: OwnedRoomOrAliasId) -> Result { )) })?; - let out = format!("```json\n{json}\n```"); - self.write_str(&out).await + self.write_str(&format!("```json\n{json}\n```")) + .await } #[admin_command] @@ -332,29 +308,22 @@ pub(super) async fn ping(&self, server: OwnedServerName) -> Result { let timer = tokio::time::Instant::now(); - match self + let response = self .services .federation - .execute(&server, ruma::api::federation::discovery::get_server_version::v1::Request {}) + .execute(&server, get_server_version::v1::Request {}) .await - { - | Err(e) => { - return Err!("Failed sending federation request to specified server:\n\n{e}"); - }, - | Ok(response) => { - let ping_time = timer.elapsed(); - let json_text_res = serde_json::to_string_pretty(&response.server); + .map_err(|e| err!("Failed sending federation request to specified server:\n\n{e}"))?; - let out = if let Ok(json) = json_text_res { - format!("Got response which took {ping_time:?} time:\n```json\n{json}\n```") - } else { - format!("Got non-JSON response which took {ping_time:?} time:\n{response:?}") - }; + let ping_time = timer.elapsed(); - write!(self, "{out}") - }, - } - .await + let out = if let Ok(json) = serde_json::to_string_pretty(&response.server) { + format!("Got response which took {ping_time:?} time:\n```json\n{json}\n```") + } else { + format!("Got non-JSON response which took {ping_time:?} time:\n{response:?}") + }; + + self.write_str(&out).await } #[admin_command] @@ -377,55 +346,25 @@ pub(super) async fn force_device_list_updates(&self) -> Result { pub(super) async fn change_log_level(&self, filter: Option, reset: bool) -> Result { let handles = &["console"]; - if reset { - let old_filter_layer = match EnvFilter::try_new(&self.services.server.config.log) { - | Ok(s) => s, - | Err(e) => return Err!("Log level from config appears to be invalid now: {e}"), - }; + let filter = reset + .then_some(&self.services.config.log) + .or(filter.as_ref()) + .ok_or_else(|| err!("No log level was specified."))?; - match self - .services - .server - .log - .reload - .reload(&old_filter_layer, Some(handles)) - { - | Err(e) => { - return Err!("Failed to modify and reload the global tracing log level: {e}"); - }, - | Ok(()) => { - let value = &self.services.server.config.log; - let out = format!("Successfully changed log level back to config value {value}"); - return self.write_str(&out).await; - }, - } - } + let filter_layer = EnvFilter::try_new(filter).map_err(|e| { + let source = if !reset { "specified" } else { "found in config" }; + err!("Invalid log level filter {source}: {e}") + })?; - if let Some(filter) = filter { - let new_filter_layer = match EnvFilter::try_new(filter) { - | Ok(s) => s, - | Err(e) => return Err!("Invalid log level filter specified: {e}"), - }; + self.services + .server + .log + .reload + .reload(&filter_layer, Some(handles)) + .map_err(|e| err!("Failed to modify and reload the global tracing log level: {e}"))?; - match self - .services - .server - .log - .reload - .reload(&new_filter_layer, Some(handles)) - { - | Ok(()) => { - return self - .write_str("Successfully changed log level") - .await; - }, - | Err(e) => { - return Err!("Failed to modify and reload the global tracing log level: {e}"); - }, - } - } - - Err!("No log level was specified.") + self.write_str(&format!("Successfully changed log level to {filter}")) + .await } #[admin_command] @@ -438,15 +377,12 @@ pub(super) async fn sign_json(&self) -> Result { } let string = self.body[1..self.body.len().checked_sub(1).unwrap()].join("\n"); - match serde_json::from_str(&string) { - | Err(e) => return Err!("Invalid json: {e}"), - | Ok(mut value) => { - self.services.server_keys.sign_json(&mut value)?; - let json_text = serde_json::to_string_pretty(&value)?; - write!(self, "{json_text}") - }, - } - .await + let mut value = serde_json::from_str(&string).map_err(|e| err!("Invalid json: {e}"))?; + + self.services.server_keys.sign_json(&mut value)?; + + let json_text = serde_json::to_string_pretty(&value)?; + self.write_str(&json_text).await } #[admin_command] @@ -459,19 +395,17 @@ pub(super) async fn verify_json(&self) -> Result { } let string = self.body[1..self.body.len().checked_sub(1).unwrap()].join("\n"); - match serde_json::from_str::(&string) { - | Err(e) => return Err!("Invalid json: {e}"), - | Ok(value) => match self - .services - .server_keys - .verify_json(&value, None) - .await - { - | Err(e) => return Err!("Signature verification failed: {e}"), - | Ok(()) => write!(self, "Signature correct"), - }, - } - .await + + let value = serde_json::from_str::(&string) + .map_err(|e| err!("Invalid json: {e}"))?; + + self.services + .server_keys + .verify_json(&value, None) + .await + .map_err(|e| err!("Signature verification failed: {e}"))?; + + self.write_str("Signature correct").await } #[admin_command] @@ -489,11 +423,10 @@ pub(super) async fn verify_pdu(&self, event_id: OwnedEventId) -> Result { .services .server_keys .verify_event(&event, None) - .await + .await? { - | Err(e) => return Err(e), - | Ok(Verified::Signatures) => "signatures OK, but content hash failed (redaction).", - | Ok(Verified::All) => "signatures and hashes OK.", + | Verified::Signatures => "signatures OK, but content hash failed (redaction).", + | Verified::All => "signatures and hashes OK.", }; self.write_str(msg).await @@ -552,6 +485,8 @@ pub(super) async fn force_set_room_state_from_server( room_id: OwnedRoomId, server_name: OwnedServerName, ) -> Result { + // TODO: diverged from join remote + if !self .services .state_cache