diff --git a/src/api/client/sync/v3.rs b/src/api/client/sync/v3.rs index 7fecf59a..e69f9977 100644 --- a/src/api/client/sync/v3.rs +++ b/src/api/client/sync/v3.rs @@ -813,7 +813,7 @@ async fn load_joined_room( let typings = services .rooms .typing - .typings_all(room_id, sender_user) + .typings_event_for_user(room_id, sender_user) .await?; Ok(vec![serde_json::from_str(&serde_json::to_string(&typings)?)?]) diff --git a/src/api/client/sync/v5.rs b/src/api/client/sync/v5.rs index 1e8045f5..d4b17861 100644 --- a/src/api/client/sync/v5.rs +++ b/src/api/client/sync/v5.rs @@ -18,6 +18,7 @@ use ruma::{ events::{ AnyRawAccountDataEvent, AnySyncEphemeralRoomEvent, StateEventType, TimelineEventType, room::member::{MembershipState, RoomMemberEventContent}, + typing::TypingEventContent, }, serde::Raw, uint, @@ -173,13 +174,17 @@ pub(crate) async fn sync_events_v5_route( sync_info, all_invited_rooms.clone(), all_joined_rooms.clone(), - all_rooms, + all_rooms.clone(), &mut todo_rooms, &known_rooms, &mut response, ) .await; + let all_rooms: Vec = all_rooms.map(ToOwned::to_owned).collect(); + let typing = collect_typing_events(services, sender_user, &body, &all_rooms).await?; + response.extensions.typing = typing; + fetch_subscriptions(services, sync_info, &known_rooms, &mut todo_rooms).await; response.rooms = process_rooms( @@ -206,6 +211,7 @@ pub(crate) async fn sync_events_v5_route( .to_device .clone() .is_none_or(|to| to.events.is_empty()) + && response.extensions.typing.is_empty() { // Hang a few seconds so requests are not spammed // Stop hanging if new info arrives @@ -338,7 +344,6 @@ where .collect(); new_known_rooms.extend(new_rooms); - //new_known_rooms.extend(room_ids..cloned()); for room_id in room_ids { let todo_room = todo_rooms.entry(room_id.to_owned()).or_insert(( BTreeSet::new(), @@ -968,6 +973,44 @@ async fn collect_to_device( }) } +async fn collect_typing_events( + services: &Services, + sender_user: &UserId, + body: &sync_events::v5::Request, + all_rooms: &Vec, +) -> Result { + if !body.extensions.typing.enabled.unwrap_or(false) { + return Ok(sync_events::v5::response::Typing::default()); + } + + let mut typing_response = sync_events::v5::response::Typing::default(); + + for room_id in all_rooms { + match services + .rooms + .typing + .typing_users_for_user(room_id, sender_user) + .await + { + | Ok(typing_users) => { + if !typing_users.is_empty() { + typing_response.rooms.insert( + room_id.to_owned(), // Already OwnedRoomId + Raw::new(&ruma::events::typing::SyncTypingEvent { + content: TypingEventContent::new(typing_users), + })?, + ); + } + }, + | Err(e) => { + warn!(%room_id, "Failed to get typing events for room: {}", e); + }, + } + } + + Ok(typing_response) +} + async fn collect_receipts(_services: &Services) -> sync_events::v5::response::Receipts { sync_events::v5::response::Receipts { rooms: BTreeMap::new() } // TODO: get explicitly requested read receipts diff --git a/src/service/rooms/typing/mod.rs b/src/service/rooms/typing/mod.rs index aa786341..0e7070c8 100644 --- a/src/service/rooms/typing/mod.rs +++ b/src/service/rooms/typing/mod.rs @@ -189,17 +189,15 @@ impl Service { } /// Returns a new typing EDU. - pub async fn typings_all( + pub async fn typing_users_for_user( &self, room_id: &RoomId, sender_user: &UserId, - ) -> Result> { + ) -> Result> { let room_typing_indicators = self.typing.read().await.get(room_id).cloned(); let Some(typing_indicators) = room_typing_indicators else { - return Ok(SyncEphemeralRoomEvent { - content: ruma::events::typing::TypingEventContent { user_ids: Vec::new() }, - }); + return Ok(Vec::new()); }; let user_ids: Vec<_> = typing_indicators @@ -216,8 +214,20 @@ impl Service { .collect() .await; + Ok(user_ids) + } + + pub async fn typings_event_for_user( + &self, + room_id: &RoomId, + sender_user: &UserId, + ) -> Result> { Ok(SyncEphemeralRoomEvent { - content: ruma::events::typing::TypingEventContent { user_ids }, + content: ruma::events::typing::TypingEventContent { + user_ids: self + .typing_users_for_user(room_id, sender_user) + .await?, + }, }) }