Cleanup/improve other async queries in some client handlers.

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk
2025-04-27 00:58:56 +00:00
parent 57c519bbb8
commit 778fbfdcb5
3 changed files with 64 additions and 65 deletions

View File

@@ -8,7 +8,7 @@ use std::{
use axum::extract::State; use axum::extract::State;
use axum_client_ip::InsecureClientIp; use axum_client_ip::InsecureClientIp;
use futures::{FutureExt, StreamExt, TryFutureExt, join, pin_mut}; use futures::{FutureExt, StreamExt, TryFutureExt, future::join, join, pin_mut};
use ruma::{ use ruma::{
CanonicalJsonObject, CanonicalJsonValue, OwnedEventId, OwnedRoomId, OwnedServerName, CanonicalJsonObject, CanonicalJsonValue, OwnedEventId, OwnedRoomId, OwnedServerName,
OwnedUserId, RoomId, RoomVersionId, ServerName, UserId, OwnedUserId, RoomId, RoomVersionId, ServerName, UserId,
@@ -47,7 +47,7 @@ use tuwunel_core::{
trace, trace,
utils::{ utils::{
self, FutureBoolExt, self, FutureBoolExt,
future::ReadyEqExt, future::{ReadyEqExt, TryExtExt},
shuffle, shuffle,
stream::{BroadbandExt, IterStream, ReadyExt}, stream::{BroadbandExt, IterStream, ReadyExt},
}, },
@@ -940,12 +940,13 @@ pub(crate) async fn joined_members_route(
.room_members(&body.room_id) .room_members(&body.room_id)
.map(ToOwned::to_owned) .map(ToOwned::to_owned)
.broad_then(|user_id| async move { .broad_then(|user_id| async move {
let member = RoomMember { let (display_name, avatar_url) = join(
display_name: services.users.displayname(&user_id).await.ok(), services.users.displayname(&user_id).ok(),
avatar_url: services.users.avatar_url(&user_id).await.ok(), services.users.avatar_url(&user_id).ok(),
}; )
.await;
(user_id, member) (user_id, RoomMember { display_name, avatar_url })
}) })
.collect() .collect()
.await, .await,

View File

@@ -1,7 +1,10 @@
use std::collections::BTreeMap; use std::collections::BTreeMap;
use axum::extract::State; use axum::extract::State;
use futures::{StreamExt, TryStreamExt, future::join3}; use futures::{
StreamExt, TryStreamExt,
future::{join, join3, join4},
};
use ruma::{ use ruma::{
OwnedMxcUri, OwnedRoomId, UserId, OwnedMxcUri, OwnedRoomId, UserId,
api::{ api::{
@@ -16,7 +19,7 @@ use ruma::{
use tuwunel_core::{ use tuwunel_core::{
Err, Result, Err, Result,
matrix::pdu::PduBuilder, matrix::pdu::PduBuilder,
utils::{IterStream, stream::TryIgnore}, utils::{IterStream, future::TryExtExt, stream::TryIgnore},
warn, warn,
}; };
use tuwunel_service::Services; use tuwunel_service::Services;
@@ -218,14 +221,13 @@ pub(crate) async fn get_avatar_url_route(
return Err!(Request(NotFound("Profile was not found."))); return Err!(Request(NotFound("Profile was not found.")));
} }
Ok(get_avatar_url::v3::Response { let (avatar_url, blurhash) = join(
avatar_url: services services.users.avatar_url(&body.user_id).ok(),
.users services.users.blurhash(&body.user_id).ok(),
.avatar_url(&body.user_id) )
.await .await;
.ok(),
blurhash: services.users.blurhash(&body.user_id).await.ok(), Ok(get_avatar_url::v3::Response { avatar_url, blurhash })
})
} }
/// # `GET /_matrix/client/v3/profile/{userId}` /// # `GET /_matrix/client/v3/profile/{userId}`
@@ -308,19 +310,19 @@ pub(crate) async fn get_profile_route(
custom_profile_fields.remove("us.cloke.msc4175.tz"); custom_profile_fields.remove("us.cloke.msc4175.tz");
custom_profile_fields.remove("m.tz"); custom_profile_fields.remove("m.tz");
let (avatar_url, blurhash, displayname, tz) = join4(
services.users.avatar_url(&body.user_id).ok(),
services.users.blurhash(&body.user_id).ok(),
services.users.displayname(&body.user_id).ok(),
services.users.timezone(&body.user_id).ok(),
)
.await;
Ok(get_profile::v3::Response { Ok(get_profile::v3::Response {
avatar_url: services avatar_url,
.users blurhash,
.avatar_url(&body.user_id) displayname,
.await tz,
.ok(),
blurhash: services.users.blurhash(&body.user_id).await.ok(),
displayname: services
.users
.displayname(&body.user_id)
.await
.ok(),
tz: services.users.timezone(&body.user_id).await.ok(),
custom_profile_fields, custom_profile_fields,
}) })
} }
@@ -332,16 +334,12 @@ pub async fn update_displayname(
all_joined_rooms: &[OwnedRoomId], all_joined_rooms: &[OwnedRoomId],
) { ) {
let (current_avatar_url, current_blurhash, current_displayname) = join3( let (current_avatar_url, current_blurhash, current_displayname) = join3(
services.users.avatar_url(user_id), services.users.avatar_url(user_id).ok(),
services.users.blurhash(user_id), services.users.blurhash(user_id).ok(),
services.users.displayname(user_id), services.users.displayname(user_id).ok(),
) )
.await; .await;
let current_avatar_url = current_avatar_url.ok();
let current_blurhash = current_blurhash.ok();
let current_displayname = current_displayname.ok();
if displayname == current_displayname { if displayname == current_displayname {
return; return;
} }
@@ -386,16 +384,12 @@ pub async fn update_avatar_url(
all_joined_rooms: &[OwnedRoomId], all_joined_rooms: &[OwnedRoomId],
) { ) {
let (current_avatar_url, current_blurhash, current_displayname) = join3( let (current_avatar_url, current_blurhash, current_displayname) = join3(
services.users.avatar_url(user_id), services.users.avatar_url(user_id).ok(),
services.users.blurhash(user_id), services.users.blurhash(user_id).ok(),
services.users.displayname(user_id), services.users.displayname(user_id).ok(),
) )
.await; .await;
let current_avatar_url = current_avatar_url.ok();
let current_blurhash = current_blurhash.ok();
let current_displayname = current_displayname.ok();
if current_avatar_url == avatar_url && current_blurhash == blurhash { if current_avatar_url == avatar_url && current_blurhash == blurhash {
return; return;
} }

View File

@@ -1,5 +1,5 @@
use axum::extract::State; use axum::extract::State;
use futures::TryStreamExt; use futures::{FutureExt, TryStreamExt, future::try_join4};
use ruma::api::client::room::initial_sync::v3::{PaginationChunk, Request, Response}; use ruma::api::client::room::initial_sync::v3::{PaginationChunk, Request, Response};
use tuwunel_core::{ use tuwunel_core::{
Err, Event, Result, at, Err, Event, Result, at,
@@ -25,22 +25,35 @@ pub(crate) async fn room_initial_sync_route(
return Err!(Request(Forbidden("No room preview available."))); return Err!(Request(Forbidden("No room preview available.")));
} }
let limit = LIMIT_MAX; let membership = services
let events: Vec<_> = services
.rooms .rooms
.timeline .state_cache
.pdus_rev(None, room_id, None) .user_membership(body.sender_user(), room_id)
.try_take(limit) .map(Ok);
.try_collect()
.await?;
let state: Vec<_> = services let visibility = services
.rooms
.directory
.visibility(room_id)
.map(Ok);
let state = services
.rooms .rooms
.state_accessor .state_accessor
.room_state_full_pdus(room_id) .room_state_full_pdus(room_id)
.map_ok(Event::into_format) .map_ok(Event::into_format)
.try_collect() .try_collect::<Vec<_>>();
.await?;
let limit = LIMIT_MAX;
let events = services
.rooms
.timeline
.pdus_rev(None, room_id, None)
.try_take(limit)
.try_collect::<Vec<_>>();
let (membership, visibility, state, events) =
try_join4(membership, visibility, state, events).await?;
let messages = PaginationChunk { let messages = PaginationChunk {
start: events start: events
@@ -68,16 +81,7 @@ pub(crate) async fn room_initial_sync_route(
account_data: None, account_data: None,
state: state.into(), state: state.into(),
messages: messages.chunk.is_empty().or_some(messages), messages: messages.chunk.is_empty().or_some(messages),
visibility: services visibility: visibility.into(),
.rooms membership,
.directory
.visibility(room_id)
.await
.into(),
membership: services
.rooms
.state_cache
.user_membership(body.sender_user(), room_id)
.await,
}) })
} }