Return whether event already existed from event_handler.

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk
2025-11-18 16:38:17 +00:00
parent 0746f4b1ad
commit dafbe59d00
7 changed files with 13 additions and 9 deletions

View File

@@ -208,6 +208,7 @@ async fn create_join_event(
.handle_incoming_pdu(&origin, room_id, &event_id, value.clone(), true) .handle_incoming_pdu(&origin, room_id, &event_id, value.clone(), true)
.boxed() .boxed()
.await? .await?
.map(at!(0))
.ok_or_else(|| err!(Request(InvalidParam("Could not accept as timeline event."))))?; .ok_or_else(|| err!(Request(InvalidParam("Could not accept as timeline event."))))?;
drop(mutex_lock); drop(mutex_lock);

View File

@@ -11,7 +11,7 @@ use ruma::{
serde::JsonObject, serde::JsonObject,
}; };
use tuwunel_core::{ use tuwunel_core::{
Err, Result, err, Err, Result, at, err,
matrix::{event::gen_event_id_canonical_json, pdu::PduEvent}, matrix::{event::gen_event_id_canonical_json, pdu::PduEvent},
warn, warn,
}; };
@@ -170,6 +170,7 @@ pub(crate) async fn create_knock_event_v1_route(
.handle_incoming_pdu(&origin, &body.room_id, &event_id, value.clone(), true) .handle_incoming_pdu(&origin, &body.room_id, &event_id, value.clone(), true)
.boxed() .boxed()
.await? .await?
.map(at!(0))
.ok_or_else(|| err!(Request(InvalidParam("Could not accept as timeline event."))))?; .ok_or_else(|| err!(Request(InvalidParam("Could not accept as timeline event."))))?;
drop(mutex_lock); drop(mutex_lock);

View File

@@ -11,7 +11,7 @@ use ruma::{
}, },
}; };
use serde_json::value::RawValue as RawJsonValue; use serde_json::value::RawValue as RawJsonValue;
use tuwunel_core::{Err, Result, err, matrix::event::gen_event_id_canonical_json}; use tuwunel_core::{Err, Result, at, err, matrix::event::gen_event_id_canonical_json};
use tuwunel_service::Services; use tuwunel_service::Services;
use crate::Ruma; use crate::Ruma;
@@ -151,6 +151,7 @@ async fn create_leave_event(
.handle_incoming_pdu(origin, room_id, &event_id, value, true) .handle_incoming_pdu(origin, room_id, &event_id, value, true)
.boxed() .boxed()
.await? .await?
.map(at!(0))
.ok_or_else(|| err!(Request(InvalidParam("Could not accept as timeline event."))))?; .ok_or_else(|| err!(Request(InvalidParam("Could not accept as timeline event."))))?;
drop(mutex_lock); drop(mutex_lock);

View File

@@ -5,7 +5,7 @@ use ruma::{
events::room::member::{MembershipState, RoomMemberEventContent}, events::room::member::{MembershipState, RoomMemberEventContent},
}; };
use tuwunel_core::{ use tuwunel_core::{
Err, Result, err, implement, matrix::event::gen_event_id_canonical_json, pdu::PduBuilder, Err, Result, at, err, implement, matrix::event::gen_event_id_canonical_json, pdu::PduBuilder,
}; };
use super::Service; use super::Service;
@@ -135,6 +135,7 @@ async fn remote_invite(
.event_handler .event_handler
.handle_incoming_pdu(&origin, room_id, &event_id, value, true) .handle_incoming_pdu(&origin, room_id, &event_id, value, true)
.await? .await?
.map(at!(0))
.ok_or_else(|| { .ok_or_else(|| {
err!(Request(InvalidParam("Could not accept incoming PDU as timeline event."))) err!(Request(InvalidParam("Could not accept incoming PDU as timeline event.")))
})?; })?;

View File

@@ -57,11 +57,11 @@ pub async fn handle_incoming_pdu<'a>(
event_id: &'a EventId, event_id: &'a EventId,
pdu: CanonicalJsonObject, pdu: CanonicalJsonObject,
is_timeline_event: bool, is_timeline_event: bool,
) -> Result<Option<RawPduId>> { ) -> Result<Option<(RawPduId, bool)>> {
// 1. Skip the PDU if we already have it as a timeline event // 1. Skip the PDU if we already have it as a timeline event
if let Ok(pdu_id) = self.services.timeline.get_pdu_id(event_id).await { if let Ok(pdu_id) = self.services.timeline.get_pdu_id(event_id).await {
trace!(?event_id, "exists"); trace!(?event_id, "exists");
return Ok(Some(pdu_id)); return Ok(Some((pdu_id, false)));
} }
// 1.1 Check the server is in the room // 1.1 Check the server is in the room

View File

@@ -28,7 +28,7 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu(
val: CanonicalJsonObject, val: CanonicalJsonObject,
room_version: &RoomVersionId, room_version: &RoomVersionId,
create_event_id: &EventId, create_event_id: &EventId,
) -> Result<Option<RawPduId>> { ) -> Result<Option<(RawPduId, bool)>> {
// Skip the PDU if we already have it as a timeline event // Skip the PDU if we already have it as a timeline event
if let Ok(pduid) = self if let Ok(pduid) = self
.services .services
@@ -36,7 +36,7 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu(
.get_pdu_id(incoming_pdu.event_id()) .get_pdu_id(incoming_pdu.event_id())
.await .await
{ {
return Ok(Some(pduid)); return Ok(Some((pduid, false)));
} }
if self if self
@@ -270,5 +270,5 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu(
"Accepted", "Accepted",
); );
Ok(pdu_id) Ok(pdu_id.zip(Some(true)))
} }

View File

@@ -33,7 +33,7 @@ use crate::rooms::{short::ShortRoomId, state_compressor::CompressedState};
skip_all, skip_all,
ret(Debug) ret(Debug)
)] )]
pub async fn append_incoming_pdu<'a, Leafs>( pub(crate) async fn append_incoming_pdu<'a, Leafs>(
&'a self, &'a self,
pdu: &'a PduEvent, pdu: &'a PduEvent,
pdu_json: CanonicalJsonObject, pdu_json: CanonicalJsonObject,