Skip to content

Commit

Permalink
Pre-blocks forwarded to DA
Browse files Browse the repository at this point in the history
  • Loading branch information
Michael Zaikin committed Jan 5, 2024
1 parent f7e1786 commit da65c0d
Show file tree
Hide file tree
Showing 11 changed files with 100 additions and 46 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,10 @@ run-operator:

run-sequencer:
$(MAKE) build-sequencer
RUST_LOG=debug ./target/debug/sequencer
RUST_LOG=info ./target/debug/sequencer

run-dsn:
./target/debug/launcher --id 1 --log-level 3 &
./target/debug/launcher --id 1 --log-level 2 &
./target/debug/launcher --id 2 --log-level 0 &
./target/debug/launcher --id 3 --log-level 0 &
./target/debug/launcher --id 4 --log-level 0 &
Expand Down
1 change: 1 addition & 0 deletions crates/pre-block/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ narwhal-config.workspace = true
indexmap.workspace = true
fastcrypto.workspace = true
rand.workspace = true
pretty_assertions.workspace = true

[features]
default = []
Expand Down
22 changes: 18 additions & 4 deletions crates/pre-block/src/conversion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,21 @@
//
// SPDX-License-Identifier: MIT

use std::collections::BTreeSet;
use narwhal_types::{CertificateAPI, CertificateV2, HeaderV2, SystemMessage};

use narwhal_types::{CertificateAPI, CertificateV2, HeaderV2};
use crate::{Batch, Certificate, CertificateHeader};

use crate::{Batch, Certificate, CertificateHeader, PreBlock};
impl From<SystemMessage> for crate::SystemMessage {
fn from(message: SystemMessage) -> Self {
match message {
SystemMessage::DkgConfirmation(msg) => Self::DkgConfirmation(msg),
SystemMessage::DkgMessage(msg) => Self::DkgMessage(msg),
SystemMessage::RandomnessSignature(round, msg) => {
Self::RandomnessSignature(round.0, msg)
}
}
}
}

impl From<HeaderV2> for CertificateHeader {
fn from(narwhal_header: HeaderV2) -> Self {
Expand All @@ -20,7 +30,11 @@ impl From<HeaderV2> for CertificateHeader {
.into_iter()
.map(|x| (x.0 .0, x.1))
.collect(),
system_messages: vec![],
system_messages: narwhal_header
.system_messages
.into_iter()
.map(|x| x.into())
.collect(),
parents: narwhal_header.parents.into_iter().map(|x| x.0).collect(),
}
}
Expand Down
14 changes: 12 additions & 2 deletions crates/pre-block/src/digest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,10 @@ mod tests {
use indexmap::IndexMap;
use narwhal_config::{AuthorityIdentifier, WorkerId};
use narwhal_test_utils::latest_protocol_version;
use narwhal_types::{BatchDigest, BatchV2, CertificateDigest, HeaderV2, TimestampMs};
use narwhal_types::{
BatchDigest, BatchV2, CertificateDigest, HeaderV2, RandomnessRound, SystemMessage,
TimestampMs,
};

use crate::CertificateHeader;

Expand Down Expand Up @@ -69,14 +72,21 @@ mod tests {
2u64,
3u64,
payload,
vec![],
vec![
SystemMessage::DkgConfirmation(vec![12u8]),
SystemMessage::DkgMessage(vec![13u8]),
SystemMessage::RandomnessSignature(RandomnessRound(123), vec![14u8]),
],
parents,
);
let expected_payload = bcs::to_bytes(&header_v2).unwrap();
let expected = header_v2.digest().0;

let header: CertificateHeader = header_v2.into();
let actual_payload = bcs::to_bytes(&header).unwrap();
let actual = header.digest();

pretty_assertions::assert_eq!(hex::encode(expected_payload), hex::encode(actual_payload));
assert_eq!(expected, actual);
}
}
4 changes: 2 additions & 2 deletions crates/pre-block/src/fixture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ use std::collections::HashMap;
use std::{collections::BTreeSet, num::NonZeroUsize};

