Skip to content

Commit

Permalink
for_blocks apply closure in parallel
Browse files Browse the repository at this point in the history
requires changing the signature of the function from FnMut -> Fn and return
data instead of relying on mutating the context
  • Loading branch information
RCasatta committed Aug 30, 2023
1 parent 6f82108 commit b2fcb27
Show file tree
Hide file tree
Showing 6 changed files with 104 additions and 57 deletions.
5 changes: 3 additions & 2 deletions src/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,10 +229,11 @@ impl Daemon {
self.p2p.lock().get_new_headers(chain)
}

pub(crate) fn for_blocks<B, F>(&self, blockhashes: B, func: F) -> Result<()>
pub(crate) fn for_blocks<B, F, R>(&self, blockhashes: B, func: F) -> Result<Vec<R>>
where
B: IntoIterator<Item = BlockHash>,
F: FnMut(BlockHash, SerBlock),
F: Fn(BlockHash, SerBlock) -> R + Send + Sync,
R: Send + Sync,
{
self.p2p.lock().for_blocks(blockhashes, func)
}
Expand Down
11 changes: 11 additions & 0 deletions src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ pub(crate) type Row = Box<[u8]>;

#[derive(Default)]
pub(crate) struct WriteBatch {
pub(crate) height: usize,
pub(crate) tip_row: Row,
pub(crate) header_rows: Vec<Row>,
pub(crate) funding_rows: Vec<Row>,
Expand All @@ -22,6 +23,16 @@ impl WriteBatch {
self.spending_rows.sort_unstable();
self.txid_rows.sort_unstable();
}
pub(crate) fn merge(mut self, other: WriteBatch) -> Self {
self.header_rows.extend(other.header_rows.into_iter());
self.funding_rows.extend(other.funding_rows.into_iter());
self.spending_rows.extend(other.spending_rows.into_iter());
self.txid_rows.extend(other.txid_rows.into_iter());
if self.height < other.height {
self.tip_row = other.tip_row
}
self
}
}

/// RocksDB wrapper for index storage
Expand Down
26 changes: 16 additions & 10 deletions src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use anyhow::{Context, Result};
use bitcoin::consensus::{deserialize, serialize, Decodable};
use bitcoin::{BlockHash, OutPoint, Txid};
use bitcoin_slices::{bsl, Visit, Visitor};
use std::collections::HashMap;
use std::ops::ControlFlow;

