diff --git a/src/service/pusher/append.rs b/src/service/pusher/append.rs index d5fbf37b..64530d76 100644 --- a/src/service/pusher/append.rs +++ b/src/service/pusher/append.rs @@ -14,9 +14,9 @@ use tuwunel_core::{ event::Event, pdu::{Count, Pdu, PduId, RawPduId}, }, - utils::{self, BoolExt, ReadyExt, future::TryExtExt, time::now_millis}, + utils::{BoolExt, ReadyExt, future::TryExtExt, time::now_millis}, }; -use tuwunel_database::{Json, Map}; +use tuwunel_database::{Deserialized, Json, Map}; use crate::rooms::short::ShortRoomId; @@ -159,13 +159,14 @@ pub(crate) async fn append_pdu(&self, pdu_id: RawPduId, pdu: &Pdu) -> Result { } } - self.increment_notification_counts(pdu.room_id(), notifies, highlights); + self.increment_notification_counts(pdu.room_id(), notifies, highlights) + .await; Ok(()) } #[implement(super::Service)] -fn increment_notification_counts( +async fn increment_notification_counts( &self, room_id: &RoomId, notifies: Vec, @@ -174,23 +175,17 @@ fn increment_notification_counts( let _cork = self.db.db.cork(); for user in notifies { - let mut userroom_id = user.as_bytes().to_vec(); - userroom_id.push(0xFF); - userroom_id.extend_from_slice(room_id.as_bytes()); - increment(&self.db.userroomid_notificationcount, &userroom_id); + increment(&self.db.userroomid_notificationcount, (&user, room_id)).await; } for user in highlights { - let mut userroom_id = user.as_bytes().to_vec(); - userroom_id.push(0xFF); - userroom_id.extend_from_slice(room_id.as_bytes()); - increment(&self.db.userroomid_highlightcount, &userroom_id); + increment(&self.db.userroomid_highlightcount, (&user, room_id)).await; } } -//TODO: this is an ABA -fn increment(db: &Arc, key: &[u8]) { - let old = db.get_blocking(key); - let new = utils::increment(old.ok().as_deref()); - db.insert(key, new); +// TODO: this is an ABA problem +async fn increment(db: &Arc, key: (&UserId, &RoomId)) { + let old: u64 = db.qry(&key).await.deserialized().unwrap_or(0); + let new = old.saturating_add(1); + db.put(key, new); } diff --git a/src/service/pusher/send.rs b/src/service/pusher/send.rs index c715e330..86301257 100644 --- a/src/service/pusher/send.rs +++ b/src/service/pusher/send.rs @@ -125,20 +125,20 @@ async fn send_notice( } let d = vec![device]; - let mut notifi = Notification::new(d); + let mut notify = Notification::new(d); - notifi.event_id = Some(event.event_id().to_owned()); - notifi.room_id = Some(event.room_id().to_owned()); + notify.event_id = Some(event.event_id().to_owned()); + notify.room_id = Some(event.room_id().to_owned()); if http .data .get("org.matrix.msc4076.disable_badge_count") .is_none() && http.data.get("disable_badge_count").is_none() { - notifi.counts = NotificationCounts::new(unread, uint!(0)); + notify.counts = NotificationCounts::new(unread, uint!(0)); } else { // counts will not be serialised if it's the default (0, 0) // skip_serializing_if = "NotificationCounts::is_default" - notifi.counts = NotificationCounts::default(); + notify.counts = NotificationCounts::default(); } if !event_id_only { @@ -147,33 +147,33 @@ async fn send_notice( .iter() .any(|t| matches!(t, Tweak::Highlight(true) | Tweak::Sound(_))) { - notifi.prio = NotificationPriority::High; + notify.prio = NotificationPriority::High; } else { - notifi.prio = NotificationPriority::Low; + notify.prio = NotificationPriority::Low; } - notifi.sender = Some(event.sender().to_owned()); - notifi.event_type = Some(event.kind().to_owned()); - notifi.content = serde_json::value::to_raw_value(event.content()).ok(); + notify.sender = Some(event.sender().to_owned()); + notify.event_type = Some(event.kind().to_owned()); + notify.content = serde_json::value::to_raw_value(event.content()).ok(); if *event.kind() == TimelineEventType::RoomMember { - notifi.user_is_target = event.state_key() == Some(event.sender().as_str()); + notify.user_is_target = event.state_key() == Some(event.sender().as_str()); } - notifi.sender_display_name = self + notify.sender_display_name = self .services .users .displayname(event.sender()) .await .ok(); - notifi.room_name = self + notify.room_name = self .services .state_accessor .get_name(event.room_id()) .await .ok(); - notifi.room_alias = self + notify.room_alias = self .services .state_accessor .get_canonical_alias(event.room_id()) @@ -181,7 +181,7 @@ async fn send_notice( .ok(); } - self.send_request(&http.url, send_event_notification::v1::Request::new(notifi)) + self.send_request(&http.url, send_event_notification::v1::Request::new(notify)) .await?; Ok(())