From 049df10f016d69e6a79d976bfa53fc9149688b71 Mon Sep 17 00:00:00 2001 From: Roman Zeyde Date: Fri, 13 Aug 2021 20:34:40 +0300 Subject: [PATCH] Add blockchain.outpoint.subscribe RPC --- src/electrum.rs | 55 +++++++++++++++++-- src/status.rs | 141 +++++++++++++++++++++++++++++++++++++++++++++++- src/tracker.rs | 10 +++- 3 files changed, 200 insertions(+), 6 deletions(-) diff --git a/src/electrum.rs b/src/electrum.rs index a3dfa2cf6..1f3cb3ca1 100644 --- a/src/electrum.rs +++ b/src/electrum.rs @@ -2,7 +2,7 @@ use anyhow::{bail, Context, Result}; use bitcoin::{ consensus::{deserialize, serialize}, hashes::hex::{FromHex, ToHex}, - BlockHash, Txid, + BlockHash, OutPoint, Txid, }; use crossbeam_channel::Receiver; use rayon::prelude::*; @@ -19,7 +19,7 @@ use crate::{ merkle::Proof, metrics::{self, Histogram}, signals::Signal, - status::ScriptHashStatus, + status::{OutPointStatus, ScriptHashStatus}, tracker::Tracker, types::ScriptHash, }; @@ -34,6 +34,7 @@ const UNKNOWN_FEE: isize = -1; // (allowed by Electrum protocol) pub struct Client { tip: Option, scripthashes: HashMap, + outpoints: HashMap, } #[derive(Deserialize)] @@ -178,7 +179,25 @@ impl Rpc { } }) .collect::>>() - .context("failed to update status")?; + .context("failed to update scripthash status")?; + + notifications.extend( + client + .outpoints + .par_iter_mut() + .filter_map(|(outpoint, status)| -> Option> { + match self.tracker.update_outpoint_status(status, &self.daemon) { + Ok(true) => Some(Ok(notification( + "blockchain.outpoint.subscribe", + &[json!([outpoint.txid, outpoint.vout]), json!(status)], + ))), + Ok(false) => None, // outpoint status is the same + Err(e) => Some(Err(e)), + } + }) + .collect::>>() + .context("failed to update scripthash status")?, + ); if let Some(old_tip) = client.tip { let new_tip = self.tracker.chain().tip(); @@ -306,6 +325,28 @@ impl Rpc { Ok(json!(result)) } + fn outpoint_subscribe(&self, client: &mut Client, (txid, vout): (Txid, u32)) -> Result { + let outpoint = OutPoint::new(txid, vout); + Ok(match client.outpoints.entry(outpoint) { + Entry::Occupied(e) => json!(e.get()), + Entry::Vacant(e) => { + let outpoint = OutPoint::new(txid, vout); + let mut status = OutPointStatus::new(outpoint); + self.tracker + .update_outpoint_status(&mut status, &self.daemon)?; + json!(e.insert(status)) + } + }) + } + + fn outpoint_unsubscribe( + &self, + client: &mut Client, + (txid, vout): (Txid, u32), + ) -> Result { + Ok(json!(client.outpoints.remove(&OutPoint::new(txid, vout)))) + } + fn new_status(&self, scripthash: ScriptHash) -> Result { let mut status = ScriptHashStatus::new(scripthash); self.tracker @@ -440,6 +481,8 @@ impl Rpc { Call::HeadersSubscribe => self.headers_subscribe(client), Call::MempoolFeeHistogram => self.get_fee_histogram(), Call::PeersSubscribe => Ok(json!([])), + Call::OutPointSubscribe(args) => self.outpoint_subscribe(client, args), + Call::OutPointUnsubscribe(args) => self.outpoint_unsubscribe(client, args), Call::Ping => Ok(Value::Null), Call::RelayFee => self.relayfee(), Call::ScriptHashGetBalance(args) => self.scripthash_get_balance(client, args), @@ -473,12 +516,13 @@ enum Call { Banner, BlockHeader((usize,)), BlockHeaders((usize, usize)), - TransactionBroadcast((String,)), Donation, EstimateFee((u16,)), Features, HeadersSubscribe, MempoolFeeHistogram, + OutPointSubscribe((Txid, u32)), // TODO: support spk_hint + OutPointUnsubscribe((Txid, u32)), PeersSubscribe, Ping, RelayFee, @@ -486,6 +530,7 @@ enum Call { ScriptHashGetHistory((ScriptHash,)), ScriptHashListUnspent((ScriptHash,)), ScriptHashSubscribe((ScriptHash,)), + TransactionBroadcast((String,)), TransactionGet(TxGetArgs), TransactionGetMerkle((Txid, usize)), Version((String, Version)), @@ -503,6 +548,8 @@ impl Call { "blockchain.scripthash.get_history" => Call::ScriptHashGetHistory(convert(params)?), "blockchain.scripthash.listunspent" => Call::ScriptHashListUnspent(convert(params)?), "blockchain.scripthash.subscribe" => Call::ScriptHashSubscribe(convert(params)?), + "blockchain.outpoint.subscribe" => Call::OutPointSubscribe(convert(params)?), + "blockchain.outpoint.unsubscribe" => Call::OutPointUnsubscribe(convert(params)?), "blockchain.transaction.broadcast" => Call::TransactionBroadcast(convert(params)?), "blockchain.transaction.get" => Call::TransactionGet(convert(params)?), "blockchain.transaction.get_merkle" => Call::TransactionGetMerkle(convert(params)?), diff --git a/src/status.rs b/src/status.rs index e1e30a5cc..70b32a643 100644 --- a/src/status.rs +++ b/src/status.rs @@ -4,7 +4,7 @@ use bitcoin::{ Amount, Block, BlockHash, OutPoint, SignedAmount, Transaction, Txid, }; use rayon::prelude::*; -use serde::ser::{Serialize, Serializer}; +use serde::ser::{Serialize, SerializeMap, Serializer}; use std::collections::{BTreeMap, HashMap, HashSet}; use std::convert::TryFrom; @@ -49,12 +49,26 @@ impl TxEntry { // Confirmation height of a transaction or its mempool state: // https://electrumx-spesmilo.readthedocs.io/en/latest/protocol-methods.html#blockchain-scripthash-get-history // https://electrumx-spesmilo.readthedocs.io/en/latest/protocol-methods.html#blockchain-scripthash-get-mempool +#[derive(Copy, Clone, Eq, PartialEq)] enum Height { Confirmed { height: usize }, Unconfirmed { has_unconfirmed_inputs: bool }, } impl Height { + fn from_blockhash(blockhash: BlockHash, chain: &Chain) -> Self { + let height = chain + .get_block_height(&blockhash) + .expect("missing block in chain"); + Self::Confirmed { height } + } + + fn unconfirmed(e: &crate::mempool::Entry) -> Self { + Self::Unconfirmed { + has_unconfirmed_inputs: e.has_unconfirmed_inputs, + } + } + fn as_i64(&self) -> i64 { match self { Self::Confirmed { height } => i64::try_from(*height).unwrap(), @@ -511,6 +525,131 @@ fn compute_status_hash(history: &[HistoryEntry]) -> Option { Some(StatusHash::from_engine(engine)) } +pub(crate) struct OutPointStatus { + outpoint: OutPoint, + funding: Option, + spending: Option<(Txid, Height)>, + tip: BlockHash, +} + +impl Serialize for OutPointStatus { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + let mut map = serializer.serialize_map(None)?; + if let Some(funding) = &self.funding { + map.serialize_entry("height", &funding)?; + } + if let Some((txid, height)) = &self.spending { + map.serialize_entry("spender_txhash", &txid)?; + map.serialize_entry("spender_height", &height)?; + } + map.end() + } +} + +impl OutPointStatus { + pub(crate) fn new(outpoint: OutPoint) -> Self { + Self { + outpoint, + funding: None, + spending: None, + tip: BlockHash::default(), + } + } + + pub(crate) fn sync( + &mut self, + index: &Index, + mempool: &Mempool, + daemon: &Daemon, + ) -> Result { + let funding = self.sync_funding(index, daemon, mempool)?; + let spending = self.sync_spending(index, daemon, mempool)?; + let same_status = (self.funding == funding) && (self.spending == spending); + self.funding = funding; + self.spending = spending; + self.tip = index.chain().tip(); + Ok(!same_status) + } + + /// Return true iff current tip became unconfirmed + fn is_reorg(&self, chain: &Chain) -> bool { + chain.get_block_height(&self.tip).is_none() + } + + fn sync_funding( + &self, + index: &Index, + daemon: &Daemon, + mempool: &Mempool, + ) -> Result> { + let chain = index.chain(); + if !self.is_reorg(chain) { + if let Some(Height::Confirmed { .. }) = &self.funding { + return Ok(self.funding); + } + } + let mut confirmed = None; + daemon.for_blocks( + index.filter_by_txid(self.outpoint.txid), + |blockhash, block| { + if confirmed.is_none() { + for tx in block.txdata { + let txid = tx.txid(); + let output_len = u32::try_from(tx.output.len()).unwrap(); + if self.outpoint.txid == txid && self.outpoint.vout < output_len { + confirmed = Some(Height::from_blockhash(blockhash, chain)); + return; + } + } + } + }, + )?; + Ok(confirmed.or_else(|| { + mempool + .get(&self.outpoint.txid) + .map(|entry| Height::unconfirmed(entry)) + })) + } + + fn sync_spending( + &self, + index: &Index, + daemon: &Daemon, + mempool: &Mempool, + ) -> Result> { + let chain = index.chain(); + if !self.is_reorg(chain) { + if let Some((_, Height::Confirmed { .. })) = &self.spending { + return Ok(self.spending); + } + } + let spending_blockhashes = index.filter_by_spending(self.outpoint); + let mut confirmed = None; + daemon.for_blocks(spending_blockhashes, |blockhash, block| { + for tx in block.txdata { + for txi in &tx.input { + if txi.previous_output == self.outpoint { + // TODO: there should be only one spending input + assert!(confirmed.is_none(), "double spend of {}", self.outpoint); + confirmed = Some((tx.txid(), Height::from_blockhash(blockhash, chain))); + return; + } + } + } + })?; + Ok(confirmed.or_else(|| { + let entries = mempool.filter_by_spending(&self.outpoint); + assert!(entries.len() <= 1, "double spend of {}", self.outpoint); + entries + .first() + .map(|entry| (entry.txid, Height::unconfirmed(entry))) + })) + } +} + #[cfg(test)] mod tests { use super::HistoryEntry; diff --git a/src/tracker.rs b/src/tracker.rs index b2d9c057b..550b4d618 100644 --- a/src/tracker.rs +++ b/src/tracker.rs @@ -11,7 +11,7 @@ use crate::{ mempool::{FeeHistogram, Mempool}, metrics::Metrics, signals::ExitFlag, - status::{Balance, ScriptHashStatus, UnspentEntry}, + status::{Balance, OutPointStatus, ScriptHashStatus, UnspentEntry}, }; /// Electrum protocol subscriptions' tracker @@ -83,6 +83,14 @@ impl Tracker { status.get_balance(self.chain()) } + pub(crate) fn update_outpoint_status( + &self, + status: &mut OutPointStatus, + daemon: &Daemon, + ) -> Result { + status.sync(&self.index, &self.mempool, daemon) + } + pub(crate) fn get_blockhash_by_txid(&self, txid: Txid) -> Option { // Note: there are two blocks with coinbase transactions having same txid (see BIP-30) self.index.filter_by_txid(txid).next()