use crate::{
Expand Down Expand Up @@ -199,29 +200,33 @@ impl Index {
}

fn sync_blocks(&mut self, daemon: &Daemon, chunk: &[NewHeader]) -> Result<()> {
let blockhashes: Vec<BlockHash> = chunk.iter().map(|h| h.hash()).collect();
let mut heights = chunk.iter().map(|h| h.height());
let hash_height: HashMap<_, _> = chunk.iter().map(|h| (h.hash(), h.height())).collect();

let mut batch = WriteBatch::default();
let batches = daemon.for_blocks(hash_height.keys().cloned(), |blockhash, block| {
let height = *hash_height.get(&blockhash).expect("some by construnction");
let mut batch = WriteBatch::default();

daemon.for_blocks(blockhashes, |blockhash, block| {
let height = heights.next().expect("unexpected block");
self.stats.observe_duration("block", || {
index_single_block(blockhash, block, height, &mut batch);
});
self.stats.height.set("tip", height as f64);
batch
})?;
let heights: Vec<_> = heights.collect();
assert!(
heights.is_empty(),
"some blocks were not indexed: {:?}",
heights
assert_eq!(
hash_height.len(),
batches.len(),
"some blocks were not indexed",
);
let mut batch = batches
.into_iter()
.fold(WriteBatch::default(), |a, b| a.merge(b));

batch.sort();
self.stats.observe_batch(&batch);
self.stats
.observe_duration("write", || self.store.write(&batch));
self.stats.observe_db(&self.store);

Ok(())
}

Expand Down Expand Up @@ -287,4 +292,5 @@ fn index_single_block(
let mut index_block = IndexBlockVisitor { batch, height };
bsl::Block::visit(&block, &mut index_block).expect("core returned invalid block");
batch.tip_row = serialize(&block_hash).into_boxed_slice();
batch.height = height;
}
24 changes: 17 additions & 7 deletions src/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ use bitcoin::{
};
use bitcoin_slices::{bsl, Parse};
use crossbeam_channel::{bounded, select, Receiver, Sender};
use rayon::iter::ParallelIterator;
use rayon::prelude::IntoParallelIterator;

use std::io::{self, ErrorKind, Write};
use std::net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream};
Expand Down Expand Up @@ -91,18 +93,20 @@ impl Connection {
.collect())
}

/// Request and process the specified blocks (in the specified order).
/// Request and process the specified blocks.
/// See https://en.bitcoin.it/wiki/Protocol_documentation#getblocks for details.
/// Defined as `&mut self` to prevent concurrent invocations (https://github.com/romanz/electrs/pull/526#issuecomment-934685515).
pub(crate) fn for_blocks<B, F>(&mut self, blockhashes: B, mut func: F) -> Result<()>
pub(crate) fn for_blocks<B, F, R>(&mut self, blockhashes: B, func: F) -> Result<Vec<R>>
where
B: IntoIterator<Item = BlockHash>,
F: FnMut(BlockHash, SerBlock),
F: Fn(BlockHash, SerBlock) -> R + Send + Sync,
R: Send + Sync,
{
self.blocks_duration.observe_duration("total", || {
let mut result = vec![];
let blockhashes: Vec<BlockHash> = blockhashes.into_iter().collect();
if blockhashes.is_empty() {
return Ok(());
return Ok(vec![]);
}
self.blocks_duration.observe_duration("request", || {
debug!("loading {} blocks", blockhashes.len());
Expand All @@ -124,10 +128,16 @@ impl Connection {
);
Ok(block)
})?;
self.blocks_duration
.observe_duration("process", || func(hash, block));
result.push((hash, block));
}
Ok(())

Ok(result
.into_par_iter()
.map(|(hash, block)| {
self.blocks_duration
.observe_duration("process", || func(hash, block))
})
.collect())
})
}

Expand Down
72 changes: 45 additions & 27 deletions src/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,10 +308,11 @@ impl ScriptHashStatus {
}

/// Apply func only on the new blocks (fetched from daemon).
fn for_new_blocks<B, F>(&self, blockhashes: B, daemon: &Daemon, func: F) -> Result<()>
fn for_new_blocks<B, F, R>(&self, blockhashes: B, daemon: &Daemon, func: F) -> Result<Vec<R>>
where
B: IntoIterator<Item = BlockHash>,
F: FnMut(BlockHash, SerBlock),
F: Fn(BlockHash, SerBlock) -> R + Send + Sync,
R: Send + Sync,
{
daemon.for_blocks(
blockhashes
Expand All @@ -331,37 +332,54 @@ impl ScriptHashStatus {
outpoints: &mut HashSet<OutPoint>,
) -> Result<HashMap<BlockHash, Vec<TxEntry>>> {
let scripthash = self.scripthash;
let mut result = HashMap::<BlockHash, HashMap<usize, TxEntry>>::new();

let funding_blockhashes = index.limit_result(index.filter_by_funding(scripthash))?;
self.for_new_blocks(funding_blockhashes, daemon, |blockhash, block| {
let block_entries = result.entry(blockhash).or_default();
for filtered_outputs in filter_block_txs_outputs(block, scripthash) {
cache.add_tx(filtered_outputs.txid, move || filtered_outputs.tx);
outpoints.extend(make_outpoints(
filtered_outputs.txid,
&filtered_outputs.result,
));
block_entries
.entry(filtered_outputs.pos)
.or_insert_with(|| TxEntry::new(filtered_outputs.txid))
.outputs = filtered_outputs.result;
}
})?;
let outputs_filtering =
self.for_new_blocks(funding_blockhashes, daemon, |blockhash, block| {
let mut block_entries: HashMap<usize, TxEntry> = HashMap::new();
let mut outpoints = vec![];
for filtered_outputs in filter_block_txs_outputs(block, scripthash) {
cache.add_tx(filtered_outputs.txid, move || filtered_outputs.tx);
outpoints.extend(make_outpoints(
filtered_outputs.txid,
&filtered_outputs.result,
));
block_entries
.entry(filtered_outputs.pos)
.or_insert_with(|| TxEntry::new(filtered_outputs.txid))
.outputs = filtered_outputs.result;
}
(blockhash, outpoints, block_entries)
})?;

outpoints.extend(outputs_filtering.iter().flat_map(|(_, o, _)| o).cloned());

let mut result: HashMap<_, _> = outputs_filtering
.into_iter()
.map(|(a, _, b)| (a, b))
.collect();

let spending_blockhashes: HashSet<BlockHash> = outpoints
.par_iter()
.flat_map_iter(|outpoint| index.filter_by_spending(*outpoint))
.collect();
self.for_new_blocks(spending_blockhashes, daemon, |blockhash, block| {
let block_entries = result.entry(blockhash).or_default();
for filtered_inputs in filter_block_txs_inputs(&block, outpoints) {
cache.add_tx(filtered_inputs.txid, move || filtered_inputs.tx);
block_entries
.entry(filtered_inputs.pos)
.or_insert_with(|| TxEntry::new(filtered_inputs.txid))
.spent = filtered_inputs.result;
}
})?;
let inputs_filtering =
self.for_new_blocks(spending_blockhashes, daemon, |blockhash, block| {
let mut block_entries: HashMap<usize, TxEntry> = HashMap::new();

for filtered_inputs in filter_block_txs_inputs(&block, outpoints) {
cache.add_tx(filtered_inputs.txid, move || filtered_inputs.tx);
block_entries
.entry(filtered_inputs.pos)
.or_insert_with(|| TxEntry::new(filtered_inputs.txid))
.spent = filtered_inputs.result;
}
(blockhash, block_entries)
})?;
for (b, h) in inputs_filtering {
let e = result.entry(b).or_default();
e.extend(h);
}

Ok(result
.into_iter()
Expand Down
23 changes: 12 additions & 11 deletions src/tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,17 +105,18 @@ impl Tracker {
) -> Result<Option<(BlockHash, Transaction)>> {
// Note: there are two blocks with coinbase transactions having same txid (see BIP-30)
let blockhashes = self.index.filter_by_txid(txid);
let mut result = None;
daemon.for_blocks(blockhashes, |blockhash, block| {
let mut visitor = FindTransaction::new(txid);
match bsl::Block::visit(&block, &mut visitor) {
Ok(_) | Err(VisitBreak) => (),
Err(e) => panic!("core returned invalid block: {:?}", e),
}
if let Some(tx) = visitor.tx_found() {
result = Some((blockhash, tx));
}
})?;
let result = daemon
.for_blocks(blockhashes, |blockhash, block| {
let mut visitor = FindTransaction::new(txid);
match bsl::Block::visit(&block, &mut visitor) {
Ok(_) | Err(VisitBreak) => (),
Err(e) => panic!("core returned invalid block: {:?}", e),
}
visitor.tx_found().map(|tx| (blockhash, tx))
})?
.first()
.cloned()
.flatten();
Ok(result)
}
}

0 comments on commit b2fcb27

Please sign in to comment.