Skip to content

Commit

Permalink
Send v2 payjoin
Browse files Browse the repository at this point in the history
Persist sessions to poll and pending transactions to wallet as in
receiving v2 payjoin.
  • Loading branch information
DanGould committed Apr 18, 2024
1 parent 68aace6 commit 421bcce
Show file tree
Hide file tree
Showing 2 changed files with 202 additions and 54 deletions.
182 changes: 136 additions & 46 deletions mutiny-core/src/nodemanager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::auth::MutinyAuthClient;
use crate::labels::LabelStorage;
use crate::ldkstorage::CHANNEL_CLOSURE_PREFIX;
use crate::logging::LOGGING_KEY;
use crate::payjoin::{Error as PayjoinError, PayjoinStorage, RecvSession};
use crate::payjoin::{random_ohttp_relay, Error as PayjoinError, PayjoinStorage, RecvSession};
use crate::utils::{sleep, spawn};
use crate::MutinyInvoice;
use crate::MutinyWalletConfig;
Expand Down Expand Up @@ -56,7 +56,6 @@ use reqwest::Client;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::cmp::max;
use std::io::Cursor;
use std::str::FromStr;
use std::sync::atomic::{AtomicBool, Ordering};
#[cfg(not(target_arch = "wasm32"))]
Expand Down Expand Up @@ -580,10 +579,14 @@ impl<S: MutinyStorage> NodeManager<S> {

/// Starts a background task to poll payjoin sessions to attempt receiving.
pub(crate) fn resume_payjoins(nm: Arc<NodeManager<S>>) {
let all = nm.storage.list_recv_sessions().unwrap_or_default();
for payjoin in all {
let receives = nm.storage.list_recv_sessions().unwrap_or_default();
for payjoin in receives {
nm.clone().spawn_payjoin_receiver(payjoin);
}
let sends = nm.storage.list_send_sessions().unwrap_or_default();
for payjoin in sends {
nm.clone().spawn_payjoin_sender(payjoin);
}
}

/// Creates a background process that will sync the wallet with the blockchain.
Expand Down Expand Up @@ -678,7 +681,7 @@ impl<S: MutinyStorage> NodeManager<S> {
pub async fn start_payjoin_session(
&self,
) -> Result<(Enrolled, payjoin::OhttpKeys), PayjoinError> {
use crate::payjoin::{fetch_ohttp_keys, random_ohttp_relay, PAYJOIN_DIR};
use crate::payjoin::{fetch_ohttp_keys, PAYJOIN_DIR};

log_info!(self.logger, "Starting payjoin session");

Expand All @@ -704,7 +707,7 @@ impl<S: MutinyStorage> NodeManager<S> {
))
}

// Send v1 payjoin request
// Send v2 payjoin request
pub async fn send_payjoin(
&self,
uri: Uri<'_, NetworkUnchecked>,
Expand All @@ -717,64 +720,151 @@ impl<S: MutinyStorage> NodeManager<S> {
.map_err(|_| MutinyError::IncorrectNetwork)?;
let address = uri.address.clone();
let original_psbt = self.wallet.create_signed_psbt(address, amount, fee_rate)?;
// Track this transaction in the wallet so it shows as an ActivityItem in UI.
// We'll cancel it if and when this original_psbt fallback is replaced with a received payjoin.
self.wallet
.insert_tx(
original_psbt.clone().extract_tx(),
ConfirmationTime::unconfirmed(crate::utils::now().as_secs()),
None,
)
.await?;

let fee_rate = if let Some(rate) = fee_rate {
FeeRate::from_sat_per_vb(rate)
} else {
let sat_per_kwu = self.fee_estimator.get_normal_fee_rate();
FeeRate::from_sat_per_kwu(sat_per_kwu as f32)
};
let fee_rate = payjoin::bitcoin::FeeRate::from_sat_per_kwu(fee_rate.sat_per_kwu() as u64);
let original_psbt = payjoin::bitcoin::psbt::PartiallySignedTransaction::from_str(
&original_psbt.to_string(),
)
.map_err(|_| MutinyError::WalletOperationFailed)?;
log_debug!(self.logger, "Creating payjoin request");
let (req, ctx) =
payjoin::send::RequestBuilder::from_psbt_and_uri(original_psbt.clone(), uri)
.unwrap()
.build_recommended(fee_rate)
.map_err(|_| MutinyError::PayjoinCreateRequest)?
.extract_v1()?;

let client = Client::builder()
.build()
.map_err(|e| MutinyError::Other(e.into()))?;
let req_ctx = payjoin::send::RequestBuilder::from_psbt_and_uri(original_psbt.clone(), uri)
.map_err(|_| MutinyError::PayjoinCreateRequest)?
.build_recommended(fee_rate)
.map_err(|_| MutinyError::PayjoinConfigError)?;
let session = self.storage.store_new_send_session(
labels.clone(),
original_psbt.clone(),
req_ctx.clone(),
)?;
self.spawn_payjoin_sender(session);
Ok(original_psbt.extract_tx().txid())
}

log_debug!(self.logger, "Sending payjoin request");
let res = client
.post(req.url)
.body(req.body)
.header("Content-Type", "text/plain")
.send()
fn spawn_payjoin_sender(&self, session: crate::payjoin::SendSession) {
let wallet = self.wallet.clone();
let logger = self.logger.clone();
let stop = self.stop.clone();
let storage = Arc::new(self.storage.clone());
utils::spawn(async move {
let proposal_psbt = match Self::poll_payjoin_sender(
stop,
wallet.clone(),
storage.clone(),
session.clone(),
)
.await
.map_err(|_| MutinyError::PayjoinCreateRequest)?
.bytes()
{
Ok(psbt) => psbt,
Err(e) => {
// self.wallet cancel_tx
log_error!(logger, "Error polling payjoin sender: {e}");
return;
}
};

let session_clone = session.clone();
match Self::handle_proposal_psbt(
logger.clone(),
wallet,
session_clone.original_psbt,
proposal_psbt,
session_clone.labels,
)
.await
.map_err(|_| MutinyError::PayjoinCreateRequest)?;
{
// Ensure ResponseError is logged with debug formatting
Err(e) => log_error!(logger, "Error handling payjoin proposal: {:?}", e),
Ok(txid) => log_info!(logger, "Payjoin proposal handled: {}", txid),
}
let o_txid = session.clone().original_psbt.clone().extract_tx().txid();
match storage.delete_send_session(session) {
Ok(_) => log_info!(logger, "Deleted payjoin send session: {}", o_txid),
Err(e) => log_error!(logger, "Error deleting payjoin send session: {e}"),
}
});
}

let mut cursor = Cursor::new(res.to_vec());
async fn poll_payjoin_sender(
stop: Arc<AtomicBool>,
wallet: Arc<OnChainWallet<S>>,
storage: Arc<S>,
mut session: crate::payjoin::SendSession,
) -> Result<bitcoin::psbt::Psbt, MutinyError> {
let http = Client::builder()
.build()
.map_err(|_| MutinyError::Other(anyhow!("failed to build http client")))?;
loop {
if stop.load(Ordering::Relaxed) {
return Err(MutinyError::NotRunning);
}

log_debug!(self.logger, "Processing payjoin response");
let proposal_psbt = ctx.process_response(&mut cursor).map_err(|e| {
// unrecognized error contents may only appear in debug logs and will not Display
log_debug!(self.logger, "Payjoin response error: {:?}", e);
e
})?;
if session.expiry < utils::now() {
wallet
.cancel_tx(&session.clone().original_psbt.extract_tx())
.map_err(|_| crate::payjoin::Error::CancelPayjoinTx)?;
storage.delete_send_session(session)?;
return Err(MutinyError::Payjoin(crate::payjoin::Error::SessionExpired));
}

// convert to pdk types
let original_psbt = PartiallySignedTransaction::from_str(&original_psbt.to_string())
.map_err(|_| MutinyError::PayjoinConfigError)?;
let proposal_psbt = PartiallySignedTransaction::from_str(&proposal_psbt.to_string())
.map_err(|_| MutinyError::PayjoinConfigError)?;
let (req, ctx) = session
.req_ctx
.extract_v2(random_ohttp_relay().to_owned())
.map_err(|_| MutinyError::PayjoinConfigError)?;
// extract_v2 mutates the session, so we need to update it in storage to not reuse keys
storage.update_send_session(session.clone())?;
let response = http
.post(req.url)
.header("Content-Type", "message/ohttp-req")
.body(req.body)
.send()
.await
.map_err(|_| MutinyError::Other(anyhow!("failed to parse payjoin response")))?;
let mut reader =
std::io::Cursor::new(response.bytes().await.map_err(|_| {
MutinyError::Other(anyhow!("failed to parse payjoin response"))
})?);

let psbt = ctx
.process_response(&mut reader)
.map_err(MutinyError::PayjoinResponse)?;
if let Some(psbt) = psbt {
let psbt = bitcoin::psbt::Psbt::from_str(&psbt.to_string())
.map_err(|_| MutinyError::Other(anyhow!("psbt conversion failed")))?;
return Ok(psbt);
} else {
log::info!("No response yet for POST payjoin request, retrying some seconds");
std::thread::sleep(std::time::Duration::from_secs(5));
}
}
}

log_debug!(self.logger, "Sending payjoin..");
let tx = self
.wallet
async fn handle_proposal_psbt(
logger: Arc<MutinyLogger>,
wallet: Arc<OnChainWallet<S>>,
original_psbt: PartiallySignedTransaction,
proposal_psbt: PartiallySignedTransaction,
labels: Vec<String>,
) -> Result<Txid, MutinyError> {
log_debug!(logger, "Sending payjoin..");
let original_tx = original_psbt.clone().extract_tx();
let tx = wallet
.send_payjoin(original_psbt, proposal_psbt, labels)
.await?;
let txid = tx.txid();
self.broadcast_transaction(tx).await?;
log_debug!(self.logger, "Payjoin broadcast! TXID: {txid}");
wallet.broadcast_transaction(tx).await?;
wallet.cancel_tx(&original_tx)?;
log_info!(logger, "Payjoin broadcast! TXID: {txid}");
Ok(txid)
}

Expand Down
74 changes: 66 additions & 8 deletions mutiny-core/src/payjoin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ use std::sync::Arc;

use crate::error::MutinyError;
use crate::storage::MutinyStorage;
use bitcoin::Transaction;
use bitcoin::{psbt::Psbt, Transaction, Txid};
use core::time::Duration;
use gloo_net::websocket::futures::WebSocket;
use hex_conservative::DisplayHex;
use once_cell::sync::Lazy;
use payjoin::receive::v2::Enrolled;
use payjoin::OhttpKeys;
use pj::send::RequestContext;
use serde::{Deserialize, Serialize};
use url::Url;

Expand Down Expand Up @@ -40,22 +41,46 @@ impl RecvSession {
self.enrolled.pubkey()
}
}

#[derive(Clone, PartialEq, Serialize, Deserialize)]
pub struct SendSession {
pub original_psbt: Psbt,
pub req_ctx: RequestContext,
pub labels: Vec<String>,
pub expiry: Duration,
}

pub trait PayjoinStorage {
fn list_recv_sessions(&self) -> Result<Vec<RecvSession>, MutinyError>;
fn store_new_recv_session(&self, session: Enrolled) -> Result<RecvSession, MutinyError>;
fn update_recv_session(&self, session: RecvSession) -> Result<(), MutinyError>;
fn delete_recv_session(&self, id: &[u8; 33]) -> Result<(), MutinyError>;

fn list_send_sessions(&self) -> Result<Vec<SendSession>, MutinyError>;
fn store_new_send_session(
&self,
labels: Vec<String>,
original_psbt: Psbt,
req_ctx: RequestContext,
) -> Result<SendSession, MutinyError>;
fn update_send_session(&self, session: SendSession) -> Result<(), MutinyError>;
fn delete_send_session(&self, session: SendSession) -> Result<(), MutinyError>;
}

const PAYJOIN_KEY_PREFIX: &str = "recvpj/";
const RECV_PAYJOIN_KEY_PREFIX: &str = "recvpj/";
const SEND_PAYJOIN_KEY_PREFIX: &str = "sendpj/";

fn get_payjoin_key(id: &[u8; 33]) -> String {
format!("{PAYJOIN_KEY_PREFIX}{}", id.as_hex())
fn get_recv_key(id: &[u8; 33]) -> String {
format!("{RECV_PAYJOIN_KEY_PREFIX}{}", id.as_hex())
}

fn get_send_key(original_txid: Txid) -> String {
format!("{RECV_PAYJOIN_KEY_PREFIX}{}", original_txid)
}

impl<S: MutinyStorage> PayjoinStorage for S {
fn list_recv_sessions(&self) -> Result<Vec<RecvSession>, MutinyError> {
let map: HashMap<String, RecvSession> = self.scan(PAYJOIN_KEY_PREFIX, None)?;
let map: HashMap<String, RecvSession> = self.scan(RECV_PAYJOIN_KEY_PREFIX, None)?;
Ok(map.values().map(|v| v.to_owned()).collect())
}

Expand All @@ -66,16 +91,49 @@ impl<S: MutinyStorage> PayjoinStorage for S {
expiry: in_24_hours,
payjoin_tx: None,
};
self.set_data(get_payjoin_key(&session.pubkey()), session.clone(), None)
self.set_data(get_recv_key(&session.pubkey()), session.clone(), None)
.map(|_| session)
}

fn update_recv_session(&self, session: RecvSession) -> Result<(), MutinyError> {
self.set_data(get_payjoin_key(&session.pubkey()), session, None)
self.set_data(get_recv_key(&session.pubkey()), session, None)
}

fn delete_recv_session(&self, id: &[u8; 33]) -> Result<(), MutinyError> {
self.delete(&[get_payjoin_key(id)])
self.delete(&[get_recv_key(id)])
}

fn store_new_send_session(
&self,
labels: Vec<String>,
original_psbt: Psbt,
req_ctx: RequestContext,
) -> Result<SendSession, MutinyError> {
let in_24_hours = crate::utils::now() + Duration::from_secs(60 * 60 * 24);
let o_txid = original_psbt.clone().extract_tx().txid();
let session = SendSession {
labels,
original_psbt,
expiry: in_24_hours,
req_ctx,
};
self.set_data(o_txid.to_string(), session.clone(), None)
.map(|_| session)
}

fn list_send_sessions(&self) -> Result<Vec<SendSession>, MutinyError> {
let map: HashMap<String, SendSession> = self.scan(SEND_PAYJOIN_KEY_PREFIX, None)?;
Ok(map.values().map(|v| v.to_owned()).collect())
}

fn update_send_session(&self, session: SendSession) -> Result<(), MutinyError> {
let o_txid = session.clone().original_psbt.extract_tx().txid();
self.set_data(get_send_key(o_txid), session, None)
}

fn delete_send_session(&self, session: SendSession) -> Result<(), MutinyError> {
let o_txid = session.original_psbt.extract_tx().txid();
self.delete(&[get_send_key(o_txid)])
}
}

Expand Down

0 comments on commit 421bcce

Please sign in to comment.