Redacted event retention, implement MSC2815
This commit is contained in:
@@ -7,6 +7,7 @@ pub mod lazy_loading;
|
||||
pub mod metadata;
|
||||
pub mod pdu_metadata;
|
||||
pub mod read_receipt;
|
||||
pub mod retention;
|
||||
pub mod search;
|
||||
pub mod short;
|
||||
pub mod spaces;
|
||||
|
||||
101
src/service/rooms/retention/mod.rs
Normal file
101
src/service/rooms/retention/mod.rs
Normal file
@@ -0,0 +1,101 @@
|
||||
use std::{
|
||||
sync::Arc,
|
||||
time::{Duration, SystemTime, UNIX_EPOCH},
|
||||
};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use ruma::{CanonicalJsonObject, EventId};
|
||||
use tuwunel_core::{
|
||||
Result, debug_info, expected, implement, matrix::pdu::PduEvent, utils::TryReadyExt,
|
||||
};
|
||||
use tuwunel_database::{Deserialized, Json, Map};
|
||||
|
||||
pub struct Service {
|
||||
services: Arc<crate::services::OnceServices>,
|
||||
eventid_originalpdu: Arc<Map>,
|
||||
timeredacted_eventid: Arc<Map>,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl crate::Service for Service {
|
||||
fn build(args: &crate::Args<'_>) -> Result<Arc<Self>> {
|
||||
Ok(Arc::new(Self {
|
||||
services: args.services.clone(),
|
||||
eventid_originalpdu: args.db["eventid_originalpdu"].clone(),
|
||||
timeredacted_eventid: args.db["timeredacted_eventid"].clone(),
|
||||
}))
|
||||
}
|
||||
|
||||
async fn worker(self: Arc<Self>) -> Result {
|
||||
loop {
|
||||
let retention_seconds = self.services.config.redaction_retention_seconds;
|
||||
|
||||
if retention_seconds != 0 {
|
||||
debug_info!("Cleaning up retained events");
|
||||
|
||||
let now = SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.unwrap()
|
||||
.as_secs();
|
||||
|
||||
let count = self
|
||||
.timeredacted_eventid
|
||||
.keys::<(u64, &EventId)>()
|
||||
.ready_try_take_while(|(time_redacted, _)| {
|
||||
let time_redacted = *time_redacted;
|
||||
Ok(expected!(time_redacted + retention_seconds) < now)
|
||||
})
|
||||
.ready_try_fold_default(|count: usize, (time_redacted, event_id)| {
|
||||
self.eventid_originalpdu.remove(event_id);
|
||||
self.timeredacted_eventid
|
||||
.del((time_redacted, event_id));
|
||||
Ok(count.saturating_add(1))
|
||||
})
|
||||
.await?;
|
||||
|
||||
debug_info!(?count, "Finished cleaning up retained events");
|
||||
}
|
||||
|
||||
tokio::select! {
|
||||
() = tokio::time::sleep(Duration::from_secs(60 * 60)) => {},
|
||||
() = self.services.server.until_shutdown() => return Ok(())
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
|
||||
}
|
||||
|
||||
#[implement(Service)]
|
||||
pub async fn get_original_pdu(&self, event_id: &EventId) -> Result<PduEvent> {
|
||||
self.eventid_originalpdu
|
||||
.get(event_id)
|
||||
.await?
|
||||
.deserialized()
|
||||
}
|
||||
|
||||
#[implement(Service)]
|
||||
pub async fn get_original_pdu_json(&self, event_id: &EventId) -> Result<CanonicalJsonObject> {
|
||||
self.eventid_originalpdu
|
||||
.get(event_id)
|
||||
.await?
|
||||
.deserialized()
|
||||
}
|
||||
|
||||
#[implement(Service)]
|
||||
pub fn save_original_pdu(&self, event_id: &EventId, pdu: &CanonicalJsonObject) {
|
||||
if !self.services.config.save_unredacted_events {
|
||||
return;
|
||||
}
|
||||
|
||||
let now = SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.unwrap()
|
||||
.as_secs();
|
||||
|
||||
self.eventid_originalpdu
|
||||
.raw_put(event_id, Json(pdu));
|
||||
|
||||
self.timeredacted_eventid
|
||||
.put_raw((now, event_id), []);
|
||||
}
|
||||
@@ -28,6 +28,10 @@ pub async fn redact_pdu<Pdu: Event + Send + Sync>(
|
||||
err!(Database(error!(?pdu_id, ?event_id, ?e, "PDU ID points to invalid PDU.")))
|
||||
})?;
|
||||
|
||||
self.services
|
||||
.retention
|
||||
.save_original_pdu(event_id, &pdu);
|
||||
|
||||
let body = pdu["content"]
|
||||
.as_object()
|
||||
.unwrap()
|
||||
|
||||
@@ -12,7 +12,9 @@ use crate::{
|
||||
account_data, admin, appservice, client, config, deactivate, emergency, federation, globals,
|
||||
key_backups,
|
||||
manager::Manager,
|
||||
media, membership, oauth, presence, pusher, resolver, rooms, sending, server_keys,
|
||||
media, membership, oauth, presence, pusher, resolver,
|
||||
rooms::{self, retention},
|
||||
sending, server_keys,
|
||||
service::{Args, Service},
|
||||
sync, transaction_ids, uiaa, users,
|
||||
};
|
||||
@@ -59,6 +61,7 @@ pub struct Services {
|
||||
pub membership: Arc<membership::Service>,
|
||||
pub deactivate: Arc<deactivate::Service>,
|
||||
pub oauth: Arc<oauth::Service>,
|
||||
pub retention: Arc<retention::Service>,
|
||||
|
||||
manager: Mutex<Option<Arc<Manager>>>,
|
||||
pub server: Arc<Server>,
|
||||
@@ -117,6 +120,7 @@ pub async fn build(server: Arc<Server>) -> Result<Arc<Self>> {
|
||||
membership: membership::Service::build(&args)?,
|
||||
deactivate: deactivate::Service::build(&args)?,
|
||||
oauth: oauth::Service::build(&args)?,
|
||||
retention: retention::Service::build(&args)?,
|
||||
|
||||
manager: Mutex::new(None),
|
||||
server,
|
||||
@@ -176,6 +180,7 @@ pub(crate) fn services(&self) -> impl Iterator<Item = Arc<dyn Service>> + Send {
|
||||
cast!(self.membership),
|
||||
cast!(self.deactivate),
|
||||
cast!(self.oauth),
|
||||
cast!(self.retention),
|
||||
]
|
||||
.into_iter()
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user