Toward abstracting Pdu into trait Event.

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk
2025-04-26 08:24:47 +00:00
parent 28354db9df
commit 57c519bbb8
41 changed files with 831 additions and 874 deletions

View File

@@ -18,7 +18,7 @@ use ruma::{
};
use tokio::sync::RwLock;
use tuwunel_core::{
Error, PduEvent, Result, Server, debug, err, error, error::default_log, pdu::PduBuilder,
Error, Event, Result, Server, debug, err, error, error::default_log, pdu::PduBuilder,
};
use crate::{Dep, account_data, globals, rooms, rooms::state::RoomMutexGuard};
@@ -335,7 +335,10 @@ impl Service {
Ok(())
}
pub async fn is_admin_command(&self, pdu: &PduEvent, body: &str) -> bool {
pub async fn is_admin_command<E>(&self, event: &E, body: &str) -> bool
where
E: Event + Send + Sync,
{
// Server-side command-escape with public echo
let is_escape = body.starts_with('\\');
let is_public_escape = is_escape
@@ -353,8 +356,13 @@ impl Service {
return false;
}
let user_is_local = self
.services
.globals
.user_is_local(event.sender());
// only allow public escaped commands by local admins
if is_public_escape && !self.services.globals.user_is_local(&pdu.sender) {
if is_public_escape && !user_is_local {
return false;
}
@@ -364,12 +372,12 @@ impl Service {
}
// Prevent unescaped !admin from being used outside of the admin room
if is_public_prefix && !self.is_admin_room(&pdu.room_id).await {
if is_public_prefix && !self.is_admin_room(event.room_id()).await {
return false;
}
// Only senders who are admin can proceed
if !self.user_is_admin(&pdu.sender).await {
if !self.user_is_admin(event.sender()).await {
return false;
}
@@ -381,8 +389,8 @@ impl Service {
.config
.emergency_password
.is_some();
let from_server = pdu.sender == *server_user && !emergency_password_set;
if from_server && self.is_admin_room(&pdu.room_id).await {
let from_server = event.sender() == server_user && !emergency_password_set;
if from_server && self.is_admin_room(event.room_id()).await {
return false;
}

View File

@@ -24,7 +24,7 @@ use ruma::{
uint,
};
use tuwunel_core::{
Err, PduEvent, Result, debug_warn, err, trace,
Err, Event, Result, debug_warn, err, trace,
utils::{stream::TryIgnore, string_from_bytes},
warn,
};
@@ -284,22 +284,26 @@ impl Service {
}
}
#[tracing::instrument(skip(self, user, unread, pusher, ruleset, pdu))]
pub async fn send_push_notice(
#[tracing::instrument(skip(self, user, unread, pusher, ruleset, event))]
pub async fn send_push_notice<E>(
&self,
user: &UserId,
unread: UInt,
pusher: &Pusher,
ruleset: Ruleset,
pdu: &PduEvent,
) -> Result<()> {
event: &E,
) -> Result
where
E: Event + Send + Sync,
for<'a> &'a E: Event + Send,
{
let mut notify = None;
let mut tweaks = Vec::new();
let power_levels: RoomPowerLevelsEventContent = self
.services
.state_accessor
.room_state_get(&pdu.room_id, &StateEventType::RoomPowerLevels, "")
.room_state_get(event.room_id(), &StateEventType::RoomPowerLevels, "")
.await
.and_then(|ev| {
serde_json::from_str(ev.content.get()).map_err(|e| {
@@ -308,8 +312,9 @@ impl Service {
})
.unwrap_or_default();
let serialized = event.to_format();
for action in self
.get_actions(user, &ruleset, &power_levels, &pdu.to_sync_room_event(), &pdu.room_id)
.get_actions(user, &ruleset, &power_levels, &serialized, event.room_id())
.await
{
let n = match action {
@@ -331,7 +336,7 @@ impl Service {
}
if notify == Some(true) {
self.send_notice(unread, pusher, tweaks, pdu)
self.send_notice(unread, pusher, tweaks, event)
.await?;
}
// Else the event triggered no actions
@@ -382,13 +387,16 @@ impl Service {
}
#[tracing::instrument(skip(self, unread, pusher, tweaks, event))]
async fn send_notice(
async fn send_notice<E>(
&self,
unread: UInt,
pusher: &Pusher,
tweaks: Vec<Tweak>,
event: &PduEvent,
) -> Result {
event: &E,
) -> Result
where
E: Event + Send + Sync,
{
// TODO: email
match &pusher.kind {
| PusherKind::Http(http) => {
@@ -434,8 +442,8 @@ impl Service {
let d = vec![device];
let mut notifi = Notification::new(d);
notifi.event_id = Some((*event.event_id).to_owned());
notifi.room_id = Some((*event.room_id).to_owned());
notifi.event_id = Some(event.event_id().to_owned());
notifi.room_id = Some(event.room_id().to_owned());
if http
.data
.get("org.matrix.msc4076.disable_badge_count")
@@ -455,7 +463,7 @@ impl Service {
)
.await?;
} else {
if event.kind == TimelineEventType::RoomEncrypted
if *event.kind() == TimelineEventType::RoomEncrypted
|| tweaks
.iter()
.any(|t| matches!(t, Tweak::Highlight(true) | Tweak::Sound(_)))
@@ -464,33 +472,33 @@ impl Service {
} else {
notifi.prio = NotificationPriority::Low;
}
notifi.sender = Some(event.sender.clone());
notifi.event_type = Some(event.kind.clone());
notifi.content = serde_json::value::to_raw_value(&event.content).ok();
notifi.sender = Some(event.sender().to_owned());
notifi.event_type = Some(event.kind().to_owned());
notifi.content = serde_json::value::to_raw_value(event.content()).ok();
if event.kind == TimelineEventType::RoomMember {
if *event.kind() == TimelineEventType::RoomMember {
notifi.user_is_target =
event.state_key.as_deref() == Some(event.sender.as_str());
event.state_key() == Some(event.sender().as_str());
}
notifi.sender_display_name = self
.services
.users
.displayname(&event.sender)
.displayname(event.sender())
.await
.ok();
notifi.room_name = self
.services
.state_accessor
.get_name(&event.room_id)
.get_name(event.room_id())
.await
.ok();
notifi.room_alias = self
.services
.state_accessor
.get_canonical_alias(&event.room_id)
.get_canonical_alias(event.room_id())
.await
.ok();

View File

@@ -127,7 +127,7 @@ pub(super) async fn handle_outlier_pdu<'a>(
let state_fetch = |ty: &StateEventType, sk: &str| {
let key = (ty.to_owned(), sk.into());
ready(auth_events.get(&key))
ready(auth_events.get(&key).map(ToOwned::to_owned))
};
let auth_check = state_res::event_auth::auth_check(

View File

@@ -4,7 +4,7 @@ use futures::{FutureExt, StreamExt, future::ready};
use ruma::{CanonicalJsonValue, RoomId, ServerName, events::StateEventType};
use tuwunel_core::{
Err, Result, debug, debug_info, err, implement,
matrix::{EventTypeExt, PduEvent, StateKey, state_res},
matrix::{Event, EventTypeExt, PduEvent, StateKey, state_res},
trace,
utils::stream::{BroadbandExt, ReadyExt},
warn,
@@ -118,7 +118,7 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu(
let state_fetch = |k: &StateEventType, s: &str| {
let key = k.with_state_key(s);
ready(auth_events.get(&key).cloned())
ready(auth_events.get(&key).map(ToOwned::to_owned))
};
let auth_check = state_res::event_auth::auth_check(

View File

@@ -3,7 +3,7 @@ use std::sync::Arc;
use futures::{Stream, StreamExt};
use ruma::{RoomId, UserId, api::client::search::search_events::v3::Criteria};
use tuwunel_core::{
PduCount, PduEvent, Result,
Event, PduCount, PduEvent, Result,
arrayvec::ArrayVec,
implement,
utils::{

View File

@@ -25,7 +25,7 @@ use ruma::{
};
use tokio::sync::{Mutex, MutexGuard};
use tuwunel_core::{
Err, Error, PduEvent, Result, implement,
Err, Error, Event, PduEvent, Result, implement,
utils::{
IterStream,
future::{BoolExt, TryExtExt},
@@ -155,7 +155,7 @@ pub async fn get_summary_and_children_local(
let children_pdus: Vec<_> = self
.get_space_child_events(current_room)
.map(PduEvent::into_stripped_spacechild_state_event)
.map(Event::into_format)
.collect()
.await;
@@ -567,7 +567,7 @@ async fn cache_insert(
room_id: room_id.clone(),
children_state: self
.get_space_child_events(&room_id)
.map(PduEvent::into_stripped_spacechild_state_event)
.map(Event::into_format)
.collect()
.await,
encryption,

View File

@@ -13,7 +13,7 @@ use ruma::{
serde::Raw,
};
use tuwunel_core::{
PduEvent, Result, err,
Event, PduEvent, Result, err,
result::FlatOk,
state_res::{self, StateMap},
utils::{
@@ -332,30 +332,34 @@ impl Service {
}
#[tracing::instrument(skip_all, level = "debug")]
pub async fn summary_stripped(&self, event: &PduEvent) -> Vec<Raw<AnyStrippedStateEvent>> {
pub async fn summary_stripped<'a, E>(&self, event: &'a E) -> Vec<Raw<AnyStrippedStateEvent>>
where
E: Event + Send + Sync,
&'a E: Event + Send,
{
let cells = [
(&StateEventType::RoomCreate, ""),
(&StateEventType::RoomJoinRules, ""),
(&StateEventType::RoomCanonicalAlias, ""),
(&StateEventType::RoomName, ""),
(&StateEventType::RoomAvatar, ""),
(&StateEventType::RoomMember, event.sender.as_str()), // Add recommended events
(&StateEventType::RoomMember, event.sender().as_str()), // Add recommended events
(&StateEventType::RoomEncryption, ""),
(&StateEventType::RoomTopic, ""),
];
let fetches = cells.iter().map(|(event_type, state_key)| {
let fetches = cells.into_iter().map(|(event_type, state_key)| {
self.services
.state_accessor
.room_state_get(&event.room_id, event_type, state_key)
.room_state_get(event.room_id(), event_type, state_key)
});
join_all(fetches)
.await
.into_iter()
.filter_map(Result::ok)
.map(PduEvent::into_stripped_state_event)
.chain(once(event.to_stripped_state_event()))
.map(Event::into_format)
.chain(once(event.to_format()))
.collect()
}

View File

@@ -7,7 +7,7 @@ use ruma::{
};
use serde_json::json;
use tuwunel_core::{
Result, err,
Event, Result, err,
matrix::pdu::{PduCount, PduEvent, PduId, RawPduId},
utils::{
ReadyExt,
@@ -49,7 +49,11 @@ impl crate::Service for Service {
}
impl Service {
pub async fn add_to_thread(&self, root_event_id: &EventId, pdu: &PduEvent) -> Result<()> {
pub async fn add_to_thread<'a, E>(&self, root_event_id: &EventId, event: &'a E) -> Result
where
E: Event + Send + Sync,
&'a E: Event + Send,
{
let root_id = self
.services
.timeline
@@ -86,7 +90,7 @@ impl Service {
}) {
// Thread already existed
relations.count = relations.count.saturating_add(uint!(1));
relations.latest_event = pdu.to_message_like_event();
relations.latest_event = event.to_format();
let content = serde_json::to_value(relations).expect("to_value always works");
@@ -99,7 +103,7 @@ impl Service {
} else {
// New thread
let relations = BundledThread {
latest_event: pdu.to_message_like_event(),
latest_event: event.to_format(),
count: uint!(1),
current_user_participated: true,
};
@@ -129,7 +133,7 @@ impl Service {
users.push(root_pdu.sender);
},
}
users.push(pdu.sender.clone());
users.push(event.sender().to_owned());
self.update_participants(&root_id, &users)
}

View File

@@ -379,8 +379,6 @@ impl Service {
.await
.unwrap_or_default();
let sync_pdu = pdu.to_sync_room_event();
let mut push_target: HashSet<_> = self
.services
.state_cache
@@ -410,6 +408,7 @@ impl Service {
}
}
let serialized = pdu.to_format();
for user in &push_target {
let rules_for_user = self
.services
@@ -427,7 +426,7 @@ impl Service {
for action in self
.services
.pusher
.get_actions(user, &rules_for_user, &power_levels, &sync_pdu, &pdu.room_id)
.get_actions(user, &rules_for_user, &power_levels, &serialized, &pdu.room_id)
.await
{
match action {
@@ -783,7 +782,7 @@ impl Service {
let auth_fetch = |k: &StateEventType, s: &str| {
let key = (k.clone(), s.into());
ready(auth_events.get(&key))
ready(auth_events.get(&key).map(ToOwned::to_owned))
};
let auth_check = state_res::auth_check(

View File

@@ -39,7 +39,7 @@ use ruma::{
};
use serde_json::value::{RawValue as RawJsonValue, to_raw_value};
use tuwunel_core::{
Error, Result, debug, err, error,
Error, Event, Result, debug, err, error,
result::LogErr,
trace,
utils::{
@@ -731,7 +731,7 @@ impl Service {
.get_pdu_from_id(pdu_id)
.await
{
pdu_jsons.push(pdu.into_room_event());
pdu_jsons.push(pdu.to_format());
}
},
| SendingEvent::Edu(edu) =>