diff --git a/src/api/client/events.rs b/src/api/client/events.rs new file mode 100644 index 00000000..c32aa7d5 --- /dev/null +++ b/src/api/client/events.rs @@ -0,0 +1,116 @@ +use std::iter::once; + +use axum::extract::State; +use futures::StreamExt; +use ruma::api::client::peeking::listen_to_new_events::v3::{Request, Response}; +use tokio::time::{Duration, Instant, timeout_at}; +use tuwunel_core::{ + Err, Event, Result, at, + matrix::PduCount, + utils::{ + BoolExt, + result::FlatOk, + stream::{IterStream, ReadyExt}, + }, +}; + +use crate::Ruma; + +const EVENT_LIMIT: usize = 50; + +/// GET `/_matrix/client/v3/events` +pub(crate) async fn events_route( + State(services): State, + body: Ruma, +) -> Result { + let (sender_user, sender_device) = body.sender(); + + let from = body + .body + .from + .as_deref() + .map(str::parse) + .flat_ok() + .unwrap_or_default(); + + let timeout = body + .body + .timeout + .as_ref() + .map(Duration::as_millis) + .map(TryInto::try_into) + .flat_ok() + .unwrap_or(services.config.client_sync_timeout_default) + .max(services.config.client_sync_timeout_min) + .min(services.config.client_sync_timeout_max); + + let Some(room_id) = body.room_id.as_deref() else { + //TODO: upgrade ruma + return Err!(Request(InvalidParam("Missing RoomId parameter."))); + }; + + if !services + .state_accessor + .user_can_see_state_events(sender_user, room_id) + .await + { + return Err!(Request(Forbidden("No room preview available."))); + } + + let stop_at = Instant::now() + .checked_add(Duration::from_millis(timeout)) + .expect("configuration must limit maximum timeout"); + + loop { + let watchers = services + .sync + .watch(sender_user, sender_device, once(room_id).stream()); + + let next_batch = services.globals.wait_pending().await?; + + let events = services + .timeline + .pdus(Some(sender_user), room_id, Some(PduCount::Normal(from))) + .ready_filter_map(Result::ok) + .ready_take_while(|(count, _)| PduCount::Normal(next_batch).ge(count)) + .take(EVENT_LIMIT) + .collect::>() + .await; + + if !events.is_empty() { + return Ok(Response { + start: events + .first() + .map(at!(0)) + .as_ref() + .map(ToString::to_string), + + end: events + .last() + .map(at!(0)) + .as_ref() + .map(ToString::to_string), + + chunk: events + .into_iter() + .map(at!(1)) + .map(Event::into_format) + .collect(), + }); + } + + if timeout_at(stop_at, watchers).await.is_err() || services.server.is_stopping() { + return Ok(Response { + chunk: Default::default(), + start: body.body.from, + end: services + .server + .is_stopping() + .is_false() + .then_some(next_batch) + .as_ref() + .map(ToString::to_string), + }); + } + } +} diff --git a/src/api/client/mod.rs b/src/api/client/mod.rs index 55b27b05..55233bdd 100644 --- a/src/api/client/mod.rs +++ b/src/api/client/mod.rs @@ -8,6 +8,7 @@ pub(super) mod context; pub(super) mod dehydrated_device; pub(super) mod device; pub(super) mod directory; +pub(super) mod events; pub(super) mod filter; pub(super) mod keys; pub(super) mod media; @@ -53,6 +54,7 @@ pub(super) use context::*; pub(super) use dehydrated_device::*; pub(super) use device::*; pub(super) use directory::*; +pub(super) use events::*; pub(super) use filter::*; pub(super) use keys::*; pub(super) use media::*; diff --git a/src/api/router.rs b/src/api/router.rs index 76b09dc6..05eb0530 100644 --- a/src/api/router.rs +++ b/src/api/router.rs @@ -146,6 +146,7 @@ pub fn build(router: Router, server: &Server) -> Router { get(client::get_state_events_for_empty_key_route) .put(client::send_state_event_for_empty_key_route), ) + .ruma_route(&client::events_route) .ruma_route(&client::sync_events_route) .ruma_route(&client::sync_events_v5_route) .ruma_route(&client::get_context_route)