Skip to content

Commit

Permalink
Add blockchain.outpoint.subscribe RPC
Browse files Browse the repository at this point in the history
  • Loading branch information
romanz committed Oct 25, 2021
1 parent 3965a3b commit c9cfcf5
Show file tree
Hide file tree
Showing 3 changed files with 200 additions and 6 deletions.
55 changes: 51 additions & 4 deletions src/electrum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use anyhow::{bail, Context, Result};
use bitcoin::{
consensus::{deserialize, serialize},
hashes::hex::{FromHex, ToHex},
BlockHash, Txid,
BlockHash, OutPoint, Txid,
};
use crossbeam_channel::Receiver;
use rayon::prelude::*;
Expand All @@ -19,7 +19,7 @@ use crate::{
merkle::Proof,
metrics::{self, Histogram},
signals::Signal,
status::ScriptHashStatus,
status::{OutPointStatus, ScriptHashStatus},
tracker::Tracker,
types::ScriptHash,
};
Expand All @@ -32,6 +32,7 @@ const UNKNOWN_FEE: isize = -1; // (allowed by Electrum protocol)
pub struct Client {
tip: Option<BlockHash>,
scripthashes: HashMap<ScriptHash, ScriptHashStatus>,
outpoints: HashMap<OutPoint, OutPointStatus>,
}

#[derive(Deserialize)]
Expand Down Expand Up @@ -176,7 +177,25 @@ impl Rpc {
}
})
.collect::<Result<Vec<Value>>>()
.context("failed to update status")?;
.context("failed to update scripthash status")?;

notifications.extend(
client
.outpoints
.par_iter_mut()
.filter_map(|(outpoint, status)| -> Option<Result<Value>> {
match self.tracker.update_outpoint_status(status, &self.daemon) {
Ok(true) => Some(Ok(notification(
"blockchain.outpoint.subscribe",
&[json!([outpoint.txid, outpoint.vout]), json!(status)],
))),
Ok(false) => None, // outpoint status is the same
Err(e) => Some(Err(e)),
}
})
.collect::<Result<Vec<Value>>>()
.context("failed to update scripthash status")?,
);

