Add room_version argument and reorg convert_outgoing_federation_event.
Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
@@ -253,8 +253,8 @@ async fn knock_room_helper_local(
|
|||||||
room_id: room_id.to_owned(),
|
room_id: room_id.to_owned(),
|
||||||
event_id: event_id.clone(),
|
event_id: event_id.clone(),
|
||||||
pdu: services
|
pdu: services
|
||||||
.sending
|
.federation
|
||||||
.convert_to_outgoing_federation_event(knock_event.clone())
|
.format_pdu_into(knock_event.clone(), Some(&room_version_id))
|
||||||
.await,
|
.await,
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -387,8 +387,8 @@ async fn knock_room_helper_remote(
|
|||||||
room_id: room_id.to_owned(),
|
room_id: room_id.to_owned(),
|
||||||
event_id: event_id.clone(),
|
event_id: event_id.clone(),
|
||||||
pdu: services
|
pdu: services
|
||||||
.sending
|
.federation
|
||||||
.convert_to_outgoing_federation_event(knock_event.clone())
|
.format_pdu_into(knock_event.clone(), Some(&room_version_id))
|
||||||
.await,
|
.await,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|||||||
@@ -78,8 +78,8 @@ pub(crate) async fn get_backfill_route(
|
|||||||
})
|
})
|
||||||
.and_then(|pdu| {
|
.and_then(|pdu| {
|
||||||
services
|
services
|
||||||
.sending
|
.federation
|
||||||
.convert_to_outgoing_federation_event(pdu)
|
.format_pdu_into(pdu, None)
|
||||||
.map(Ok)
|
.map(Ok)
|
||||||
})
|
})
|
||||||
.try_collect()
|
.try_collect()
|
||||||
|
|||||||
@@ -41,8 +41,8 @@ pub(crate) async fn get_event_route(
|
|||||||
origin: services.globals.server_name().to_owned(),
|
origin: services.globals.server_name().to_owned(),
|
||||||
origin_server_ts: MilliSecondsSinceUnixEpoch::now(),
|
origin_server_ts: MilliSecondsSinceUnixEpoch::now(),
|
||||||
pdu: services
|
pdu: services
|
||||||
.sending
|
.federation
|
||||||
.convert_to_outgoing_federation_event(event)
|
.format_pdu_into(event, None)
|
||||||
.await,
|
.await,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -48,11 +48,7 @@ pub(crate) async fn get_event_authorization_route(
|
|||||||
.event_ids_iter(room_id, once(body.event_id.borrow()))
|
.event_ids_iter(room_id, once(body.event_id.borrow()))
|
||||||
.ready_filter_map(Result::ok)
|
.ready_filter_map(Result::ok)
|
||||||
.filter_map(async |id| services.timeline.get_pdu_json(&id).await.ok())
|
.filter_map(async |id| services.timeline.get_pdu_json(&id).await.ok())
|
||||||
.then(|pdu| {
|
.then(|pdu| services.federation.format_pdu_into(pdu, None))
|
||||||
services
|
|
||||||
.sending
|
|
||||||
.convert_to_outgoing_federation_event(pdu)
|
|
||||||
})
|
|
||||||
.collect()
|
.collect()
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
|
|||||||
@@ -77,8 +77,8 @@ pub(crate) async fn get_missing_events_route(
|
|||||||
let prev_events = pdu.prev_events.iter().map(ToOwned::to_owned);
|
let prev_events = pdu.prev_events.iter().map(ToOwned::to_owned);
|
||||||
|
|
||||||
let event = services
|
let event = services
|
||||||
.sending
|
.federation
|
||||||
.convert_to_outgoing_federation_event(event)
|
.format_pdu_into(event, None)
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
queued_events.extend(prev_events);
|
queued_events.extend(prev_events);
|
||||||
|
|||||||
@@ -178,8 +178,8 @@ pub(crate) async fn create_invite_route(
|
|||||||
|
|
||||||
Ok(create_invite::v2::Response {
|
Ok(create_invite::v2::Response {
|
||||||
event: services
|
event: services
|
||||||
.sending
|
.federation
|
||||||
.convert_to_outgoing_federation_event(signed_event)
|
.format_pdu_into(signed_event, Some(&body.room_version))
|
||||||
.await,
|
.await,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -218,8 +218,8 @@ async fn create_join_event(
|
|||||||
.broad_and_then(|event_id| services.timeline.get_pdu_json(event_id))
|
.broad_and_then(|event_id| services.timeline.get_pdu_json(event_id))
|
||||||
.broad_and_then(|pdu| {
|
.broad_and_then(|pdu| {
|
||||||
services
|
services
|
||||||
.sending
|
.federation
|
||||||
.convert_to_outgoing_federation_event(pdu)
|
.format_pdu_into(pdu, Some(&room_version_id))
|
||||||
.map(Ok)
|
.map(Ok)
|
||||||
})
|
})
|
||||||
.try_collect()
|
.try_collect()
|
||||||
@@ -233,8 +233,8 @@ async fn create_join_event(
|
|||||||
.broad_and_then(async |event_id| services.timeline.get_pdu_json(&event_id).await)
|
.broad_and_then(async |event_id| services.timeline.get_pdu_json(&event_id).await)
|
||||||
.broad_and_then(|pdu| {
|
.broad_and_then(|pdu| {
|
||||||
services
|
services
|
||||||
.sending
|
.federation
|
||||||
.convert_to_outgoing_federation_event(pdu)
|
.format_pdu_into(pdu, Some(&room_version_id))
|
||||||
.map(Ok)
|
.map(Ok)
|
||||||
})
|
})
|
||||||
.try_collect()
|
.try_collect()
|
||||||
|
|||||||
@@ -43,8 +43,8 @@ pub(crate) async fn get_room_state_route(
|
|||||||
.and_then(|id| services.timeline.get_pdu_json(id))
|
.and_then(|id| services.timeline.get_pdu_json(id))
|
||||||
.and_then(|pdu| {
|
.and_then(|pdu| {
|
||||||
services
|
services
|
||||||
.sending
|
.federation
|
||||||
.convert_to_outgoing_federation_event(pdu)
|
.format_pdu_into(pdu, None)
|
||||||
.map(Ok)
|
.map(Ok)
|
||||||
})
|
})
|
||||||
.try_collect()
|
.try_collect()
|
||||||
@@ -56,8 +56,8 @@ pub(crate) async fn get_room_state_route(
|
|||||||
.and_then(async |id| services.timeline.get_pdu_json(&id).await)
|
.and_then(async |id| services.timeline.get_pdu_json(&id).await)
|
||||||
.and_then(|pdu| {
|
.and_then(|pdu| {
|
||||||
services
|
services
|
||||||
.sending
|
.federation
|
||||||
.convert_to_outgoing_federation_event(pdu)
|
.format_pdu_into(pdu, None)
|
||||||
.map(Ok)
|
.map(Ok)
|
||||||
})
|
})
|
||||||
.try_collect()
|
.try_collect()
|
||||||
|
|||||||
@@ -1,5 +1,6 @@
|
|||||||
mod builder;
|
mod builder;
|
||||||
mod count;
|
mod count;
|
||||||
|
pub mod format;
|
||||||
mod hashes;
|
mod hashes;
|
||||||
mod id;
|
mod id;
|
||||||
mod raw_id;
|
mod raw_id;
|
||||||
|
|||||||
39
src/core/matrix/pdu/format.rs
Normal file
39
src/core/matrix/pdu/format.rs
Normal file
@@ -0,0 +1,39 @@
|
|||||||
|
use ruma::{CanonicalJsonObject, CanonicalJsonValue, RoomVersionId};
|
||||||
|
|
||||||
|
use crate::{is_equal_to, matrix::room_version};
|
||||||
|
|
||||||
|
pub fn into_outgoing_federation(
|
||||||
|
mut pdu_json: CanonicalJsonObject,
|
||||||
|
room_version: &RoomVersionId,
|
||||||
|
) -> CanonicalJsonObject {
|
||||||
|
if let Some(unsigned) = pdu_json
|
||||||
|
.get_mut("unsigned")
|
||||||
|
.and_then(|val| val.as_object_mut())
|
||||||
|
{
|
||||||
|
unsigned.remove("transaction_id");
|
||||||
|
}
|
||||||
|
|
||||||
|
let Ok(room_rules) = room_version::rules(room_version) else {
|
||||||
|
pdu_json.remove("event_id");
|
||||||
|
return pdu_json;
|
||||||
|
};
|
||||||
|
|
||||||
|
if !room_rules.event_format.require_event_id {
|
||||||
|
pdu_json.remove("event_id");
|
||||||
|
}
|
||||||
|
|
||||||
|
if !room_rules
|
||||||
|
.event_format
|
||||||
|
.require_room_create_room_id
|
||||||
|
{
|
||||||
|
if pdu_json
|
||||||
|
.get("type")
|
||||||
|
.and_then(CanonicalJsonValue::as_str)
|
||||||
|
.is_some_and(is_equal_to!("m.room.create"))
|
||||||
|
{
|
||||||
|
pdu_json.remove("room_id");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pdu_json
|
||||||
|
}
|
||||||
44
src/service/federation/format.rs
Normal file
44
src/service/federation/format.rs
Normal file
@@ -0,0 +1,44 @@
|
|||||||
|
use futures::future::OptionFuture;
|
||||||
|
use ruma::{CanonicalJsonObject, CanonicalJsonValue, RoomId, RoomVersionId};
|
||||||
|
use serde_json::value::{RawValue as RawJsonValue, to_raw_value};
|
||||||
|
use tuwunel_core::{
|
||||||
|
implement,
|
||||||
|
matrix::pdu,
|
||||||
|
utils::{future::TryExtExt, result::FlatOk},
|
||||||
|
};
|
||||||
|
|
||||||
|
/// This does not return a full `Pdu` it is only to satisfy ruma's types.
|
||||||
|
#[implement(super::Service)]
|
||||||
|
pub async fn format_pdu_into(
|
||||||
|
&self,
|
||||||
|
mut pdu_json: CanonicalJsonObject,
|
||||||
|
room_version: Option<&RoomVersionId>,
|
||||||
|
) -> Box<RawJsonValue> {
|
||||||
|
let room_id = pdu_json
|
||||||
|
.get("room_id")
|
||||||
|
.and_then(CanonicalJsonValue::as_str)
|
||||||
|
.map(RoomId::parse)
|
||||||
|
.flat_ok();
|
||||||
|
|
||||||
|
let query_room_version: OptionFuture<_> = room_id
|
||||||
|
.and_then(|room_id| {
|
||||||
|
room_version
|
||||||
|
.is_none()
|
||||||
|
.then(|| self.services.state.get_room_version(room_id))
|
||||||
|
.map(TryExtExt::ok)
|
||||||
|
})
|
||||||
|
.into();
|
||||||
|
|
||||||
|
if let Some(room_version) = query_room_version
|
||||||
|
.await
|
||||||
|
.flatten()
|
||||||
|
.as_ref()
|
||||||
|
.or(room_version)
|
||||||
|
{
|
||||||
|
pdu_json = pdu::format::into_outgoing_federation(pdu_json, room_version);
|
||||||
|
} else {
|
||||||
|
pdu_json.remove("event_id");
|
||||||
|
}
|
||||||
|
|
||||||
|
to_raw_value(&pdu_json).expect("CanonicalJson is valid serde_json::Value")
|
||||||
|
}
|
||||||
@@ -1,4 +1,5 @@
|
|||||||
mod execute;
|
mod execute;
|
||||||
|
mod format;
|
||||||
|
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
|||||||
@@ -90,8 +90,8 @@ async fn remote_invite(
|
|||||||
room_version: room_version_id.clone(),
|
room_version: room_version_id.clone(),
|
||||||
event: self
|
event: self
|
||||||
.services
|
.services
|
||||||
.sending
|
.federation
|
||||||
.convert_to_outgoing_federation_event(pdu_json.clone())
|
.format_pdu_into(pdu_json.clone(), Some(&room_version_id))
|
||||||
.await,
|
.await,
|
||||||
invite_room_state: invite_room_state
|
invite_room_state: invite_room_state
|
||||||
.into_iter()
|
.into_iter()
|
||||||
|
|||||||
@@ -253,8 +253,8 @@ pub async fn join_remote(
|
|||||||
omit_members: false,
|
omit_members: false,
|
||||||
pdu: self
|
pdu: self
|
||||||
.services
|
.services
|
||||||
.sending
|
.federation
|
||||||
.convert_to_outgoing_federation_event(join_event.clone())
|
.format_pdu_into(join_event.clone(), Some(&room_version_id))
|
||||||
.await,
|
.await,
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -742,8 +742,8 @@ pub async fn join_local(
|
|||||||
omit_members: false,
|
omit_members: false,
|
||||||
pdu: self
|
pdu: self
|
||||||
.services
|
.services
|
||||||
.sending
|
.federation
|
||||||
.convert_to_outgoing_federation_event(join_event.clone())
|
.format_pdu_into(join_event.clone(), Some(&room_version_id))
|
||||||
.await,
|
.await,
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -329,8 +329,8 @@ pub async fn remote_leave(&self, user_id: &UserId, room_id: &RoomId) -> Result {
|
|||||||
event_id,
|
event_id,
|
||||||
pdu: self
|
pdu: self
|
||||||
.services
|
.services
|
||||||
.sending
|
.federation
|
||||||
.convert_to_outgoing_federation_event(leave_event.clone())
|
.format_pdu_into(leave_event.clone(), Some(&room_version_id))
|
||||||
.await,
|
.await,
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -16,8 +16,8 @@ use futures::{
|
|||||||
stream::FuturesUnordered,
|
stream::FuturesUnordered,
|
||||||
};
|
};
|
||||||
use ruma::{
|
use ruma::{
|
||||||
CanonicalJsonObject, CanonicalJsonValue, MilliSecondsSinceUnixEpoch, OwnedRoomId,
|
MilliSecondsSinceUnixEpoch, OwnedRoomId, OwnedServerName, OwnedUserId, RoomId, ServerName,
|
||||||
OwnedServerName, OwnedUserId, RoomId, ServerName, UInt,
|
UInt,
|
||||||
api::{
|
api::{
|
||||||
appservice::event::push_events::v1::EphemeralData,
|
appservice::event::push_events::v1::EphemeralData,
|
||||||
federation::transactions::{
|
federation::transactions::{
|
||||||
@@ -37,10 +37,8 @@ use ruma::{
|
|||||||
serde::Raw,
|
serde::Raw,
|
||||||
uint,
|
uint,
|
||||||
};
|
};
|
||||||
use serde_json::value::{RawValue as RawJsonValue, to_raw_value};
|
|
||||||
use tuwunel_core::{
|
use tuwunel_core::{
|
||||||
Error, Event, Result, debug, err, error, implement, is_equal_to,
|
Error, Event, Result, debug, err, error,
|
||||||
matrix::room_version,
|
|
||||||
result::LogErr,
|
result::LogErr,
|
||||||
trace,
|
trace,
|
||||||
utils::{
|
utils::{
|
||||||
@@ -883,7 +881,11 @@ impl Service {
|
|||||||
.get_pdu_json_from_id(pdu_id)
|
.get_pdu_json_from_id(pdu_id)
|
||||||
.ok()
|
.ok()
|
||||||
})
|
})
|
||||||
.wide_then(|pdu| self.convert_to_outgoing_federation_event(pdu))
|
.wide_then(|pdu| {
|
||||||
|
self.services
|
||||||
|
.federation
|
||||||
|
.format_pdu_into(pdu, None)
|
||||||
|
})
|
||||||
.collect()
|
.collect()
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
@@ -937,64 +939,3 @@ impl Service {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// This does not return a full `Pdu` it is only to satisfy ruma's types.
|
|
||||||
#[implement(Service)]
|
|
||||||
pub async fn convert_to_outgoing_federation_event(
|
|
||||||
&self,
|
|
||||||
mut pdu_json: CanonicalJsonObject,
|
|
||||||
) -> Box<RawJsonValue> {
|
|
||||||
self.strip_outgoing_federation_event(&mut pdu_json)
|
|
||||||
.await;
|
|
||||||
|
|
||||||
to_raw_value(&pdu_json).expect("CanonicalJson is valid serde_json::Value")
|
|
||||||
}
|
|
||||||
|
|
||||||
#[implement(Service)]
|
|
||||||
async fn strip_outgoing_federation_event(&self, pdu_json: &mut CanonicalJsonObject) {
|
|
||||||
if let Some(unsigned) = pdu_json
|
|
||||||
.get_mut("unsigned")
|
|
||||||
.and_then(|val| val.as_object_mut())
|
|
||||||
{
|
|
||||||
unsigned.remove("transaction_id");
|
|
||||||
}
|
|
||||||
|
|
||||||
let Some(room_id) = pdu_json
|
|
||||||
.get("room_id")
|
|
||||||
.and_then(CanonicalJsonValue::as_str)
|
|
||||||
.map(RoomId::parse)
|
|
||||||
.transpose()
|
|
||||||
.ok()
|
|
||||||
.flatten()
|
|
||||||
else {
|
|
||||||
return;
|
|
||||||
};
|
|
||||||
|
|
||||||
let Ok(room_rules) = self
|
|
||||||
.services
|
|
||||||
.state
|
|
||||||
.get_room_version(room_id)
|
|
||||||
.await
|
|
||||||
.and_then(|ref ver| room_version::rules(ver))
|
|
||||||
else {
|
|
||||||
pdu_json.remove("event_id");
|
|
||||||
return;
|
|
||||||
};
|
|
||||||
|
|
||||||
if !room_rules.event_format.require_event_id {
|
|
||||||
pdu_json.remove("event_id");
|
|
||||||
}
|
|
||||||
|
|
||||||
if !room_rules
|
|
||||||
.event_format
|
|
||||||
.require_room_create_room_id
|
|
||||||
{
|
|
||||||
if pdu_json
|
|
||||||
.get("type")
|
|
||||||
.and_then(CanonicalJsonValue::as_str)
|
|
||||||
.is_some_and(is_equal_to!("m.room.create"))
|
|
||||||
{
|
|
||||||
pdu_json.remove("room_id");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|||||||
Reference in New Issue
Block a user