Implement GET /_matrix/client/v3/events live room previews.

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk
2025-11-02 08:03:13 +00:00
parent 6b8d6956a3
commit 4afd6f347b
3 changed files with 119 additions and 0 deletions

116
src/api/client/events.rs Normal file
View File

@@ -0,0 +1,116 @@
use std::iter::once;
use axum::extract::State;
use futures::StreamExt;
use ruma::api::client::peeking::listen_to_new_events::v3::{Request, Response};
use tokio::time::{Duration, Instant, timeout_at};
use tuwunel_core::{
Err, Event, Result, at,
matrix::PduCount,
utils::{
BoolExt,
result::FlatOk,
stream::{IterStream, ReadyExt},
},
};
use crate::Ruma;
const EVENT_LIMIT: usize = 50;
/// GET `/_matrix/client/v3/events`
pub(crate) async fn events_route(
State(services): State<crate::State>,
body: Ruma<Request>,
) -> Result<Response> {
let (sender_user, sender_device) = body.sender();
let from = body
.body
.from
.as_deref()
.map(str::parse)
.flat_ok()
.unwrap_or_default();
let timeout = body
.body
.timeout
.as_ref()
.map(Duration::as_millis)
.map(TryInto::try_into)
.flat_ok()
.unwrap_or(services.config.client_sync_timeout_default)
.max(services.config.client_sync_timeout_min)
.min(services.config.client_sync_timeout_max);
let Some(room_id) = body.room_id.as_deref() else {
//TODO: upgrade ruma
return Err!(Request(InvalidParam("Missing RoomId parameter.")));
};
if !services
.state_accessor
.user_can_see_state_events(sender_user, room_id)
.await
{
return Err!(Request(Forbidden("No room preview available.")));
}
let stop_at = Instant::now()
.checked_add(Duration::from_millis(timeout))
.expect("configuration must limit maximum timeout");
loop {
let watchers = services
.sync
.watch(sender_user, sender_device, once(room_id).stream());
let next_batch = services.globals.wait_pending().await?;
let events = services
.timeline
.pdus(Some(sender_user), room_id, Some(PduCount::Normal(from)))
.ready_filter_map(Result::ok)
.ready_take_while(|(count, _)| PduCount::Normal(next_batch).ge(count))
.take(EVENT_LIMIT)
.collect::<Vec<_>>()
.await;
if !events.is_empty() {
return Ok(Response {
start: events
.first()
.map(at!(0))
.as_ref()
.map(ToString::to_string),
end: events
.last()
.map(at!(0))
.as_ref()
.map(ToString::to_string),
chunk: events
.into_iter()
.map(at!(1))
.map(Event::into_format)
.collect(),
});
}
if timeout_at(stop_at, watchers).await.is_err() || services.server.is_stopping() {
return Ok(Response {
chunk: Default::default(),
start: body.body.from,
end: services
.server
.is_stopping()
.is_false()
.then_some(next_batch)
.as_ref()
.map(ToString::to_string),
});
}
}
}

View File

@@ -8,6 +8,7 @@ pub(super) mod context;
pub(super) mod dehydrated_device;
pub(super) mod device;
pub(super) mod directory;
pub(super) mod events;
pub(super) mod filter;
pub(super) mod keys;
pub(super) mod media;
@@ -53,6 +54,7 @@ pub(super) use context::*;
pub(super) use dehydrated_device::*;
pub(super) use device::*;
pub(super) use directory::*;
pub(super) use events::*;
pub(super) use filter::*;
pub(super) use keys::*;
pub(super) use media::*;

View File

@@ -146,6 +146,7 @@ pub fn build(router: Router<State>, server: &Server) -> Router<State> {
get(client::get_state_events_for_empty_key_route)
.put(client::send_state_event_for_empty_key_route),
)
.ruma_route(&client::events_route)
.ruma_route(&client::sync_events_route)
.ruma_route(&client::sync_events_v5_route)
.ruma_route(&client::get_context_route)