diff --git a/src/api/server/send.rs b/src/api/server/send.rs index 8ba5a1d5..cf6849ce 100644 --- a/src/api/server/send.rs +++ b/src/api/server/send.rs @@ -142,6 +142,21 @@ async fn handle( started: Instant, pdus: impl Stream + Send, edus: impl Stream + Send, +) -> Result { + let results = handle_pdus(services, client, origin, started, pdus).await?; + + handle_edus(services, client, origin, edus).await?; + + Ok(results) +} + +#[tracing::instrument(name = "pdus", level = "debug", skip_all)] +async fn handle_pdus( + services: &Services, + client: &IpAddr, + origin: &ServerName, + started: Instant, + pdus: impl Stream + Send, ) -> Result { // group pdus by room let pdus = pdus @@ -160,7 +175,7 @@ async fn handle( .into_iter() .try_stream() .broad_and_then(async |(room_id, pdus): (_, Vec<_>)| { - handle_room(services, client, origin, &started, room_id, pdus.into_iter()) + handle_pdus_room(services, client, origin, &started, room_id, pdus.into_iter()) .map_ok(ResolvedMap::into_iter) .map_ok(IterStream::try_stream) .await @@ -169,17 +184,16 @@ async fn handle( .try_collect() .await?; - // evaluate edus after pdus, at least for now. - edus.enumerate() - .for_each_concurrent(automatic_width(), |(i, edu)| { - handle_edu(services, client, origin, i, edu) - }) - .await; - Ok(results) } -async fn handle_room( +#[tracing::instrument( + name = "room", + level = "debug", + skip_all, + fields(%room_id) +)] +async fn handle_pdus_room( services: &Services, _client: &IpAddr, origin: &ServerName, @@ -205,10 +219,7 @@ async fn handle_room( .await; debug!( - %event_id, - %room_id, - ri, - ti, + %event_id, ri, ti, pdu_elapsed = ?pdu_start_time.elapsed(), txn_elapsed = ?txn_start_time.elapsed(), "Finished PDU", @@ -221,6 +232,28 @@ async fn handle_room( .await } +#[tracing::instrument(name = "edus", level = "debug", skip_all)] +async fn handle_edus( + services: &Services, + client: &IpAddr, + origin: &ServerName, + edus: impl Stream + Send, +) -> Result { + edus.enumerate() + .for_each_concurrent(automatic_width(), |(i, edu)| { + handle_edu(services, client, origin, i, edu) + }) + .await; + + Ok(()) +} + +#[tracing::instrument( + name = "edu", + level = "debug", + skip_all, + fields(%i), +)] async fn handle_edu( services: &Services, client: &IpAddr, diff --git a/src/service/rooms/pdu_metadata/mod.rs b/src/service/rooms/pdu_metadata/mod.rs index 1548fb88..af24e49e 100644 --- a/src/service/rooms/pdu_metadata/mod.rs +++ b/src/service/rooms/pdu_metadata/mod.rs @@ -127,7 +127,7 @@ where } #[implement(Service)] -#[tracing::instrument(skip(self), level = "debug")] +#[tracing::instrument(skip(self), level = "debug", ret)] pub async fn is_event_referenced(&self, room_id: &RoomId, event_id: &EventId) -> bool { let key = (room_id, event_id); self.db.referencedevents.qry(&key).await.is_ok() @@ -140,7 +140,7 @@ pub fn mark_event_soft_failed(&self, event_id: &EventId) { } #[implement(Service)] -#[tracing::instrument(skip(self), level = "debug")] +#[tracing::instrument(skip(self), level = "debug", ret)] pub async fn is_event_soft_failed(&self, event_id: &EventId) -> bool { self.db .softfailedeventids