Skip to content

Commit

Permalink
engine-controller
Browse files Browse the repository at this point in the history
  • Loading branch information
refcell committed Nov 27, 2024
1 parent 6343cf0 commit 83ab26d
Show file tree
Hide file tree
Showing 11 changed files with 67 additions and 65 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/driver/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ alloy-network.workspace = true
alloy-transport.workspace = true
alloy-consensus.workspace = true
alloy-rpc-types-eth.workspace = true
alloy-rpc-types-engine = { workspace = true, features = ["jwt", "serde"] }
alloy-provider = { workspace = true, features = ["ipc", "ws", "reqwest"] }
alloy-rpc-types = { workspace = true, features = ["ssz"] }
alloy-primitives = { workspace = true, features = ["map"] }
Expand Down
21 changes: 21 additions & 0 deletions crates/driver/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! Configuration for the Hilo Driver.

use alloy_rpc_types_engine::JwtSecret;
use kona_derive::traits::ChainProvider;
use kona_driver::PipelineCursor;
use op_alloy_genesis::RollupConfig;
Expand Down Expand Up @@ -46,10 +47,30 @@ pub struct Config {
pub rollup_config: RollupConfig,
/// The hilo-node RPC server
pub rpc_url: Option<Url>,
/// Engine API JWT Secret.
/// This is used to authenticate with the engine API
#[serde(deserialize_with = "deserialize_jwt_secret", serialize_with = "as_hex")]
pub jwt_secret: JwtSecret,
/// The cache size for in-memory providers.
pub cache_size: usize,
}

