Skip to content

Commit

Permalink
scope and spawn instead of par iter
Browse files Browse the repository at this point in the history
  • Loading branch information
RCasatta committed Nov 21, 2023
1 parent 5829edd commit f7c8cca
Showing 1 changed file with 51 additions and 26 deletions.
77 changes: 51 additions & 26 deletions src/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,10 @@ use bitcoin::{
};
use bitcoin_slices::{bsl, Parse};
use crossbeam_channel::{bounded, select, Receiver, Sender};
use rayon::iter::ParallelIterator;
use rayon::prelude::IntoParallelIterator;

use std::io::{self, ErrorKind, Write};
use std::net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream};
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};

use crate::types::SerBlock;
Expand Down Expand Up @@ -102,8 +100,8 @@ impl Connection {
R: Send + Sync,
{
self.blocks_duration.observe_duration("total", || {
let mut result = vec![];
let blockhashes: Vec<BlockHash> = blockhashes.into_iter().collect();
let blockhashes_len = blockhashes.len();
if blockhashes.is_empty() {
return Ok(vec![]);
}
Expand All @@ -112,31 +110,58 @@ impl Connection {
self.req_send.send(Request::get_blocks(&blockhashes))
})?;

for hash in blockhashes {
let block = self.blocks_duration.observe_duration("response", || {
let block = self
.blocks_recv
.recv()
.with_context(|| format!("failed to get block {}", hash))?;
let header = bsl::BlockHeader::parse(&block[..])
.expect("core returned invalid blockheader")
.parsed_owned();
ensure!(
&header.block_hash_sha2()[..] == hash.as_byte_array(),
"got unexpected block"
);
Ok(block)
})?;
result.push((hash, block));
let mut result = Vec::with_capacity(blockhashes_len);
for _ in 0..blockhashes_len {
// TODO use `OnceLock<R>` instead of `Mutex<Option<R>>` once MSRV 1.70
result.push(Mutex::new(None));
}

Ok(result
.into_par_iter()
.map(|(hash, block)| {
self.blocks_duration
.observe_duration("process", || func(hash, block))
rayon::scope(|s| {
for (i, hash) in blockhashes.iter().enumerate() {
let block_result = self.blocks_duration.observe_duration("response", || {
let block = self
.blocks_recv
.recv()
.with_context(|| format!("failed to get block {}", hash))?;
let header = bsl::BlockHeader::parse(&block[..])
.expect("core returned invalid blockheader")
.parsed_owned();
ensure!(
&header.block_hash_sha2()[..] == hash.as_byte_array(),
"got unexpected block"
);
Ok(block)
});
if let Ok(block) = block_result {
let func = &func;
let blocks_duration = &self.blocks_duration;
let hash = *hash;
let result = &result;

s.spawn(move |_| {
let r =
blocks_duration.observe_duration("process", || func(hash, block));
*result[i]
.lock()
.expect("I am the only user of this mutex until the scope ends") =
Some(r);
});
}
}
});

let result: Option<Vec<_>> = result
.into_iter()
.map(|e| {
e.into_inner()
.expect("spawn cannot panic and the scope ensure all the threads ended")
})
.collect())
.collect();

match result {
Some(r) => Ok(r),
None => bail!("One or more of the jobs in for_blocks failed"),
}
})
}

Expand Down

0 comments on commit f7c8cca

Please sign in to comment.