From b46f0d7bddacee62e2c7359943dddd608e90273c Mon Sep 17 00:00:00 2001 From: Tony Giorgio Date: Tue, 7 May 2024 23:19:40 -0500 Subject: [PATCH] Fedimint onchain cleanup --- mutiny-core/src/federation.rs | 369 ++++++++++++++++++---------------- mutiny-core/src/storage.rs | 18 ++ 2 files changed, 212 insertions(+), 175 deletions(-) diff --git a/mutiny-core/src/federation.rs b/mutiny-core/src/federation.rs index 92355aba0..7be242353 100644 --- a/mutiny-core/src/federation.rs +++ b/mutiny-core/src/federation.rs @@ -5,8 +5,9 @@ use crate::{ logging::MutinyLogger, onchain::coin_type_from_network, storage::{ - get_transaction_details, list_payment_info, persist_payment_info, - persist_transaction_details, MutinyStorage, VersionedValue, TRANSACTION_DETAILS_PREFIX_KEY, + delete_transaction_details, get_transaction_details, list_payment_info, + persist_payment_info, persist_transaction_details, MutinyStorage, VersionedValue, + TRANSACTION_DETAILS_PREFIX_KEY, }, utils::sleep, HTLCStatus, MutinyInvoice, DEFAULT_PAYMENT_TIMEOUT, @@ -1146,6 +1147,7 @@ async fn process_operation_until_timeout( stored_transaction_details, operation_id, storage, + esplora, timeout, stop, logger, @@ -1318,90 +1320,95 @@ async fn process_onchain_withdraw_outcome( .map(|o| o.labels.clone()) .unwrap_or_default(); - match stream_or_outcome { - UpdateStreamOrOutcome::Outcome(outcome) => { - // TODO - log_trace!(logger, "Outcome received: {:?}", outcome); - } - UpdateStreamOrOutcome::UpdateStream(mut s) => { - // break out after sleep time or check stop signal - log_trace!(logger, "start timeout stream futures"); - loop { - let timeout_future = if let Some(t) = timeout { - sleep(t as i32) - } else { - sleep(1_000_i32) - }; + let mut s = stream_or_outcome.into_stream(); - let mut stream_fut = Box::pin(s.next()).fuse(); - let delay_fut = Box::pin(timeout_future).fuse(); - pin_mut!(delay_fut); + // break out after sleep time or check stop signal + log_trace!(logger, "start timeout stream futures"); + loop { + let timeout_future = if let Some(t) = timeout { + sleep(t as i32) + } else { + sleep(1_000_i32) + }; - select! { - outcome_option = stream_fut => { - if let Some(outcome) = outcome_option { - // TODO refactor outcome parsing into seperate method - match outcome { - WithdrawState::Created => { - // Nothing to do - log_debug!(logger, "Waiting for withdraw"); + let mut stream_fut = Box::pin(s.next()).fuse(); + let delay_fut = Box::pin(timeout_future).fuse(); + pin_mut!(delay_fut); + + select! { + outcome_option = stream_fut => { + if let Some(outcome) = outcome_option { + match outcome { + WithdrawState::Created => { + // Nothing to do + log_debug!(logger, "Waiting for withdraw"); + }, + WithdrawState::Succeeded(txid) => { + log_info!(logger, "Withdraw successful: {txid}"); + + let internal_id = Txid::from_slice(&operation_id.0).expect("should convert"); + let txid = Txid::from_slice(&txid).expect("should convert"); + let updated_transaction_details = TransactionDetails { + transaction: None, + txid: Some(txid), + internal_id, + received: 0, + sent: amount.to_sat(), + fee: Some(fee.to_sat()), + confirmation_time: ConfirmationTime::Unconfirmed { last_seen: now().as_secs() }, + labels: labels.clone(), + }; + + match persist_transaction_details(&storage, &updated_transaction_details) { + Ok(_) => { + log_info!(logger, "Transaction updated"); }, - WithdrawState::Succeeded(txid) => { - log_info!(logger, "Withdraw successful: {txid}"); - - let internal_id = Txid::from_slice(&operation_id.0).expect("should convert"); - let txid = Txid::from_slice(&txid).expect("should convert"); - let updated_transaction_details = TransactionDetails { - transaction: None, - txid: Some(txid), - internal_id, - received: 0, - sent: amount.to_sat(), - fee: Some(fee.to_sat()), - confirmation_time: ConfirmationTime::Unconfirmed { last_seen: now().as_secs() }, - labels: labels.clone(), - }; - - match persist_transaction_details(&storage, &updated_transaction_details) { - Ok(_) => { - log_info!(logger, "Transaction updated"); - }, - Err(e) => { - log_error!(logger, "Error updating transaction: {e}"); - }, - } - - // we need to get confirmations for this txid and update - subscribe_onchain_confirmation_check(storage.clone(), esplora.clone(), txid, updated_transaction_details, stop, logger.clone()).await; - - break - }, - WithdrawState::Failed(e) => { - // TODO delete - log_error!(logger, "Transaction failed: {e}"); - break; + Err(e) => { + log_error!(logger, "Error updating transaction: {e}"); }, } - } - } - _ = delay_fut => { - if timeout.is_none() { - if stop.load(Ordering::Relaxed) { - break; + + // we need to get confirmations for this txid and update + subscribe_onchain_confirmation_check(storage.clone(), esplora.clone(), txid, updated_transaction_details, stop, logger.clone()).await; + + break + }, + WithdrawState::Failed(e) => { + log_error!(logger, "Transaction failed: {e}"); + + // if we have the original transaction details, delete it + if let Some(t) = original_transaction_details { + match delete_transaction_details(&storage, &t) { + Ok(_) => { + log_info!(logger, "Transaction deleted"); + }, + Err(e) => { + log_error!(logger, "Error deleting transaction: {e}"); + }, + } } - } else { - log_debug!( - logger, - "Timeout reached, exiting loop for on chain tx", - ); + break; - } + }, } } } - log_trace!(logger, "Done with stream outcome",); + _ = delay_fut => { + if timeout.is_none() { + if stop.load(Ordering::Relaxed) { + break; + } + } else { + log_debug!( + logger, + "Timeout reached, exiting loop for on chain tx", + ); + break; + } + } } } + log_trace!(logger, "Done with stream outcome",); } async fn subscribe_onchain_confirmation_check( @@ -1447,11 +1454,14 @@ async fn subscribe_onchain_confirmation_check( }); } +// TODO refactor maybe? +#[allow(clippy::too_many_arguments)] async fn process_onchain_deposit_outcome( stream_or_outcome: UpdateStreamOrOutcome, original_transaction_details: Option, operation_id: OperationId, storage: S, + esplora: Arc, timeout: Option, stop: Arc, logger: Arc, @@ -1461,119 +1471,128 @@ async fn process_onchain_deposit_outcome( .map(|o| o.labels.clone()) .unwrap_or_default(); - match stream_or_outcome { - UpdateStreamOrOutcome::Outcome(outcome) => { - // TODO - log_trace!(logger, "Outcome received: {:?}", outcome); - } - UpdateStreamOrOutcome::UpdateStream(mut s) => { - // break out after sleep time or check stop signal - log_trace!(logger, "start timeout stream futures"); - loop { - let timeout_future = if let Some(t) = timeout { - sleep(t as i32) - } else { - sleep(1_000_i32) - }; + let mut s = stream_or_outcome.into_stream(); - let mut stream_fut = Box::pin(s.next()).fuse(); - let delay_fut = Box::pin(timeout_future).fuse(); - pin_mut!(delay_fut); + // break out after sleep time or check stop signal + log_trace!(logger, "start timeout stream futures"); + loop { + let timeout_future = if let Some(t) = timeout { + sleep(t as i32) + } else { + sleep(1_000_i32) + }; - select! { - outcome_option = stream_fut => { - if let Some(outcome) = outcome_option { - // TODO refactor outcome parsing into seperate method - match outcome { - fedimint_wallet_client::DepositState::WaitingForTransaction => { - // Nothing to do - log_debug!(logger, "Waiting for transaction"); - } - fedimint_wallet_client::DepositState::WaitingForConfirmation(tx) => { - // Pending state, update with info we have - log_debug!(logger, "Waiting for confirmation"); - let txid = Txid::from_slice(&tx.btc_transaction.txid()).expect("should convert"); - let internal_id = Txid::from_slice(&operation_id.0).expect("should convert"); - let output = tx.btc_transaction.output[tx.out_idx as usize].clone(); - - let updated_transaction_details = TransactionDetails { - transaction: None, - txid: Some(txid), - internal_id, - received: output.value, - sent: 0, - fee: None, - confirmation_time: ConfirmationTime::Unconfirmed { last_seen: now().as_secs() }, - labels: labels.clone(), - }; - - match persist_transaction_details(&storage, &updated_transaction_details) { - Ok(_) => { - log_info!(logger, "Transaction updated"); - }, - Err(e) => { - log_error!(logger, "Error updating transaction: {e}"); - }, - } - } - fedimint_wallet_client::DepositState::Confirmed(tx) => { - // Pending state, update with info we have - log_debug!(logger, "Transaction confirmed"); - let txid = Txid::from_slice(&tx.btc_transaction.txid()).expect("should convert"); - let internal_id = Txid::from_slice(&operation_id.0).expect("should convert"); - let output = tx.btc_transaction.output[tx.out_idx as usize].clone(); - - let updated_transaction_details = TransactionDetails { - transaction: None, - txid: Some(txid), - internal_id, - received: output.value, - sent: 0, - fee: None, - confirmation_time: ConfirmationTime::Confirmed { height: 0, time: now().as_secs() }, // FIXME: can't figure this out - labels: labels.clone(), - }; - - match persist_transaction_details(&storage, &updated_transaction_details) { - Ok(_) => { - log_info!(logger, "Transaction updated"); - }, - Err(e) => { - log_error!(logger, "Error updating transaction: {e}"); - }, - } - } - fedimint_wallet_client::DepositState::Claimed(_) => { - // Nothing really to change from confirmed to claimed - log_debug!(logger, "Transaction claimed"); - break; - } - fedimint_wallet_client::DepositState::Failed(e) => { - // TODO delete - log_error!(logger, "Transaction failed: {e}"); - break; - } + let mut stream_fut = Box::pin(s.next()).fuse(); + let delay_fut = Box::pin(timeout_future).fuse(); + pin_mut!(delay_fut); + + select! { + outcome_option = stream_fut => { + if let Some(outcome) = outcome_option { + match outcome { + fedimint_wallet_client::DepositState::WaitingForTransaction => { + // Nothing to do + log_debug!(logger, "Waiting for transaction"); + } + fedimint_wallet_client::DepositState::WaitingForConfirmation(tx) => { + // Pending state, update with info we have + log_debug!(logger, "Waiting for confirmation"); + let txid = Txid::from_slice(&tx.btc_transaction.txid()).expect("should convert"); + let internal_id = Txid::from_slice(&operation_id.0).expect("should convert"); + let output = tx.btc_transaction.output[tx.out_idx as usize].clone(); + + let updated_transaction_details = TransactionDetails { + transaction: None, + txid: Some(txid), + internal_id, + received: output.value, + sent: 0, + fee: None, + confirmation_time: ConfirmationTime::Unconfirmed { last_seen: now().as_secs() }, + labels: labels.clone(), + }; + + match persist_transaction_details(&storage, &updated_transaction_details) { + Ok(_) => { + log_info!(logger, "Transaction updated"); + }, + Err(e) => { + log_error!(logger, "Error updating transaction: {e}"); + }, } } - } - _ = delay_fut => { - if timeout.is_none() { - if stop.load(Ordering::Relaxed) { - break; + fedimint_wallet_client::DepositState::Confirmed(tx) => { + // Pending state, update with info we have + log_debug!(logger, "Transaction confirmed"); + let txid = Txid::from_slice(&tx.btc_transaction.txid()).expect("should convert"); + let internal_id = Txid::from_slice(&operation_id.0).expect("should convert"); + let output = tx.btc_transaction.output[tx.out_idx as usize].clone(); + + // store as confirmed 0 block height until we can check esplora after + let updated_transaction_details = TransactionDetails { + transaction: None, + txid: Some(txid), + internal_id, + received: output.value, + sent: 0, + fee: None, + confirmation_time: ConfirmationTime::Confirmed { height: 0, time: now().as_secs() }, + labels: labels.clone(), + }; + + match persist_transaction_details(&storage, &updated_transaction_details) { + Ok(_) => { + log_info!(logger, "Transaction updated"); + }, + Err(e) => { + log_error!(logger, "Error updating transaction: {e}"); + }, } - } else { - log_debug!( - logger, - "Timeout reached, exiting loop for on chain tx", - ); + + // we need to get confirmations for this txid and update + subscribe_onchain_confirmation_check(storage.clone(), esplora.clone(), txid, updated_transaction_details, stop.clone(), logger.clone()).await; + } + fedimint_wallet_client::DepositState::Claimed(_) => { + // Nothing really to change from confirmed to claimed + log_debug!(logger, "Transaction claimed"); + break; + } + fedimint_wallet_client::DepositState::Failed(e) => { + log_error!(logger, "Transaction failed: {e}"); + + // if we have the original transaction details, delete it + if let Some(t) = original_transaction_details { + match delete_transaction_details(&storage, &t) { + Ok(_) => { + log_info!(logger, "Transaction deleted"); + }, + Err(e) => { + log_error!(logger, "Error deleting transaction: {e}"); + }, + } + } + break; } } } } - log_trace!(logger, "Done with stream outcome",); + _ = delay_fut => { + if timeout.is_none() { + if stop.load(Ordering::Relaxed) { + break; + } + } else { + log_debug!( + logger, + "Timeout reached, exiting loop for on chain tx", + ); + break; + } + } } } + log_trace!(logger, "Done with stream outcome",); } #[derive(Clone)] diff --git a/mutiny-core/src/storage.rs b/mutiny-core/src/storage.rs index a6be6c796..9f970e74a 100644 --- a/mutiny-core/src/storage.rs +++ b/mutiny-core/src/storage.rs @@ -926,6 +926,24 @@ pub(crate) fn persist_transaction_details( Ok(()) } +pub(crate) fn delete_transaction_details( + storage: &S, + transaction_details: &TransactionDetails, +) -> Result<(), MutinyError> { + let key = transaction_details_key(transaction_details.internal_id); + storage.delete(&[key.clone()])?; + + // delete from index + let index = storage.activity_index(); + let mut index = index.try_write()?; + index.insert(IndexItem { + timestamp: None, + key, + }); + + Ok(()) +} + pub(crate) fn get_transaction_details( storage: &S, internal_id: Txid,