use narwhal_test_utils::{latest_protocol_version, CommitteeFixture};
use narwhal_types::{CertificateDigest, CertificateV2, Header, VoteAPI, HeaderV2Builder};
use narwhal_types::{CertificateDigest, CertificateV2, Header, HeaderV2Builder, VoteAPI};
use narwhal_utils::protocol_config::ProtocolConfig;

use crate::{Certificate, Digest, Batch, PreBlock, PreBlockStore, PublicKey};
use crate::{Batch, Certificate, Digest, PreBlock, PreBlockStore, PublicKey};

pub const COMMITTEE_SIZE: usize = 4;

Expand Down
9 changes: 8 additions & 1 deletion crates/pre-block/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,21 @@ pub type AggregateSignature = Vec<u8>;
/// Blake2B 256 bit
pub type Digest = [u8; 32];

#[derive(Debug, Clone, Deserialize, Serialize)]
pub enum SystemMessage {
DkgMessage(Vec<u8>),
DkgConfirmation(Vec<u8>),
RandomnessSignature(u64, Vec<u8>),
}

#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct CertificateHeader {
pub author: u16,
pub round: u64,
pub epoch: u64,
pub created_at: u64,
pub payload: Vec<(Digest, (u32, u64))>,
pub system_messages: Vec<()>, // not used
pub system_messages: Vec<SystemMessage>,
pub parents: BTreeSet<Digest>,
}

