Skip to content

Commit

Permalink
fix: bootnodes logic (#225)
Browse files Browse the repository at this point in the history
  • Loading branch information
pepoviola authored Jun 4, 2024
1 parent 4d9c3a3 commit 4bca044
Showing 1 changed file with 109 additions and 43 deletions.
152 changes: 109 additions & 43 deletions crates/orchestrator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,16 @@ mod shared;
mod spawner;

use std::{
net::IpAddr,
path::{Path, PathBuf},
time::Duration,
};

use configuration::{NetworkConfig, RegistrationStrategy};
use errors::OrchestratorError;
use network::{parachain::Parachain, relaychain::Relaychain, Network};
use network_spec::{parachain::ParachainSpec, NetworkSpec};
use generators::errors::GeneratorError;
use network::{node::NetworkNode, parachain::Parachain, relaychain::Relaychain, Network};
use network_spec::{node::NodeSpec, parachain::ParachainSpec, NetworkSpec};
use provider::{
types::{ProviderCapabilities, TransferedFile},
DynProvider,
Expand Down Expand Up @@ -202,20 +204,7 @@ where
.build_raw(&ns, &scoped_fs)
.await?;

// get the bootnodes to spawn first and calculate the bootnode string for use later
let mut bootnodes = vec![];
let mut relaynodes = vec![];
network_spec.relaychain.nodes.iter().for_each(|node| {
if node.is_bootnode {
bootnodes.push(node)
} else {
relaynodes.push(node)
}
});

if bootnodes.is_empty() {
bootnodes.push(relaynodes.remove(0))
}
let (bootnodes, relaynodes) = split_nodes_by_bootnodes(&network_spec.relaychain.nodes);

// TODO: we want to still supporting spawn a dedicated bootnode??
let mut ctx = SpawnNodeCtx {
Expand Down Expand Up @@ -249,7 +238,7 @@ where
Network::new_with_relay(r, ns.clone(), self.filesystem.clone(), network_spec.clone());

let spawning_tasks = bootnodes
.iter_mut()
.iter()
.map(|node| spawner::spawn_node(node, global_files_to_inject.clone(), &ctx));

// Initiate the node_ws_uel which will be later used in the Parachain_with_extrinsic config
Expand All @@ -259,20 +248,13 @@ where
let mut bootnodes_addr: Vec<String> = vec![];
for node in futures::future::try_join_all(spawning_tasks).await? {
let ip = node.inner.ip().await?;
bootnodes_addr.push(
// TODO: we just use localhost for now
generators::generate_node_bootnode_addr(
&node.spec.peer_id,
&ip,
if ctx.ns.capabilities().use_default_ports_in_cmd {
P2P_PORT
} else {
node.spec.p2p_port.0
},
node.inner.args().as_ref(),
&node.spec.p2p_cert_hash,
)?,
);
let port = if ctx.ns.capabilities().use_default_ports_in_cmd {
P2P_PORT
} else {
node.spec.p2p_port.0
};
let bootnode_multiaddr = generate_bootnode_addr(&node, &ip, port)?;
bootnodes_addr.push(bootnode_multiaddr);

// Is used in the register_para_options (We need to get this from the relay and not the collators)
if node_ws_url.is_empty() {
Expand All @@ -283,6 +265,13 @@ where
network.add_running_node(node, None);
}

// Add the bootnodes to the relaychain spec file and ctx
network_spec
.relaychain
.chain_spec
.add_bootnodes(&scoped_fs, &bootnodes_addr)
.await?;

ctx.bootnodes_addr = &bootnodes_addr;

// spawn the rest of the nodes (TODO: in batches)
Expand All @@ -295,21 +284,16 @@ where
network.add_running_node(node, None);
}

// Add the bootnodes to the relaychain spec file
network_spec
.relaychain
.chain_spec
.add_bootnodes(&scoped_fs, &bootnodes_addr)
.await?;

// spawn paras
for para in network_spec.parachains.iter() {
// Create parachain (in the context of the running network)
let parachain = Parachain::from_spec(para, &global_files_to_inject, &scoped_fs).await?;
let parachain_id = parachain.chain_id.clone();

// Create `ctx` for spawn the nodes
let ctx_para = SpawnNodeCtx {
let (bootnodes, collators) = split_nodes_by_bootnodes(&para.collators);

// Create `ctx` for spawn parachain nodes
let mut ctx_para = SpawnNodeCtx {
parachain: Some(para),
parachain_id: parachain_id.as_deref(),
role: if para.is_cumulus_based {
Expand All @@ -333,12 +317,58 @@ where
));
}

// Spawn the nodes
let spawning_tasks = para.collators.iter().map(|node| {
let spawning_tasks = bootnodes
.iter()
.map(|node| spawner::spawn_node(node, para_files_to_inject.clone(), &ctx_para));

// Calculate the bootnodes addr from the running nodes
let mut bootnodes_addr: Vec<String> = vec![];
let mut running_nodes: Vec<NetworkNode> = vec![];
for node in futures::future::try_join_all(spawning_tasks).await? {
let ip = node.inner.ip().await?;
let port = if ctx.ns.capabilities().use_default_ports_in_cmd {
P2P_PORT
} else {
node.spec.p2p_port.0
};
let bootnode_multiaddr = generate_bootnode_addr(&node, &ip, port)?;
bootnodes_addr.push(bootnode_multiaddr);

let mut para_files_to_inject = global_files_to_inject.clone();
if para.is_cumulus_based {
para_files_to_inject.push(TransferedFile::new(
PathBuf::from(format!(
"{}/{}.json",
ns.base_dir().to_string_lossy(),
para.id
)),
PathBuf::from(format!("/cfg/{}.json", para.id)),
));
}

running_nodes.push(node);
}

if let Some(para_chain_spec) = para.chain_spec.as_ref() {
para_chain_spec
.add_bootnodes(&scoped_fs, &bootnodes_addr)
.await?;
}

ctx_para.bootnodes_addr = &bootnodes_addr;

// Spawn the rest of the nodes
let spawning_tasks = collators.iter().map(|node| {
spawner::spawn_node(node, parachain.files_to_inject.clone(), &ctx_para)
});

let running_nodes = futures::future::try_join_all(spawning_tasks).await?;
// join all the running nodes
running_nodes.extend_from_slice(
futures::future::try_join_all(spawning_tasks)
.await?
.as_slice(),
);

let running_para_id = parachain.para_id;
network.add_para(parachain);
for node in running_nodes {
Expand Down Expand Up @@ -386,6 +416,42 @@ where
}
}

// Helpers

// Split the node list depending if it's bootnode or not
// NOTE: if there isn't a bootnode declared we use the first one
fn split_nodes_by_bootnodes(nodes: &[NodeSpec]) -> (Vec<&NodeSpec>, Vec<&NodeSpec>) {
// get the bootnodes to spawn first and calculate the bootnode string for use later
let mut bootnodes = vec![];
let mut other_nodes = vec![];
nodes.iter().for_each(|node| {
if node.is_bootnode {
bootnodes.push(node)
} else {
other_nodes.push(node)
}
});

if bootnodes.is_empty() {
bootnodes.push(other_nodes.remove(0))
}
(bootnodes, other_nodes)
}

// Generate a bootnode multiaddress and return as string
fn generate_bootnode_addr(
node: &NetworkNode,
ip: &IpAddr,
port: u16,
) -> Result<String, GeneratorError> {
generators::generate_node_bootnode_addr(
&node.spec.peer_id,
ip,
port,
node.inner.args().as_ref(),
&node.spec.p2p_cert_hash,
)
}
// Validate that the config fulfill all the requirements of the provider
fn validate_spec_with_provider_capabilities(
network_spec: &NetworkSpec,
Expand Down

0 comments on commit 4bca044

Please sign in to comment.