Refactor admin debug

This commit is contained in:
dasha_uwu
2026-02-06 12:09:15 +05:00
parent 01194bfc7b
commit 6014c0fd6c

View File

@@ -10,7 +10,9 @@ use futures::{FutureExt, StreamExt, TryStreamExt};
use ruma::{ use ruma::{
CanonicalJsonObject, CanonicalJsonValue, EventId, OwnedEventId, OwnedRoomId, CanonicalJsonObject, CanonicalJsonValue, EventId, OwnedEventId, OwnedRoomId,
OwnedRoomOrAliasId, OwnedServerName, RoomId, RoomVersionId, OwnedRoomOrAliasId, OwnedServerName, RoomId, RoomVersionId,
api::federation::event::get_room_state, events::AnyStateEvent, serde::Raw, api::federation::{discovery::get_server_version, event::get_room_state},
events::AnyStateEvent,
serde::Raw,
}; };
use serde::Serialize; use serde::Serialize;
use tracing_subscriber::EnvFilter; use tracing_subscriber::EnvFilter;
@@ -91,18 +93,20 @@ pub(super) async fn parse_pdu(&self) -> Result {
let rules = RoomVersionId::V6 let rules = RoomVersionId::V6
.rules() .rules()
.expect("rules for V6 rooms"); .expect("rules for V6 rooms");
match serde_json::from_str(&string) {
| Err(e) => return Err!("Invalid json in command body: {e}"), let value =
| Ok(value) => match ruma::signatures::reference_hash(&value, &rules) { serde_json::from_str(&string).map_err(|e| err!("Invalid json in command body: {e}"))?;
| Err(e) => return Err!("Could not parse PDU JSON: {e:?}"),
| Ok(hash) => { let hash = ruma::signatures::reference_hash(&value, &rules)
let event_id = OwnedEventId::parse(format!("${hash}")); .map_err(|e| err!("Could not parse PDU JSON: {e:?}"))?;
match serde_json::from_value::<PduEvent>(serde_json::to_value(value)?) {
| Err(e) => return Err!("EventId: {event_id:?}\nCould not parse event: {e}"), let event_id = OwnedEventId::parse(format!("${hash}"));
| Ok(pdu) => write!(self, "EventId: {event_id:?}\n{pdu:#?}"),
} let value = serde_json::to_value(value)?;
},
}, match serde_json::from_value::<PduEvent>(value) {
| Err(e) => return Err!("EventId: {event_id:?}\nCould not parse event: {e}"),
| Ok(pdu) => write!(self, "EventId: {event_id:?}\n{pdu:#?}"),
} }
.await .await
} }
@@ -125,19 +129,17 @@ pub(super) async fn get_pdu(&self, event_id: OwnedEventId) -> Result {
.await; .await;
} }
match pdu_json { let json = pdu_json.map_err(|_| err!("PDU not found locally."))?;
| Err(_) => return Err!("PDU not found locally."),
| Ok(json) => { let text = serde_json::to_string_pretty(&json)?;
let text = serde_json::to_string_pretty(&json)?; let msg = if outlier {
let msg = if outlier { "Outlier (Rejected / Soft Failed) PDU found in our database"
"Outlier (Rejected / Soft Failed) PDU found in our database" } else {
} else { "PDU found in our database"
"PDU found in our database" };
};
write!(self, "{msg}\n```json\n{text}\n```",) self.write_str(&format!("{msg}\n```json\n{text}\n```"))
}, .await
}
.await
} }
#[admin_command] #[admin_command]
@@ -150,14 +152,12 @@ pub(super) async fn get_short_pdu(&self, shortroomid: ShortRoomId, count: i64) -
.get_pdu_json_from_id(&pdu_id) .get_pdu_json_from_id(&pdu_id)
.await; .await;
match pdu_json { let json = pdu_json.map_err(|_| err!("PDU not found locally."))?;
| Err(_) => return Err!("PDU not found locally."),
| Ok(json) => { let json_text = serde_json::to_string_pretty(&json)?;
let json_text = serde_json::to_string_pretty(&json)?;
write!(self, "```json\n{json_text}\n```") self.write_str(&format!("```json\n{json_text}\n```"))
}, .await
}
.await
} }
#[admin_command] #[admin_command]
@@ -192,29 +192,19 @@ pub(super) async fn get_remote_pdu_list(&self, server: OwnedServerName, force: b
let mut success_count: usize = 0; let mut success_count: usize = 0;
for event_id in list { for event_id in list {
if force { let result = self
match self .get_remote_pdu(event_id.to_owned(), server.clone())
.get_remote_pdu(event_id.to_owned(), server.clone()) .await;
.await
{
| Err(e) => {
failed_count = failed_count.saturating_add(1);
self.services
.admin
.send_text(&format!("Failed to get remote PDU, ignoring error: {e}"))
.await;
warn!("Failed to get remote PDU, ignoring error: {e}"); if !force {
}, result?;
| _ => { } else if let Err(e) = result {
success_count = success_count.saturating_add(1); warn!("Failed to get remote PDU, ignoring error: {e}");
}, failed_count = failed_count.saturating_add(1);
} continue;
} else {
self.get_remote_pdu(event_id.to_owned(), server.clone())
.await?;
success_count = success_count.saturating_add(1);
} }
success_count = success_count.saturating_add(1);
} }
let out = let out =
@@ -240,62 +230,48 @@ pub(super) async fn get_remote_pdu(
); );
} }
match self let response = self
.services .services
.federation .federation
.execute(&server, ruma::api::federation::event::get_event::v1::Request { .execute(&server, ruma::api::federation::event::get_event::v1::Request {
event_id: event_id.clone(), event_id: event_id.clone(),
}) })
.await .await
{ .map_err(|e| {
| Err(e) => { err!("Remote server did not have PDU or failed sending request to remote server: {e}")
return Err!( })?;
"Remote server did not have PDU or failed sending request to remote server: {e}"
);
},
| Ok(response) => {
let json: CanonicalJsonObject =
serde_json::from_str(response.pdu.get()).map_err(|e| {
warn!(
"Requested event ID {event_id} from server but failed to convert from \
RawValue to CanonicalJsonObject (malformed event/response?): {e}"
);
err!(Request(Unknown(
"Received response from server but failed to parse PDU"
)))
})?;
trace!("Attempting to parse PDU: {:?}", &response.pdu); let json: CanonicalJsonObject = serde_json::from_str(response.pdu.get()).map_err(|e| {
let (room_id, ..) = { warn!(
let parsed_result = self "Requested event ID {event_id} from server but failed to convert from RawValue to \
.services CanonicalJsonObject (malformed event/response?): {e}"
.event_handler );
.parse_incoming_pdu(&response.pdu) err!(Request(Unknown("Received response from server but failed to parse PDU")))
.boxed() })?;
.await;
match parsed_result { trace!("Attempting to parse PDU: {:?}", &response.pdu);
| Ok(t) => t, let (room_id, ..) = self
| Err(e) => { .services
warn!("Failed to parse PDU: {e}"); .event_handler
info!("Full PDU: {:?}", &response.pdu); .parse_incoming_pdu(&response.pdu)
return Err!("Failed to parse PDU remote server {server} sent us: {e}"); .boxed()
}, .await
} .map_err(|e| {
}; warn!("Failed to parse PDU: {e}");
info!("Full PDU: {:?}", &response.pdu);
err!("Failed to parse PDU remote server {server} sent us: {e}")
})?;
info!("Attempting to handle event ID {event_id} as backfilled PDU"); info!("Attempting to handle event ID {event_id} as backfilled PDU");
self.services self.services
.timeline .timeline
.backfill_pdu(&room_id, &server, response.pdu) .backfill_pdu(&room_id, &server, response.pdu)
.await?; .await?;
let text = serde_json::to_string_pretty(&json)?; let text = serde_json::to_string_pretty(&json)?;
let msg = "Got PDU from specified server and handled as backfilled"; let msg = "Got PDU from specified server and handled as backfilled";
write!(self, "{msg}. Event body:\n```json\n{text}\n```") self.write_str(&format!("{msg}. Event body:\n```json\n{text}\n```"))
}, .await
}
.await
} }
#[admin_command] #[admin_command]
@@ -320,8 +296,8 @@ pub(super) async fn get_room_state(&self, room: OwnedRoomOrAliasId) -> Result {
)) ))
})?; })?;
let out = format!("```json\n{json}\n```"); self.write_str(&format!("```json\n{json}\n```"))
self.write_str(&out).await .await
} }
#[admin_command] #[admin_command]
@@ -332,29 +308,22 @@ pub(super) async fn ping(&self, server: OwnedServerName) -> Result {
let timer = tokio::time::Instant::now(); let timer = tokio::time::Instant::now();
match self let response = self
.services .services
.federation .federation
.execute(&server, ruma::api::federation::discovery::get_server_version::v1::Request {}) .execute(&server, get_server_version::v1::Request {})
.await .await
{ .map_err(|e| err!("Failed sending federation request to specified server:\n\n{e}"))?;
| Err(e) => {
return Err!("Failed sending federation request to specified server:\n\n{e}");
},
| Ok(response) => {
let ping_time = timer.elapsed();
let json_text_res = serde_json::to_string_pretty(&response.server);
let out = if let Ok(json) = json_text_res { let ping_time = timer.elapsed();
format!("Got response which took {ping_time:?} time:\n```json\n{json}\n```")
} else {
format!("Got non-JSON response which took {ping_time:?} time:\n{response:?}")
};
write!(self, "{out}") let out = if let Ok(json) = serde_json::to_string_pretty(&response.server) {
}, format!("Got response which took {ping_time:?} time:\n```json\n{json}\n```")
} } else {
.await format!("Got non-JSON response which took {ping_time:?} time:\n{response:?}")
};
self.write_str(&out).await
} }
#[admin_command] #[admin_command]
@@ -377,55 +346,25 @@ pub(super) async fn force_device_list_updates(&self) -> Result {
pub(super) async fn change_log_level(&self, filter: Option<String>, reset: bool) -> Result { pub(super) async fn change_log_level(&self, filter: Option<String>, reset: bool) -> Result {
let handles = &["console"]; let handles = &["console"];
if reset { let filter = reset
let old_filter_layer = match EnvFilter::try_new(&self.services.server.config.log) { .then_some(&self.services.config.log)
| Ok(s) => s, .or(filter.as_ref())
| Err(e) => return Err!("Log level from config appears to be invalid now: {e}"), .ok_or_else(|| err!("No log level was specified."))?;
};
match self let filter_layer = EnvFilter::try_new(filter).map_err(|e| {
.services let source = if !reset { "specified" } else { "found in config" };
.server err!("Invalid log level filter {source}: {e}")
.log })?;
.reload
.reload(&old_filter_layer, Some(handles))
{
| Err(e) => {
return Err!("Failed to modify and reload the global tracing log level: {e}");
},
| Ok(()) => {
let value = &self.services.server.config.log;
let out = format!("Successfully changed log level back to config value {value}");
return self.write_str(&out).await;
},
}
}
if let Some(filter) = filter { self.services
let new_filter_layer = match EnvFilter::try_new(filter) { .server
| Ok(s) => s, .log
| Err(e) => return Err!("Invalid log level filter specified: {e}"), .reload
}; .reload(&filter_layer, Some(handles))
.map_err(|e| err!("Failed to modify and reload the global tracing log level: {e}"))?;
match self self.write_str(&format!("Successfully changed log level to {filter}"))
.services .await
.server
.log
.reload
.reload(&new_filter_layer, Some(handles))
{
| Ok(()) => {
return self
.write_str("Successfully changed log level")
.await;
},
| Err(e) => {
return Err!("Failed to modify and reload the global tracing log level: {e}");
},
}
}
Err!("No log level was specified.")
} }
#[admin_command] #[admin_command]
@@ -438,15 +377,12 @@ pub(super) async fn sign_json(&self) -> Result {
} }
let string = self.body[1..self.body.len().checked_sub(1).unwrap()].join("\n"); let string = self.body[1..self.body.len().checked_sub(1).unwrap()].join("\n");
match serde_json::from_str(&string) { let mut value = serde_json::from_str(&string).map_err(|e| err!("Invalid json: {e}"))?;
| Err(e) => return Err!("Invalid json: {e}"),
| Ok(mut value) => { self.services.server_keys.sign_json(&mut value)?;
self.services.server_keys.sign_json(&mut value)?;
let json_text = serde_json::to_string_pretty(&value)?; let json_text = serde_json::to_string_pretty(&value)?;
write!(self, "{json_text}") self.write_str(&json_text).await
},
}
.await
} }
#[admin_command] #[admin_command]
@@ -459,19 +395,17 @@ pub(super) async fn verify_json(&self) -> Result {
} }
let string = self.body[1..self.body.len().checked_sub(1).unwrap()].join("\n"); let string = self.body[1..self.body.len().checked_sub(1).unwrap()].join("\n");
match serde_json::from_str::<CanonicalJsonObject>(&string) {
| Err(e) => return Err!("Invalid json: {e}"), let value = serde_json::from_str::<CanonicalJsonObject>(&string)
| Ok(value) => match self .map_err(|e| err!("Invalid json: {e}"))?;
.services
.server_keys self.services
.verify_json(&value, None) .server_keys
.await .verify_json(&value, None)
{ .await
| Err(e) => return Err!("Signature verification failed: {e}"), .map_err(|e| err!("Signature verification failed: {e}"))?;
| Ok(()) => write!(self, "Signature correct"),
}, self.write_str("Signature correct").await
}
.await
} }
#[admin_command] #[admin_command]
@@ -489,11 +423,10 @@ pub(super) async fn verify_pdu(&self, event_id: OwnedEventId) -> Result {
.services .services
.server_keys .server_keys
.verify_event(&event, None) .verify_event(&event, None)
.await .await?
{ {
| Err(e) => return Err(e), | Verified::Signatures => "signatures OK, but content hash failed (redaction).",
| Ok(Verified::Signatures) => "signatures OK, but content hash failed (redaction).", | Verified::All => "signatures and hashes OK.",
| Ok(Verified::All) => "signatures and hashes OK.",
}; };
self.write_str(msg).await self.write_str(msg).await
@@ -552,6 +485,8 @@ pub(super) async fn force_set_room_state_from_server(
room_id: OwnedRoomId, room_id: OwnedRoomId,
server_name: OwnedServerName, server_name: OwnedServerName,
) -> Result { ) -> Result {
// TODO: diverged from join remote
if !self if !self
.services .services
.state_cache .state_cache