Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add blockchain.outpoint.subscribe RPC #454

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 51 additions & 4 deletions src/electrum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use anyhow::{bail, Context, Result};
use bitcoin::{
consensus::{deserialize, encode::serialize_hex},
hashes::hex::FromHex,
BlockHash, Txid,
BlockHash, OutPoint, Txid,
};
use crossbeam_channel::Receiver;
use rayon::prelude::*;
Expand All @@ -21,7 +21,7 @@ use crate::{
merkle::Proof,
metrics::{self, Histogram, Metrics},
signals::Signal,
status::ScriptHashStatus,
status::{OutPointStatus, ScriptHashStatus},
tracker::Tracker,
types::ScriptHash,
};
Expand All @@ -36,6 +36,7 @@ const UNSUBSCRIBED_QUERY_MESSAGE: &str = "your wallet uses less efficient method
pub struct Client {
tip: Option<BlockHash>,
scripthashes: HashMap<ScriptHash, ScriptHashStatus>,
outpoints: HashMap<OutPoint, OutPointStatus>,
}

#[derive(Deserialize)]
Expand Down Expand Up @@ -185,7 +186,25 @@ impl Rpc {
}
})
.collect::<Result<Vec<Value>>>()
.context("failed to update status")?;
.context("failed to update scripthash status")?;

notifications.extend(
client
.outpoints
.par_iter_mut()
.filter_map(|(outpoint, status)| -> Option<Result<Value>> {
match self.tracker.update_outpoint_status(status, &self.daemon) {
Ok(true) => Some(Ok(notification(
"blockchain.outpoint.subscribe",
&[json!([outpoint.txid, outpoint.vout]), json!(status)],
))),
Ok(false) => None, // outpoint status is the same
Err(e) => Some(Err(e)),
}
})
.collect::<Result<Vec<Value>>>()
.context("failed to update scripthash status")?,
);

if let Some(old_tip) = client.tip {
let new_tip = self.tracker.chain().tip();
Expand Down Expand Up @@ -350,6 +369,28 @@ impl Rpc {
})
}

fn outpoint_subscribe(&self, client: &mut Client, (txid, vout): (Txid, u32)) -> Result<Value> {
let outpoint = OutPoint::new(txid, vout);
Ok(match client.outpoints.entry(outpoint) {
Entry::Occupied(e) => json!(e.get()),
Entry::Vacant(e) => {
let outpoint = OutPoint::new(txid, vout);
let mut status = OutPointStatus::new(outpoint);
self.tracker
.update_outpoint_status(&mut status, &self.daemon)?;
json!(e.insert(status))
}
})
}

fn outpoint_unsubscribe(
&self,
client: &mut Client,
(txid, vout): (Txid, u32),
) -> Result<Value> {
Ok(json!(client.outpoints.remove(&OutPoint::new(txid, vout))))
}

