Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Concurrent block processing #931

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions src/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,10 +229,11 @@ impl Daemon {
self.p2p.lock().get_new_headers(chain)
}

pub(crate) fn for_blocks<B, F>(&self, blockhashes: B, func: F) -> Result<()>
pub(crate) fn for_blocks<B, F, R>(&self, blockhashes: B, func: F) -> Result<Vec<R>>
where
B: IntoIterator<Item = BlockHash>,
F: FnMut(BlockHash, SerBlock),
F: Fn(BlockHash, SerBlock) -> R + Send + Sync,
R: Send + Sync,
{
self.p2p.lock().for_blocks(blockhashes, func)
}
Expand Down
11 changes: 11 additions & 0 deletions src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ pub(crate) type Row = Box<[u8]>;

#[derive(Default)]
pub(crate) struct WriteBatch {
pub(crate) height: usize,
pub(crate) tip_row: Row,
pub(crate) header_rows: Vec<Row>,
pub(crate) funding_rows: Vec<Row>,
Expand All @@ -22,6 +23,16 @@ impl WriteBatch {
self.spending_rows.sort_unstable();
self.txid_rows.sort_unstable();
}
pub(crate) fn merge(mut self, other: WriteBatch) -> Self {
self.header_rows.extend(other.header_rows.into_iter());
self.funding_rows.extend(other.funding_rows.into_iter());
self.spending_rows.extend(other.spending_rows.into_iter());
self.txid_rows.extend(other.txid_rows.into_iter());
if self.height < other.height {
self.tip_row = other.tip_row
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also update self.height?

Copy link
Contributor Author

@RCasatta RCasatta Oct 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so -> #931 (comment)

Which is not ideal at all, but I am not sure if there are other better options, or if I should keep this and add a comment

}
Comment on lines +31 to +33
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic could bring to intermediate invalid states (with middle heights still missing).
However, since all batches are always merged should be okay at the end

self
}
}

/// RocksDB wrapper for index storage
Expand Down
26 changes: 16 additions & 10 deletions src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use anyhow::{Context, Result};
use bitcoin::consensus::{deserialize, serialize, Decodable};
use bitcoin::{BlockHash, OutPoint, Txid};
use bitcoin_slices::{bsl, Visit, Visitor};
use std::collections::HashMap;
use std::ops::ControlFlow;

