Remove federation_handletime for now.
Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
@@ -1,5 +1,3 @@
|
|||||||
use std::fmt::Write;
|
|
||||||
|
|
||||||
use futures::StreamExt;
|
use futures::StreamExt;
|
||||||
use ruma::{OwnedRoomId, OwnedServerName, OwnedUserId};
|
use ruma::{OwnedRoomId, OwnedServerName, OwnedUserId};
|
||||||
use tuwunel_core::{Err, Result};
|
use tuwunel_core::{Err, Result};
|
||||||
@@ -26,25 +24,7 @@ pub(super) async fn enable_room(&self, room_id: OwnedRoomId) -> Result {
|
|||||||
|
|
||||||
#[admin_command]
|
#[admin_command]
|
||||||
pub(super) async fn incoming_federation(&self) -> Result {
|
pub(super) async fn incoming_federation(&self) -> Result {
|
||||||
let msg = {
|
Err!("This command is temporarily disabled")
|
||||||
let map = self
|
|
||||||
.services
|
|
||||||
.rooms
|
|
||||||
.event_handler
|
|
||||||
.federation_handletime
|
|
||||||
.read()
|
|
||||||
.expect("locked");
|
|
||||||
|
|
||||||
let mut msg = format!("Handling {} incoming pdus:\n", map.len());
|
|
||||||
for (r, (e, i)) in map.iter() {
|
|
||||||
let elapsed = i.elapsed();
|
|
||||||
writeln!(msg, "{} {}: {}m{}s", r, e, elapsed.as_secs() / 60, elapsed.as_secs() % 60)?;
|
|
||||||
}
|
|
||||||
|
|
||||||
msg
|
|
||||||
};
|
|
||||||
|
|
||||||
self.write_str(&msg).await
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[admin_command]
|
#[admin_command]
|
||||||
|
|||||||
@@ -1,12 +1,10 @@
|
|||||||
use std::time::Instant;
|
|
||||||
|
|
||||||
use futures::{
|
use futures::{
|
||||||
FutureExt, TryFutureExt, TryStreamExt,
|
FutureExt, TryFutureExt, TryStreamExt,
|
||||||
future::{OptionFuture, try_join5},
|
future::{OptionFuture, try_join5},
|
||||||
};
|
};
|
||||||
use ruma::{CanonicalJsonObject, EventId, RoomId, ServerName, UserId, events::StateEventType};
|
use ruma::{CanonicalJsonObject, EventId, RoomId, ServerName, UserId, events::StateEventType};
|
||||||
use tuwunel_core::{
|
use tuwunel_core::{
|
||||||
Err, Result, debug, debug::INFO_SPAN_LEVEL, defer, err, implement, matrix::Event,
|
Err, Result, debug, debug::INFO_SPAN_LEVEL, err, implement, matrix::Event,
|
||||||
utils::stream::IterStream, warn,
|
utils::stream::IterStream, warn,
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -165,19 +163,6 @@ pub async fn handle_incoming_pdu<'a>(
|
|||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
// Done with prev events, now handling the incoming event
|
// Done with prev events, now handling the incoming event
|
||||||
let start_time = Instant::now();
|
|
||||||
self.federation_handletime
|
|
||||||
.write()
|
|
||||||
.expect("locked")
|
|
||||||
.insert(room_id.into(), (event_id.to_owned(), start_time));
|
|
||||||
|
|
||||||
defer! {{
|
|
||||||
self.federation_handletime
|
|
||||||
.write()
|
|
||||||
.expect("locked")
|
|
||||||
.remove(room_id);
|
|
||||||
}};
|
|
||||||
|
|
||||||
self.upgrade_outlier_to_timeline_pdu(incoming_pdu, val, create_event, origin, room_id)
|
self.upgrade_outlier_to_timeline_pdu(incoming_pdu, val, create_event, origin, room_id)
|
||||||
.boxed()
|
.boxed()
|
||||||
.await
|
.await
|
||||||
|
|||||||
@@ -1,13 +1,10 @@
|
|||||||
use std::{
|
use std::{ops::Range, time::Duration};
|
||||||
ops::Range,
|
|
||||||
time::{Duration, Instant},
|
|
||||||
};
|
|
||||||
|
|
||||||
use ruma::{CanonicalJsonObject, EventId, MilliSecondsSinceUnixEpoch, RoomId, ServerName};
|
use ruma::{CanonicalJsonObject, EventId, MilliSecondsSinceUnixEpoch, RoomId, ServerName};
|
||||||
use tuwunel_core::{
|
use tuwunel_core::{
|
||||||
Err, Result, debug,
|
Err, Result, debug,
|
||||||
debug::INFO_SPAN_LEVEL,
|
debug::INFO_SPAN_LEVEL,
|
||||||
defer, implement,
|
implement,
|
||||||
matrix::{Event, PduEvent},
|
matrix::{Event, PduEvent},
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -57,26 +54,8 @@ where
|
|||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
|
||||||
let start_time = Instant::now();
|
|
||||||
self.federation_handletime
|
|
||||||
.write()
|
|
||||||
.expect("locked")
|
|
||||||
.insert(room_id.into(), (prev_id.to_owned(), start_time));
|
|
||||||
|
|
||||||
defer! {{
|
|
||||||
self.federation_handletime
|
|
||||||
.write()
|
|
||||||
.expect("locked")
|
|
||||||
.remove(room_id);
|
|
||||||
}};
|
|
||||||
|
|
||||||
self.upgrade_outlier_to_timeline_pdu(pdu, json, create_event, origin, room_id)
|
self.upgrade_outlier_to_timeline_pdu(pdu, json, create_event, origin, room_id)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
debug!(
|
|
||||||
elapsed = ?start_time.elapsed(),
|
|
||||||
"Handled prev_event",
|
|
||||||
);
|
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -11,10 +11,10 @@ mod state_at_incoming;
|
|||||||
mod upgrade_outlier_pdu;
|
mod upgrade_outlier_pdu;
|
||||||
|
|
||||||
use std::{
|
use std::{
|
||||||
collections::{HashMap, hash_map},
|
collections::hash_map,
|
||||||
fmt::Write,
|
fmt::Write,
|
||||||
ops::Range,
|
ops::Range,
|
||||||
sync::{Arc, RwLock as StdRwLock},
|
sync::Arc,
|
||||||
time::{Duration, Instant},
|
time::{Duration, Instant},
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -33,7 +33,6 @@ use crate::{Dep, globals, rooms, sending, server_keys};
|
|||||||
|
|
||||||
pub struct Service {
|
pub struct Service {
|
||||||
pub mutex_federation: RoomMutexMap,
|
pub mutex_federation: RoomMutexMap,
|
||||||
pub federation_handletime: StdRwLock<HandleTimeMap>,
|
|
||||||
services: Services,
|
services: Services,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -53,14 +52,12 @@ struct Services {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type RoomMutexMap = MutexMap<OwnedRoomId, ()>;
|
type RoomMutexMap = MutexMap<OwnedRoomId, ()>;
|
||||||
type HandleTimeMap = HashMap<OwnedRoomId, (OwnedEventId, Instant)>;
|
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl crate::Service for Service {
|
impl crate::Service for Service {
|
||||||
fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
|
fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
|
||||||
Ok(Arc::new(Self {
|
Ok(Arc::new(Self {
|
||||||
mutex_federation: RoomMutexMap::new(),
|
mutex_federation: RoomMutexMap::new(),
|
||||||
federation_handletime: HandleTimeMap::new().into(),
|
|
||||||
services: Services {
|
services: Services {
|
||||||
globals: args.depend::<globals::Service>("globals"),
|
globals: args.depend::<globals::Service>("globals"),
|
||||||
sending: args.depend::<sending::Service>("sending"),
|
sending: args.depend::<sending::Service>("sending"),
|
||||||
@@ -84,14 +81,6 @@ impl crate::Service for Service {
|
|||||||
let mutex_federation = self.mutex_federation.len();
|
let mutex_federation = self.mutex_federation.len();
|
||||||
writeln!(out, "federation_mutex: {mutex_federation}")?;
|
writeln!(out, "federation_mutex: {mutex_federation}")?;
|
||||||
|
|
||||||
let federation_handletime = self
|
|
||||||
.federation_handletime
|
|
||||||
.read()
|
|
||||||
.expect("locked for reading")
|
|
||||||
.len();
|
|
||||||
|
|
||||||
writeln!(out, "federation_handletime: {federation_handletime}")?;
|
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user