Skip already-accepted events from inclusion in recursive evals.
Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
@@ -8,6 +8,7 @@ use ruma::{
|
|||||||
use tuwunel_core::{
|
use tuwunel_core::{
|
||||||
Result, debug_warn, err, implement,
|
Result, debug_warn, err, implement,
|
||||||
matrix::{Event, PduEvent},
|
matrix::{Event, PduEvent},
|
||||||
|
utils::stream::IterStream,
|
||||||
};
|
};
|
||||||
|
|
||||||
use super::check_room_id;
|
use super::check_room_id;
|
||||||
@@ -32,14 +33,27 @@ where
|
|||||||
Events: Iterator<Item = &'a EventId> + Send,
|
Events: Iterator<Item = &'a EventId> + Send,
|
||||||
{
|
{
|
||||||
let mut todo_outlier_stack: FuturesOrdered<_> = initial_set
|
let mut todo_outlier_stack: FuturesOrdered<_> = initial_set
|
||||||
|
.stream()
|
||||||
.map(ToOwned::to_owned)
|
.map(ToOwned::to_owned)
|
||||||
|
.filter_map(async |event_id| {
|
||||||
|
self.services
|
||||||
|
.timeline
|
||||||
|
.non_outlier_pdu_exists(&event_id)
|
||||||
|
.await
|
||||||
|
.is_err()
|
||||||
|
.then_some(event_id)
|
||||||
|
})
|
||||||
.map(async |event_id| {
|
.map(async |event_id| {
|
||||||
let fetch = self.fetch_auth(origin, room_id, once(event_id.as_ref()), room_version);
|
let events = once(event_id.as_ref());
|
||||||
|
let auth = self
|
||||||
|
.fetch_auth(origin, room_id, events, room_version)
|
||||||
|
.await;
|
||||||
|
|
||||||
(event_id.clone(), fetch.await)
|
(event_id, auth)
|
||||||
})
|
})
|
||||||
.map(FutureExt::boxed)
|
.map(FutureExt::boxed)
|
||||||
.collect();
|
.collect()
|
||||||
|
.await;
|
||||||
|
|
||||||
let mut amount = 0;
|
let mut amount = 0;
|
||||||
let mut eventid_info = HashMap::new();
|
let mut eventid_info = HashMap::new();
|
||||||
@@ -78,21 +92,20 @@ where
|
|||||||
if pdu.origin_server_ts() > first_ts_in_room {
|
if pdu.origin_server_ts() > first_ts_in_room {
|
||||||
amount = amount.saturating_add(1);
|
amount = amount.saturating_add(1);
|
||||||
for prev_prev in pdu.prev_events() {
|
for prev_prev in pdu.prev_events() {
|
||||||
if !graph.contains_key(prev_prev) {
|
if graph.contains_key(prev_prev) {
|
||||||
let prev_prev = prev_prev.to_owned();
|
continue;
|
||||||
let fetch = async move {
|
|
||||||
let fetch = self.fetch_auth(
|
|
||||||
origin,
|
|
||||||
room_id,
|
|
||||||
once(prev_prev.as_ref()),
|
|
||||||
room_version,
|
|
||||||
);
|
|
||||||
|
|
||||||
(prev_prev.clone(), fetch.await)
|
|
||||||
};
|
|
||||||
|
|
||||||
todo_outlier_stack.push_back(fetch.boxed());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let prev_prev = prev_prev.to_owned();
|
||||||
|
let fetch = async move {
|
||||||
|
let fetch = self
|
||||||
|
.fetch_auth(origin, room_id, once(prev_prev.as_ref()), room_version)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
(prev_prev, fetch)
|
||||||
|
};
|
||||||
|
|
||||||
|
todo_outlier_stack.push_back(fetch.boxed());
|
||||||
}
|
}
|
||||||
|
|
||||||
graph.insert(
|
graph.insert(
|
||||||
|
|||||||
Reference in New Issue
Block a user