Renames for several event_handler service files, fn's and args.

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk
2025-08-02 06:30:48 +00:00
parent 4ac61fd87b
commit 4237f21903
8 changed files with 117 additions and 133 deletions

View File

@@ -5,20 +5,15 @@ use std::{
};
use ruma::{
CanonicalJsonObject, CanonicalJsonValue, EventId, OwnedEventId, RoomId, ServerName,
api::federation::event::get_event,
CanonicalJsonObject, CanonicalJsonValue, EventId, OwnedEventId, RoomId, RoomVersionId,
ServerName, api::federation::event::get_event,
};
use tuwunel_core::{
debug, debug_error, debug_warn, implement,
matrix::{
PduEvent,
event::{Event, gen_event_id_canonical_json},
},
matrix::{PduEvent, event::gen_event_id_canonical_json},
trace, warn,
};
use super::get_room_version_id;
/// Find the event and auth it. Once the event is validated (steps 1 - 8)
/// it is appended to the outliers Tree.
///
@@ -29,22 +24,21 @@ use super::get_room_version_id;
/// c. Ask origin server over federation
/// d. TODO: Ask other servers over federation?
#[implement(super::Service)]
pub(super) async fn fetch_and_handle_outliers<'a, Pdu, Events>(
pub(super) async fn fetch_auth<'a, Events>(
&self,
origin: &'a ServerName,
origin: &ServerName,
room_id: &RoomId,
events: Events,
create_event: &'a Pdu,
room_id: &'a RoomId,
room_version: &RoomVersionId,
) -> Vec<(PduEvent, Option<CanonicalJsonObject>)>
where
Pdu: Event,
Events: Iterator<Item = &'a EventId> + Clone + Send,
{
let mut events_with_auth_events = Vec::with_capacity(events.clone().count());
for event_id in events {
let outlier = self
.fetch_auth(room_id, event_id, origin, create_event)
.fetch_auth_chain(origin, room_id, event_id, room_version)
.await;
events_with_auth_events.push(outlier);
@@ -71,10 +65,10 @@ where
match Box::pin(self.handle_outlier_pdu(
origin,
create_event,
&next_id,
room_id,
&next_id,
value.clone(),
room_version,
true,
))
.await
@@ -95,16 +89,13 @@ where
}
#[implement(super::Service)]
async fn fetch_auth<'a, Pdu>(
async fn fetch_auth_chain(
&self,
_room_id: &'a RoomId,
event_id: &'a EventId,
origin: &'a ServerName,
create_event: &'a Pdu,
) -> (OwnedEventId, Option<PduEvent>, Vec<(OwnedEventId, CanonicalJsonObject)>)
where
Pdu: Event,
{
origin: &ServerName,
_room_id: &RoomId,
event_id: &EventId,
room_version: &RoomVersionId,
) -> (OwnedEventId, Option<PduEvent>, Vec<(OwnedEventId, CanonicalJsonObject)>) {
// a. Look in the main timeline (pduid_pdu tree)
// b. Look at outlier pdu tree
// (get_pdu_json checks both)
@@ -149,13 +140,8 @@ where
{
| Ok(res) => {
debug!("Got {next_id} over federation");
let Ok(room_version_id) = get_room_version_id(create_event) else {
self.back_off(&next_id);
continue;
};
let Ok((calculated_event_id, value)) =
gen_event_id_canonical_json(&res.pdu, &room_version_id)
gen_event_id_canonical_json(&res.pdu, room_version)
else {
self.back_off(&next_id);
continue;

View File

@@ -5,8 +5,8 @@ use std::{
use futures::FutureExt;
use ruma::{
CanonicalJsonObject, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, RoomId, ServerName,
int, uint,
CanonicalJsonObject, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, RoomId,
RoomVersionId, ServerName, int, uint,
};
use tuwunel_core::{
Result, debug_warn, err, implement,
@@ -23,16 +23,15 @@ use super::check_room_id;
fields(%origin),
)]
#[allow(clippy::type_complexity)]
pub(super) async fn fetch_prev<'a, Pdu, Events>(
pub(super) async fn fetch_prev<'a, Events>(
&self,
origin: &ServerName,
create_event: &Pdu,
room_id: &RoomId,
first_ts_in_room: MilliSecondsSinceUnixEpoch,
initial_set: Events,
room_version: &RoomVersionId,
first_ts_in_room: MilliSecondsSinceUnixEpoch,
) -> Result<(Vec<OwnedEventId>, HashMap<OwnedEventId, (PduEvent, CanonicalJsonObject)>)>
where
Pdu: Event,
Events: Iterator<Item = &'a EventId> + Clone + Send,
{
let num_ids = initial_set.clone().count();
@@ -47,12 +46,7 @@ where
self.services.server.check_running()?;
match self
.fetch_and_handle_outliers(
origin,
once(prev_event_id.as_ref()),
create_event,
room_id,
)
.fetch_auth(origin, room_id, once(prev_event_id.as_ref()), room_version)
.boxed()
.await
.pop()
@@ -62,7 +56,7 @@ where
let limit = self.services.server.config.max_fetch_prev_events;
if amount > limit {
debug_warn!("Max prev event limit reached! Limit: {limit}");
debug_warn!(?limit, "Max prev event limit reached!");
graph.insert(prev_event_id.clone(), HashSet::new());
continue;
}

View File

@@ -2,8 +2,8 @@ use std::collections::{HashMap, hash_map};
use futures::FutureExt;
use ruma::{
EventId, OwnedEventId, RoomId, ServerName, api::federation::event::get_room_state_ids,
events::StateEventType,
EventId, OwnedEventId, RoomId, RoomVersionId, ServerName,
api::federation::event::get_room_state_ids, events::StateEventType,
};
use tuwunel_core::{Err, Result, debug, debug_warn, err, implement, matrix::Event};
@@ -18,16 +18,14 @@ use crate::rooms::short::ShortStateKey;
skip_all,
fields(%origin),
)]
pub(super) async fn fetch_state<Pdu>(
pub(super) async fn fetch_state(
&self,
origin: &ServerName,
create_event: &Pdu,
room_id: &RoomId,
event_id: &EventId,
) -> Result<Option<HashMap<u64, OwnedEventId>>>
where
Pdu: Event,
{
room_version: &RoomVersionId,
create_event_id: &EventId,
) -> Result<Option<HashMap<u64, OwnedEventId>>> {
let res = self
.services
.sending
@@ -41,7 +39,7 @@ where
debug!("Fetching state events");
let state_ids = res.pdu_ids.iter().map(AsRef::as_ref);
let state_vec = self
.fetch_and_handle_outliers(origin, state_ids, create_event, room_id)
.fetch_auth(origin, room_id, state_ids, room_version)
.boxed()
.await;
@@ -79,7 +77,7 @@ where
if state
.get(&create_shortstatekey)
.map(AsRef::as_ref)
!= Some(create_event.event_id())
!= Some(create_event_id)
{
return Err!(Database("Incoming event refers to wrong create event."));
}

View File

@@ -8,6 +8,7 @@ use tuwunel_core::{
utils::stream::IterStream, warn,
};
use super::get_room_version_id;
use crate::rooms::timeline::RawPduId;
/// When receiving an event one needs to:
@@ -50,7 +51,7 @@ pub async fn handle_incoming_pdu<'a>(
origin: &'a ServerName,
room_id: &'a RoomId,
event_id: &'a EventId,
value: CanonicalJsonObject,
pdu: CanonicalJsonObject,
is_timeline_event: bool,
) -> Result<Option<RawPduId>> {
// 1. Skip the PDU if we already have it as a timeline event
@@ -72,7 +73,7 @@ pub async fn handle_incoming_pdu<'a>(
let origin_acl_check = self.acl_check(origin, room_id);
// 1.3.2 Check room ACL on sender's server name
let sender: &UserId = value
let sender: &UserId = pdu
.get("sender")
.try_into()
.map_err(|e| err!(Request(InvalidParam("PDU does not have a valid sender key: {e}"))))?;
@@ -106,8 +107,10 @@ pub async fn handle_incoming_pdu<'a>(
return Err!(Request(Forbidden("Federation of this room is disabled by this server.")));
}
let room_version = get_room_version_id(create_event)?;
let (incoming_pdu, val) = self
.handle_outlier_pdu(origin, create_event, event_id, room_id, value, false)
.handle_outlier_pdu(origin, room_id, event_id, pdu, &room_version, false)
.boxed()
.await?;
@@ -131,7 +134,7 @@ pub async fn handle_incoming_pdu<'a>(
// 9. Fetch any missing prev events doing all checks listed here starting at 1.
// These are timeline events
let (sorted_prev_events, mut eventid_info) = self
.fetch_prev(origin, create_event, room_id, first_ts_in_room, incoming_pdu.prev_events())
.fetch_prev(origin, room_id, incoming_pdu.prev_events(), &room_version, first_ts_in_room)
.await?;
debug!(
@@ -146,12 +149,13 @@ pub async fn handle_incoming_pdu<'a>(
.try_for_each(|prev_id| {
self.handle_prev_pdu(
origin,
event_id,
room_id,
event_id,
eventid_info.remove(prev_id),
create_event,
&room_version,
first_ts_in_room,
prev_id,
create_event.event_id(),
)
.inspect_err(move |e| {
warn!("Prev {prev_id} failed: {e}");
@@ -163,7 +167,14 @@ pub async fn handle_incoming_pdu<'a>(
.await?;
// Done with prev events, now handling the incoming event
self.upgrade_outlier_to_timeline_pdu(incoming_pdu, val, create_event, origin, room_id)
.boxed()
.await
self.upgrade_outlier_to_timeline_pdu(
origin,
room_id,
incoming_pdu,
val,
&room_version,
create_event.event_id(),
)
.boxed()
.await
}

View File

@@ -2,7 +2,8 @@ use std::collections::{HashMap, hash_map};
use futures::future::ready;
use ruma::{
CanonicalJsonObject, CanonicalJsonValue, EventId, RoomId, ServerName, events::StateEventType,
CanonicalJsonObject, CanonicalJsonValue, EventId, RoomId, RoomVersionId, ServerName,
events::StateEventType,
};
use tuwunel_core::{
Err, Result, debug, debug_info, err, implement,
@@ -10,41 +11,36 @@ use tuwunel_core::{
state_res, trace, warn,
};
use super::{check_room_id, get_room_version_id, to_room_version};
use super::{check_room_id, to_room_version};
#[implement(super::Service)]
#[allow(clippy::too_many_arguments)]
pub(super) async fn handle_outlier_pdu<'a, Pdu>(
pub(super) async fn handle_outlier_pdu(
&self,
origin: &'a ServerName,
create_event: &'a Pdu,
event_id: &'a EventId,
room_id: &'a RoomId,
mut value: CanonicalJsonObject,
origin: &ServerName,
room_id: &RoomId,
event_id: &EventId,
mut pdu_json: CanonicalJsonObject,
room_version: &RoomVersionId,
auth_events_known: bool,
) -> Result<(PduEvent, CanonicalJsonObject)>
where
Pdu: Event,
{
) -> Result<(PduEvent, CanonicalJsonObject)> {
// 1. Remove unsigned field
value.remove("unsigned");
// TODO: For RoomVersion6 we must check that Raw<..> is canonical do we anywhere?: https://matrix.org/docs/spec/rooms/v6#canonical-json
pdu_json.remove("unsigned");
// TODO: For RoomVersion6 we must check that Raw<..> is canonical do we
// anywhere?: https://matrix.org/docs/spec/rooms/v6#canonical-json
// 2. Check signatures, otherwise drop
// 3. check content hash, redact if doesn't match
let room_version_id = get_room_version_id(create_event)?;
let mut incoming_pdu = match self
let mut pdu_json = match self
.services
.server_keys
.verify_event(&value, Some(&room_version_id))
.verify_event(&pdu_json, Some(room_version))
.await
{
| Ok(ruma::signatures::Verified::All) => value,
| Ok(ruma::signatures::Verified::All) => pdu_json,
| Ok(ruma::signatures::Verified::Signatures) => {
// Redact
debug_info!("Calculated hash does not match (redaction): {event_id}");
let Ok(obj) = ruma::canonical_json::redact(value, &room_version_id, None) else {
let Ok(obj) = ruma::canonical_json::redact(pdu_json, room_version, None) else {
return Err!(Request(InvalidParam("Redaction failed")));
};
@@ -66,13 +62,13 @@ where
// Now that we have checked the signature and hashes we can add the eventID and
// convert to our PduEvent type
incoming_pdu
pdu_json
.insert("event_id".to_owned(), CanonicalJsonValue::String(event_id.as_str().to_owned()));
let pdu_event = serde_json::from_value::<PduEvent>(serde_json::to_value(&incoming_pdu)?)
let event = serde_json::from_value::<PduEvent>(serde_json::to_value(&pdu_json)?)
.map_err(|e| err!(Request(BadJson(debug_warn!("Event is not a valid PDU: {e}")))))?;
check_room_id(room_id, &pdu_event)?;
check_room_id(room_id, &event)?;
if !auth_events_known {
// 4. fetch any missing auth events doing all checks listed here starting at 1.
@@ -81,21 +77,15 @@ where
// the auth events are also rejected "due to auth events"
// NOTE: Step 5 is not applied anymore because it failed too often
debug!("Fetching auth events");
Box::pin(self.fetch_and_handle_outliers(
origin,
pdu_event.auth_events(),
create_event,
room_id,
))
.await;
Box::pin(self.fetch_auth(origin, room_id, event.auth_events(), room_version)).await;
}
// 6. Reject "due to auth events" if the event doesn't pass auth based on the
// auth events
debug!("Checking based on auth events");
// Build map of auth events
let mut auth_events = HashMap::with_capacity(pdu_event.auth_events().count());
for id in pdu_event.auth_events() {
let mut auth_events = HashMap::with_capacity(event.auth_events().count());
for id in event.auth_events() {
let Ok(auth_event) = self.services.timeline.get_pdu(id).await else {
warn!("Could not find auth event {id}");
continue;
@@ -135,8 +125,8 @@ where
};
let auth_check = state_res::event_auth::auth_check(
&to_room_version(&room_version_id),
&pdu_event,
&to_room_version(room_version),
&event,
None, // TODO: third party invite
state_fetch,
)
@@ -152,9 +142,9 @@ where
// 7. Persist the event as an outlier.
self.services
.timeline
.add_pdu_outlier(pdu_event.event_id(), &incoming_pdu);
.add_pdu_outlier(event.event_id(), &pdu_json);
trace!("Added pdu as outlier.");
Ok((pdu_event, incoming_pdu))
Ok((event, pdu_json))
}

View File

@@ -1,6 +1,8 @@
use std::{ops::Range, time::Duration};
use ruma::{CanonicalJsonObject, EventId, MilliSecondsSinceUnixEpoch, RoomId, ServerName};
use ruma::{
CanonicalJsonObject, EventId, MilliSecondsSinceUnixEpoch, RoomId, RoomVersionId, ServerName,
};
use tuwunel_core::{
Err, Result, debug,
debug::INFO_SPAN_LEVEL,
@@ -16,19 +18,17 @@ use tuwunel_core::{
skip_all,
fields(%prev_id),
)]
pub(super) async fn handle_prev_pdu<'a, Pdu>(
pub(super) async fn handle_prev_pdu(
&self,
origin: &'a ServerName,
event_id: &'a EventId,
room_id: &'a RoomId,
origin: &ServerName,
room_id: &RoomId,
event_id: &EventId,
eventid_info: Option<(PduEvent, CanonicalJsonObject)>,
create_event: &'a Pdu,
room_version: &RoomVersionId,
first_ts_in_room: MilliSecondsSinceUnixEpoch,
prev_id: &'a EventId,
) -> Result
where
Pdu: Event,
{
prev_id: &EventId,
create_event_id: &EventId,
) -> Result {
// Check for disabled again because it might have changed
if self.services.metadata.is_disabled(room_id).await {
return Err!(Request(Forbidden(debug_warn!(
@@ -54,8 +54,15 @@ where
return Ok(());
}
self.upgrade_outlier_to_timeline_pdu(pdu, json, create_event, origin, room_id)
.await?;
self.upgrade_outlier_to_timeline_pdu(
origin,
room_id,
pdu,
json,
room_version,
create_event_id,
)
.await?;
Ok(())
}

View File

@@ -1,5 +1,5 @@
mod acl_check;
mod fetch_and_handle_outliers;
mod fetch_auth;
mod fetch_prev;
mod fetch_state;
mod handle_incoming_pdu;

View File

@@ -1,7 +1,9 @@
use std::{borrow::Borrow, iter::once, sync::Arc, time::Instant};
use futures::{FutureExt, StreamExt, future::ready};
use ruma::{CanonicalJsonObject, RoomId, ServerName, events::StateEventType};
use ruma::{
CanonicalJsonObject, EventId, RoomId, RoomVersionId, ServerName, events::StateEventType,
};
use tuwunel_core::{
Err, Result, debug, debug_info, err, implement, is_equal_to,
matrix::{Event, EventTypeExt, PduEvent, StateKey, state_res},
@@ -10,7 +12,7 @@ use tuwunel_core::{
warn,
};
use super::{get_room_version_id, to_room_version};
use super::to_room_version;
use crate::rooms::{
state_compressor::{CompressedState, HashSetCompressStateEvent},
timeline::RawPduId,
@@ -18,17 +20,15 @@ use crate::rooms::{
#[implement(super::Service)]
#[tracing::instrument(name = "upgrade", level = "debug", skip_all, ret(Debug))]
pub(super) async fn upgrade_outlier_to_timeline_pdu<Pdu>(
pub(super) async fn upgrade_outlier_to_timeline_pdu(
&self,
incoming_pdu: PduEvent,
val: CanonicalJsonObject,
create_event: &Pdu,
origin: &ServerName,
room_id: &RoomId,
) -> Result<Option<RawPduId>>
where
Pdu: Event,
{
incoming_pdu: PduEvent,
val: CanonicalJsonObject,
room_version: &RoomVersionId,
create_event_id: &EventId,
) -> Result<Option<RawPduId>> {
// Skip the PDU if we already have it as a timeline event
if let Ok(pduid) = self
.services
@@ -50,7 +50,6 @@ where
debug!("Upgrading to timeline pdu");
let timer = Instant::now();
let room_version_id = get_room_version_id(create_event)?;
// 10. Fetch missing state and auth chain events by calling /state_ids at
// backwards extremities doing all the checks in this list starting at 1.
@@ -61,22 +60,21 @@ where
self.state_at_incoming_degree_one(&incoming_pdu)
.await?
} else {
self.state_at_incoming_resolved(&incoming_pdu, room_id, &room_version_id)
self.state_at_incoming_resolved(&incoming_pdu, room_id, room_version)
.boxed()
.await?
};
if state_at_incoming_event.is_none() {
state_at_incoming_event = self
.fetch_state(origin, create_event, room_id, incoming_pdu.event_id())
.fetch_state(origin, room_id, incoming_pdu.event_id(), room_version, create_event_id)
.boxed()
.await?;
}
let state_at_incoming_event =
state_at_incoming_event.expect("we always set this to some above");
let room_version = to_room_version(&room_version_id);
debug!("Performing auth check");
// 11. Check the auth of the event passes based on the state of the event
let state_fetch_state = &state_at_incoming_event;
@@ -97,7 +95,7 @@ where
};
let auth_check = state_res::event_auth::auth_check(
&room_version,
&to_room_version(room_version),
&incoming_pdu,
None, // TODO: third party invite
|ty, sk| state_fetch(ty.clone(), sk.into()),
@@ -128,7 +126,7 @@ where
};
let auth_check = state_res::event_auth::auth_check(
&room_version,
&to_room_version(room_version),
&incoming_pdu,
None, // third-party invite
state_fetch,
@@ -138,7 +136,7 @@ where
// Soft fail check before doing state res
debug!("Performing soft-fail check");
let soft_fail = match (auth_check, incoming_pdu.redacts_id(&room_version_id)) {
let soft_fail = match (auth_check, incoming_pdu.redacts_id(room_version)) {
| (false, _) => true,
| (true, None) => false,
| (true, Some(redact_id)) =>
@@ -216,7 +214,7 @@ where
}
let new_room_state = self
.resolve_state(room_id, &room_version_id, state_after)
.resolve_state(room_id, room_version, state_after)
.boxed()
.await?;