Fix growing unread notification counts by unifying key generation logic (#253)

* fix(pusher): use consistent tuple keys for notification counts

* fix(pusher): correct 'notifi' typo in send.rs
This commit is contained in:
Justin
2026-01-04 22:12:36 -05:00
committed by Jason Volk
parent 257168946e
commit 05898034e5
2 changed files with 27 additions and 32 deletions

View File

@@ -14,9 +14,9 @@ use tuwunel_core::{
event::Event, event::Event,
pdu::{Count, Pdu, PduId, RawPduId}, pdu::{Count, Pdu, PduId, RawPduId},
}, },
utils::{self, BoolExt, ReadyExt, future::TryExtExt, time::now_millis}, utils::{BoolExt, ReadyExt, future::TryExtExt, time::now_millis},
}; };
use tuwunel_database::{Json, Map}; use tuwunel_database::{Deserialized, Json, Map};
use crate::rooms::short::ShortRoomId; use crate::rooms::short::ShortRoomId;
@@ -159,13 +159,14 @@ pub(crate) async fn append_pdu(&self, pdu_id: RawPduId, pdu: &Pdu) -> Result {
} }
} }
self.increment_notification_counts(pdu.room_id(), notifies, highlights); self.increment_notification_counts(pdu.room_id(), notifies, highlights)
.await;
Ok(()) Ok(())
} }
#[implement(super::Service)] #[implement(super::Service)]
fn increment_notification_counts( async fn increment_notification_counts(
&self, &self,
room_id: &RoomId, room_id: &RoomId,
notifies: Vec<OwnedUserId>, notifies: Vec<OwnedUserId>,
@@ -174,23 +175,17 @@ fn increment_notification_counts(
let _cork = self.db.db.cork(); let _cork = self.db.db.cork();
for user in notifies { for user in notifies {
let mut userroom_id = user.as_bytes().to_vec(); increment(&self.db.userroomid_notificationcount, (&user, room_id)).await;
userroom_id.push(0xFF);
userroom_id.extend_from_slice(room_id.as_bytes());
increment(&self.db.userroomid_notificationcount, &userroom_id);
} }
for user in highlights { for user in highlights {
let mut userroom_id = user.as_bytes().to_vec(); increment(&self.db.userroomid_highlightcount, (&user, room_id)).await;
userroom_id.push(0xFF);
userroom_id.extend_from_slice(room_id.as_bytes());
increment(&self.db.userroomid_highlightcount, &userroom_id);
} }
} }
//TODO: this is an ABA // TODO: this is an ABA problem
fn increment(db: &Arc<Map>, key: &[u8]) { async fn increment(db: &Arc<Map>, key: (&UserId, &RoomId)) {
let old = db.get_blocking(key); let old: u64 = db.qry(&key).await.deserialized().unwrap_or(0);
let new = utils::increment(old.ok().as_deref()); let new = old.saturating_add(1);
db.insert(key, new); db.put(key, new);
} }

View File

@@ -125,20 +125,20 @@ async fn send_notice<Pdu: Event>(
} }
let d = vec![device]; let d = vec![device];
let mut notifi = Notification::new(d); let mut notify = Notification::new(d);
notifi.event_id = Some(event.event_id().to_owned()); notify.event_id = Some(event.event_id().to_owned());
notifi.room_id = Some(event.room_id().to_owned()); notify.room_id = Some(event.room_id().to_owned());
if http if http
.data .data
.get("org.matrix.msc4076.disable_badge_count") .get("org.matrix.msc4076.disable_badge_count")
.is_none() && http.data.get("disable_badge_count").is_none() .is_none() && http.data.get("disable_badge_count").is_none()
{ {
notifi.counts = NotificationCounts::new(unread, uint!(0)); notify.counts = NotificationCounts::new(unread, uint!(0));
} else { } else {
// counts will not be serialised if it's the default (0, 0) // counts will not be serialised if it's the default (0, 0)
// skip_serializing_if = "NotificationCounts::is_default" // skip_serializing_if = "NotificationCounts::is_default"
notifi.counts = NotificationCounts::default(); notify.counts = NotificationCounts::default();
} }
if !event_id_only { if !event_id_only {
@@ -147,33 +147,33 @@ async fn send_notice<Pdu: Event>(
.iter() .iter()
.any(|t| matches!(t, Tweak::Highlight(true) | Tweak::Sound(_))) .any(|t| matches!(t, Tweak::Highlight(true) | Tweak::Sound(_)))
{ {
notifi.prio = NotificationPriority::High; notify.prio = NotificationPriority::High;
} else { } else {
notifi.prio = NotificationPriority::Low; notify.prio = NotificationPriority::Low;
} }
notifi.sender = Some(event.sender().to_owned()); notify.sender = Some(event.sender().to_owned());
notifi.event_type = Some(event.kind().to_owned()); notify.event_type = Some(event.kind().to_owned());
notifi.content = serde_json::value::to_raw_value(event.content()).ok(); notify.content = serde_json::value::to_raw_value(event.content()).ok();
if *event.kind() == TimelineEventType::RoomMember { if *event.kind() == TimelineEventType::RoomMember {
notifi.user_is_target = event.state_key() == Some(event.sender().as_str()); notify.user_is_target = event.state_key() == Some(event.sender().as_str());
} }
notifi.sender_display_name = self notify.sender_display_name = self
.services .services
.users .users
.displayname(event.sender()) .displayname(event.sender())
.await .await
.ok(); .ok();
notifi.room_name = self notify.room_name = self
.services .services
.state_accessor .state_accessor
.get_name(event.room_id()) .get_name(event.room_id())
.await .await
.ok(); .ok();
notifi.room_alias = self notify.room_alias = self
.services .services
.state_accessor .state_accessor
.get_canonical_alias(event.room_id()) .get_canonical_alias(event.room_id())
@@ -181,7 +181,7 @@ async fn send_notice<Pdu: Event>(
.ok(); .ok();
} }
self.send_request(&http.url, send_event_notification::v1::Request::new(notifi)) self.send_request(&http.url, send_event_notification::v1::Request::new(notify))
.await?; .await?;
Ok(()) Ok(())