State-reset and security mitigations.
Upgrade Ruma to present. The following are intentionally benign for activation in a later commit: - Hydra backports not default. - Room version 12 not default. - Room version 12 not listed as stable. Do not enable them manually or you can brick your database. Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
@@ -8,9 +8,11 @@ use std::{
|
||||
};
|
||||
|
||||
use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt, pin_mut};
|
||||
use ruma::{EventId, OwnedEventId, RoomId};
|
||||
use ruma::{EventId, OwnedEventId, RoomId, room_version_rules::RoomVersionRules};
|
||||
use tuwunel_core::{
|
||||
Err, Result, at, debug, debug_error, implement, trace,
|
||||
Err, Result, at, debug, debug_error, implement,
|
||||
matrix::Event,
|
||||
trace,
|
||||
utils::{
|
||||
IterStream,
|
||||
stream::{ReadyExt, TryBroadbandExt},
|
||||
@@ -28,6 +30,7 @@ pub struct Service {
|
||||
|
||||
struct Services {
|
||||
short: Dep<rooms::short::Service>,
|
||||
state: Dep<rooms::state::Service>,
|
||||
timeline: Dep<rooms::timeline::Service>,
|
||||
}
|
||||
|
||||
@@ -38,6 +41,7 @@ impl crate::Service for Service {
|
||||
Ok(Arc::new(Self {
|
||||
services: Services {
|
||||
short: args.depend::<rooms::short::Service>("rooms::short"),
|
||||
state: args.depend::<rooms::state::Service>("rooms::state"),
|
||||
timeline: args.depend::<rooms::timeline::Service>("rooms::timeline"),
|
||||
},
|
||||
db: Data::new(&args),
|
||||
@@ -80,6 +84,12 @@ where
|
||||
const BUCKET: Bucket<'_> = BTreeSet::new();
|
||||
|
||||
let started = Instant::now();
|
||||
let room_rules = self
|
||||
.services
|
||||
.state
|
||||
.get_room_version_rules(room_id)
|
||||
.await?;
|
||||
|
||||
let starting_ids = self
|
||||
.services
|
||||
.short
|
||||
@@ -103,7 +113,7 @@ where
|
||||
let full_auth_chain: Vec<ShortEventId> = buckets
|
||||
.into_iter()
|
||||
.try_stream()
|
||||
.broad_and_then(|chunk| self.get_auth_chain_outer(room_id, started, chunk))
|
||||
.broad_and_then(|chunk| self.get_auth_chain_outer(room_id, started, chunk, &room_rules))
|
||||
.try_collect()
|
||||
.map_ok(|auth_chain: Vec<_>| auth_chain.into_iter().flatten().collect())
|
||||
.map_ok(|mut full_auth_chain: Vec<_>| {
|
||||
@@ -129,6 +139,7 @@ async fn get_auth_chain_outer(
|
||||
room_id: &RoomId,
|
||||
started: Instant,
|
||||
chunk: Bucket<'_>,
|
||||
room_rules: &RoomVersionRules,
|
||||
) -> Result<Vec<ShortEventId>> {
|
||||
let chunk_key: Vec<ShortEventId> = chunk.iter().map(at!(0)).collect();
|
||||
|
||||
@@ -155,7 +166,7 @@ async fn get_auth_chain_outer(
|
||||
}
|
||||
|
||||
let auth_chain = self
|
||||
.get_auth_chain_inner(room_id, event_id)
|
||||
.get_auth_chain_inner(room_id, event_id, room_rules)
|
||||
.await?;
|
||||
|
||||
self.cache_auth_chain_vec(vec![shortid], auth_chain.as_slice());
|
||||
@@ -187,14 +198,33 @@ async fn get_auth_chain_outer(
|
||||
}
|
||||
|
||||
#[implement(Service)]
|
||||
#[tracing::instrument(name = "inner", level = "trace", skip(self, room_id))]
|
||||
#[tracing::instrument(
|
||||
name = "inner",
|
||||
level = "trace",
|
||||
skip(self, room_id, room_rules)
|
||||
)]
|
||||
async fn get_auth_chain_inner(
|
||||
&self,
|
||||
room_id: &RoomId,
|
||||
event_id: &EventId,
|
||||
room_rules: &RoomVersionRules,
|
||||
) -> Result<Vec<ShortEventId>> {
|
||||
let mut todo: VecDeque<_> = [event_id.to_owned()].into();
|
||||
let mut found = HashSet::new();
|
||||
let mut todo: VecDeque<_> = [event_id.to_owned()].into();
|
||||
|
||||
if room_rules
|
||||
.authorization
|
||||
.room_create_event_id_as_room_id
|
||||
{
|
||||
let create_id = room_id.as_event_id()?;
|
||||
let sauthevent = self
|
||||
.services
|
||||
.short
|
||||
.get_or_create_shorteventid(&create_id)
|
||||
.await;
|
||||
|
||||
found.insert(sauthevent);
|
||||
}
|
||||
|
||||
while let Some(event_id) = todo.pop_front() {
|
||||
trace!(?event_id, "processing auth event");
|
||||
@@ -213,7 +243,7 @@ async fn get_auth_chain_inner(
|
||||
))));
|
||||
}
|
||||
|
||||
for auth_event in &pdu.auth_events {
|
||||
for auth_event in pdu.auth_events() {
|
||||
let sauthevent = self
|
||||
.services
|
||||
.short
|
||||
@@ -223,7 +253,7 @@ async fn get_auth_chain_inner(
|
||||
if found.insert(sauthevent) {
|
||||
trace!(?event_id, ?auth_event, "adding auth event to processing queue");
|
||||
|
||||
todo.push_back(auth_event.clone());
|
||||
todo.push_back(auth_event.to_owned());
|
||||
}
|
||||
}
|
||||
},
|
||||
|
||||
Reference in New Issue
Block a user