Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Claim zaps in batches #1203

Merged
merged 1 commit into from
Jun 6, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 46 additions & 9 deletions mutiny-core/src/hermes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ use bitcoin::{bip32::ExtendedPrivKey, secp256k1::Secp256k1};
use fedimint_core::config::FederationId;
use futures::{pin_mut, select, FutureExt};
use lightning::util::logger::Logger;
use lightning::{log_error, log_info, log_warn};
use lightning::{log_debug, log_error, log_info, log_warn};
use lightning_invoice::Bolt11Invoice;
use nostr::secp256k1::SecretKey;
use nostr::{nips::nip04::decrypt, Event, JsonUtil, Keys, Tag, ToBech32};
use nostr::{nips::nip04::decrypt, Event, JsonUtil, Keys, RelayMessage, Tag, ToBech32};
use nostr::{prelude::decrypt_received_private_zap_message, EventBuilder};
use nostr::{Filter, Kind, Timestamp};
use nostr_sdk::{Client, RelayPoolNotification};
Expand Down Expand Up @@ -101,7 +101,12 @@ impl<S: MutinyStorage> HermesClient<S> {
let client = Client::new(&keys);

client
.add_relays(RELAYS)
.add_relays(
// we are listening only, so no need to connect to blastr
RELAYS
.into_iter()
.filter(|r| *r != "wss://nostr.mutinywallet.com"),
)
.await
.expect("Failed to add relays");

Expand Down Expand Up @@ -266,6 +271,9 @@ impl<S: MutinyStorage> HermesClient<S> {

client.subscribe(vec![received_dm_filter], None).await;

let mut has_received_eose = false;
let mut batch_notifications: Vec<(EcashNotification, Timestamp)> = Vec::new();

let mut notifications = client.notifications();

loop {
Expand All @@ -282,8 +290,17 @@ impl<S: MutinyStorage> HermesClient<S> {
Kind::EncryptedDirectMessage => {
match decrypt_ecash_notification(&dm_key, event.pubkey, &event.content) {
Ok(notification) => {
if let Err(e) = handle_ecash_notification(notification, event.created_at, &federations, &storage, &claim_key, profile_key.as_ref(), &logger).await {
log_error!(logger, "Error handling ecash notification: {e}");
// if we have received an EOSE, we should immediately handle the notification
// otherwise we should wait until we have received an EOSE so we can do the initial batch
if has_received_eose {
if let Err(e) = handle_ecash_notification(notification, event.created_at, &federations, &storage, &claim_key, profile_key.as_ref(), &logger).await {
log_error!(logger, "Error handling ecash notification: {e}");
} else if let Err(e) = storage.set_dm_sync_time(event.created_at.as_u64(), true) { // save the last sync time after processing the notification
log_error!(logger, "Error saving last sync time: {e}");
}
} else {
log_debug!(logger, "Received ecash notification, adding to batch");
batch_notifications.push((notification, event.created_at));
}
},
Err(e) => {
Expand All @@ -295,7 +312,30 @@ impl<S: MutinyStorage> HermesClient<S> {
}
}
},
Ok(RelayPoolNotification::Message { .. }) => {}, // ignore messages
Ok(RelayPoolNotification::Message { message, .. }) => {
// if we receive an EOSE, we have received all the notifications from the relay
// and can now handle the batch
if let RelayMessage::EndOfStoredEvents(_) = message {
has_received_eose = true;
if !batch_notifications.is_empty() {
let mut max_created_at: Option<Timestamp> = None;
for (notification, created_at) in batch_notifications.drain(..) {
if let Err(e) = handle_ecash_notification(notification, created_at, &federations, &storage, &claim_key, profile_key.as_ref(), &logger).await {
log_error!(logger, "Error handling ecash notification: {e}");
} else if max_created_at.is_none() || max_created_at.is_some_and(|x| x < created_at) {
max_created_at = Some(created_at);
}
}

// save the last sync time after the batch
if let Some(max_created_at) = max_created_at {
if let Err(e) = storage.set_dm_sync_time(max_created_at.as_u64(), true) {
log_error!(logger, "Error saving last sync time: {e}");
}
}
}
}
},
Ok(RelayPoolNotification::Shutdown) => break, // if we disconnect, we restart to reconnect
Ok(RelayPoolNotification::Stop) => {}, // Currently unused
Ok(RelayPoolNotification::RelayStatus { .. }) => {}, // Currently unused
Expand Down Expand Up @@ -734,9 +774,6 @@ async fn handle_ecash_notification<S: MutinyStorage>(
);
}

// save the last sync time
storage.set_dm_sync_time(created_at.as_u64(), true)?;

Ok(())
}

Expand Down
Loading