if let Some(old_tip) = client.tip {
let new_tip = self.tracker.chain().tip();
Expand Down Expand Up @@ -304,6 +323,28 @@ impl Rpc {
Ok(json!(result))
}

fn outpoint_subscribe(&self, client: &mut Client, (txid, vout): (Txid, u32)) -> Result<Value> {
let outpoint = OutPoint::new(txid, vout);
Ok(match client.outpoints.entry(outpoint) {
Entry::Occupied(e) => json!(e.get()),
Entry::Vacant(e) => {
let outpoint = OutPoint::new(txid, vout);
let mut status = OutPointStatus::new(outpoint);
self.tracker
.update_outpoint_status(&mut status, &self.daemon)?;
json!(e.insert(status))
}
})
}

fn outpoint_unsubscribe(
&self,
client: &mut Client,
(txid, vout): (Txid, u32),
) -> Result<Value> {
Ok(json!(client.outpoints.remove(&OutPoint::new(txid, vout))))
}

fn new_status(&self, scripthash: ScriptHash) -> Result<ScriptHashStatus> {
let mut status = ScriptHashStatus::new(scripthash);
self.tracker
Expand Down Expand Up @@ -438,6 +479,8 @@ impl Rpc {
Call::HeadersSubscribe => self.headers_subscribe(client),
Call::MempoolFeeHistogram => self.get_fee_histogram(),
Call::PeersSubscribe => Ok(json!([])),
Call::OutPointSubscribe(args) => self.outpoint_subscribe(client, args),
Call::OutPointUnsubscribe(args) => self.outpoint_unsubscribe(client, args),
Call::Ping => Ok(Value::Null),
Call::RelayFee => self.relayfee(),
Call::ScriptHashGetBalance(args) => self.scripthash_get_balance(client, args),
Expand Down Expand Up @@ -471,19 +514,21 @@ enum Call {
Banner,
BlockHeader((usize,)),
BlockHeaders((usize, usize)),
TransactionBroadcast((String,)),
Donation,
EstimateFee((u16,)),
Features,
HeadersSubscribe,
MempoolFeeHistogram,
OutPointSubscribe((Txid, u32)), // TODO: support spk_hint
OutPointUnsubscribe((Txid, u32)),
PeersSubscribe,
Ping,
RelayFee,
ScriptHashGetBalance((ScriptHash,)),
ScriptHashGetHistory((ScriptHash,)),
ScriptHashListUnspent((ScriptHash,)),
ScriptHashSubscribe((ScriptHash,)),
TransactionBroadcast((String,)),
TransactionGet(TxGetArgs),
TransactionGetMerkle((Txid, usize)),
Version((String, Version)),
Expand All @@ -501,6 +546,8 @@ impl Call {
"blockchain.scripthash.get_history" => Call::ScriptHashGetHistory(convert(params)?),
"blockchain.scripthash.listunspent" => Call::ScriptHashListUnspent(convert(params)?),
"blockchain.scripthash.subscribe" => Call::ScriptHashSubscribe(convert(params)?),
"blockchain.outpoint.subscribe" => Call::OutPointSubscribe(convert(params)?),
"blockchain.outpoint.unsubscribe" => Call::OutPointUnsubscribe(convert(params)?),
"blockchain.transaction.broadcast" => Call::TransactionBroadcast(convert(params)?),
"blockchain.transaction.get" => Call::TransactionGet(convert(params)?),
"blockchain.transaction.get_merkle" => Call::TransactionGetMerkle(convert(params)?),
Expand Down
141 changes: 140 additions & 1 deletion src/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use bitcoin::{
Amount, Block, BlockHash, OutPoint, SignedAmount, Transaction, Txid,
};
use rayon::prelude::*;
use serde::ser::{Serialize, Serializer};
use serde::ser::{Serialize, SerializeMap, Serializer};

use std::collections::{BTreeMap, HashMap, HashSet};
use std::convert::TryFrom;
Expand Down Expand Up @@ -49,12 +49,26 @@ impl TxEntry {
// Confirmation height of a transaction or its mempool state:
// https://electrumx-spesmilo.readthedocs.io/en/latest/protocol-methods.html#blockchain-scripthash-get-history
// https://electrumx-spesmilo.readthedocs.io/en/latest/protocol-methods.html#blockchain-scripthash-get-mempool
#[derive(Copy, Clone, Eq, PartialEq)]
enum Height {
Confirmed { height: usize },
Unconfirmed { has_unconfirmed_inputs: bool },
}

impl Height {
fn from_blockhash(blockhash: BlockHash, chain: &Chain) -> Self {
let height = chain
.get_block_height(&blockhash)
.expect("missing block in chain");
Self::Confirmed { height }
}

fn unconfirmed(e: &crate::mempool::Entry) -> Self {
Self::Unconfirmed {
has_unconfirmed_inputs: e.has_unconfirmed_inputs,
}
}

fn as_i64(&self) -> i64 {
match self {
Self::Confirmed { height } => i64::try_from(*height).unwrap(),
Expand Down Expand Up @@ -511,6 +525,131 @@ fn compute_status_hash(history: &[HistoryEntry]) -> Option<StatusHash> {
Some(StatusHash::from_engine(engine))
}

pub(crate) struct OutPointStatus {
outpoint: OutPoint,
funding: Option<Height>,
spending: Option<(Txid, Height)>,
tip: BlockHash,
}

impl Serialize for OutPointStatus {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let mut map = serializer.serialize_map(None)?;
if let Some(funding) = &self.funding {
map.serialize_entry("height", &funding)?;
}
if let Some((txid, height)) = &self.spending {
map.serialize_entry("spender_txhash", &txid)?;
map.serialize_entry("spender_height", &height)?;
}
map.end()
}
}

impl OutPointStatus {
pub(crate) fn new(outpoint: OutPoint) -> Self {
Self {
outpoint,
funding: None,
spending: None,
tip: BlockHash::default(),
}
}

pub(crate) fn sync(
&mut self,
index: &Index,
mempool: &Mempool,
daemon: &Daemon,
) -> Result<bool> {
let funding = self.sync_funding(index, daemon, mempool)?;
let spending = self.sync_spending(index, daemon, mempool)?;
let same_status = (self.funding == funding) && (self.spending == spending);
self.funding = funding;
self.spending = spending;
self.tip = index.chain().tip();
Ok(!same_status)
}

/// Return true iff current tip became unconfirmed
fn is_reorg(&self, chain: &Chain) -> bool {
chain.get_block_height(&self.tip).is_none()
}

fn sync_funding(
&self,
index: &Index,
daemon: &Daemon,
mempool: &Mempool,
) -> Result<Option<Height>> {
let chain = index.chain();
if !self.is_reorg(chain) {
if let Some(Height::Confirmed { .. }) = &self.funding {
return Ok(self.funding);
}
}
let mut confirmed = None;
daemon.for_blocks(
index.filter_by_txid(self.outpoint.txid),
|blockhash, block| {
if confirmed.is_none() {
for tx in block.txdata {
let txid = tx.txid();
let output_len = u32::try_from(tx.output.len()).unwrap();
if self.outpoint.txid == txid && self.outpoint.vout < output_len {
confirmed = Some(Height::from_blockhash(blockhash, chain));
return;
}
}
}
},
)?;
Ok(confirmed.or_else(|| {
mempool
.get(&self.outpoint.txid)
.map(|entry| Height::unconfirmed(entry))
}))
}

fn sync_spending(
&self,
index: &Index,
daemon: &Daemon,
mempool: &Mempool,
) -> Result<Option<(Txid, Height)>> {
let chain = index.chain();
if !self.is_reorg(chain) {
if let Some((_, Height::Confirmed { .. })) = &self.spending {
return Ok(self.spending);
}
}
let spending_blockhashes = index.filter_by_spending(self.outpoint);
let mut confirmed = None;
daemon.for_blocks(spending_blockhashes, |blockhash, block| {
for tx in block.txdata {
for txi in &tx.input {
if txi.previous_output == self.outpoint {
// TODO: there should be only one spending input
assert!(confirmed.is_none(), "double spend of {}", self.outpoint);
confirmed = Some((tx.txid(), Height::from_blockhash(blockhash, chain)));
return;
}
}
}
})?;
Ok(confirmed.or_else(|| {
let entries = mempool.filter_by_spending(&self.outpoint);
assert!(entries.len() <= 1, "double spend of {}", self.outpoint);
entries
.first()
.map(|entry| (entry.txid, Height::unconfirmed(entry)))
}))
}
}

#[cfg(test)]
mod tests {
use super::HistoryEntry;
Expand Down
10 changes: 9 additions & 1 deletion src/tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::{
mempool::{FeeHistogram, Mempool},
metrics::Metrics,
signals::ExitFlag,
status::{Balance, ScriptHashStatus, UnspentEntry},
status::{Balance, OutPointStatus, ScriptHashStatus, UnspentEntry},
};

/// Electrum protocol subscriptions' tracker
Expand Down Expand Up @@ -83,6 +83,14 @@ impl Tracker {
status.get_balance(self.chain())
}

pub(crate) fn update_outpoint_status(
&self,
status: &mut OutPointStatus,
daemon: &Daemon,
) -> Result<bool> {
status.sync(&self.index, &self.mempool, daemon)
}

pub(crate) fn get_blockhash_by_txid(&self, txid: Txid) -> Option<BlockHash> {
// Note: there are two blocks with coinbase transactions having same txid (see BIP-30)
self.index.filter_by_txid(txid).next()
Expand Down

0 comments on commit c9cfcf5

Please sign in to comment.