pull stuff out of globals
This commit is contained in:
@@ -11,34 +11,38 @@ mod state_at_incoming;
|
||||
mod upgrade_outlier_pdu;
|
||||
|
||||
use std::{
|
||||
collections::hash_map,
|
||||
collections::{HashMap, hash_map},
|
||||
fmt::Write,
|
||||
ops::Range,
|
||||
sync::Arc,
|
||||
sync::{Arc, RwLock},
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use ruma::{EventId, OwnedRoomId, RoomId};
|
||||
use ruma::{EventId, OwnedEventId, OwnedRoomId, RoomId};
|
||||
use tuwunel_core::{
|
||||
Err, Result, implement,
|
||||
matrix::{Event, PduEvent},
|
||||
utils::{MutexMap, continue_exponential_backoff},
|
||||
utils::{MutexMap, bytes::pretty, continue_exponential_backoff},
|
||||
};
|
||||
|
||||
type RoomMutexMap = MutexMap<OwnedRoomId, ()>;
|
||||
|
||||
type RateLimitState = (Instant, u32); // Time if last failed try, number of failed tries
|
||||
|
||||
pub struct Service {
|
||||
pub mutex_federation: RoomMutexMap,
|
||||
services: Arc<crate::services::OnceServices>,
|
||||
bad_event_ratelimiter: Arc<RwLock<HashMap<OwnedEventId, RateLimitState>>>,
|
||||
}
|
||||
|
||||
type RoomMutexMap = MutexMap<OwnedRoomId, ()>;
|
||||
|
||||
#[async_trait]
|
||||
impl crate::Service for Service {
|
||||
fn build(args: &crate::Args<'_>) -> Result<Arc<Self>> {
|
||||
Ok(Arc::new(Self {
|
||||
mutex_federation: RoomMutexMap::new(),
|
||||
services: args.services.clone(),
|
||||
bad_event_ratelimiter: Arc::new(RwLock::new(HashMap::new())),
|
||||
}))
|
||||
}
|
||||
|
||||
@@ -46,9 +50,28 @@ impl crate::Service for Service {
|
||||
let mutex_federation = self.mutex_federation.len();
|
||||
writeln!(out, "federation_mutex: {mutex_federation}")?;
|
||||
|
||||
let (ber_count, ber_bytes) = self.bad_event_ratelimiter.read()?.iter().fold(
|
||||
(0_usize, 0_usize),
|
||||
|(mut count, mut bytes), (event_id, _)| {
|
||||
bytes = bytes.saturating_add(event_id.capacity());
|
||||
bytes = bytes.saturating_add(size_of::<RateLimitState>());
|
||||
count = count.saturating_add(1);
|
||||
(count, bytes)
|
||||
},
|
||||
);
|
||||
|
||||
writeln!(out, "bad_event_ratelimiter: {ber_count} ({})", pretty(ber_bytes))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn clear_cache(&self) {
|
||||
self.bad_event_ratelimiter
|
||||
.write()
|
||||
.expect("locked for writing")
|
||||
.clear();
|
||||
}
|
||||
|
||||
fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
|
||||
}
|
||||
|
||||
@@ -57,8 +80,6 @@ fn back_off(&self, event_id: &EventId) {
|
||||
use hash_map::Entry::{Occupied, Vacant};
|
||||
|
||||
match self
|
||||
.services
|
||||
.globals
|
||||
.bad_event_ratelimiter
|
||||
.write()
|
||||
.expect("locked")
|
||||
@@ -76,8 +97,6 @@ fn back_off(&self, event_id: &EventId) {
|
||||
#[implement(Service)]
|
||||
fn is_backed_off(&self, event_id: &EventId, range: Range<Duration>) -> bool {
|
||||
let Some((time, tries)) = self
|
||||
.services
|
||||
.globals
|
||||
.bad_event_ratelimiter
|
||||
.read()
|
||||
.expect("locked")
|
||||
|
||||
Reference in New Issue
Block a user