Fix duplicate push notifications via lease-based optimistic insert#25
Fix duplicate push notifications via lease-based optimistic insert#25alltheseas wants to merge 1 commit into
Conversation
Previously used a check-then-act pattern that caused a race condition when the same event arrived via multiple relay connections concurrently. Both threads could pass the deduplication check before either had written the record, resulting in duplicate notifications. The fix uses a two-phase optimistic insert with lease-based retry: 1. INSERT OR IGNORE with received_notification=false (atomic "claim") 2. If insert succeeds, we "own" this notification 3. If insert fails, check the existing row: - received_notification=true → already sent, skip - received_notification=false AND sent_at recent (<60s) → in-flight, skip - received_notification=false AND sent_at stale (>60s) → crashed/failed, retry 4. For stale claims, atomically extend lease before retrying 5. Refresh lease immediately before sending (final guard + fresh 60s window) 6. If lease refresh fails (0 rows), another thread won → skip 7. Send the notification 8. UPDATE received_notification=true only on successful send The 60-second lease timeout balances: - Avoiding duplicates from concurrent processing (recent = in-flight) - Allowing retries after crashes/timeouts (stale = abandoned) - APNS typical latency is ~500ms, so 60s provides ample buffer Note: If UPDATE fails after successful send, retry may cause duplicate. This is accepted as a rare edge case (DB failure after APNS success); better to risk occasional duplicate than lose notifications. Fixes: damus-io#24 🤖 Generated with [Claude Code](https://claude.ai/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
|
I don't have APNS server test access to test this |
|
@coderabbitai review please |
|
✅ Actions performedReview triggered.
|
📝 WalkthroughWalkthroughImplemented a two-phase, lease-based deduplication mechanism in the notification manager to prevent duplicate push notifications when multiple relays concurrently deliver the same event. The mechanism uses INSERT OR IGNORE with atomic operations and a 60-second lease window to guard against race conditions. Changes
Sequence DiagramsequenceDiagram
actor Relay as Relay (Event Delivery)
participant NM as NotificationManager
participant DB as Database
participant Notif as Notification System
Relay->>NM: Event from relay(s)
NM->>DB: INSERT OR IGNORE into notifications
alt Row already exists
NM->>DB: Check notification state
alt Already sent (received_notification = true)
NM->>NM: Skip (already delivered)
else In-flight & recent (sent_at within 60s)
NM->>NM: Skip (concurrent delivery in progress)
else Stale lease (sent_at > 60s)
NM->>DB: Attempt atomic lease refresh
alt Lease acquired
NM->>DB: Update sent_at (refresh)
NM->>Notif: Send notification
NM->>DB: Mark as received, update sent_at
else Lease blocked
NM->>NM: Skip (race condition: another thread has it)
end
end
else New row inserted
NM->>DB: Update sent_at (set lease)
NM->>Notif: Send notification
NM->>DB: Mark as received, update sent_at
end
Estimated Code Review Effort🎯 4 (Complex) | ⏱️ ~45 minutes Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
@coderabbitai review please |
|
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@src/notification_manager/mod.rs`:
- Around line 412-474: The code currently reads sent_at as a String
(row.get(1)?) which errors on NULL and causes legacy rows to be skipped; change
the query_row to return Result<(bool, Option<String>), _> (i.e. row.get::<_,
Option<String>>(1)?) and branch on None as an unclaimed/stale lease: for
Ok((false, None)) perform the same atomic UPDATE using WHERE sent_at IS NULL
(setting sent_at = now.to_sql_string()) and treat a successful update as
acquiring the lease (return false to proceed) or 0 updated as contention (return
true); keep the existing path for Some(sent_at_str) converting to
nostr::Timestamp and using the existing sent_at equality check in the UPDATE.
Ensure variables referenced (sent_at_str, sent_at, updated, conn.execute,
LEASE_TIMEOUT_SECS) are used accordingly.
🧹 Nitpick comments (1)
src/notification_manager/mod.rs (1)
508-514: Use the refreshed timestamp when marking success
sent_atis updated with the earliernow, even thoughsend_timeis already captured right before send. Usingsend_time(or a freshTimestamp::now()after send) keeps the DB record aligned with the actual send window and avoids confusing timestamps ifsent_atis reused later.♻️ Suggested tweak
- params![now.to_sql_string(), ¬ification_id], + params![send_time.to_sql_string(), ¬ification_id],
| "SELECT received_notification, sent_at FROM notifications WHERE id = ?" | ||
| )?; | ||
| let result: Result<(bool, String), _> = stmt.query_row( | ||
| [¬ification_id], | ||
| |row| Ok((row.get(0)?, row.get(1)?)) | ||
| ); | ||
|
|
||
| match result { | ||
| Ok((true, _)) => { | ||
| // Already successfully sent | ||
| log::debug!( | ||
| "Notification for event {} to pubkey {} already sent (dedup)", | ||
| event.id, | ||
| pubkey | ||
| ); | ||
| true | ||
| } | ||
| Ok((false, sent_at_str)) => { | ||
| // In-flight or failed - check lease timeout | ||
| let sent_at = nostr::Timestamp::from_sql_string(sent_at_str) | ||
| .unwrap_or(now); | ||
| let age_secs = now.as_u64().saturating_sub(sent_at.as_u64()); | ||
|
|
||
| if age_secs < LEASE_TIMEOUT_SECS { | ||
| // Recent claim - another thread is handling it | ||
| log::debug!( | ||
| "Notification for event {} to pubkey {} in-flight (lease active, {}s old)", | ||
| event.id, | ||
| pubkey, | ||
| age_secs | ||
| ); | ||
| true | ||
| } else { | ||
| // Stale claim - previous attempt likely crashed, retry | ||
| // Atomically extend lease before retrying | ||
| let updated = conn.execute( | ||
| "UPDATE notifications SET sent_at = ? WHERE id = ? AND received_notification = false AND sent_at = ?", | ||
| params![now.to_sql_string(), ¬ification_id, sent_at.to_sql_string()], | ||
| )?; | ||
| if updated == 0 { | ||
| // Another thread beat us to it | ||
| log::debug!( | ||
| "Notification for event {} to pubkey {} lease contention, skipping", | ||
| event.id, | ||
| pubkey | ||
| ); | ||
| true | ||
| } else { | ||
| log::debug!( | ||
| "Retrying notification for event {} to pubkey {} (stale lease, {}s old)", | ||
| event.id, | ||
| pubkey, | ||
| age_secs | ||
| ); | ||
| false | ||
| } | ||
| } | ||
| } | ||
| Err(_) => { | ||
| // Row disappeared somehow, skip | ||
| true | ||
| } | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# First, locate and read the notification_manager/mod.rs file around lines 412-474
head -480 src/notification_manager/mod.rs | tail -100Repository: damus-io/notepush
Length of output: 4972
🏁 Script executed:
# Search for schema/migration files that might define the sent_at column
find . -type f \( -name "*.sql" -o -name "*migration*" -o -name "*schema*" \) | grep -E "\.(sql|rs)$" | head -20Repository: damus-io/notepush
Length of output: 43
🏁 Script executed:
# Search for the sent_at column definition in the codebase
rg -A 3 -B 3 "sent_at" --type rust | head -100Repository: damus-io/notepush
Length of output: 5579
🏁 Script executed:
# Check the full context of the query and match expression
sed -n '400,500p' src/notification_manager/mod.rsRepository: damus-io/notepush
Length of output: 4976
Handle NULL sent_at from legacy rows to enable retries
sent_at is added without a default in the migration, so pre-existing rows have NULL. The current code reads it as String with row.get(1)?, which errors on NULL and falls through to the Err(_) branch, silently skipping any notification stuck in received_notification=false after a crash. This permanently suppresses retries for legacy data. Treat NULL as an unclaimed lease and reclaim it atomically.
Proposed fix
- let result: Result<(bool, String), _> = stmt.query_row(
+ let result: Result<(bool, Option<String>), _> = stmt.query_row(
[¬ification_id],
|row| Ok((row.get(0)?, row.get(1)?))
);- Ok((false, sent_at_str)) => {
+ Ok((false, None)) => {
+ let updated = conn.execute(
+ "UPDATE notifications SET sent_at = ? WHERE id = ? AND received_notification = false AND sent_at IS NULL",
+ params![now.to_sql_string(), ¬ification_id],
+ )?;
+ if updated == 0 {
+ log::debug!(
+ "Notification for event {} to pubkey {} lease contention, skipping",
+ event.id,
+ pubkey
+ );
+ true
+ } else {
+ log::debug!(
+ "Retrying notification for event {} to pubkey {} (missing sent_at)",
+ event.id,
+ pubkey
+ );
+ false
+ }
+ }
+ Ok((false, Some(sent_at_str))) => {
let sent_at = nostr::Timestamp::from_sql_string(sent_at_str)
.unwrap_or(now);🤖 Prompt for AI Agents
In `@src/notification_manager/mod.rs` around lines 412 - 474, The code currently
reads sent_at as a String (row.get(1)?) which errors on NULL and causes legacy
rows to be skipped; change the query_row to return Result<(bool,
Option<String>), _> (i.e. row.get::<_, Option<String>>(1)?) and branch on None
as an unclaimed/stale lease: for Ok((false, None)) perform the same atomic
UPDATE using WHERE sent_at IS NULL (setting sent_at = now.to_sql_string()) and
treat a successful update as acquiring the lease (return false to proceed) or 0
updated as contention (return true); keep the existing path for
Some(sent_at_str) converting to nostr::Timestamp and using the existing sent_at
equality check in the UPDATE. Ensure variables referenced (sent_at_str, sent_at,
updated, conn.execute, LEASE_TIMEOUT_SECS) are used accordingly.
Summary
Fixes the race condition causing duplicate push notifications when the same event arrives via multiple relay connections concurrently.
Technical Details
received_notification=falsetrueOR recentfalse(<60s)WHERE sent_at = oldtrueGuarantees
Test plan
Fixes: #24
🤖 Generated with Claude Code
Summary by CodeRabbit
✏️ Tip: You can customize this high-level summary in your review settings.