From 94ef7e8f125fca84c6c52bd68a4c7cccf0d957a2 Mon Sep 17 00:00:00 2001 From: benthecarman Date: Thu, 6 Jun 2024 12:57:06 -0500 Subject: [PATCH] Claim zaps in batches --- mutiny-core/src/hermes.rs | 55 ++++++++++++++++++++++++++++++++------- 1 file changed, 46 insertions(+), 9 deletions(-) diff --git a/mutiny-core/src/hermes.rs b/mutiny-core/src/hermes.rs index 75f08c551..75b1f0492 100644 --- a/mutiny-core/src/hermes.rs +++ b/mutiny-core/src/hermes.rs @@ -14,10 +14,10 @@ use bitcoin::{bip32::ExtendedPrivKey, secp256k1::Secp256k1}; use fedimint_core::config::FederationId; use futures::{pin_mut, select, FutureExt}; use lightning::util::logger::Logger; -use lightning::{log_error, log_info, log_warn}; +use lightning::{log_debug, log_error, log_info, log_warn}; use lightning_invoice::Bolt11Invoice; use nostr::secp256k1::SecretKey; -use nostr::{nips::nip04::decrypt, Event, JsonUtil, Keys, Tag, ToBech32}; +use nostr::{nips::nip04::decrypt, Event, JsonUtil, Keys, RelayMessage, Tag, ToBech32}; use nostr::{prelude::decrypt_received_private_zap_message, EventBuilder}; use nostr::{Filter, Kind, Timestamp}; use nostr_sdk::{Client, RelayPoolNotification}; @@ -101,7 +101,12 @@ impl HermesClient { let client = Client::new(&keys); client - .add_relays(RELAYS) + .add_relays( + // we are listening only, so no need to connect to blastr + RELAYS + .into_iter() + .filter(|r| *r != "wss://nostr.mutinywallet.com"), + ) .await .expect("Failed to add relays"); @@ -266,6 +271,9 @@ impl HermesClient { client.subscribe(vec![received_dm_filter], None).await; + let mut has_received_eose = false; + let mut batch_notifications: Vec<(EcashNotification, Timestamp)> = Vec::new(); + let mut notifications = client.notifications(); loop { @@ -282,8 +290,17 @@ impl HermesClient { Kind::EncryptedDirectMessage => { match decrypt_ecash_notification(&dm_key, event.pubkey, &event.content) { Ok(notification) => { - if let Err(e) = handle_ecash_notification(notification, event.created_at, &federations, &storage, &claim_key, profile_key.as_ref(), &logger).await { - log_error!(logger, "Error handling ecash notification: {e}"); + // if we have received an EOSE, we should immediately handle the notification + // otherwise we should wait until we have received an EOSE so we can do the initial batch + if has_received_eose { + if let Err(e) = handle_ecash_notification(notification, event.created_at, &federations, &storage, &claim_key, profile_key.as_ref(), &logger).await { + log_error!(logger, "Error handling ecash notification: {e}"); + } else if let Err(e) = storage.set_dm_sync_time(event.created_at.as_u64(), true) { // save the last sync time after processing the notification + log_error!(logger, "Error saving last sync time: {e}"); + } + } else { + log_debug!(logger, "Received ecash notification, adding to batch"); + batch_notifications.push((notification, event.created_at)); } }, Err(e) => { @@ -295,7 +312,30 @@ impl HermesClient { } } }, - Ok(RelayPoolNotification::Message { .. }) => {}, // ignore messages + Ok(RelayPoolNotification::Message { message, .. }) => { + // if we receive an EOSE, we have received all the notifications from the relay + // and can now handle the batch + if let RelayMessage::EndOfStoredEvents(_) = message { + has_received_eose = true; + if !batch_notifications.is_empty() { + let mut max_created_at: Option = None; + for (notification, created_at) in batch_notifications.drain(..) { + if let Err(e) = handle_ecash_notification(notification, created_at, &federations, &storage, &claim_key, profile_key.as_ref(), &logger).await { + log_error!(logger, "Error handling ecash notification: {e}"); + } else if max_created_at.is_none() || max_created_at.is_some_and(|x| x < created_at) { + max_created_at = Some(created_at); + } + } + + // save the last sync time after the batch + if let Some(max_created_at) = max_created_at { + if let Err(e) = storage.set_dm_sync_time(max_created_at.as_u64(), true) { + log_error!(logger, "Error saving last sync time: {e}"); + } + } + } + } + }, Ok(RelayPoolNotification::Shutdown) => break, // if we disconnect, we restart to reconnect Ok(RelayPoolNotification::Stop) => {}, // Currently unused Ok(RelayPoolNotification::RelayStatus { .. }) => {}, // Currently unused @@ -734,9 +774,6 @@ async fn handle_ecash_notification( ); } - // save the last sync time - storage.set_dm_sync_time(created_at.as_u64(), true)?; - Ok(()) }