From 778fbfdcb5e3a734c742acb2fd9770d97004e135 Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Sun, 27 Apr 2025 00:58:56 +0000 Subject: [PATCH] Cleanup/improve other async queries in some client handlers. Signed-off-by: Jason Volk --- src/api/client/membership.rs | 15 ++++--- src/api/client/profile.rs | 66 +++++++++++++---------------- src/api/client/room/initial_sync.rs | 48 +++++++++++---------- 3 files changed, 64 insertions(+), 65 deletions(-) diff --git a/src/api/client/membership.rs b/src/api/client/membership.rs index c588980a..51fd7274 100644 --- a/src/api/client/membership.rs +++ b/src/api/client/membership.rs @@ -8,7 +8,7 @@ use std::{ use axum::extract::State; use axum_client_ip::InsecureClientIp; -use futures::{FutureExt, StreamExt, TryFutureExt, join, pin_mut}; +use futures::{FutureExt, StreamExt, TryFutureExt, future::join, join, pin_mut}; use ruma::{ CanonicalJsonObject, CanonicalJsonValue, OwnedEventId, OwnedRoomId, OwnedServerName, OwnedUserId, RoomId, RoomVersionId, ServerName, UserId, @@ -47,7 +47,7 @@ use tuwunel_core::{ trace, utils::{ self, FutureBoolExt, - future::ReadyEqExt, + future::{ReadyEqExt, TryExtExt}, shuffle, stream::{BroadbandExt, IterStream, ReadyExt}, }, @@ -940,12 +940,13 @@ pub(crate) async fn joined_members_route( .room_members(&body.room_id) .map(ToOwned::to_owned) .broad_then(|user_id| async move { - let member = RoomMember { - display_name: services.users.displayname(&user_id).await.ok(), - avatar_url: services.users.avatar_url(&user_id).await.ok(), - }; + let (display_name, avatar_url) = join( + services.users.displayname(&user_id).ok(), + services.users.avatar_url(&user_id).ok(), + ) + .await; - (user_id, member) + (user_id, RoomMember { display_name, avatar_url }) }) .collect() .await, diff --git a/src/api/client/profile.rs b/src/api/client/profile.rs index 55b716b0..5cdffdde 100644 --- a/src/api/client/profile.rs +++ b/src/api/client/profile.rs @@ -1,7 +1,10 @@ use std::collections::BTreeMap; use axum::extract::State; -use futures::{StreamExt, TryStreamExt, future::join3}; +use futures::{ + StreamExt, TryStreamExt, + future::{join, join3, join4}, +}; use ruma::{ OwnedMxcUri, OwnedRoomId, UserId, api::{ @@ -16,7 +19,7 @@ use ruma::{ use tuwunel_core::{ Err, Result, matrix::pdu::PduBuilder, - utils::{IterStream, stream::TryIgnore}, + utils::{IterStream, future::TryExtExt, stream::TryIgnore}, warn, }; use tuwunel_service::Services; @@ -218,14 +221,13 @@ pub(crate) async fn get_avatar_url_route( return Err!(Request(NotFound("Profile was not found."))); } - Ok(get_avatar_url::v3::Response { - avatar_url: services - .users - .avatar_url(&body.user_id) - .await - .ok(), - blurhash: services.users.blurhash(&body.user_id).await.ok(), - }) + let (avatar_url, blurhash) = join( + services.users.avatar_url(&body.user_id).ok(), + services.users.blurhash(&body.user_id).ok(), + ) + .await; + + Ok(get_avatar_url::v3::Response { avatar_url, blurhash }) } /// # `GET /_matrix/client/v3/profile/{userId}` @@ -308,19 +310,19 @@ pub(crate) async fn get_profile_route( custom_profile_fields.remove("us.cloke.msc4175.tz"); custom_profile_fields.remove("m.tz"); + let (avatar_url, blurhash, displayname, tz) = join4( + services.users.avatar_url(&body.user_id).ok(), + services.users.blurhash(&body.user_id).ok(), + services.users.displayname(&body.user_id).ok(), + services.users.timezone(&body.user_id).ok(), + ) + .await; + Ok(get_profile::v3::Response { - avatar_url: services - .users - .avatar_url(&body.user_id) - .await - .ok(), - blurhash: services.users.blurhash(&body.user_id).await.ok(), - displayname: services - .users - .displayname(&body.user_id) - .await - .ok(), - tz: services.users.timezone(&body.user_id).await.ok(), + avatar_url, + blurhash, + displayname, + tz, custom_profile_fields, }) } @@ -332,16 +334,12 @@ pub async fn update_displayname( all_joined_rooms: &[OwnedRoomId], ) { let (current_avatar_url, current_blurhash, current_displayname) = join3( - services.users.avatar_url(user_id), - services.users.blurhash(user_id), - services.users.displayname(user_id), + services.users.avatar_url(user_id).ok(), + services.users.blurhash(user_id).ok(), + services.users.displayname(user_id).ok(), ) .await; - let current_avatar_url = current_avatar_url.ok(); - let current_blurhash = current_blurhash.ok(); - let current_displayname = current_displayname.ok(); - if displayname == current_displayname { return; } @@ -386,16 +384,12 @@ pub async fn update_avatar_url( all_joined_rooms: &[OwnedRoomId], ) { let (current_avatar_url, current_blurhash, current_displayname) = join3( - services.users.avatar_url(user_id), - services.users.blurhash(user_id), - services.users.displayname(user_id), + services.users.avatar_url(user_id).ok(), + services.users.blurhash(user_id).ok(), + services.users.displayname(user_id).ok(), ) .await; - let current_avatar_url = current_avatar_url.ok(); - let current_blurhash = current_blurhash.ok(); - let current_displayname = current_displayname.ok(); - if current_avatar_url == avatar_url && current_blurhash == blurhash { return; } diff --git a/src/api/client/room/initial_sync.rs b/src/api/client/room/initial_sync.rs index c9197888..a2c1c520 100644 --- a/src/api/client/room/initial_sync.rs +++ b/src/api/client/room/initial_sync.rs @@ -1,5 +1,5 @@ use axum::extract::State; -use futures::TryStreamExt; +use futures::{FutureExt, TryStreamExt, future::try_join4}; use ruma::api::client::room::initial_sync::v3::{PaginationChunk, Request, Response}; use tuwunel_core::{ Err, Event, Result, at, @@ -25,22 +25,35 @@ pub(crate) async fn room_initial_sync_route( return Err!(Request(Forbidden("No room preview available."))); } - let limit = LIMIT_MAX; - let events: Vec<_> = services + let membership = services .rooms - .timeline - .pdus_rev(None, room_id, None) - .try_take(limit) - .try_collect() - .await?; + .state_cache + .user_membership(body.sender_user(), room_id) + .map(Ok); - let state: Vec<_> = services + let visibility = services + .rooms + .directory + .visibility(room_id) + .map(Ok); + + let state = services .rooms .state_accessor .room_state_full_pdus(room_id) .map_ok(Event::into_format) - .try_collect() - .await?; + .try_collect::>(); + + let limit = LIMIT_MAX; + let events = services + .rooms + .timeline + .pdus_rev(None, room_id, None) + .try_take(limit) + .try_collect::>(); + + let (membership, visibility, state, events) = + try_join4(membership, visibility, state, events).await?; let messages = PaginationChunk { start: events @@ -68,16 +81,7 @@ pub(crate) async fn room_initial_sync_route( account_data: None, state: state.into(), messages: messages.chunk.is_empty().or_some(messages), - visibility: services - .rooms - .directory - .visibility(room_id) - .await - .into(), - membership: services - .rooms - .state_cache - .user_membership(body.sender_user(), room_id) - .await, + visibility: visibility.into(), + membership, }) }