diff --git a/Cargo.toml b/Cargo.toml index c6eaf140..0011cf69 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -342,6 +342,7 @@ features = [ "ring-compat", "unstable-msc2448", "unstable-msc2666", + "unstable-msc2815", "unstable-msc2867", "unstable-msc2870", "unstable-msc3026", diff --git a/src/admin/debug/commands.rs b/src/admin/debug/commands.rs index 4976761b..dda63b9b 100644 --- a/src/admin/debug/commands.rs +++ b/src/admin/debug/commands.rs @@ -1023,3 +1023,19 @@ pub(super) async fn resync_database(&self) -> Result { .update() .map_err(|e| err!("Failed to update from primary: {e:?}")) } + +#[admin_command] +pub(super) async fn get_retained_pdu(&self, event_id: OwnedEventId) -> Result { + let pdu = self + .services + .retention + .get_original_pdu_json(&event_id) + .await?; + + let text = serde_json::to_string_pretty(&pdu)?; + + self.write_str(&format!("Original PDU:\n```json\n{text}```")) + .await?; + + Ok(()) +} diff --git a/src/admin/debug/mod.rs b/src/admin/debug/mod.rs index 5f5413f8..66c8a0fd 100644 --- a/src/admin/debug/mod.rs +++ b/src/admin/debug/mod.rs @@ -250,6 +250,11 @@ pub(super) enum DebugCommand { /// - Synchronize database with primary (secondary only) ResyncDatabase, + /// - Retrieves the saved original PDU before it has been redacted + GetRetainedPdu { + event_id: OwnedEventId, + }, + /// - Developer test stubs #[command(subcommand)] #[clap(hide(true))] diff --git a/src/api/client/room/event.rs b/src/api/client/room/event.rs index 37afcbbd..b15f1f54 100644 --- a/src/api/client/room/event.rs +++ b/src/api/client/room/event.rs @@ -1,7 +1,11 @@ use axum::extract::State; -use futures::{FutureExt, TryFutureExt, future::try_join}; +use futures::{TryFutureExt, future::join3, pin_mut}; use ruma::api::client::room::get_room_event; -use tuwunel_core::{Err, Event, Result, err}; +use tuwunel_core::{ + Err, Event, Pdu, Result, err, + result::IsErrOr, + utils::{BoolExt, FutureBoolExt, TryFutureExtExt, future::OptionFutureExt}, +}; use crate::{Ruma, client::is_ignored_pdu}; @@ -9,9 +13,10 @@ use crate::{Ruma, client::is_ignored_pdu}; /// /// Gets a single event. pub(crate) async fn get_room_event_route( - State(ref services): State, - ref body: Ruma, + State(services): State, + body: Ruma, ) -> Result { + let sender_user = body.sender_user(); let event_id = &body.event_id; let room_id = &body.room_id; @@ -20,14 +25,51 @@ pub(crate) async fn get_room_event_route( .get_pdu(event_id) .map_err(|_| err!(Request(NotFound("Event {} not found.", event_id)))); + let retained_event = body + .include_unredacted_content + .then_async(async || { + let is_admin = services + .config + .allow_room_admins_to_request_unredacted_events + .then_async(|| services.admin.user_is_admin(sender_user)) + .unwrap_or(false); + + let can_redact = services + .state_accessor + .get_power_levels(room_id) + .map_ok_or(false, |power_levels| { + power_levels.for_user(sender_user) >= power_levels.redact + }); + + pin_mut!(is_admin, can_redact); + + if is_admin.or(can_redact).await { + services + .retention + .get_original_pdu(event_id) + .await + .map_err(|_| err!(Request(NotFound("Event {} not found.", event_id)))) + } else { + Err!(Request(Forbidden("You are not allowed to see the original event"))) + } + }); + let visible = services .state_accessor - .user_can_see_event(body.sender_user(), room_id, event_id) - .map(Ok); + .user_can_see_event(sender_user, room_id, event_id); - let (mut event, visible) = try_join(event, visible).await?; + let (mut event, retained_event, visible): (Result, Option>, _) = + join3(event, retained_event, visible).await; - if !visible || is_ignored_pdu(services, &event, body.sender_user()).await { + if event.as_ref().is_err_or(Event::is_redacted) + && let Some(retained_event) = retained_event + { + event = retained_event; + } + + let mut event = event?; + + if !visible || is_ignored_pdu(&services, &event, body.sender_user()).await { return Err!(Request(Forbidden("You don't have permission to view this event."))); } diff --git a/src/api/client/unversioned.rs b/src/api/client/unversioned.rs index 7acd565b..d77488f0 100644 --- a/src/api/client/unversioned.rs +++ b/src/api/client/unversioned.rs @@ -57,6 +57,7 @@ pub(crate) async fn get_supported_versions_route( ("us.cloke.msc4175".to_owned(), true), /* Profile field for user time zone (https://github.com/matrix-org/matrix-spec-proposals/pull/4175) */ ("org.matrix.msc4180".to_owned(), true), /* stable flag for 3916 (https://github.com/matrix-org/matrix-spec-proposals/pull/4180) */ ("org.matrix.simplified_msc3575".to_owned(), true), /* Simplified Sliding sync (https://github.com/matrix-org/matrix-spec-proposals/pull/4186) */ + ("fi.mau.msc2815".to_owned(), true), /* Allow room moderators to view redacted event content (https://github.com/matrix-org/matrix-spec-proposals/pull/2815) */ ]), }; diff --git a/src/core/config/mod.rs b/src/core/config/mod.rs index c21efe22..8b78e4fd 100644 --- a/src/core/config/mod.rs +++ b/src/core/config/mod.rs @@ -1990,6 +1990,32 @@ pub struct Config { #[serde(default = "true_fn")] pub admin_room_notices: bool, + /// Save original events before applying redaction to them. + /// + /// They can be retrieved with `admin debug get-retained-pdu` or MSC2815. + /// + /// default: true + #[serde(default)] + pub save_unredacted_events: bool, + + /// Redaction retention period in seconds. + /// + /// By default the unredacted events are stored forever. + /// + /// default: disabled + #[serde(default)] + pub redaction_retention_seconds: u64, + + /// Allows users with `redact` power level to request unredacted events with + /// MSC2815. + /// + /// Server admins can request unredacted events regardless of the value of + /// this option. + /// + /// default: true + #[serde(default = "true_fn")] + pub allow_room_admins_to_request_unredacted_events: bool, + /// Enable database pool affinity support. On supporting systems, block /// device queue topologies are detected and the request pool is optimized /// for the hardware; db_pool_workers is determined automatically. diff --git a/src/database/maps.rs b/src/database/maps.rs index b5d9cf5e..7fee7225 100644 --- a/src/database/maps.rs +++ b/src/database/maps.rs @@ -63,6 +63,14 @@ pub(super) static MAPS: &[Descriptor] = &[ name: "disabledroomids", ..descriptor::RANDOM_SMALL }, + Descriptor { + name: "eventid_originalpdu", + key_size_hint: Some(48), + val_size_hint: Some(1520), + block_size: 2048, + index_size: 512, + ..descriptor::RANDOM + }, Descriptor { name: "eventid_outlierpdu", cache_disp: CacheDisp::SharedWith("pduid_pdu"), @@ -343,6 +351,11 @@ pub(super) static MAPS: &[Descriptor] = &[ name: "threadid_userids", ..descriptor::SEQUENTIAL_SMALL }, + Descriptor { + name: "timeredacted_eventid", + key_size_hint: Some(57), + ..descriptor::SEQUENTIAL_SMALL + }, Descriptor { name: "todeviceid_events", ..descriptor::RANDOM diff --git a/src/service/rooms/mod.rs b/src/service/rooms/mod.rs index 0fc2e1d9..0f60e2f6 100644 --- a/src/service/rooms/mod.rs +++ b/src/service/rooms/mod.rs @@ -7,6 +7,7 @@ pub mod lazy_loading; pub mod metadata; pub mod pdu_metadata; pub mod read_receipt; +pub mod retention; pub mod search; pub mod short; pub mod spaces; diff --git a/src/service/rooms/retention/mod.rs b/src/service/rooms/retention/mod.rs new file mode 100644 index 00000000..e9c53640 --- /dev/null +++ b/src/service/rooms/retention/mod.rs @@ -0,0 +1,101 @@ +use std::{ + sync::Arc, + time::{Duration, SystemTime, UNIX_EPOCH}, +}; + +use async_trait::async_trait; +use ruma::{CanonicalJsonObject, EventId}; +use tuwunel_core::{ + Result, debug_info, expected, implement, matrix::pdu::PduEvent, utils::TryReadyExt, +}; +use tuwunel_database::{Deserialized, Json, Map}; + +pub struct Service { + services: Arc, + eventid_originalpdu: Arc, + timeredacted_eventid: Arc, +} + +#[async_trait] +impl crate::Service for Service { + fn build(args: &crate::Args<'_>) -> Result> { + Ok(Arc::new(Self { + services: args.services.clone(), + eventid_originalpdu: args.db["eventid_originalpdu"].clone(), + timeredacted_eventid: args.db["timeredacted_eventid"].clone(), + })) + } + + async fn worker(self: Arc) -> Result { + loop { + let retention_seconds = self.services.config.redaction_retention_seconds; + + if retention_seconds != 0 { + debug_info!("Cleaning up retained events"); + + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs(); + + let count = self + .timeredacted_eventid + .keys::<(u64, &EventId)>() + .ready_try_take_while(|(time_redacted, _)| { + let time_redacted = *time_redacted; + Ok(expected!(time_redacted + retention_seconds) < now) + }) + .ready_try_fold_default(|count: usize, (time_redacted, event_id)| { + self.eventid_originalpdu.remove(event_id); + self.timeredacted_eventid + .del((time_redacted, event_id)); + Ok(count.saturating_add(1)) + }) + .await?; + + debug_info!(?count, "Finished cleaning up retained events"); + } + + tokio::select! { + () = tokio::time::sleep(Duration::from_secs(60 * 60)) => {}, + () = self.services.server.until_shutdown() => return Ok(()) + }; + } + } + + fn name(&self) -> &str { crate::service::make_name(std::module_path!()) } +} + +#[implement(Service)] +pub async fn get_original_pdu(&self, event_id: &EventId) -> Result { + self.eventid_originalpdu + .get(event_id) + .await? + .deserialized() +} + +#[implement(Service)] +pub async fn get_original_pdu_json(&self, event_id: &EventId) -> Result { + self.eventid_originalpdu + .get(event_id) + .await? + .deserialized() +} + +#[implement(Service)] +pub fn save_original_pdu(&self, event_id: &EventId, pdu: &CanonicalJsonObject) { + if !self.services.config.save_unredacted_events { + return; + } + + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs(); + + self.eventid_originalpdu + .raw_put(event_id, Json(pdu)); + + self.timeredacted_eventid + .put_raw((now, event_id), []); +} diff --git a/src/service/rooms/timeline/redact.rs b/src/service/rooms/timeline/redact.rs index 6dd22831..3632c1e9 100644 --- a/src/service/rooms/timeline/redact.rs +++ b/src/service/rooms/timeline/redact.rs @@ -28,6 +28,10 @@ pub async fn redact_pdu( err!(Database(error!(?pdu_id, ?event_id, ?e, "PDU ID points to invalid PDU."))) })?; + self.services + .retention + .save_original_pdu(event_id, &pdu); + let body = pdu["content"] .as_object() .unwrap() diff --git a/src/service/services.rs b/src/service/services.rs index 98c55e7b..a7229fdc 100644 --- a/src/service/services.rs +++ b/src/service/services.rs @@ -12,7 +12,9 @@ use crate::{ account_data, admin, appservice, client, config, deactivate, emergency, federation, globals, key_backups, manager::Manager, - media, membership, oauth, presence, pusher, resolver, rooms, sending, server_keys, + media, membership, oauth, presence, pusher, resolver, + rooms::{self, retention}, + sending, server_keys, service::{Args, Service}, sync, transaction_ids, uiaa, users, }; @@ -59,6 +61,7 @@ pub struct Services { pub membership: Arc, pub deactivate: Arc, pub oauth: Arc, + pub retention: Arc, manager: Mutex>>, pub server: Arc, @@ -117,6 +120,7 @@ pub async fn build(server: Arc) -> Result> { membership: membership::Service::build(&args)?, deactivate: deactivate::Service::build(&args)?, oauth: oauth::Service::build(&args)?, + retention: retention::Service::build(&args)?, manager: Mutex::new(None), server, @@ -176,6 +180,7 @@ pub(crate) fn services(&self) -> impl Iterator> + Send { cast!(self.membership), cast!(self.deactivate), cast!(self.oauth), + cast!(self.retention), ] .into_iter() } diff --git a/tuwunel-example.toml b/tuwunel-example.toml index e8778dd3..3b1c3f7b 100644 --- a/tuwunel-example.toml +++ b/tuwunel-example.toml @@ -1713,6 +1713,26 @@ # #admin_room_notices = true +# Save original events before applying redaction to them. +# +# They can be retrieved with `admin debug get-retained-pdu` or MSC2815. +# +#save_unredacted_events = true + +# Redaction retention period in seconds. +# +# By default the unredacted events are stored forever. +# +#redaction_retention_seconds = disabled + +# Allows users with `redact` power level to request unredacted events with +# MSC2815. +# +# Server admins can request unredacted events regardless of the value of +# this option. +# +#allow_room_admins_to_request_unredacted_events = true + # Enable database pool affinity support. On supporting systems, block # device queue topologies are detected and the request pool is optimized # for the hardware; db_pool_workers is determined automatically.