use crate::{
Expand Down Expand Up @@ -199,29 +200,33 @@ impl Index {
}

fn sync_blocks(&mut self, daemon: &Daemon, chunk: &[NewHeader]) -> Result<()> {
let blockhashes: Vec<BlockHash> = chunk.iter().map(|h| h.hash()).collect();
let mut heights = chunk.iter().map(|h| h.height());
let hash_height: HashMap<_, _> = chunk.iter().map(|h| (h.hash(), h.height())).collect();

let mut batch = WriteBatch::default();
let batches = daemon.for_blocks(hash_height.keys().cloned(), |blockhash, block| {
let height = *hash_height.get(&blockhash).expect("some by construnction");
let mut batch = WriteBatch::default();

daemon.for_blocks(blockhashes, |blockhash, block| {
let height = heights.next().expect("unexpected block");
self.stats.observe_duration("block", || {
index_single_block(blockhash, block, height, &mut batch);
});
self.stats.height.set("tip", height as f64);
batch
})?;
let heights: Vec<_> = heights.collect();
assert!(
heights.is_empty(),
"some blocks were not indexed: {:?}",
heights
assert_eq!(
hash_height.len(),
batches.len(),
"some blocks were not indexed",
);
let mut batch = batches
.into_iter()
.fold(WriteBatch::default(), |a, b| a.merge(b));

batch.sort();
self.stats.observe_batch(&batch);
self.stats
.observe_duration("write", || self.store.write(&batch));
self.stats.observe_db(&self.store);

Ok(())
}

Expand Down Expand Up @@ -287,4 +292,5 @@ fn index_single_block(
let mut index_block = IndexBlockVisitor { batch, height };
bsl::Block::visit(&block, &mut index_block).expect("core returned invalid block");
batch.tip_row = serialize(&block_hash).into_boxed_slice();
batch.height = height;
}
81 changes: 58 additions & 23 deletions src/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crossbeam_channel::{bounded, select, Receiver, Sender};

use std::io::{self, ErrorKind, Write};
use std::net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream};
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};

use crate::types::SerBlock;
Expand Down Expand Up @@ -90,43 +90,78 @@ impl Connection {
.collect())
}

/// Request and process the specified blocks (in the specified order).
/// Request and process the specified blocks.
/// See https://en.bitcoin.it/wiki/Protocol_documentation#getblocks for details.
/// Defined as `&mut self` to prevent concurrent invocations (https://github.com/romanz/electrs/pull/526#issuecomment-934685515).
pub(crate) fn for_blocks<B, F>(&mut self, blockhashes: B, mut func: F) -> Result<()>
pub(crate) fn for_blocks<B, F, R>(&mut self, blockhashes: B, func: F) -> Result<Vec<R>>
where
B: IntoIterator<Item = BlockHash>,
F: FnMut(BlockHash, SerBlock),
F: Fn(BlockHash, SerBlock) -> R + Send + Sync,
R: Send + Sync,
{
self.blocks_duration.observe_duration("total", || {
let blockhashes: Vec<BlockHash> = blockhashes.into_iter().collect();
let blockhashes_len = blockhashes.len();
if blockhashes.is_empty() {
return Ok(());
return Ok(vec![]);
}
self.blocks_duration.observe_duration("request", || {
debug!("loading {} blocks", blockhashes.len());
self.req_send.send(Request::get_blocks(&blockhashes))
})?;

for hash in blockhashes {
let block = self.blocks_duration.observe_duration("response", || {
let block = self
.blocks_recv
.recv()
.with_context(|| format!("failed to get block {}", hash))?;
let header = bsl::BlockHeader::parse(&block[..])
.expect("core returned invalid blockheader")
.parsed_owned();
ensure!(
&header.block_hash_sha2()[..] == hash.as_byte_array(),
"got unexpected block"
);
Ok(block)
})?;
self.blocks_duration
.observe_duration("process", || func(hash, block));
let mut result = Vec::with_capacity(blockhashes_len);
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If possible, can we use 5aba2a1 to simplify this method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure! It is much better I didn't think about that

for _ in 0..blockhashes_len {
// TODO use `OnceLock<R>` instead of `Mutex<Option<R>>` once MSRV 1.70
result.push(Mutex::new(None));
}

rayon::scope(|s| {
for (i, hash) in blockhashes.iter().enumerate() {
let block_result = self.blocks_duration.observe_duration("response", || {
let block = self
.blocks_recv
.recv()
.with_context(|| format!("failed to get block {}", hash))?;
let header = bsl::BlockHeader::parse(&block[..])
.expect("core returned invalid blockheader")
.parsed_owned();
ensure!(
&header.block_hash_sha2()[..] == hash.as_byte_array(),
"got unexpected block"
);
Ok(block)
});
if let Ok(block) = block_result {
let func = &func;
let blocks_duration = &self.blocks_duration;
let hash = *hash;
let result = &result;

s.spawn(move |_| {
let r =
blocks_duration.observe_duration("process", || func(hash, block));
*result[i]
.lock()
.expect("I am the only user of this mutex until the scope ends") =
Some(r);
});
}
}
});

let result: Option<Vec<_>> = result
.into_iter()
.map(|e| {
e.into_inner()
.expect("spawn cannot panic and the scope ensure all the threads ended")
})
.collect();

match result {
Some(r) => Ok(r),
None => bail!("One or more of the jobs in for_blocks failed"),
}
Ok(())
})
}

Expand Down
72 changes: 45 additions & 27 deletions src/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,10 +308,11 @@ impl ScriptHashStatus {
}

/// Apply func only on the new blocks (fetched from daemon).
fn for_new_blocks<B, F>(&self, blockhashes: B, daemon: &Daemon, func: F) -> Result<()>
fn for_new_blocks<B, F, R>(&self, blockhashes: B, daemon: &Daemon, func: F) -> Result<Vec<R>>
where
B: IntoIterator<Item = BlockHash>,
F: FnMut(BlockHash, SerBlock),
F: Fn(BlockHash, SerBlock) -> R + Send + Sync,
R: Send + Sync,
{
daemon.for_blocks(
blockhashes
Expand All @@ -331,37 +332,54 @@ impl ScriptHashStatus {
outpoints: &mut HashSet<OutPoint>,
) -> Result<HashMap<BlockHash, Vec<TxEntry>>> {
let scripthash = self.scripthash;
let mut result = HashMap::<BlockHash, HashMap<usize, TxEntry>>::new();

let funding_blockhashes = index.limit_result(index.filter_by_funding(scripthash))?;
self.for_new_blocks(funding_blockhashes, daemon, |blockhash, block| {
let block_entries = result.entry(blockhash).or_default();
for filtered_outputs in filter_block_txs_outputs(block, scripthash) {
cache.add_tx(filtered_outputs.txid, move || filtered_outputs.tx);
outpoints.extend(make_outpoints(
filtered_outputs.txid,
&filtered_outputs.result,
));
block_entries
.entry(filtered_outputs.pos)
.or_insert_with(|| TxEntry::new(filtered_outputs.txid))
.outputs = filtered_outputs.result;
}
})?;
let outputs_filtering =
self.for_new_blocks(funding_blockhashes, daemon, |blockhash, block| {
let mut block_entries: HashMap<usize, TxEntry> = HashMap::new();
let mut outpoints = vec![];
for filtered_outputs in filter_block_txs_outputs(block, scripthash) {
cache.add_tx(filtered_outputs.txid, move || filtered_outputs.tx);
outpoints.extend(make_outpoints(
filtered_outputs.txid,
&filtered_outputs.result,
));
block_entries
.entry(filtered_outputs.pos)
.or_insert_with(|| TxEntry::new(filtered_outputs.txid))
.outputs = filtered_outputs.result;
}
(blockhash, outpoints, block_entries)
})?;

outpoints.extend(outputs_filtering.iter().flat_map(|(_, o, _)| o).cloned());

let mut result: HashMap<_, _> = outputs_filtering
.into_iter()
.map(|(a, _, b)| (a, b))
.collect();

let spending_blockhashes: HashSet<BlockHash> = outpoints
.par_iter()
.flat_map_iter(|outpoint| index.filter_by_spending(*outpoint))
.collect();
self.for_new_blocks(spending_blockhashes, daemon, |blockhash, block| {
let block_entries = result.entry(blockhash).or_default();
for filtered_inputs in filter_block_txs_inputs(&block, outpoints) {
cache.add_tx(filtered_inputs.txid, move || filtered_inputs.tx);
block_entries
.entry(filtered_inputs.pos)
.or_insert_with(|| TxEntry::new(filtered_inputs.txid))
.spent = filtered_inputs.result;
}
})?;
let inputs_filtering =
self.for_new_blocks(spending_blockhashes, daemon, |blockhash, block| {
let mut block_entries: HashMap<usize, TxEntry> = HashMap::new();

for filtered_inputs in filter_block_txs_inputs(&block, outpoints) {
cache.add_tx(filtered_inputs.txid, move || filtered_inputs.tx);
block_entries
.entry(filtered_inputs.pos)
.or_insert_with(|| TxEntry::new(filtered_inputs.txid))
.spent = filtered_inputs.result;
}
(blockhash, block_entries)
})?;
for (b, h) in inputs_filtering {
let e = result.entry(b).or_default();
e.extend(h);
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may happen that the same transaction both funds and spends the same scripthash - so we probably need to "merge" both funding and spending TxEntry instances -> f43de8f

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, sorry to introduce so many bugs in the proposed PR, but it's pretty hard to remember all these cases, ideally some testing should enforce these...

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No problem, I definitely should add more tests :)

}

Ok(result
.into_iter()
Expand Down
23 changes: 12 additions & 11 deletions src/tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,17 +105,18 @@ impl Tracker {
) -> Result<Option<(BlockHash, Transaction)>> {
// Note: there are two blocks with coinbase transactions having same txid (see BIP-30)
let blockhashes = self.index.filter_by_txid(txid);
let mut result = None;
daemon.for_blocks(blockhashes, |blockhash, block| {
if result.is_some() {
return; // keep first matching transaction
}
romanz marked this conversation as resolved.
Show resolved Hide resolved
let mut visitor = FindTransaction::new(txid);
result = match bsl::Block::visit(&block, &mut visitor) {
Ok(_) | Err(VisitBreak) => visitor.tx_found().map(|tx| (blockhash, tx)),
Err(e) => panic!("core returned invalid block: {:?}", e),
};
})?;
let result = daemon
.for_blocks(blockhashes, |blockhash, block| {
let mut visitor = FindTransaction::new(txid);
match bsl::Block::visit(&block, &mut visitor) {
Ok(_) | Err(VisitBreak) => (),
Err(e) => panic!("core returned invalid block: {:?}", e),
}
visitor.tx_found().map(|tx| (blockhash, tx))
})?
.first()
RCasatta marked this conversation as resolved.
Show resolved Hide resolved
.cloned()
.flatten();
Ok(result)
}
}