Expand Down
10 changes: 8 additions & 2 deletions crates/pre-block/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,16 @@ pub fn validate_certificate_chain(

match store.get_certificate_index(parent) {
Some(prev_index) if prev_index + 1 != index => {
anyhow::bail!("Parent certificate is not from a preceding sub dag {}", hex::encode(parent))
anyhow::bail!(
"Parent certificate is not from a preceding sub dag {}",
hex::encode(parent)
)
}
None => {
anyhow::bail!("Parent certificate cannot be not found {}", hex::encode(parent));
anyhow::bail!(
"Parent certificate cannot be not found {}",
hex::encode(parent)
);
}
_ => (),
}
Expand Down
37 changes: 25 additions & 12 deletions sequencer/src/consensus_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
// SPDX-License-Identifier: MIT

use bytes::Bytes;
use log::{info, debug};
use std::{sync::mpsc, collections::BTreeSet};
use log::{debug, info};
use narwhal_types::{TransactionProto, TransactionsClient};
use pre_block::{PreBlock, Certificate, CertificateHeader};
use pre_block::{Certificate, CertificateHeader, PreBlock, SystemMessage};
use std::{collections::BTreeSet, sync::mpsc};
use tonic::transport::Channel;

mod exporter {
Expand Down Expand Up @@ -43,10 +43,7 @@ impl WorkerClient {
let tx = TransactionProto {
transaction: Bytes::from(payload),
};
let res = self.client()
.await?
.submit_transaction(tx)
.await?;
let res = self.client().await?.submit_transaction(tx).await?;
debug!("[Worker client] Response {:#?}", res.metadata());
Ok(())
}
Expand Down Expand Up @@ -80,9 +77,10 @@ impl PrimaryClient {
pub async fn subscribe_pre_blocks(
&mut self,
from_id: u64,
pre_blocks_tx: mpsc::Sender<PreBlock>
pre_blocks_tx: mpsc::Sender<PreBlock>,
) -> anyhow::Result<()> {
let mut stream = self.client()
let mut stream = self
.client()
.await?
.export(exporter::ExportRequest { from_id })
.await?
Expand All @@ -94,14 +92,25 @@ impl PrimaryClient {

pre_blocks_tx.send(pre_block)?;
}

Ok(())
}
}

impl From<exporter::SystemMessage> for SystemMessage {
fn from(msg: exporter::SystemMessage) -> Self {
match msg.message.unwrap() {
exporter::system_message::Message::DkgConfirmation(msg) => Self::DkgConfirmation(msg),
exporter::system_message::Message::DkgMessage(msg) => Self::DkgMessage(msg),
exporter::system_message::Message::RandomnessSignature(sig) => {
Self::RandomnessSignature(sig.randomness_round, sig.bytes)
}
}
}
}

impl From<exporter::Header> for CertificateHeader {
fn from(header: exporter::Header) -> Self {
assert!(header.system_messages.is_empty());
Self {
author: header.author as u16,
round: header.round,
Expand All @@ -117,7 +126,11 @@ impl From<exporter::Header> for CertificateHeader {
)
})
.collect(),
system_messages: vec![],
system_messages: header
.system_messages
.into_iter()
.map(|msg| msg.into())
.collect(),
parents: BTreeSet::from_iter(
header
.parents
Expand Down
36 changes: 18 additions & 18 deletions sequencer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,22 @@
//
// SPDX-License-Identifier: MIT

use axum::{
extract::{Path, State},
http::StatusCode,
routing::{get, post},
Json, Router,
};
use clap::Parser;
use consensus_client::WorkerClient;
use log::{error, info, warn};
use rollup_client::RollupClient;
use serde::{Deserialize, Serialize};
use std::sync::mpsc;
use std::sync::Arc;
use std::time::Duration;
use tokio::signal;
use tokio::task::JoinHandle;
use serde::{Serialize, Deserialize};
use axum::{
extract::{Path, State},
http::StatusCode,
routing::{get, post},
Json, Router,
};

use crate::consensus_client::PrimaryClient;
use crate::da_batcher::publish_pre_blocks;
Expand All @@ -43,18 +43,18 @@ impl AppState {

#[derive(Debug, Clone, Serialize, Deserialize)]
struct Transaction {
pub data: String
pub data: String,
}

async fn broadcast_transaction(
State(state): State<AppState>,
Json(tx): Json<Transaction>,
) -> Result<Json<String>, StatusCode> {
info!("Broadcasting tx `{}`", tx.data);
let tx_payload = hex::decode(tx.data)
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let tx_payload = hex::decode(tx.data).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;

state.worker_client
state
.worker_client
.as_ref()
.clone() // cloning channel is cheap and encouraged
.send_transaction(tx_payload)
Expand All @@ -68,7 +68,8 @@ async fn get_block_by_level(
State(state): State<AppState>,
Path(level): Path<u32>,
) -> Result<Json<Vec<String>>, StatusCode> {
let block = state.rollup_client
let block = state
.rollup_client
.get_block_by_level(level)
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
Expand All @@ -82,7 +83,8 @@ async fn get_block_by_level(
}

async fn get_head(State(state): State<AppState>) -> Result<Json<u32>, StatusCode> {
let head = state.rollup_client
let head = state
.rollup_client
.get_head()
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
Expand All @@ -93,15 +95,13 @@ async fn get_authorities(
State(state): State<AppState>,
Path(epoch): Path<u64>,
) -> Result<Json<Vec<String>>, StatusCode> {
let authorities = state.rollup_client
let authorities = state
.rollup_client
.get_authorities(epoch)
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;

let res: Vec<String> = authorities
.into_iter()
.map(|a| hex::encode(a))
.collect();
let res: Vec<String> = authorities.into_iter().map(|a| hex::encode(a)).collect();
Ok(Json(res))
}

Expand Down
8 changes: 5 additions & 3 deletions sequencer/src/rollup_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,9 +149,11 @@ impl RollupClient {
pub async fn get_head(&self) -> anyhow::Result<u32> {
let res = self.store_get("/head".into()).await?;
if let Some(bytes) = res {
let index = u32::from_be_bytes(bytes.try_into().map_err(|b| {
anyhow::anyhow!("Failed to parse head: {}", hex::encode(b))
})?);
let index = u32::from_be_bytes(
bytes
.try_into()
.map_err(|b| anyhow::anyhow!("Failed to parse head: {}", hex::encode(b)))?,
);
Ok(index + 1)
} else {
Ok(0)
Expand Down

0 comments on commit da65c0d

Please sign in to comment.