pub mod blurhash; mod data; pub(super) mod migrations; mod preview; mod remote; mod tests; mod thumbnail; use std::{ path::PathBuf, sync::Arc, time::{Duration, Instant, SystemTime}, }; use async_trait::async_trait; use base64::{Engine as _, engine::general_purpose}; use ruma::{Mxc, OwnedMxcUri, UserId, http_headers::ContentDisposition}; use tokio::{ fs, io::{AsyncReadExt, AsyncWriteExt, BufReader}, sync::Notify, }; use tuwunel_core::{ Err, Result, debug, debug_error, debug_info, debug_warn, err, error, trace, utils::{self, MutexMap}, warn, }; use self::data::{Data, Metadata}; pub use self::thumbnail::Dim; #[derive(Debug)] pub struct FileMeta { pub content: Option>, pub content_type: Option, pub content_disposition: Option, } /// For MSC2246 pub(super) struct MXCState { /// Save the notifier for each pending media upload pub(super) notifiers: std::sync::Mutex>>, /// Save the ratelimiter for each user pub(super) ratelimiter: std::sync::Mutex>, } pub struct Service { url_preview_mutex: MutexMap, mxc_state: MXCState, pub(super) db: Data, services: Arc, } /// generated MXC ID (`media-id`) length pub const MXC_LENGTH: usize = 32; /// Cache control for immutable objects. pub const CACHE_CONTROL_IMMUTABLE: &str = "private,max-age=31536000,immutable"; /// Default cross-origin resource policy. pub const CORP_CROSS_ORIGIN: &str = "cross-origin"; #[async_trait] impl crate::Service for Service { fn build(args: &crate::Args<'_>) -> Result> { Ok(Arc::new(Self { url_preview_mutex: MutexMap::new(), mxc_state: MXCState { notifiers: std::sync::Mutex::new(std::collections::HashMap::new()), ratelimiter: std::sync::Mutex::new(std::collections::HashMap::new()), }, db: Data::new(args.db), services: args.services.clone(), })) } async fn worker(self: Arc) -> Result { self.create_media_dir().await?; Ok(()) } fn name(&self) -> &str { crate::service::make_name(std::module_path!()) } } impl Service { /// Create a pending media upload ID. pub async fn create_pending( &self, mxc: &Mxc<'_>, user: &UserId, unused_expires_at: u64, ) -> Result<()> { let config = &self.services.server.config; // Rate limiting (rc_media_create) let rate = f64::from(config.rc_media_create_per_second); let burst = f64::from(config.rc_media_create_burst_count); // Check rate limiting if rate > 0.0 && burst > 0.0 { let now = Instant::now(); let mut ratelimiter = self.mxc_state.ratelimiter.lock().unwrap(); let (last_time, tokens) = ratelimiter .entry(user.to_owned()) .or_insert_with(|| (now, burst)); let elapsed = now.duration_since(*last_time).as_secs_f64(); let new_tokens = elapsed.mul_add(rate, *tokens).min(burst); if new_tokens >= 1.0 { *last_time = now; *tokens = new_tokens - 1.0; } else { return Err(tuwunel_core::Error::Request( ruma::api::client::error::ErrorKind::LimitExceeded { retry_after: None }, "Too many pending media creation requests.".into(), http::StatusCode::TOO_MANY_REQUESTS, )); } } let max_uploads = config.max_pending_media_uploads; let (current_uploads, earliest_expiration) = self.db.count_pending_mxc_for_user(user).await; // Check if the user has reached the maximum number of pending media uploads if current_uploads >= max_uploads { let retry_after = earliest_expiration.saturating_sub(u64::try_from( SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap() .as_millis(), )?); return Err(tuwunel_core::Error::Request( ruma::api::client::error::ErrorKind::LimitExceeded { retry_after: Some(ruma::api::client::error::RetryAfter::Delay( Duration::from_millis(retry_after), )), }, "Maximum number of pending media uploads reached.".into(), http::StatusCode::TOO_MANY_REQUESTS, )); } self.db .insert_pending_mxc(mxc, user, unused_expires_at); Ok(()) } /// Uploads content to a pending media ID. pub async fn upload_pending( &self, mxc: &Mxc<'_>, user: &UserId, content_disposition: Option<&ContentDisposition>, content_type: Option<&str>, file: &[u8], ) -> Result<()> { let pending = self.db.search_pending_mxc(mxc).await; let Some((owner_id, expires_at)) = pending else { if self.get_metadata(mxc).await.is_some() { return Err!(Request(CannotOverwriteMedia("Media ID already has content"))); } return Err!(Request(NotFound("Media not found"))); }; if owner_id != user { return Err!(Request(Forbidden("You did not create this media ID"))); } let current_time = u64::try_from( SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .expect("Time went backwards") .as_millis(), )?; if expires_at < current_time { return Err!(Request(NotFound("Pending media ID expired"))); } self.create(mxc, Some(user), content_disposition, content_type, file) .await?; self.db.remove_pending_mxc(mxc); if let Some(notifier) = self .mxc_state .notifiers .lock() .unwrap() .get(&mxc.to_string()) .cloned() { notifier.notify_waiters(); let mut map = self.mxc_state.notifiers.lock().unwrap(); map.remove(&mxc.to_string()); } Ok(()) } /// Uploads a file. pub async fn create( &self, mxc: &Mxc<'_>, user: Option<&UserId>, content_disposition: Option<&ContentDisposition>, content_type: Option<&str>, file: &[u8], ) -> Result { // Width, Height = 0 if it's not a thumbnail let key = self.db.create_file_metadata( mxc, user, &Dim::default(), content_disposition, content_type, )?; //TODO: Dangling metadata in database if creation fails let mut f = self.create_media_file(&key).await?; f.write_all(file).await?; Ok(()) } /// Deletes a file in the database and from the media directory via an MXC pub async fn delete(&self, mxc: &Mxc<'_>) -> Result { match self.db.search_mxc_metadata_prefix(mxc).await { | Ok(keys) => { for key in keys { trace!(?mxc, "MXC Key: {key:?}"); debug_info!(?mxc, "Deleting from filesystem"); if let Err(e) = self.remove_media_file(&key).await { debug_error!(?mxc, "Failed to remove media file: {e}"); } debug_info!(?mxc, "Deleting from database"); self.db.delete_file_mxc(mxc).await; } Ok(()) }, | _ => { Err!(Database(error!( "Failed to find any media keys for MXC {mxc} in our database." ))) }, } } /// Deletes all media by the specified user /// /// currently, this is only practical for local users pub async fn delete_from_user(&self, user: &UserId) -> Result { let mxcs = self.db.get_all_user_mxcs(user).await; let mut deletion_count: usize = 0; for mxc in mxcs { let Ok(mxc) = mxc.as_str().try_into().inspect_err(|e| { debug_error!(?mxc, "Failed to parse MXC URI from database: {e}"); }) else { continue; }; debug_info!(%deletion_count, "Deleting MXC {mxc} by user {user} from database and filesystem"); match self.delete(&mxc).await { | Ok(()) => { deletion_count = deletion_count.saturating_add(1); }, | Err(e) => { debug_error!(%deletion_count, "Failed to delete {mxc} from user {user}, ignoring error: {e}"); }, } } Ok(deletion_count) } /// Downloads a file. pub async fn get(&self, mxc: &Mxc<'_>) -> Result> { match self .db .search_file_metadata(mxc, &Dim::default()) .await { | Ok(Metadata { content_disposition, content_type, key }) => { let mut content = Vec::with_capacity(8192); let path = self.get_media_file(&key); BufReader::new(fs::File::open(path).await?) .read_to_end(&mut content) .await?; Ok(Some(FileMeta { content: Some(content), content_type, content_disposition, })) }, | _ => Ok(None), } } /// Download a file and wait up to a timeout_ms if it is pending. pub async fn get_with_timeout( &self, mxc: &Mxc<'_>, timeout_duration: Duration, ) -> Result> { if let Some(meta) = self.get(mxc).await? { return Ok(Some(meta)); } let pending = self.db.search_pending_mxc(mxc).await; if pending.is_none() { return Ok(None); } let notifier = { let mut map = self.mxc_state.notifiers.lock().unwrap(); map.entry(mxc.to_string()) .or_insert_with(|| Arc::new(Notify::new())) .clone() }; if tokio::time::timeout(timeout_duration, notifier.notified()) .await .is_err() { return Err!(Request(NotYetUploaded("Media has not been uploaded yet"))); } self.get(mxc).await } /// Download a thumbnail and wait up to a timeout_ms if it is pending. pub async fn get_thumbnail_with_timeout( &self, mxc: &Mxc<'_>, dim: &Dim, timeout_duration: Duration, ) -> Result> { if let Some(meta) = self.get_thumbnail(mxc, dim).await? { return Ok(Some(meta)); } let pending = self.db.search_pending_mxc(mxc).await; if pending.is_none() { return Ok(None); } let notifier = { let mut map = self.mxc_state.notifiers.lock().unwrap(); map.entry(mxc.to_string()) .or_insert_with(|| Arc::new(Notify::new())) .clone() }; if tokio::time::timeout(timeout_duration, notifier.notified()) .await .is_err() { return Err!(Request(NotYetUploaded("Media has not been uploaded yet"))); } self.get_thumbnail(mxc, dim).await } /// Gets all the MXC URIs in our media database pub async fn get_all_mxcs(&self) -> Result> { let all_keys = self.db.get_all_media_keys().await; let mut mxcs = Vec::with_capacity(all_keys.len()); for key in all_keys { trace!("Full MXC key from database: {key:?}"); let mut parts = key.split(|&b| b == 0xFF); let mxc = parts .next() .map(|bytes| { utils::string_from_bytes(bytes).map_err(|e| { err!(Database(error!( "Failed to parse MXC unicode bytes from our database: {e}" ))) }) }) .transpose()?; let Some(mxc_s) = mxc else { debug_warn!( ?mxc, "Parsed MXC URL unicode bytes from database but is still invalid" ); continue; }; trace!("Parsed MXC key to URL: {mxc_s}"); let mxc = OwnedMxcUri::from(mxc_s); if mxc.is_valid() { mxcs.push(mxc); } else { debug_warn!("{mxc:?} from database was found to not be valid"); } } Ok(mxcs) } /// Deletes all remote only media files in the given at or after /// time/duration. Returns a usize with the amount of media files deleted. pub async fn delete_all_remote_media_at_after_time( &self, time: SystemTime, before: bool, after: bool, yes_i_want_to_delete_local_media: bool, ) -> Result { let all_keys = self.db.get_all_media_keys().await; let mut remote_mxcs = Vec::with_capacity(all_keys.len()); for key in all_keys { trace!("Full MXC key from database: {key:?}"); let mut parts = key.split(|&b| b == 0xFF); let mxc = parts .next() .map(|bytes| { utils::string_from_bytes(bytes).map_err(|e| { err!(Database(error!( "Failed to parse MXC unicode bytes from our database: {e}" ))) }) }) .transpose()?; let Some(mxc_s) = mxc else { debug_warn!( ?mxc, "Parsed MXC URL unicode bytes from database but is still invalid" ); continue; }; trace!("Parsed MXC key to URL: {mxc_s}"); let mxc = OwnedMxcUri::from(mxc_s); if (mxc.server_name() == Ok(self.services.globals.server_name()) && !yes_i_want_to_delete_local_media) || !mxc.is_valid() { debug!("Ignoring local or broken media MXC: {mxc}"); continue; } let path = self.get_media_file(&key); let file_metadata = match fs::metadata(path.clone()).await { | Ok(file_metadata) => file_metadata, | Err(e) => { error!( "Failed to obtain file metadata for MXC {mxc} at file path \ \"{path:?}\", skipping: {e}" ); continue; }, }; trace!(%mxc, ?path, "File metadata: {file_metadata:?}"); let file_created_at = match file_metadata.modified() { | Ok(value) => value, | Err(err) => { error!("Could not delete MXC {mxc} at path {path:?}: {err:?}. Skipping..."); continue; }, }; debug!("File created at: {file_created_at:?}"); if file_created_at >= time && before { debug!( "File is within (before) user duration, pushing to list of file paths and \ keys to delete." ); remote_mxcs.push(mxc.to_string()); } else if file_created_at <= time && after { debug!( "File is not within (after) user duration, pushing to list of file paths \ and keys to delete." ); remote_mxcs.push(mxc.to_string()); } } if remote_mxcs.is_empty() { return Err!(Database("Did not found any eligible MXCs to delete.")); } debug_info!("Deleting media now in the past {time:?}"); let mut deletion_count: usize = 0; for mxc in remote_mxcs { let Ok(mxc) = mxc.as_str().try_into() else { debug_warn!("Invalid MXC in database, skipping"); continue; }; debug_info!("Deleting MXC {mxc} from database and filesystem"); match self.delete(&mxc).await { | Ok(()) => { deletion_count = deletion_count.saturating_add(1); }, | Err(e) => { warn!("Failed to delete {mxc}, ignoring error and skipping: {e}"); continue; }, } } Ok(deletion_count) } pub async fn create_media_dir(&self) -> Result { let dir = self.get_media_dir(); Ok(fs::create_dir_all(dir).await?) } async fn remove_media_file(&self, key: &[u8]) -> Result { let path = self.get_media_file(key); let legacy = self.get_media_file_b64(key); debug!(?key, ?path, ?legacy, "Removing media file"); let file_rm = fs::remove_file(&path); let legacy_rm = fs::remove_file(&legacy); let (file_rm, legacy_rm) = tokio::join!(file_rm, legacy_rm); if let Err(e) = legacy_rm && self.services.server.config.media_compat_file_link { debug_error!(?key, ?legacy, "Failed to remove legacy media symlink: {e}"); } Ok(file_rm?) } async fn create_media_file(&self, key: &[u8]) -> Result { let path = self.get_media_file(key); debug!(?key, ?path, "Creating media file"); let file = fs::File::create(&path).await?; if self.services.server.config.media_compat_file_link { let legacy = self.get_media_file_b64(key); if let Err(e) = fs::symlink(&path, &legacy).await { debug_error!( key = ?encode_key(key), ?path, ?legacy, "Failed to create legacy media symlink: {e}" ); } } Ok(file) } #[inline] pub async fn get_metadata(&self, mxc: &Mxc<'_>) -> Option { self.db .search_file_metadata(mxc, &Dim::default()) .await .map(|metadata| FileMeta { content_disposition: metadata.content_disposition, content_type: metadata.content_type, content: None, }) .ok() } #[inline] #[must_use] pub fn get_media_file(&self, key: &[u8]) -> PathBuf { self.get_media_file_sha256(key) } /// new SHA256 file name media function. requires database migrated. uses /// SHA256 hash of the base64 key as the file name #[must_use] pub fn get_media_file_sha256(&self, key: &[u8]) -> PathBuf { let mut r = self.get_media_dir(); // Using the hash of the base64 key as the filename // This is to prevent the total length of the path from exceeding the maximum // length in most filesystems let digest = ::digest(key); let encoded = encode_key(&digest); r.push(encoded); r } /// old base64 file name media function /// This is the old version of `get_media_file` that uses the full base64 /// key as the filename. #[must_use] pub fn get_media_file_b64(&self, key: &[u8]) -> PathBuf { let mut r = self.get_media_dir(); let encoded = encode_key(key); r.push(encoded); r } #[must_use] pub fn get_media_dir(&self) -> PathBuf { let mut r = PathBuf::new(); r.push(self.services.server.config.database_path.clone()); r.push("media"); r } } #[inline] #[must_use] pub fn encode_key(key: &[u8]) -> String { general_purpose::URL_SAFE_NO_PAD.encode(key) }