Implement GET /_matrix/client/v3/events live room previews.
Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
116
src/api/client/events.rs
Normal file
116
src/api/client/events.rs
Normal file
@@ -0,0 +1,116 @@
|
|||||||
|
use std::iter::once;
|
||||||
|
|
||||||
|
use axum::extract::State;
|
||||||
|
use futures::StreamExt;
|
||||||
|
use ruma::api::client::peeking::listen_to_new_events::v3::{Request, Response};
|
||||||
|
use tokio::time::{Duration, Instant, timeout_at};
|
||||||
|
use tuwunel_core::{
|
||||||
|
Err, Event, Result, at,
|
||||||
|
matrix::PduCount,
|
||||||
|
utils::{
|
||||||
|
BoolExt,
|
||||||
|
result::FlatOk,
|
||||||
|
stream::{IterStream, ReadyExt},
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
use crate::Ruma;
|
||||||
|
|
||||||
|
const EVENT_LIMIT: usize = 50;
|
||||||
|
|
||||||
|
/// GET `/_matrix/client/v3/events`
|
||||||
|
pub(crate) async fn events_route(
|
||||||
|
State(services): State<crate::State>,
|
||||||
|
body: Ruma<Request>,
|
||||||
|
) -> Result<Response> {
|
||||||
|
let (sender_user, sender_device) = body.sender();
|
||||||
|
|
||||||
|
let from = body
|
||||||
|
.body
|
||||||
|
.from
|
||||||
|
.as_deref()
|
||||||
|
.map(str::parse)
|
||||||
|
.flat_ok()
|
||||||
|
.unwrap_or_default();
|
||||||
|
|
||||||
|
let timeout = body
|
||||||
|
.body
|
||||||
|
.timeout
|
||||||
|
.as_ref()
|
||||||
|
.map(Duration::as_millis)
|
||||||
|
.map(TryInto::try_into)
|
||||||
|
.flat_ok()
|
||||||
|
.unwrap_or(services.config.client_sync_timeout_default)
|
||||||
|
.max(services.config.client_sync_timeout_min)
|
||||||
|
.min(services.config.client_sync_timeout_max);
|
||||||
|
|
||||||
|
let Some(room_id) = body.room_id.as_deref() else {
|
||||||
|
//TODO: upgrade ruma
|
||||||
|
return Err!(Request(InvalidParam("Missing RoomId parameter.")));
|
||||||
|
};
|
||||||
|
|
||||||
|
if !services
|
||||||
|
.state_accessor
|
||||||
|
.user_can_see_state_events(sender_user, room_id)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
return Err!(Request(Forbidden("No room preview available.")));
|
||||||
|
}
|
||||||
|
|
||||||
|
let stop_at = Instant::now()
|
||||||
|
.checked_add(Duration::from_millis(timeout))
|
||||||
|
.expect("configuration must limit maximum timeout");
|
||||||
|
|
||||||
|
loop {
|
||||||
|
let watchers = services
|
||||||
|
.sync
|
||||||
|
.watch(sender_user, sender_device, once(room_id).stream());
|
||||||
|
|
||||||
|
let next_batch = services.globals.wait_pending().await?;
|
||||||
|
|
||||||
|
let events = services
|
||||||
|
.timeline
|
||||||
|
.pdus(Some(sender_user), room_id, Some(PduCount::Normal(from)))
|
||||||
|
.ready_filter_map(Result::ok)
|
||||||
|
.ready_take_while(|(count, _)| PduCount::Normal(next_batch).ge(count))
|
||||||
|
.take(EVENT_LIMIT)
|
||||||
|
.collect::<Vec<_>>()
|
||||||
|
.await;
|
||||||
|
|
||||||
|
if !events.is_empty() {
|
||||||
|
return Ok(Response {
|
||||||
|
start: events
|
||||||
|
.first()
|
||||||
|
.map(at!(0))
|
||||||
|
.as_ref()
|
||||||
|
.map(ToString::to_string),
|
||||||
|
|
||||||
|
end: events
|
||||||
|
.last()
|
||||||
|
.map(at!(0))
|
||||||
|
.as_ref()
|
||||||
|
.map(ToString::to_string),
|
||||||
|
|
||||||
|
chunk: events
|
||||||
|
.into_iter()
|
||||||
|
.map(at!(1))
|
||||||
|
.map(Event::into_format)
|
||||||
|
.collect(),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
if timeout_at(stop_at, watchers).await.is_err() || services.server.is_stopping() {
|
||||||
|
return Ok(Response {
|
||||||
|
chunk: Default::default(),
|
||||||
|
start: body.body.from,
|
||||||
|
end: services
|
||||||
|
.server
|
||||||
|
.is_stopping()
|
||||||
|
.is_false()
|
||||||
|
.then_some(next_batch)
|
||||||
|
.as_ref()
|
||||||
|
.map(ToString::to_string),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -8,6 +8,7 @@ pub(super) mod context;
|
|||||||
pub(super) mod dehydrated_device;
|
pub(super) mod dehydrated_device;
|
||||||
pub(super) mod device;
|
pub(super) mod device;
|
||||||
pub(super) mod directory;
|
pub(super) mod directory;
|
||||||
|
pub(super) mod events;
|
||||||
pub(super) mod filter;
|
pub(super) mod filter;
|
||||||
pub(super) mod keys;
|
pub(super) mod keys;
|
||||||
pub(super) mod media;
|
pub(super) mod media;
|
||||||
@@ -53,6 +54,7 @@ pub(super) use context::*;
|
|||||||
pub(super) use dehydrated_device::*;
|
pub(super) use dehydrated_device::*;
|
||||||
pub(super) use device::*;
|
pub(super) use device::*;
|
||||||
pub(super) use directory::*;
|
pub(super) use directory::*;
|
||||||
|
pub(super) use events::*;
|
||||||
pub(super) use filter::*;
|
pub(super) use filter::*;
|
||||||
pub(super) use keys::*;
|
pub(super) use keys::*;
|
||||||
pub(super) use media::*;
|
pub(super) use media::*;
|
||||||
|
|||||||
@@ -146,6 +146,7 @@ pub fn build(router: Router<State>, server: &Server) -> Router<State> {
|
|||||||
get(client::get_state_events_for_empty_key_route)
|
get(client::get_state_events_for_empty_key_route)
|
||||||
.put(client::send_state_event_for_empty_key_route),
|
.put(client::send_state_event_for_empty_key_route),
|
||||||
)
|
)
|
||||||
|
.ruma_route(&client::events_route)
|
||||||
.ruma_route(&client::sync_events_route)
|
.ruma_route(&client::sync_events_route)
|
||||||
.ruma_route(&client::sync_events_v5_route)
|
.ruma_route(&client::sync_events_v5_route)
|
||||||
.ruma_route(&client::get_context_route)
|
.ruma_route(&client::get_context_route)
|
||||||
|
|||||||
Reference in New Issue
Block a user