Skip to content

Commit

Permalink
IBD: fix some syncer-syncee miscommunications which are more apparent…
Browse files Browse the repository at this point in the history
… with 10 bps (kaspanet#221)

* comments

* extend resolution range to 8 max

* do not sync missing bodies in the past of `syncer_header_selected_tip`

* typos

* no need to lock the pruning store throughout locator building

* the two conditions can be done in one

* rollback previous change (will be fixed more correctly by the coming switch HSC-> VSC)

* change selected chain store from *headers* selected chain to *virtual* selected chain (wip: test fix; renaming of various variables)

* fix selected-chain test by adding a way to (test-)build utxo valid blocks with specific parents

* make pruning point getter non-Option

* rename `headers_selected_chain` -> `virtual_selected_parent`

* add temp logic for upgrading from prev DB version

* get tip if high is none through the selected chain store itself

* add virtual chain assertion to relevant tests

* added selected_chain_store_iterator which is more idiomatic

* wrap with TestBlockBuilder to avoid direct access through virtual processor

* keep the pruning point read guard throughout building the locator

* break if parent is missing

* extend comment
  • Loading branch information
michaelsutton authored Jul 20, 2023
1 parent 27d61e1 commit caedaa2
Show file tree
Hide file tree
Showing 19 changed files with 300 additions and 124 deletions.
6 changes: 3 additions & 3 deletions components/consensusmanager/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,12 +256,12 @@ impl ConsensusSessionOwned {
self.clone().spawn_blocking(|c| c.get_pruning_point_proof()).await
}

pub async fn async_create_headers_selected_chain_block_locator(
pub async fn async_create_virtual_selected_chain_block_locator(
&self,
low: Option<Hash>,
high: Option<Hash>,
) -> ConsensusResult<Vec<Hash>> {
self.clone().spawn_blocking(move |c| c.create_headers_selected_chain_block_locator(low, high)).await
self.clone().spawn_blocking(move |c| c.create_virtual_selected_chain_block_locator(low, high)).await
}

pub async fn async_create_block_locator_from_pruning_point(&self, high: Hash, limit: usize) -> ConsensusResult<Vec<Hash>> {
Expand Down Expand Up @@ -331,7 +331,7 @@ impl ConsensusSessionOwned {
self.clone().spawn_blocking(move |c| c.get_missing_block_body_hashes(high)).await
}

pub async fn async_pruning_point(&self) -> Option<Hash> {
pub async fn async_pruning_point(&self) -> Hash {
self.clone().spawn_blocking(|c| c.pruning_point()).await
}

Expand Down
4 changes: 2 additions & 2 deletions consensus/core/src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ pub trait ConsensusApi: Send + Sync {
unimplemented!()
}

fn create_headers_selected_chain_block_locator(&self, low: Option<Hash>, high: Option<Hash>) -> ConsensusResult<Vec<Hash>> {
fn create_virtual_selected_chain_block_locator(&self, low: Option<Hash>, high: Option<Hash>) -> ConsensusResult<Vec<Hash>> {
unimplemented!()
}

Expand Down Expand Up @@ -238,7 +238,7 @@ pub trait ConsensusApi: Send + Sync {
unimplemented!()
}

fn pruning_point(&self) -> Option<Hash> {
fn pruning_point(&self) -> Hash {
unimplemented!()
}

Expand Down
1 change: 1 addition & 0 deletions consensus/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ impl HashMapCustomHasher for BlockHashSet {
}
}

#[derive(Default, Debug)]
pub struct ChainPath {
pub added: Vec<Hash>,
pub removed: Vec<Hash>,
Expand Down
27 changes: 15 additions & 12 deletions consensus/src/consensus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -557,7 +557,7 @@ impl ConsensusApi for Consensus {
self.services.pruning_proof_manager.get_pruning_point_proof()
}

fn create_headers_selected_chain_block_locator(&self, low: Option<Hash>, high: Option<Hash>) -> ConsensusResult<Vec<Hash>> {
fn create_virtual_selected_chain_block_locator(&self, low: Option<Hash>, high: Option<Hash>) -> ConsensusResult<Vec<Hash>> {
if let Some(low) = low {
self.validate_block_exists(low)?;
}
Expand All @@ -566,7 +566,7 @@ impl ConsensusApi for Consensus {
self.validate_block_exists(high)?;
}

Ok(self.services.sync_manager.create_headers_selected_chain_block_locator(low, high)?)
Ok(self.services.sync_manager.create_virtual_selected_chain_block_locator(low, high)?)
}

fn pruning_point_headers(&self) -> Vec<Arc<Header>> {
Expand Down Expand Up @@ -652,8 +652,8 @@ impl ConsensusApi for Consensus {
Ok(self.services.sync_manager.get_missing_block_body_hashes(high)?)
}

fn pruning_point(&self) -> Option<Hash> {
self.pruning_point_store.read().pruning_point().unwrap_option()
fn pruning_point(&self) -> Hash {
self.pruning_point_store.read().pruning_point().unwrap()
}

fn get_daa_window(&self, hash: Hash) -> ConsensusResult<Vec<Hash>> {
Expand All @@ -673,27 +673,30 @@ impl ConsensusApi for Consensus {
self.validate_block_exists(hash)?;

// In order to guarantee the chain height is at least k, we check that the pruning point is not genesis.
if self.pruning_point().unwrap() == self.config.genesis.hash {
if self.pruning_point() == self.config.genesis.hash {
return Err(ConsensusError::UnexpectedPruningPoint);
}

let mut hashes = Vec::with_capacity(self.config.params.ghostdag_k as usize);
let mut current = hash;
// TODO: This will crash if we don't have the data for k blocks in the past of
// current. The syncee should validate it got all of the associated data.
for _ in 0..=self.config.params.ghostdag_k {
hashes.push(current);
current = self.ghostdag_primary_store.get_selected_parent(current).unwrap();
// TODO: ideally the syncee should validate it got all of the associated data up
// to k blocks back and then we would be able to safely unwrap here. For now we
// just break the loop, since if the data was truly missing we wouldn't accept
// the staging consensus in the first place
let Some(parent) = self.ghostdag_primary_store.get_selected_parent(current).unwrap_option() else { break; };
current = parent;
}
Ok(hashes)
}

fn create_block_locator_from_pruning_point(&self, high: Hash, limit: usize) -> ConsensusResult<Vec<Hash>> {
self.validate_block_exists(high)?;

let pp_read_guard = self.pruning_point_store.read();
let pp = pp_read_guard.pruning_point().unwrap();
Ok(self.services.sync_manager.create_block_locator_from_pruning_point(high, pp, Some(limit))?)
// Keep the pruning point read guard throughout building the locator
let pruning_point_read = self.pruning_point_store.read();
let pruning_point = pruning_point_read.pruning_point().unwrap();
Ok(self.services.sync_manager.create_block_locator_from_pruning_point(high, pruning_point, Some(limit))?)
}

fn estimate_network_hashes_per_second(&self, start_hash: Option<Hash>, window_size: usize) -> ConsensusResult<u64> {
Expand Down
55 changes: 39 additions & 16 deletions consensus/src/consensus/test_consensus.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use async_channel::Sender;
use kaspa_consensus_core::coinbase::MinerData;
use kaspa_consensus_core::tx::ScriptPublicKey;
use kaspa_consensus_core::{
api::ConsensusApi, block::MutableBlock, blockstatus::BlockStatus, header::Header, merkle::calc_hash_merkle_root,
subnets::SUBNETWORK_ID_COINBASE, tx::Transaction,
Expand All @@ -13,6 +15,7 @@ use parking_lot::RwLock;
use std::future::Future;
use std::{sync::Arc, thread::JoinHandle};

use crate::pipeline::virtual_processor::test_block_builder::TestBlockBuilder;
use crate::processes::window::WindowManager;
use crate::{
config::Config,
Expand All @@ -34,8 +37,9 @@ use super::services::{DbDagTraversalManager, DbGhostdagManager, DbWindowManager}
use super::Consensus;

pub struct TestConsensus {
consensus: Arc<Consensus>,
params: Params,
consensus: Arc<Consensus>,
block_builder: TestBlockBuilder,
db_lifetime: DbLifetime,
}

Expand All @@ -44,23 +48,21 @@ impl TestConsensus {
pub fn with_db(db: Arc<DB>, config: &Config, notification_sender: Sender<Notification>) -> Self {
let notification_root = Arc::new(ConsensusNotificationRoot::new(notification_sender));
let counters = Arc::new(ProcessingCounters::default());
Self {
consensus: Arc::new(Consensus::new(db, Arc::new(config.clone()), Default::default(), notification_root, counters)),
params: config.params.clone(),
db_lifetime: Default::default(),
}
let consensus = Arc::new(Consensus::new(db, Arc::new(config.clone()), Default::default(), notification_root, counters));
let block_builder = TestBlockBuilder::new(consensus.virtual_processor.clone());

Self { params: config.params.clone(), consensus, block_builder, db_lifetime: Default::default() }
}

/// Creates a test consensus instance based on `config` with a temp DB and the provided `notification_sender`
pub fn with_notifier(config: &Config, notification_sender: Sender<Notification>) -> Self {
let (db_lifetime, db) = create_temp_db();
let notification_root = Arc::new(ConsensusNotificationRoot::new(notification_sender));
let counters = Arc::new(ProcessingCounters::default());
Self {
consensus: Arc::new(Consensus::new(db, Arc::new(config.clone()), Default::default(), notification_root, counters)),
params: config.params.clone(),
db_lifetime,
}
let consensus = Arc::new(Consensus::new(db, Arc::new(config.clone()), Default::default(), notification_root, counters));
let block_builder = TestBlockBuilder::new(consensus.virtual_processor.clone());

Self { consensus, block_builder, params: config.params.clone(), db_lifetime }
}

/// Creates a test consensus instance based on `config` with a temp DB and no notifier
Expand All @@ -69,11 +71,10 @@ impl TestConsensus {
let (dummy_notification_sender, _) = async_channel::unbounded();
let notification_root = Arc::new(ConsensusNotificationRoot::new(dummy_notification_sender));
let counters = Arc::new(ProcessingCounters::default());
Self {
consensus: Arc::new(Consensus::new(db, Arc::new(config.clone()), Default::default(), notification_root, counters)),
params: config.params.clone(),
db_lifetime,
}
let consensus = Arc::new(Consensus::new(db, Arc::new(config.clone()), Default::default(), notification_root, counters));
let block_builder = TestBlockBuilder::new(consensus.virtual_processor.clone());

Self { consensus, block_builder, params: config.params.clone(), db_lifetime }
}

/// Clone the inner consensus Arc. For general usage of the underlying consensus simply deref
Expand Down Expand Up @@ -107,6 +108,28 @@ impl TestConsensus {
self.validate_and_insert_block(self.build_block_with_parents(hash, parents).to_immutable())
}

pub fn add_utxo_valid_block_with_parents(
&self,
hash: Hash,
parents: Vec<Hash>,
txs: Vec<Transaction>,
) -> impl Future<Output = BlockProcessResult<BlockStatus>> {
let miner_data = MinerData::new(ScriptPublicKey::from_vec(0, vec![]), vec![]);
self.validate_and_insert_block(self.build_utxo_valid_block_with_parents(hash, parents, miner_data, txs).to_immutable())
}

pub fn build_utxo_valid_block_with_parents(
&self,
hash: Hash,
parents: Vec<Hash>,
miner_data: MinerData,
txs: Vec<Transaction>,
) -> MutableBlock {
let mut template = self.block_builder.build_block_template_with_parents(parents, miner_data, txs).unwrap();
template.block.header.hash = hash;
template.block
}

pub fn build_block_with_parents_and_transactions(
&self,
hash: Hash,
Expand Down
15 changes: 11 additions & 4 deletions consensus/src/model/stores/selected_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@ use super::U64Key;
pub trait SelectedChainStoreReader {
fn get_by_hash(&self, hash: Hash) -> StoreResult<u64>;
fn get_by_index(&self, index: u64) -> StoreResult<Hash>;
fn get_tip(&self) -> StoreResult<(u64, Hash)>;
}

/// Write API for `SelectedChainStore`. The set function is deliberately `mut`
/// since chain index is not append-only and thus needs to be guarded.
pub trait SelectedChainStore: SelectedChainStoreReader {
fn apply_changes(&mut self, batch: &mut WriteBatch, changes: ChainPath) -> StoreResult<()>;
fn apply_changes(&mut self, batch: &mut WriteBatch, changes: &ChainPath) -> StoreResult<()>;
fn prune_below_pruning_point(&mut self, writer: impl DbWriter, pruning_point: Hash) -> StoreResult<()>;
fn init_with_pruning_point(&mut self, batch: &mut WriteBatch, block: Hash) -> StoreResult<()>;
}
Expand Down Expand Up @@ -68,22 +69,28 @@ impl SelectedChainStoreReader for DbSelectedChainStore {
fn get_by_index(&self, index: u64) -> StoreResult<Hash> {
self.access_hash_by_index.read(index.into())
}

fn get_tip(&self) -> StoreResult<(u64, Hash)> {
let idx = self.access_highest_index.read()?;
let hash = self.access_hash_by_index.read(idx.into())?;
Ok((idx, hash))
}
}

impl SelectedChainStore for DbSelectedChainStore {
fn apply_changes(&mut self, batch: &mut WriteBatch, changes: ChainPath) -> StoreResult<()> {
fn apply_changes(&mut self, batch: &mut WriteBatch, changes: &ChainPath) -> StoreResult<()> {
let added_len = changes.added.len() as u64;
let current_highest_index = self.access_highest_index.read().unwrap();
let split_index = current_highest_index - changes.removed.len() as u64;
let new_highest_index = added_len + split_index;

for to_remove in changes.removed {
for to_remove in changes.removed.iter().copied() {
let index = self.access_index_by_hash.read(to_remove).unwrap();
self.access_index_by_hash.delete(BatchDbWriter::new(batch), to_remove).unwrap();
self.access_hash_by_index.delete(BatchDbWriter::new(batch), index.into()).unwrap();
}

for (i, to_add) in changes.added.into_iter().enumerate() {
for (i, to_add) in changes.added.iter().copied().enumerate() {
self.access_index_by_hash.write(BatchDbWriter::new(batch), to_add, i as u64 + split_index + 1).unwrap();
self.access_hash_by_index.write(BatchDbWriter::new(batch), (i as u64 + split_index + 1).into(), to_add).unwrap();
}
Expand Down
12 changes: 0 additions & 12 deletions consensus/src/pipeline/header_processor/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use crate::{
pruning::{DbPruningStore, PruningPointInfo, PruningStoreReader},
reachability::{DbReachabilityStore, StagingReachabilityStore},
relations::{DbRelationsStore, RelationsStoreReader},
selected_chain::{DbSelectedChainStore, SelectedChainStore},
statuses::{DbStatusesStore, StatusesStore, StatusesStoreBatchExtensions, StatusesStoreReader},
DB,
},
Expand Down Expand Up @@ -136,7 +135,6 @@ pub struct HeaderProcessor {
pub(super) daa_excluded_store: Arc<DbDaaStore>,
pub(super) headers_store: Arc<DbHeadersStore>,
pub(super) headers_selected_tip_store: Arc<RwLock<DbHeadersSelectedTipStore>>,
pub(super) selected_chain_store: Arc<RwLock<DbSelectedChainStore>>,
pub(super) depth_store: Arc<DbDepthStore>,

// Managers and services
Expand Down Expand Up @@ -187,7 +185,6 @@ impl HeaderProcessor {
headers_store: storage.headers_store.clone(),
depth_store: storage.depth_store.clone(),
headers_selected_tip_store: storage.headers_selected_tip_store.clone(),
selected_chain_store: storage.selected_chain_store.clone(),
block_window_cache_for_difficulty: storage.block_window_cache_for_difficulty.clone(),
block_window_cache_for_past_median_time: storage.block_window_cache_for_past_median_time.clone(),

Expand Down Expand Up @@ -392,18 +389,13 @@ impl HeaderProcessor {
// Non-append only stores need to use write locks.
// Note we need to keep the lock write guards until the batch is written.
let mut hst_write = self.headers_selected_tip_store.write();
let mut sc_write = self.selected_chain_store.write();
let prev_hst = hst_write.get().unwrap();
// We can't calculate chain path for blocks that do not have the pruning point in their chain, so we just skip them.
if SortableBlock::new(ctx.hash, header.blue_work) > prev_hst
&& reachability::is_chain_ancestor_of(&staging, pp, ctx.hash).unwrap()
{
// Hint reachability about the new tip.
reachability::hint_virtual_selected_parent(&mut staging, ctx.hash).unwrap();
hst_write.set_batch(&mut batch, SortableBlock::new(ctx.hash, header.blue_work)).unwrap();
let mut chain_path = self.dag_traversal_manager.calculate_chain_path(prev_hst.hash, ghostdag_data[0].selected_parent);
chain_path.added.push(ctx.hash);
sc_write.apply_changes(&mut batch, chain_path).unwrap();
}

//
Expand Down Expand Up @@ -437,7 +429,6 @@ impl HeaderProcessor {
drop(reachability_relations_write);
drop(relations_write);
drop(hst_write);
drop(sc_write);
}

fn commit_trusted_header(&self, ctx: HeaderProcessingContext, _header: &Header) {
Expand All @@ -463,13 +454,10 @@ impl HeaderProcessor {
pub fn process_genesis(&self) {
// Init headers selected tip and selected chain stores
let mut batch = WriteBatch::default();
let mut sc_write = self.selected_chain_store.write();
sc_write.init_with_pruning_point(&mut batch, self.genesis.hash).unwrap();
let mut hst_write = self.headers_selected_tip_store.write();
hst_write.set_batch(&mut batch, SortableBlock::new(self.genesis.hash, 0.into())).unwrap();
self.db.write(batch).unwrap();
drop(hst_write);
drop(sc_write);

// Write the genesis header
let mut genesis_header: Header = (&self.genesis).into();
Expand Down
1 change: 1 addition & 0 deletions consensus/src/pipeline/virtual_processor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@ pub mod errors;
mod processor;
mod utxo_validation;
pub use processor::*;
pub mod test_block_builder;
#[cfg(test)]
mod tests;
Loading

0 comments on commit caedaa2

Please sign in to comment.