Refactor to async closures.
Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
@@ -19,7 +19,7 @@ pub(super) async fn list_rooms(
|
|||||||
.rooms
|
.rooms
|
||||||
.metadata
|
.metadata
|
||||||
.iter_ids()
|
.iter_ids()
|
||||||
.filter_map(|room_id| async move {
|
.filter_map(async |room_id| {
|
||||||
(!exclude_disabled
|
(!exclude_disabled
|
||||||
|| !self
|
|| !self
|
||||||
.services
|
.services
|
||||||
@@ -29,7 +29,7 @@ pub(super) async fn list_rooms(
|
|||||||
.await)
|
.await)
|
||||||
.then_some(room_id)
|
.then_some(room_id)
|
||||||
})
|
})
|
||||||
.filter_map(|room_id| async move {
|
.filter_map(async |room_id| {
|
||||||
(!exclude_banned
|
(!exclude_banned
|
||||||
|| !self
|
|| !self
|
||||||
.services
|
.services
|
||||||
|
|||||||
@@ -47,7 +47,7 @@ async fn list_joined_members(&self, room_id: OwnedRoomId, local_only: bool) -> R
|
|||||||
.unwrap_or(true)
|
.unwrap_or(true)
|
||||||
})
|
})
|
||||||
.map(ToOwned::to_owned)
|
.map(ToOwned::to_owned)
|
||||||
.filter_map(|user_id| async move {
|
.filter_map(async |user_id| {
|
||||||
Some((
|
Some((
|
||||||
self.services
|
self.services
|
||||||
.users
|
.users
|
||||||
|
|||||||
@@ -179,7 +179,7 @@ async fn ban_room(&self, room: OwnedRoomOrAliasId) -> Result {
|
|||||||
.alias
|
.alias
|
||||||
.local_aliases_for_room(&room_id)
|
.local_aliases_for_room(&room_id)
|
||||||
.map(ToOwned::to_owned)
|
.map(ToOwned::to_owned)
|
||||||
.for_each(|local_alias| async move {
|
.for_each(async |local_alias| {
|
||||||
self.services
|
self.services
|
||||||
.rooms
|
.rooms
|
||||||
.alias
|
.alias
|
||||||
@@ -363,7 +363,7 @@ async fn ban_list_of_rooms(&self) -> Result {
|
|||||||
.alias
|
.alias
|
||||||
.local_aliases_for_room(&room_id)
|
.local_aliases_for_room(&room_id)
|
||||||
.map(ToOwned::to_owned)
|
.map(ToOwned::to_owned)
|
||||||
.for_each(|local_alias| async move {
|
.for_each(async |local_alias| {
|
||||||
self.services
|
self.services
|
||||||
.rooms
|
.rooms
|
||||||
.alias
|
.alias
|
||||||
|
|||||||
@@ -526,7 +526,7 @@ where
|
|||||||
|
|
||||||
let mut futures: FuturesUnordered<_> = get_over_federation
|
let mut futures: FuturesUnordered<_> = get_over_federation
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|(server, vec)| async move {
|
.map(async |(server, vec)| {
|
||||||
let mut device_keys_input_fed = BTreeMap::new();
|
let mut device_keys_input_fed = BTreeMap::new();
|
||||||
for (user_id, keys) in vec {
|
for (user_id, keys) in vec {
|
||||||
device_keys_input_fed.insert(user_id.to_owned(), keys.clone());
|
device_keys_input_fed.insert(user_id.to_owned(), keys.clone());
|
||||||
@@ -656,7 +656,7 @@ pub(crate) async fn claim_keys_helper(
|
|||||||
|
|
||||||
let mut futures: FuturesUnordered<_> = get_over_federation
|
let mut futures: FuturesUnordered<_> = get_over_federation
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|(server, vec)| async move {
|
.map(async |(server, vec)| {
|
||||||
let mut one_time_keys_input_fed = BTreeMap::new();
|
let mut one_time_keys_input_fed = BTreeMap::new();
|
||||||
for (user_id, keys) in vec {
|
for (user_id, keys) in vec {
|
||||||
one_time_keys_input_fed.insert(user_id.clone(), keys.clone());
|
one_time_keys_input_fed.insert(user_id.clone(), keys.clone());
|
||||||
|
|||||||
@@ -550,7 +550,7 @@ async fn join_room_by_id_helper_remote(
|
|||||||
.validate_and_add_event_id_no_fetch(pdu, &room_version_id)
|
.validate_and_add_event_id_no_fetch(pdu, &room_version_id)
|
||||||
})
|
})
|
||||||
.ready_filter_map(Result::ok)
|
.ready_filter_map(Result::ok)
|
||||||
.fold(HashMap::new(), |mut state, (event_id, value)| async move {
|
.fold(HashMap::new(), async |mut state, (event_id, value)| {
|
||||||
let pdu = match PduEvent::from_id_val(&event_id, value.clone()) {
|
let pdu = match PduEvent::from_id_val(&event_id, value.clone()) {
|
||||||
| Ok(pdu) => pdu,
|
| Ok(pdu) => pdu,
|
||||||
| Err(e) => {
|
| Err(e) => {
|
||||||
|
|||||||
@@ -85,7 +85,7 @@ pub(crate) async fn joined_members_route(
|
|||||||
.state_cache
|
.state_cache
|
||||||
.room_members(&body.room_id)
|
.room_members(&body.room_id)
|
||||||
.map(ToOwned::to_owned)
|
.map(ToOwned::to_owned)
|
||||||
.broad_then(|user_id| async move {
|
.broad_then(async |user_id| {
|
||||||
let (display_name, avatar_url) = join(
|
let (display_name, avatar_url) = join(
|
||||||
services.users.displayname(&user_id).ok(),
|
services.users.displayname(&user_id).ok(),
|
||||||
services.users.avatar_url(&user_id).ok(),
|
services.users.avatar_url(&user_id).ok(),
|
||||||
|
|||||||
@@ -151,9 +151,7 @@ pub(crate) async fn get_message_events_route(
|
|||||||
.map(IterStream::stream)
|
.map(IterStream::stream)
|
||||||
.into_stream()
|
.into_stream()
|
||||||
.flatten()
|
.flatten()
|
||||||
.broad_filter_map(|user_id| async move {
|
.broad_filter_map(async |user_id| get_member_event(&services, room_id, &user_id).await)
|
||||||
get_member_event(&services, room_id, &user_id).await
|
|
||||||
})
|
|
||||||
.collect()
|
.collect()
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
|
|||||||
@@ -350,7 +350,7 @@ pub async fn update_displayname(
|
|||||||
let all_joined_rooms: Vec<_> = all_joined_rooms
|
let all_joined_rooms: Vec<_> = all_joined_rooms
|
||||||
.iter()
|
.iter()
|
||||||
.try_stream()
|
.try_stream()
|
||||||
.and_then(|room_id: &OwnedRoomId| async move {
|
.and_then(async |room_id: &OwnedRoomId| {
|
||||||
let pdu = PduBuilder::state(user_id.to_string(), &RoomMemberEventContent {
|
let pdu = PduBuilder::state(user_id.to_string(), &RoomMemberEventContent {
|
||||||
displayname: displayname.clone(),
|
displayname: displayname.clone(),
|
||||||
membership: MembershipState::Join,
|
membership: MembershipState::Join,
|
||||||
@@ -403,7 +403,7 @@ pub async fn update_avatar_url(
|
|||||||
let all_joined_rooms: Vec<_> = all_joined_rooms
|
let all_joined_rooms: Vec<_> = all_joined_rooms
|
||||||
.iter()
|
.iter()
|
||||||
.try_stream()
|
.try_stream()
|
||||||
.and_then(|room_id: &OwnedRoomId| async move {
|
.and_then(async |room_id: &OwnedRoomId| {
|
||||||
let pdu = PduBuilder::state(user_id.to_string(), &RoomMemberEventContent {
|
let pdu = PduBuilder::state(user_id.to_string(), &RoomMemberEventContent {
|
||||||
avatar_url: avatar_url.clone(),
|
avatar_url: avatar_url.clone(),
|
||||||
blurhash: blurhash.clone(),
|
blurhash: blurhash.clone(),
|
||||||
|
|||||||
@@ -95,13 +95,13 @@ async fn category_room_events(
|
|||||||
});
|
});
|
||||||
|
|
||||||
let results: Vec<_> = rooms
|
let results: Vec<_> = rooms
|
||||||
.filter_map(|room_id| async move {
|
.filter_map(async |room_id| {
|
||||||
check_room_visible(services, sender_user, &room_id, criteria)
|
check_room_visible(services, sender_user, &room_id, criteria)
|
||||||
.await
|
.await
|
||||||
.is_ok()
|
.is_ok()
|
||||||
.then_some(room_id)
|
.then_some(room_id)
|
||||||
})
|
})
|
||||||
.filter_map(|room_id| async move {
|
.filter_map(async |room_id| {
|
||||||
let query = RoomQuery {
|
let query = RoomQuery {
|
||||||
room_id: &room_id,
|
room_id: &room_id,
|
||||||
user_id: Some(sender_user),
|
user_id: Some(sender_user),
|
||||||
@@ -135,7 +135,7 @@ async fn category_room_events(
|
|||||||
.iter()
|
.iter()
|
||||||
.stream()
|
.stream()
|
||||||
.ready_filter(|_| criteria.include_state.is_some_and(is_true!()))
|
.ready_filter(|_| criteria.include_state.is_some_and(is_true!()))
|
||||||
.filter_map(|(room_id, ..)| async move {
|
.filter_map(async |(room_id, ..)| {
|
||||||
procure_room_state(services, room_id)
|
procure_room_state(services, room_id)
|
||||||
.map_ok(|state| (room_id.clone(), state))
|
.map_ok(|state| (room_id.clone(), state))
|
||||||
.await
|
.await
|
||||||
|
|||||||
@@ -166,7 +166,7 @@ where
|
|||||||
|
|
||||||
let next_batch: OptionFuture<_> = queue
|
let next_batch: OptionFuture<_> = queue
|
||||||
.pop_front()
|
.pop_front()
|
||||||
.map(|(room, _)| async move {
|
.map(async |(room, _)| {
|
||||||
parents.insert(room);
|
parents.insert(room);
|
||||||
|
|
||||||
let next_short_room_ids: Vec<_> = parents
|
let next_short_room_ids: Vec<_> = parents
|
||||||
|
|||||||
@@ -75,7 +75,7 @@ async fn share_encrypted_room(
|
|||||||
.get_shared_rooms(sender_user, user_id)
|
.get_shared_rooms(sender_user, user_id)
|
||||||
.ready_filter(|&room_id| Some(room_id) != ignore_room)
|
.ready_filter(|&room_id| Some(room_id) != ignore_room)
|
||||||
.map(ToOwned::to_owned)
|
.map(ToOwned::to_owned)
|
||||||
.broad_any(|other_room_id| async move {
|
.broad_any(async |other_room_id| {
|
||||||
services
|
services
|
||||||
.rooms
|
.rooms
|
||||||
.state_accessor
|
.state_accessor
|
||||||
|
|||||||
@@ -237,7 +237,7 @@ pub(crate) async fn build_sync_events(
|
|||||||
.rooms
|
.rooms
|
||||||
.state_cache
|
.state_cache
|
||||||
.rooms_invited(sender_user)
|
.rooms_invited(sender_user)
|
||||||
.fold_default(|mut invited_rooms: BTreeMap<_, _>, (room_id, invite_state)| async move {
|
.fold_default(async |mut invited_rooms: BTreeMap<_, _>, (room_id, invite_state)| {
|
||||||
let invite_count = services
|
let invite_count = services
|
||||||
.rooms
|
.rooms
|
||||||
.state_cache
|
.state_cache
|
||||||
@@ -262,7 +262,7 @@ pub(crate) async fn build_sync_events(
|
|||||||
.rooms
|
.rooms
|
||||||
.state_cache
|
.state_cache
|
||||||
.rooms_knocked(sender_user)
|
.rooms_knocked(sender_user)
|
||||||
.fold_default(|mut knocked_rooms: BTreeMap<_, _>, (room_id, knock_state)| async move {
|
.fold_default(async |mut knocked_rooms: BTreeMap<_, _>, (room_id, knock_state)| {
|
||||||
let knock_count = services
|
let knock_count = services
|
||||||
.rooms
|
.rooms
|
||||||
.state_cache
|
.state_cache
|
||||||
@@ -334,7 +334,7 @@ pub(crate) async fn build_sync_events(
|
|||||||
let device_list_left: HashSet<_> = left_encrypted_users
|
let device_list_left: HashSet<_> = left_encrypted_users
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.stream()
|
.stream()
|
||||||
.broad_filter_map(|user_id| async move {
|
.broad_filter_map(async |user_id: OwnedUserId| {
|
||||||
share_encrypted_room(services, sender_user, &user_id, None)
|
share_encrypted_room(services, sender_user, &user_id, None)
|
||||||
.await
|
.await
|
||||||
.eq(&false)
|
.eq(&false)
|
||||||
@@ -620,7 +620,7 @@ async fn load_joined_room(
|
|||||||
.rooms
|
.rooms
|
||||||
.read_receipt
|
.read_receipt
|
||||||
.readreceipts_since(room_id, since)
|
.readreceipts_since(room_id, since)
|
||||||
.filter_map(|(read_user, _, edu)| async move {
|
.filter_map(async |(read_user, _, edu)| {
|
||||||
services
|
services
|
||||||
.users
|
.users
|
||||||
.user_is_ignored(read_user, sender_user)
|
.user_is_ignored(read_user, sender_user)
|
||||||
@@ -806,7 +806,7 @@ async fn load_joined_room(
|
|||||||
.rooms
|
.rooms
|
||||||
.typing
|
.typing
|
||||||
.last_typing_update(room_id)
|
.last_typing_update(room_id)
|
||||||
.and_then(|count| async move {
|
.and_then(async |count| {
|
||||||
if count <= since {
|
if count <= since {
|
||||||
return Ok(Vec::<Raw<AnySyncEphemeralRoomEvent>>::new());
|
return Ok(Vec::<Raw<AnySyncEphemeralRoomEvent>>::new());
|
||||||
}
|
}
|
||||||
@@ -1124,8 +1124,9 @@ async fn calculate_state_incremental<'a>(
|
|||||||
.fold_default(|(mut dlu, mut leu): pair_of!(HashSet<_>), (content, user_id)| async move {
|
.fold_default(|(mut dlu, mut leu): pair_of!(HashSet<_>), (content, user_id)| async move {
|
||||||
use MembershipState::*;
|
use MembershipState::*;
|
||||||
|
|
||||||
let shares_encrypted_room =
|
let shares_encrypted_room = async |user_id| {
|
||||||
|user_id| share_encrypted_room(services, sender_user, user_id, Some(room_id));
|
share_encrypted_room(services, sender_user, user_id, Some(room_id)).await
|
||||||
|
};
|
||||||
|
|
||||||
match content.membership {
|
match content.membership {
|
||||||
| Leave => leu.insert(user_id),
|
| Leave => leu.insert(user_id),
|
||||||
|
|||||||
@@ -470,7 +470,7 @@ where
|
|||||||
.rooms
|
.rooms
|
||||||
.read_receipt
|
.read_receipt
|
||||||
.readreceipts_since(room_id, *roomsince)
|
.readreceipts_since(room_id, *roomsince)
|
||||||
.filter_map(|(read_user, _ts, v)| async move {
|
.filter_map(async |(read_user, _ts, v)| {
|
||||||
services
|
services
|
||||||
.users
|
.users
|
||||||
.user_is_ignored(read_user, sender_user)
|
.user_is_ignored(read_user, sender_user)
|
||||||
@@ -548,7 +548,7 @@ where
|
|||||||
let required_state = required_state_request
|
let required_state = required_state_request
|
||||||
.iter()
|
.iter()
|
||||||
.stream()
|
.stream()
|
||||||
.filter_map(|state| async move {
|
.filter_map(async |state| {
|
||||||
services
|
services
|
||||||
.rooms
|
.rooms
|
||||||
.state_accessor
|
.state_accessor
|
||||||
|
|||||||
@@ -1,5 +1,5 @@
|
|||||||
use axum::extract::State;
|
use axum::extract::State;
|
||||||
use futures::StreamExt;
|
use futures::{StreamExt, TryStreamExt};
|
||||||
use ruma::{api::client::threads::get_threads, uint};
|
use ruma::{api::client::threads::get_threads, uint};
|
||||||
use tuwunel_core::{
|
use tuwunel_core::{
|
||||||
Result, at,
|
Result, at,
|
||||||
@@ -35,18 +35,17 @@ pub(crate) async fn get_threads_route(
|
|||||||
.rooms
|
.rooms
|
||||||
.threads
|
.threads
|
||||||
.threads_until(body.sender_user(), &body.room_id, from, &body.include)
|
.threads_until(body.sender_user(), &body.room_id, from, &body.include)
|
||||||
.await?
|
|
||||||
.take(limit)
|
.take(limit)
|
||||||
.filter_map(|(count, pdu)| async move {
|
.try_filter_map(async |(count, pdu)| {
|
||||||
services
|
Ok(services
|
||||||
.rooms
|
.rooms
|
||||||
.state_accessor
|
.state_accessor
|
||||||
.user_can_see_event(body.sender_user(), &body.room_id, &pdu.event_id)
|
.user_can_see_event(body.sender_user(), &body.room_id, &pdu.event_id)
|
||||||
.await
|
.await
|
||||||
.then_some((count, pdu))
|
.then_some((count, pdu)))
|
||||||
})
|
})
|
||||||
.collect()
|
.try_collect()
|
||||||
.await;
|
.await?;
|
||||||
|
|
||||||
Ok(get_threads::v1::Response {
|
Ok(get_threads::v1::Response {
|
||||||
next_batch: threads
|
next_batch: threads
|
||||||
|
|||||||
@@ -64,7 +64,7 @@ pub(crate) async fn get_backfill_route(
|
|||||||
.timeline
|
.timeline
|
||||||
.pdus_rev(None, &body.room_id, Some(from.saturating_add(1)))
|
.pdus_rev(None, &body.room_id, Some(from.saturating_add(1)))
|
||||||
.try_take(limit)
|
.try_take(limit)
|
||||||
.try_filter_map(|(_, pdu)| async move {
|
.try_filter_map(async |(_, pdu)| {
|
||||||
Ok(services
|
Ok(services
|
||||||
.rooms
|
.rooms
|
||||||
.state_accessor
|
.state_accessor
|
||||||
@@ -72,7 +72,7 @@ pub(crate) async fn get_backfill_route(
|
|||||||
.await
|
.await
|
||||||
.then_some(pdu))
|
.then_some(pdu))
|
||||||
})
|
})
|
||||||
.try_filter_map(|pdu| async move {
|
.try_filter_map(async |pdu| {
|
||||||
Ok(services
|
Ok(services
|
||||||
.rooms
|
.rooms
|
||||||
.timeline
|
.timeline
|
||||||
|
|||||||
@@ -49,7 +49,7 @@ pub(crate) async fn get_event_authorization_route(
|
|||||||
.auth_chain
|
.auth_chain
|
||||||
.event_ids_iter(room_id, once(body.event_id.borrow()))
|
.event_ids_iter(room_id, once(body.event_id.borrow()))
|
||||||
.ready_filter_map(Result::ok)
|
.ready_filter_map(Result::ok)
|
||||||
.filter_map(|id| async move {
|
.filter_map(async |id| {
|
||||||
services
|
services
|
||||||
.rooms
|
.rooms
|
||||||
.timeline
|
.timeline
|
||||||
|
|||||||
@@ -45,7 +45,7 @@ pub(crate) async fn get_hierarchy_route(
|
|||||||
let (children, inaccessible_children) =
|
let (children, inaccessible_children) =
|
||||||
get_parent_children_via(&room, suggested_only)
|
get_parent_children_via(&room, suggested_only)
|
||||||
.stream()
|
.stream()
|
||||||
.broad_filter_map(|(child, _via)| async move {
|
.broad_filter_map(async |(child, _via)| {
|
||||||
match services
|
match services
|
||||||
.rooms
|
.rooms
|
||||||
.spaces
|
.spaces
|
||||||
|
|||||||
@@ -158,10 +158,11 @@ async fn handle(
|
|||||||
let results: ResolvedMap = pdus
|
let results: ResolvedMap = pdus
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.try_stream()
|
.try_stream()
|
||||||
.broad_and_then(|(room_id, pdus): (_, Vec<_>)| {
|
.broad_and_then(async |(room_id, pdus): (_, Vec<_>)| {
|
||||||
handle_room(services, client, origin, started, room_id, pdus.into_iter())
|
handle_room(services, client, origin, &started, room_id, pdus.into_iter())
|
||||||
.map_ok(Vec::into_iter)
|
.map_ok(ResolvedMap::into_iter)
|
||||||
.map_ok(IterStream::try_stream)
|
.map_ok(IterStream::try_stream)
|
||||||
|
.await
|
||||||
})
|
})
|
||||||
.try_flatten()
|
.try_flatten()
|
||||||
.try_collect()
|
.try_collect()
|
||||||
@@ -180,28 +181,27 @@ async fn handle_room(
|
|||||||
services: &Services,
|
services: &Services,
|
||||||
_client: &IpAddr,
|
_client: &IpAddr,
|
||||||
origin: &ServerName,
|
origin: &ServerName,
|
||||||
txn_start_time: Instant,
|
txn_start_time: &Instant,
|
||||||
room_id: OwnedRoomId,
|
ref room_id: OwnedRoomId,
|
||||||
pdus: impl Iterator<Item = Pdu> + Send,
|
pdus: impl Iterator<Item = Pdu> + Send,
|
||||||
) -> Result<Vec<(OwnedEventId, Result)>> {
|
) -> Result<ResolvedMap> {
|
||||||
let _room_lock = services
|
let _room_lock = services
|
||||||
.rooms
|
.rooms
|
||||||
.event_handler
|
.event_handler
|
||||||
.mutex_federation
|
.mutex_federation
|
||||||
.lock(&room_id)
|
.lock(room_id)
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
let room_id = &room_id;
|
|
||||||
pdus.try_stream()
|
pdus.try_stream()
|
||||||
.and_then(|(_, event_id, value)| async move {
|
.and_then(async |(room_id, event_id, value)| {
|
||||||
services.server.check_running()?;
|
services.server.check_running()?;
|
||||||
let pdu_start_time = Instant::now();
|
let pdu_start_time = Instant::now();
|
||||||
let result = services
|
let result = services
|
||||||
.rooms
|
.rooms
|
||||||
.event_handler
|
.event_handler
|
||||||
.handle_incoming_pdu(origin, room_id, &event_id, value, true)
|
.handle_incoming_pdu(origin, &room_id, &event_id, value, true)
|
||||||
.await
|
.map_ok(|_| ())
|
||||||
.map(|_| ());
|
.await;
|
||||||
|
|
||||||
debug!(
|
debug!(
|
||||||
pdu_elapsed = ?pdu_start_time.elapsed(),
|
pdu_elapsed = ?pdu_start_time.elapsed(),
|
||||||
@@ -329,7 +329,7 @@ async fn handle_edu_receipt_room(
|
|||||||
.read
|
.read
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.stream()
|
.stream()
|
||||||
.for_each_concurrent(automatic_width(), |(user_id, user_updates)| async move {
|
.for_each_concurrent(automatic_width(), async |(user_id, user_updates)| {
|
||||||
handle_edu_receipt_room_user(services, origin, room_id, &user_id, user_updates).await;
|
handle_edu_receipt_room_user(services, origin, room_id, &user_id, user_updates).await;
|
||||||
})
|
})
|
||||||
.await;
|
.await;
|
||||||
@@ -368,7 +368,7 @@ async fn handle_edu_receipt_room_user(
|
|||||||
.event_ids
|
.event_ids
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.stream()
|
.stream()
|
||||||
.for_each_concurrent(automatic_width(), |event_id| async move {
|
.for_each_concurrent(automatic_width(), async |event_id| {
|
||||||
let user_data = [(user_id.to_owned(), data.clone())];
|
let user_data = [(user_id.to_owned(), data.clone())];
|
||||||
let receipts = [(ReceiptType::Read, BTreeMap::from(user_data))];
|
let receipts = [(ReceiptType::Read, BTreeMap::from(user_data))];
|
||||||
let content = [(event_id.clone(), BTreeMap::from(receipts))];
|
let content = [(event_id.clone(), BTreeMap::from(receipts))];
|
||||||
|
|||||||
@@ -242,7 +242,7 @@ async fn create_join_event(
|
|||||||
.rooms
|
.rooms
|
||||||
.auth_chain
|
.auth_chain
|
||||||
.event_ids_iter(room_id, starting_events)
|
.event_ids_iter(room_id, starting_events)
|
||||||
.broad_and_then(|event_id| async move {
|
.broad_and_then(async |event_id| {
|
||||||
services
|
services
|
||||||
.rooms
|
.rooms
|
||||||
.timeline
|
.timeline
|
||||||
|
|||||||
@@ -56,7 +56,7 @@ pub(crate) async fn get_room_state_route(
|
|||||||
.rooms
|
.rooms
|
||||||
.auth_chain
|
.auth_chain
|
||||||
.event_ids_iter(&body.room_id, once(body.event_id.borrow()))
|
.event_ids_iter(&body.room_id, once(body.event_id.borrow()))
|
||||||
.and_then(|id| async move { services.rooms.timeline.get_pdu_json(&id).await })
|
.and_then(async |id| services.rooms.timeline.get_pdu_json(&id).await)
|
||||||
.and_then(|pdu| {
|
.and_then(|pdu| {
|
||||||
services
|
services
|
||||||
.sending
|
.sending
|
||||||
|
|||||||
@@ -40,7 +40,7 @@ pub(crate) async fn get_devices_route(
|
|||||||
devices: services
|
devices: services
|
||||||
.users
|
.users
|
||||||
.all_devices_metadata(user_id)
|
.all_devices_metadata(user_id)
|
||||||
.filter_map(|metadata| async move {
|
.filter_map(async |metadata| {
|
||||||
let device_id = metadata.device_id.clone();
|
let device_id = metadata.device_id.clone();
|
||||||
let device_id_clone = device_id.clone();
|
let device_id_clone = device_id.clone();
|
||||||
let device_id_string = device_id.as_str().to_owned();
|
let device_id_string = device_id.as_str().to_owned();
|
||||||
|
|||||||
@@ -66,7 +66,7 @@ where
|
|||||||
|
|
||||||
let b = Arc::new(Mutex::new(b.peekable()));
|
let b = Arc::new(Mutex::new(b.peekable()));
|
||||||
a.map(move |ai| (ai, b.clone()))
|
a.map(move |ai| (ai, b.clone()))
|
||||||
.filter_map(|(ai, b)| async move {
|
.filter_map(async move |(ai, b)| {
|
||||||
let mut lock = b.lock().await;
|
let mut lock = b.lock().await;
|
||||||
while let Some(bi) = Pin::new(&mut *lock)
|
while let Some(bi) = Pin::new(&mut *lock)
|
||||||
.next_if(|bi| *bi <= ai)
|
.next_if(|bi| *bi <= ai)
|
||||||
|
|||||||
@@ -213,7 +213,7 @@ impl Service {
|
|||||||
self.db
|
self.db
|
||||||
.id_appserviceregistrations
|
.id_appserviceregistrations
|
||||||
.keys()
|
.keys()
|
||||||
.and_then(move |id: &str| async move {
|
.and_then(async move |id: &str| {
|
||||||
Ok((id.to_owned(), self.get_db_registration(id).await?))
|
Ok((id.to_owned(), self.get_db_registration(id).await?))
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -146,7 +146,7 @@ async fn get_auth_chain_outer(
|
|||||||
let chunk_cache: Vec<_> = chunk
|
let chunk_cache: Vec<_> = chunk
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.try_stream()
|
.try_stream()
|
||||||
.broad_and_then(|(shortid, event_id)| async move {
|
.broad_and_then(async |(shortid, event_id)| {
|
||||||
if let Ok(cached) = self
|
if let Ok(cached) = self
|
||||||
.get_cached_eventid_authchain(&[shortid])
|
.get_cached_eventid_authchain(&[shortid])
|
||||||
.await
|
.await
|
||||||
|
|||||||
@@ -3,7 +3,7 @@ use std::{
|
|||||||
iter::once,
|
iter::once,
|
||||||
};
|
};
|
||||||
|
|
||||||
use futures::{FutureExt, future};
|
use futures::FutureExt;
|
||||||
use ruma::{
|
use ruma::{
|
||||||
CanonicalJsonValue, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, RoomId, ServerName,
|
CanonicalJsonValue, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, RoomId, ServerName,
|
||||||
int, uint,
|
int, uint,
|
||||||
@@ -110,7 +110,7 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let event_fetch = |event_id| {
|
let event_fetch = async |event_id: OwnedEventId| {
|
||||||
let origin_server_ts = eventid_info
|
let origin_server_ts = eventid_info
|
||||||
.get(&event_id)
|
.get(&event_id)
|
||||||
.map_or_else(|| uint!(0), |info| info.0.origin_server_ts().get());
|
.map_or_else(|| uint!(0), |info| info.0.origin_server_ts().get());
|
||||||
@@ -118,7 +118,7 @@ where
|
|||||||
// This return value is the key used for sorting events,
|
// This return value is the key used for sorting events,
|
||||||
// events are then sorted by power level, time,
|
// events are then sorted by power level, time,
|
||||||
// and lexically by event_id.
|
// and lexically by event_id.
|
||||||
future::ok((int!(0), MilliSecondsSinceUnixEpoch(origin_server_ts)))
|
Ok((int!(0), MilliSecondsSinceUnixEpoch(origin_server_ts)))
|
||||||
};
|
};
|
||||||
|
|
||||||
let sorted = state_res::lexicographical_topological_sort(&graph, &event_fetch)
|
let sorted = state_res::lexicographical_topological_sort(&graph, &event_fetch)
|
||||||
|
|||||||
@@ -128,7 +128,7 @@ where
|
|||||||
new_state
|
new_state
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.stream()
|
.stream()
|
||||||
.broad_then(|((event_type, state_key), event_id)| async move {
|
.broad_then(async |((event_type, state_key), event_id)| {
|
||||||
self.services
|
self.services
|
||||||
.short
|
.short
|
||||||
.get_or_create_shortstatekey(&event_type, &state_key)
|
.get_or_create_shortstatekey(&event_type, &state_key)
|
||||||
|
|||||||
@@ -167,7 +167,7 @@ where
|
|||||||
.prev_events()
|
.prev_events()
|
||||||
.any(is_equal_to!(event_id))
|
.any(is_equal_to!(event_id))
|
||||||
})
|
})
|
||||||
.broad_filter_map(|event_id| async move {
|
.broad_filter_map(async |event_id| {
|
||||||
// Only keep those extremities were not referenced yet
|
// Only keep those extremities were not referenced yet
|
||||||
self.services
|
self.services
|
||||||
.pdu_metadata
|
.pdu_metadata
|
||||||
|
|||||||
@@ -84,9 +84,9 @@ impl Data {
|
|||||||
.ready_take_while(move |key| key.starts_with(&target.to_be_bytes()))
|
.ready_take_while(move |key| key.starts_with(&target.to_be_bytes()))
|
||||||
.map(|to_from| u64_from_u8(&to_from[8..16]))
|
.map(|to_from| u64_from_u8(&to_from[8..16]))
|
||||||
.map(PduCount::from_unsigned)
|
.map(PduCount::from_unsigned)
|
||||||
.wide_filter_map(move |shorteventid| async move {
|
.map(move |shorteventid| (user_id, shortroomid, shorteventid))
|
||||||
|
.wide_filter_map(async |(user_id, shortroomid, shorteventid)| {
|
||||||
let pdu_id: RawPduId = PduId { shortroomid, shorteventid }.into();
|
let pdu_id: RawPduId = PduId { shortroomid, shorteventid }.into();
|
||||||
|
|
||||||
let mut pdu = self
|
let mut pdu = self
|
||||||
.services
|
.services
|
||||||
.timeline
|
.timeline
|
||||||
|
|||||||
@@ -112,7 +112,7 @@ pub async fn search_pdus<'a>(
|
|||||||
let pdus = pdu_ids
|
let pdus = pdu_ids
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.stream()
|
.stream()
|
||||||
.wide_filter_map(move |result_pdu_id: RawPduId| async move {
|
.wide_filter_map(async |result_pdu_id: RawPduId| {
|
||||||
self.services
|
self.services
|
||||||
.timeline
|
.timeline
|
||||||
.get_pdu_from_id(&result_pdu_id)
|
.get_pdu_from_id(&result_pdu_id)
|
||||||
@@ -121,7 +121,7 @@ pub async fn search_pdus<'a>(
|
|||||||
})
|
})
|
||||||
.ready_filter(|pdu| !pdu.is_redacted())
|
.ready_filter(|pdu| !pdu.is_redacted())
|
||||||
.ready_filter(move |pdu| filter.matches(pdu))
|
.ready_filter(move |pdu| filter.matches(pdu))
|
||||||
.wide_filter_map(move |pdu| async move {
|
.wide_filter_map(async |pdu| {
|
||||||
self.services
|
self.services
|
||||||
.state_accessor
|
.state_accessor
|
||||||
.user_can_see_event(query.user_id?, pdu.room_id(), pdu.event_id())
|
.user_can_see_event(query.user_id?, pdu.room_id(), pdu.event_id())
|
||||||
@@ -164,7 +164,7 @@ async fn search_pdu_ids_query_room(
|
|||||||
) -> Vec<Vec<RawPduId>> {
|
) -> Vec<Vec<RawPduId>> {
|
||||||
tokenize(&query.criteria.search_term)
|
tokenize(&query.criteria.search_term)
|
||||||
.stream()
|
.stream()
|
||||||
.wide_then(|word| async move {
|
.wide_then(async |word| {
|
||||||
self.search_pdu_ids_query_words(shortroomid, &word)
|
self.search_pdu_ids_query_words(shortroomid, &word)
|
||||||
.collect::<Vec<_>>()
|
.collect::<Vec<_>>()
|
||||||
.await
|
.await
|
||||||
|
|||||||
@@ -270,7 +270,7 @@ fn get_space_child_events<'a>(
|
|||||||
.map(IterStream::stream)
|
.map(IterStream::stream)
|
||||||
.map(StreamExt::flatten)
|
.map(StreamExt::flatten)
|
||||||
.flatten_stream()
|
.flatten_stream()
|
||||||
.broad_filter_map(move |(state_key, event_id): (_, OwnedEventId)| async move {
|
.broad_filter_map(async move |(state_key, event_id): (_, OwnedEventId)| {
|
||||||
self.services
|
self.services
|
||||||
.timeline
|
.timeline
|
||||||
.get_pdu(&event_id)
|
.get_pdu(&event_id)
|
||||||
|
|||||||
@@ -338,7 +338,7 @@ pub fn state_full_pdus(
|
|||||||
.short
|
.short
|
||||||
.multi_get_eventid_from_short(short_ids)
|
.multi_get_eventid_from_short(short_ids)
|
||||||
.ready_filter_map(Result::ok)
|
.ready_filter_map(Result::ok)
|
||||||
.broad_filter_map(move |event_id: OwnedEventId| async move {
|
.broad_filter_map(async |event_id: OwnedEventId| {
|
||||||
self.services
|
self.services
|
||||||
.timeline
|
.timeline
|
||||||
.get_pdu(&event_id)
|
.get_pdu(&event_id)
|
||||||
|
|||||||
@@ -197,7 +197,7 @@ where
|
|||||||
.map(ToOwned::to_owned)
|
.map(ToOwned::to_owned)
|
||||||
// Don't notify the sender of their own events, and dont send from ignored users
|
// Don't notify the sender of their own events, and dont send from ignored users
|
||||||
.ready_filter(|user| *user != pdu.sender())
|
.ready_filter(|user| *user != pdu.sender())
|
||||||
.filter_map(|recipient_user| async move { (!self.services.users.user_is_ignored(pdu.sender(), &recipient_user).await).then_some(recipient_user) })
|
.filter_map(async |recipient_user| self.services.users.user_is_ignored(pdu.sender(), &recipient_user).await.eq(&false).then_some(recipient_user))
|
||||||
.collect()
|
.collect()
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
|
|||||||
@@ -95,7 +95,7 @@ pub async fn backfill_if_required(&self, room_id: &RoomId, from: PduCount) -> Re
|
|||||||
.stream(),
|
.stream(),
|
||||||
)
|
)
|
||||||
.ready_filter(|server_name| !self.services.globals.server_is_ours(server_name))
|
.ready_filter(|server_name| !self.services.globals.server_is_ours(server_name))
|
||||||
.filter_map(|server_name| async move {
|
.filter_map(async |server_name| {
|
||||||
self.services
|
self.services
|
||||||
.state_cache
|
.state_cache
|
||||||
.server_in_room(&server_name, room_id)
|
.server_in_room(&server_name, room_id)
|
||||||
|
|||||||
@@ -205,12 +205,12 @@ impl Service {
|
|||||||
let user_ids: Vec<_> = typing_indicators
|
let user_ids: Vec<_> = typing_indicators
|
||||||
.into_keys()
|
.into_keys()
|
||||||
.stream()
|
.stream()
|
||||||
.filter_map(|typing_user_id| async move {
|
.filter_map(async |typing_user_id| {
|
||||||
(!self
|
self.services
|
||||||
.services
|
|
||||||
.users
|
.users
|
||||||
.user_is_ignored(&typing_user_id, sender_user)
|
.user_is_ignored(&typing_user_id, sender_user)
|
||||||
.await)
|
.await
|
||||||
|
.eq(&false)
|
||||||
.then_some(typing_user_id)
|
.then_some(typing_user_id)
|
||||||
})
|
})
|
||||||
.collect()
|
.collect()
|
||||||
|
|||||||
@@ -492,7 +492,7 @@ impl Service {
|
|||||||
.state_cache
|
.state_cache
|
||||||
.server_rooms(server_name)
|
.server_rooms(server_name)
|
||||||
.map(ToOwned::to_owned)
|
.map(ToOwned::to_owned)
|
||||||
.broad_filter_map(|room_id| async move {
|
.broad_filter_map(async |room_id| {
|
||||||
let receipt_map = self
|
let receipt_map = self
|
||||||
.select_edus_receipts_room(&room_id, since, max_edu_count, &mut num)
|
.select_edus_receipts_room(&room_id, since, max_edu_count, &mut num)
|
||||||
.await;
|
.await;
|
||||||
|
|||||||
@@ -173,7 +173,7 @@ impl Services {
|
|||||||
|
|
||||||
pub async fn clear_cache(&self) {
|
pub async fn clear_cache(&self) {
|
||||||
self.services()
|
self.services()
|
||||||
.for_each(|service| async move {
|
.for_each(async |service| {
|
||||||
service.clear_cache().await;
|
service.clear_cache().await;
|
||||||
})
|
})
|
||||||
.await;
|
.await;
|
||||||
@@ -182,7 +182,7 @@ impl Services {
|
|||||||
pub async fn memory_usage(&self) -> Result<String> {
|
pub async fn memory_usage(&self) -> Result<String> {
|
||||||
self.services()
|
self.services()
|
||||||
.map(Ok)
|
.map(Ok)
|
||||||
.try_fold(String::new(), |mut out, service| async move {
|
.try_fold(String::new(), async |mut out, service| {
|
||||||
service.memory_usage(&mut out).await?;
|
service.memory_usage(&mut out).await?;
|
||||||
Ok(out)
|
Ok(out)
|
||||||
})
|
})
|
||||||
|
|||||||
Reference in New Issue
Block a user