Remove bad_event_ratelimiter entries after expiration.
Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
@@ -4,7 +4,7 @@ use std::{
|
|||||||
time::Duration,
|
time::Duration,
|
||||||
};
|
};
|
||||||
|
|
||||||
use futures::{FutureExt, StreamExt};
|
use futures::{FutureExt, StreamExt, TryFutureExt};
|
||||||
use ruma::{
|
use ruma::{
|
||||||
CanonicalJsonObject, CanonicalJsonValue, EventId, OwnedEventId, RoomId, RoomVersionId,
|
CanonicalJsonObject, CanonicalJsonValue, EventId, OwnedEventId, RoomId, RoomVersionId,
|
||||||
ServerName, api::federation::event::get_event,
|
ServerName, api::federation::event::get_event,
|
||||||
@@ -89,6 +89,7 @@ where
|
|||||||
if next_id == id {
|
if next_id == id {
|
||||||
pdus.push((pdu, Some(json)));
|
pdus.push((pdu, Some(json)));
|
||||||
}
|
}
|
||||||
|
self.cancel_back_off(&next_id);
|
||||||
} else {
|
} else {
|
||||||
self.back_off(&next_id);
|
self.back_off(&next_id);
|
||||||
}
|
}
|
||||||
@@ -152,8 +153,11 @@ async fn fetch_auth_chain(
|
|||||||
.services
|
.services
|
||||||
.federation
|
.federation
|
||||||
.execute(origin, get_event::v1::Request { event_id: next_id.clone() })
|
.execute(origin, get_event::v1::Request { event_id: next_id.clone() })
|
||||||
|
.inspect_err(|e| debug_error!(?next_id, "Failed to fetch event: {e}"))
|
||||||
|
.inspect_ok(|_| {
|
||||||
|
self.cancel_back_off(&next_id);
|
||||||
|
})
|
||||||
.await
|
.await
|
||||||
.inspect_err(|e| debug_error!("Failed to fetch event {next_id}: {e}"))
|
|
||||||
else {
|
else {
|
||||||
debug_warn!("Backing off from {next_id}");
|
debug_warn!("Backing off from {next_id}");
|
||||||
self.back_off(&next_id);
|
self.back_off(&next_id);
|
||||||
@@ -168,6 +172,7 @@ async fn fetch_auth_chain(
|
|||||||
continue;
|
continue;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
self.cancel_back_off(&next_id);
|
||||||
if calculated_event_id != next_id {
|
if calculated_event_id != next_id {
|
||||||
warn!(
|
warn!(
|
||||||
"Server didn't return event id we requested: requested: {next_id}, we got \
|
"Server didn't return event id we requested: requested: {next_id}, we got \
|
||||||
|
|||||||
@@ -170,6 +170,9 @@ pub async fn handle_incoming_pdu<'a>(
|
|||||||
warn!("Prev {prev_id} failed: {e}");
|
warn!("Prev {prev_id} failed: {e}");
|
||||||
self.back_off(prev_id);
|
self.back_off(prev_id);
|
||||||
})
|
})
|
||||||
|
.inspect_ok(|()| {
|
||||||
|
self.cancel_back_off(prev_id);
|
||||||
|
})
|
||||||
.map(|_| self.services.server.check_running())
|
.map(|_| self.services.server.check_running())
|
||||||
})
|
})
|
||||||
.boxed()
|
.boxed()
|
||||||
|
|||||||
@@ -76,7 +76,16 @@ impl crate::Service for Service {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[implement(Service)]
|
#[implement(Service)]
|
||||||
fn back_off(&self, event_id: &EventId) {
|
fn cancel_back_off(&self, event_id: &EventId) -> bool {
|
||||||
|
self.bad_event_ratelimiter
|
||||||
|
.write()
|
||||||
|
.expect("locked")
|
||||||
|
.remove(event_id)
|
||||||
|
.is_some()
|
||||||
|
}
|
||||||
|
|
||||||
|
#[implement(Service)]
|
||||||
|
fn back_off(&self, event_id: &EventId) -> bool {
|
||||||
use hash_map::Entry::{Occupied, Vacant};
|
use hash_map::Entry::{Occupied, Vacant};
|
||||||
|
|
||||||
match self
|
match self
|
||||||
@@ -87,9 +96,11 @@ fn back_off(&self, event_id: &EventId) {
|
|||||||
{
|
{
|
||||||
| Vacant(e) => {
|
| Vacant(e) => {
|
||||||
e.insert((Instant::now(), 1));
|
e.insert((Instant::now(), 1));
|
||||||
|
true
|
||||||
},
|
},
|
||||||
| Occupied(mut e) => {
|
| Occupied(mut e) => {
|
||||||
*e.get_mut() = (Instant::now(), e.get().1.saturating_add(1));
|
*e.get_mut() = (Instant::now(), e.get().1.saturating_add(1));
|
||||||
|
false
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -106,7 +117,11 @@ fn is_backed_off(&self, event_id: &EventId, range: Range<Duration>) -> bool {
|
|||||||
return false;
|
return false;
|
||||||
};
|
};
|
||||||
|
|
||||||
continue_exponential_backoff(range.start, range.end, time.elapsed(), tries)
|
if !continue_exponential_backoff(range.start, range.end, time.elapsed(), tries) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
true
|
||||||
}
|
}
|
||||||
|
|
||||||
#[implement(Service)]
|
#[implement(Service)]
|
||||||
|
|||||||
Reference in New Issue
Block a user