fn as_hex<S>(v: &JwtSecret, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::ser::Serializer,
{
let encoded = alloy_primitives::hex::encode(v.as_bytes());
serializer.serialize_str(&encoded)
}

fn deserialize_jwt_secret<'de, D>(deserializer: D) -> Result<JwtSecret, D::Error>
where
D: serde::de::Deserializer<'de>,
{
let s: &str = serde::de::Deserialize::deserialize(deserializer)?;
JwtSecret::from_hex(s).map_err(serde::de::Error::custom)
}

impl Config {
/// Construct an [OnlineBlobProviderWithFallback] from the [Config].
pub async fn blob_provider(
Expand Down
42 changes: 16 additions & 26 deletions crates/driver/src/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use kona_driver::{Driver, PipelineCursor, TipCursor};
use std::sync::Arc;
// use tokio::sync::watch::{channel, Receiver};

use hilo_engine::{EngineClient, EngineController};
use hilo_engine::EngineController;
use hilo_providers_local::{InMemoryChainProvider, InMemoryL2ChainProvider};

use crate::{
Expand All @@ -15,7 +15,7 @@ use crate::{
};

/// A driver from [kona_driver] that uses hilo-types.
pub type KonaDriver = Driver<EngineClient, EngineController, HiloPipeline, HiloDerivationPipeline>;
pub type KonaDriver = Driver<EngineController, HiloPipeline, HiloDerivationPipeline>;

/// An error that can happen when running the driver.
#[derive(Debug, thiserror::Error)]
Expand All @@ -42,17 +42,13 @@ pub struct HiloDriver<C: Context> {
pub ctx: C,
/// The driver config.
pub cfg: Config,
/// A constructor for execution.
pub exec: Option<EngineController>,
// Receiver to listen for SIGINT signals
// shutdown_recv: Receiver<bool>,
}

impl HiloDriver<StandaloneContext> {
/// Creates a new [HiloDriver] with a standalone context.
pub async fn standalone(cfg: Config, exec: EngineController) -> TransportResult<Self> {
pub async fn standalone(cfg: Config) -> TransportResult<Self> {
let ctx = StandaloneContext::new(cfg.l1_rpc_url.clone()).await?;
Ok(Self::new(cfg, ctx, exec))
Ok(Self::new(cfg, ctx))
}
}

Expand All @@ -61,15 +57,8 @@ where
C: Context,
{
/// Constructs a new [HiloDriver].
pub fn new(cfg: Config, ctx: C, exec: EngineController) -> Self {
// TODO: Receive shutdown signal
// let (_shutdown_sender, shutdown_recv) = channel(false);
// ctrlc::set_handler(move || {
// tracing::info!("sending shut down signal");
// shutdown_sender.send(true).expect("could not send shutdown signal");
// })
// .expect("could not register shutdown handler");
Self { cfg, ctx, exec: Some(exec) }
pub fn new(cfg: Config, ctx: C) -> Self {
Self { cfg, ctx }
}

/// Initializes the [HiloPipeline].
Expand All @@ -89,7 +78,13 @@ where
pub async fn init_driver(&mut self) -> Result<KonaDriver, ConfigError> {
let cursor = self.cfg.tip_cursor().await?;
let pipeline = self.init_pipeline(cursor.clone()).await?;
let exec = self.exec.take().expect("Executor not set");
let exec = EngineController::new(
self.cfg.l2_engine_url.clone(),
self.cfg.jwt_secret,
cursor.origin(),
cursor.l2_safe_head().block_info.into(),
&self.cfg.rollup_config,
);
Ok(Driver::new(cursor, exec, pipeline))
}

Expand Down Expand Up @@ -134,6 +129,9 @@ where
let mut driver = self.init_driver().await?;
info!("Driver initialized");

// Wait until the engine is ready
driver.wait_for_executor().await;

// Step 3: Start the processing loop
loop {
tokio::select! {
Expand All @@ -154,14 +152,6 @@ where
}
}

/// Loops until the engine client is online and receives a response from the engine.
async fn await_engine_ready(&self) {
while !self.engine_driver.engine_ready().await {
self.check_shutdown().await;
sleep(Duration::from_secs(1)).await;
}
}

// Exits if a SIGINT signal is received
// fn check_shutdown(&self) -> Result<(), DriverError> {
// if *self.shutdown_recv.borrow() {
Expand Down
8 changes: 4 additions & 4 deletions crates/engine/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
//! Contains the engine api client.

use async_trait::async_trait;
use http_body_util::Full;
use tower::ServiceBuilder;
use url::Url;
use alloy_eips::eip1898::BlockNumberOrTag;
use alloy_network::AnyNetwork;
use alloy_primitives::{Bytes, B256};
Expand All @@ -19,9 +15,13 @@ use alloy_transport_http::{
},
AuthLayer, AuthService, Http, HyperClient,
};
use async_trait::async_trait;
use http_body_util::Full;
use op_alloy_protocol::L2BlockInfo;
use op_alloy_provider::ext::engine::OpEngineApi;
use op_alloy_rpc_types_engine::{OpExecutionPayloadEnvelopeV3, OpPayloadAttributes};
use tower::ServiceBuilder;
use url::Url;

use crate::{Engine, EngineApiError};

Expand Down
25 changes: 14 additions & 11 deletions crates/engine/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,19 @@
//!
//! See: <https://github.com/ethereum-optimism/optimism/blob/develop/op-node/rollup/engine/engine_controller.go#L46>

use url::Url;
use std::time::Duration;
use tokio::time::sleep;
use kona_driver::Executor;
use alloy_consensus::{Header, Sealed};
use alloy_primitives::B256;
use async_trait::async_trait;
use alloy_consensus::{Sealed, Header};
use alloy_rpc_types_engine::{ForkchoiceState, JwtSecret};
use async_trait::async_trait;
use kona_driver::Executor;
use op_alloy_genesis::RollupConfig;
use op_alloy_protocol::BlockInfo;
use op_alloy_rpc_types_engine::OpPayloadAttributes;
use std::time::Duration;
use tokio::time::sleep;
use url::Url;

use crate::{EngineApiError, Engine, EngineClient};
use crate::{Engine, EngineApiError, EngineClient};

/// L1 epoch block
#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
Expand All @@ -27,6 +27,12 @@ pub struct Epoch {
pub timestamp: u64,
}

impl From<BlockInfo> for Epoch {
fn from(block: BlockInfo) -> Self {
Self { number: block.number, hash: block.hash, timestamp: block.timestamp }
}
}

/// The engine controller.
#[derive(Debug, Clone)]
pub struct EngineController {
Expand Down Expand Up @@ -88,10 +94,7 @@ impl Executor for EngineController {
async fn wait_until_ready(&mut self) {
let forkchoice = self.create_forkchoice_state();
// Loop until the forkchoice is updated
while !self.client
.forkchoice_update(forkchoice, None)
.await
.is_ok() {
while !self.client.forkchoice_update(forkchoice, None).await.is_ok_and(|u| u.is_valid()) {
sleep(Duration::from_secs(1)).await;
}
}
Expand Down
1 change: 0 additions & 1 deletion crates/node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ rust-version.workspace = true

[dependencies]
# Local
hilo-engine.workspace = true
hilo-driver.workspace = true

# Alloy
Expand Down
9 changes: 1 addition & 8 deletions crates/node/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

use crate::SyncMode;
use alloy_rpc_types_engine::JwtSecret;
use hilo_engine::EngineController;
use op_alloy_genesis::RollupConfig;
use serde::{Deserialize, Serialize};
use url::Url;
Expand Down Expand Up @@ -55,13 +54,6 @@ pub struct Config {
pub cache_size: usize,
}

impl Config {
/// Constructs a new [EngineController] from the config.
pub fn executor(&self) -> EngineController {
EngineController::new(self.l2_engine_url.clone(), self.jwt_secret)
}
}

impl From<Config> for hilo_driver::Config {
fn from(config: Config) -> Self {
hilo_driver::Config {
Expand All @@ -74,6 +66,7 @@ impl From<Config> for hilo_driver::Config {
rollup_config: config.rollup_config,
rpc_url: config.rpc_url,
cache_size: config.cache_size,
jwt_secret: config.jwt_secret,
}
}
}
Expand Down
3 changes: 1 addition & 2 deletions crates/node/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,7 @@ impl Node {
/// Creates and starts the [HiloDriver] which handles the derivation sync process.
async fn start_driver(&self) -> Result<(), NodeError> {
let cfg = self.config.clone().into();
let exec = self.config.executor();
let mut driver = HiloDriver::standalone(cfg, exec).await?;
let mut driver = HiloDriver::standalone(cfg).await?;
driver.start().await?;
Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion crates/providers-alloy/src/beacon_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ impl BeaconClient for OnlineBeaconClient {
let mut sidecars = Vec::with_capacity(hashes.len());
hashes.iter().for_each(|hash| {
if let Some(sidecar) =
raw_response.data.iter().find(|sidecar| sidecar.index == hash.index as u64)
raw_response.data.iter().find(|sidecar| sidecar.index == hash.index)
{
sidecars.push(sidecar.clone());
}
Expand Down
18 changes: 7 additions & 11 deletions crates/providers-alloy/src/blobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,10 @@ impl<B: BeaconClient> OnlineBlobProvider<B> {
let sidecars = self.fetch_sidecars(slot, blob_hashes).await?;

// Filter blob sidecars that match the indicies in the specified list.
let blob_hash_indicies = blob_hashes.iter().map(|b| b.index).collect::<Vec<usize>>();
let blob_hash_indicies = blob_hashes.iter().map(|b| b.index).collect::<Vec<u64>>();
let filtered = sidecars
.into_iter()
.filter(|s| blob_hash_indicies.contains(&(s.index as usize)))
.filter(|s| blob_hash_indicies.contains(&s.index))
.collect::<Vec<_>>();

// Validate the correct number of blob sidecars were retrieved.
Expand Down Expand Up @@ -165,10 +165,7 @@ where
let hash = blob_hashes
.get(i)
.ok_or(BlobProviderError::Backend("Missing blob hash".to_string()))?;
match sidecar.verify_blob(&alloy_eips::eip4844::IndexedBlobHash {
hash: hash.hash,
index: hash.index as u64,
}) {
match sidecar.verify_blob(&IndexedBlobHash { hash: hash.hash, index: hash.index }) {
Ok(_) => Ok(sidecar.blob),
Err(e) => Err(BlobProviderError::Backend(e.to_string())),
}
Expand Down Expand Up @@ -262,7 +259,7 @@ impl<B: BeaconClient, F: BlobSidecarProvider> OnlineBlobProviderWithFallback<B,
let blob_hash_indicies = blob_hashes.iter().map(|b| b.index).collect::<Vec<_>>();
let filtered = sidecars
.into_iter()
.filter(|s| blob_hash_indicies.contains(&(s.index as usize)))
.filter(|s| blob_hash_indicies.contains(&s.index))
.collect::<Vec<_>>();

// Validate the correct number of blob sidecars were retrieved.
Expand Down Expand Up @@ -324,10 +321,9 @@ where
let hash = blob_hashes.get(i).ok_or(BlobProviderError::Backend(
"fallback: failed to get blob hash".to_string(),
))?;
match sidecar.verify_blob(&alloy_eips::eip4844::IndexedBlobHash {
hash: hash.hash,
index: hash.index as u64,
}) {
match sidecar
.verify_blob(&IndexedBlobHash { hash: hash.hash, index: hash.index })
{
Ok(_) => Ok(sidecar.blob),
Err(e) => Err(BlobProviderError::Backend(e.to_string())),
}
Expand Down

0 comments on commit 83ab26d

Please sign in to comment.