fn new_status(&self, scripthash: ScriptHash) -> Result<ScriptHashStatus> {
let mut status = ScriptHashStatus::new(scripthash);
self.tracker
Expand Down Expand Up @@ -548,6 +589,8 @@ impl Rpc {
Params::Features => self.features(),
Params::HeadersSubscribe => self.headers_subscribe(client),
Params::MempoolFeeHistogram => self.get_fee_histogram(),
Params::OutPointSubscribe(args) => self.outpoint_subscribe(client, *args),
Params::OutPointUnsubscribe(args) => self.outpoint_unsubscribe(client, *args),
Params::PeersSubscribe => Ok(json!([])),
Params::Ping => Ok(Value::Null),
Params::RelayFee => self.relayfee(),
Expand All @@ -572,12 +615,13 @@ enum Params {
Banner,
BlockHeader((usize,)),
BlockHeaders((usize, usize)),
TransactionBroadcast((String,)),
Donation,
EstimateFee((u16,)),
Features,
HeadersSubscribe,
MempoolFeeHistogram,
OutPointSubscribe((Txid, u32)), // TODO: support spk_hint
OutPointUnsubscribe((Txid, u32)),
PeersSubscribe,
Ping,
RelayFee,
Expand All @@ -586,6 +630,7 @@ enum Params {
ScriptHashListUnspent((ScriptHash,)),
ScriptHashSubscribe((ScriptHash,)),
ScriptHashUnsubscribe((ScriptHash,)),
TransactionBroadcast((String,)),
TransactionGet(TxGetArgs),
TransactionGetMerkle((Txid, usize)),
TransactionFromPosition((usize, usize, bool)),
Expand All @@ -599,6 +644,8 @@ impl Params {
"blockchain.block.headers" => Params::BlockHeaders(convert(params)?),
"blockchain.estimatefee" => Params::EstimateFee(convert(params)?),
"blockchain.headers.subscribe" => Params::HeadersSubscribe,
"blockchain.outpoint.subscribe" => Params::OutPointSubscribe(convert(params)?),
"blockchain.outpoint.unsubscribe" => Params::OutPointUnsubscribe(convert(params)?),
"blockchain.relayfee" => Params::RelayFee,
"blockchain.scripthash.get_balance" => Params::ScriptHashGetBalance(convert(params)?),
"blockchain.scripthash.get_history" => Params::ScriptHashGetHistory(convert(params)?),
Expand Down
137 changes: 136 additions & 1 deletion src/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use bitcoin::{
Amount, Block, BlockHash, OutPoint, SignedAmount, Transaction, Txid,
};
use rayon::prelude::*;
use serde::ser::{Serialize, Serializer};
use serde::ser::{Serialize, SerializeMap, Serializer};

use std::collections::{BTreeMap, HashMap, HashSet};
use std::convert::TryFrom;
Expand Down Expand Up @@ -48,12 +48,26 @@ impl TxEntry {
// Confirmation height of a transaction or its mempool state:
// https://electrumx-spesmilo.readthedocs.io/en/latest/protocol-methods.html#blockchain-scripthash-get-history
// https://electrumx-spesmilo.readthedocs.io/en/latest/protocol-methods.html#blockchain-scripthash-get-mempool
#[derive(Copy, Clone, Eq, PartialEq)]
enum Height {
Confirmed { height: usize },
Unconfirmed { has_unconfirmed_inputs: bool },
}

impl Height {
fn from_blockhash(blockhash: BlockHash, chain: &Chain) -> Self {
let height = chain
.get_block_height(&blockhash)
.expect("missing block in chain");
Self::Confirmed { height }
}

fn unconfirmed(e: &crate::mempool::Entry) -> Self {
Self::Unconfirmed {
has_unconfirmed_inputs: e.has_unconfirmed_inputs,
}
}

fn as_i64(&self) -> i64 {
match self {
Self::Confirmed { height } => i64::try_from(*height).unwrap(),
Expand Down Expand Up @@ -538,6 +552,127 @@ fn filter_block_txs<T: Send>(
.into_iter()
}

pub(crate) struct OutPointStatus {
outpoint: OutPoint,
funding: Option<Height>,
spending: Option<(Txid, Height)>,
tip: BlockHash,
}

impl Serialize for OutPointStatus {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let mut map = serializer.serialize_map(None)?;
if let Some(funding) = &self.funding {
map.serialize_entry("height", &funding)?;
}
if let Some((txid, height)) = &self.spending {
map.serialize_entry("spender_txhash", &txid)?;
map.serialize_entry("spender_height", &height)?;
}
map.end()
}
}

impl OutPointStatus {
pub(crate) fn new(outpoint: OutPoint) -> Self {
Self {
outpoint,
funding: None,
spending: None,
tip: BlockHash::all_zeros(),
}
}

pub(crate) fn sync(
&mut self,
index: &Index,
mempool: &Mempool,
daemon: &Daemon,
) -> Result<bool> {
let funding = self.sync_funding(index, daemon, mempool)?;
let spending = self.sync_spending(index, daemon, mempool)?;
let same_status = (self.funding == funding) && (self.spending == spending);
self.funding = funding;
self.spending = spending;
self.tip = index.chain().tip();
Ok(!same_status)
}

/// Return true iff current tip became unconfirmed
fn is_reorg(&self, chain: &Chain) -> bool {
chain.get_block_height(&self.tip).is_none()
}

fn sync_funding(
&self,
index: &Index,
daemon: &Daemon,
mempool: &Mempool,
) -> Result<Option<Height>> {
let chain = index.chain();
if !self.is_reorg(chain) {
if let Some(Height::Confirmed { .. }) = &self.funding {
return Ok(self.funding);
}
}
let mut confirmed = None;
daemon.for_blocks(
index.filter_by_txid(self.outpoint.txid),
|blockhash, block| {
if confirmed.is_none() {
for tx in block.txdata {
let txid = tx.txid();
let output_len = u32::try_from(tx.output.len()).unwrap();
if self.outpoint.txid == txid && self.outpoint.vout < output_len {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self.outpoint.vout < output_len

maybe i am missing something, but for what reason is this check here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clearly this avoids out-of-bounds access.

Copy link
Contributor

@antonilol antonilol Nov 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm i tried to find where self.outpoint.vout is used to index tx.output, but could not find it, self.outpoint.vout is not used after this here, which made sense after i thought of this: this code checks for output funding, it doesnt matter which output index is subscribed to, as long as it is in the transaction (self.outpoint.vout < output_len)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, interesting, this is not even required for double-spend handling since a double-spent transaction would have a different txid.

Maybe there would be a value in having a sanity check with a warning? It'd be best to also check scripts then.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's indeed a sanity check - in case the user tries to subscribe to a non-existing outpoint (by specifying a vout larger than the actual # of txid outputs).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I'm trying to say is that it now silently ignores the transaction instead of printing a warning and a warning would be helpful because doing this is a bug and without a message it could be hard to analyze.

confirmed = Some(Height::from_blockhash(blockhash, chain));
return;
}
}
}
},
)?;
Ok(confirmed.or_else(|| mempool.get(&self.outpoint.txid).map(Height::unconfirmed)))
}

fn sync_spending(
&self,
index: &Index,
daemon: &Daemon,
mempool: &Mempool,
) -> Result<Option<(Txid, Height)>> {
let chain = index.chain();
if !self.is_reorg(chain) {
if let Some((_, Height::Confirmed { .. })) = &self.spending {
return Ok(self.spending);
}
}
let spending_blockhashes = index.filter_by_spending(self.outpoint);
let mut confirmed = None;
daemon.for_blocks(spending_blockhashes, |blockhash, block| {
for tx in block.txdata {
for txi in &tx.input {
if txi.previous_output == self.outpoint {
// TODO: there should be only one spending input
assert!(confirmed.is_none(), "double spend of {}", self.outpoint);
confirmed = Some((tx.txid(), Height::from_blockhash(blockhash, chain)));
return;
}
}
}
})?;
Ok(confirmed.or_else(|| {
let entries = mempool.filter_by_spending(&self.outpoint);
assert!(entries.len() <= 1, "double spend of {}", self.outpoint);
entries
.first()
.map(|entry| (entry.txid, Height::unconfirmed(entry)))
}))
}
}

#[cfg(test)]
mod tests {
use super::HistoryEntry;
Expand Down
10 changes: 9 additions & 1 deletion src/tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::{
mempool::{FeeHistogram, Mempool},
metrics::Metrics,
signals::ExitFlag,
status::{Balance, ScriptHashStatus, UnspentEntry},
status::{Balance, OutPointStatus, ScriptHashStatus, UnspentEntry},
};

/// Electrum protocol subscriptions' tracker
Expand Down Expand Up @@ -114,4 +114,12 @@ impl Tracker {
})?;
Ok(result)
}

pub(crate) fn update_outpoint_status(
&self,
status: &mut OutPointStatus,
daemon: &Daemon,
) -> Result<bool> {
status.sync(&self.index, &self.mempool, daemon)
}
}