diff --git a/crates/orchestrator/src/lib.rs b/crates/orchestrator/src/lib.rs index 14468f765..40a935936 100644 --- a/crates/orchestrator/src/lib.rs +++ b/crates/orchestrator/src/lib.rs @@ -12,14 +12,16 @@ mod shared; mod spawner; use std::{ + net::IpAddr, path::{Path, PathBuf}, time::Duration, }; use configuration::{NetworkConfig, RegistrationStrategy}; use errors::OrchestratorError; -use network::{parachain::Parachain, relaychain::Relaychain, Network}; -use network_spec::{parachain::ParachainSpec, NetworkSpec}; +use generators::errors::GeneratorError; +use network::{node::NetworkNode, parachain::Parachain, relaychain::Relaychain, Network}; +use network_spec::{node::NodeSpec, parachain::ParachainSpec, NetworkSpec}; use provider::{ types::{ProviderCapabilities, TransferedFile}, DynProvider, @@ -202,20 +204,7 @@ where .build_raw(&ns, &scoped_fs) .await?; - // get the bootnodes to spawn first and calculate the bootnode string for use later - let mut bootnodes = vec![]; - let mut relaynodes = vec![]; - network_spec.relaychain.nodes.iter().for_each(|node| { - if node.is_bootnode { - bootnodes.push(node) - } else { - relaynodes.push(node) - } - }); - - if bootnodes.is_empty() { - bootnodes.push(relaynodes.remove(0)) - } + let (bootnodes, relaynodes) = split_nodes_by_bootnodes(&network_spec.relaychain.nodes); // TODO: we want to still supporting spawn a dedicated bootnode?? let mut ctx = SpawnNodeCtx { @@ -249,7 +238,7 @@ where Network::new_with_relay(r, ns.clone(), self.filesystem.clone(), network_spec.clone()); let spawning_tasks = bootnodes - .iter_mut() + .iter() .map(|node| spawner::spawn_node(node, global_files_to_inject.clone(), &ctx)); // Initiate the node_ws_uel which will be later used in the Parachain_with_extrinsic config @@ -259,20 +248,13 @@ where let mut bootnodes_addr: Vec = vec![]; for node in futures::future::try_join_all(spawning_tasks).await? { let ip = node.inner.ip().await?; - bootnodes_addr.push( - // TODO: we just use localhost for now - generators::generate_node_bootnode_addr( - &node.spec.peer_id, - &ip, - if ctx.ns.capabilities().use_default_ports_in_cmd { - P2P_PORT - } else { - node.spec.p2p_port.0 - }, - node.inner.args().as_ref(), - &node.spec.p2p_cert_hash, - )?, - ); + let port = if ctx.ns.capabilities().use_default_ports_in_cmd { + P2P_PORT + } else { + node.spec.p2p_port.0 + }; + let bootnode_multiaddr = generate_bootnode_addr(&node, &ip, port)?; + bootnodes_addr.push(bootnode_multiaddr); // Is used in the register_para_options (We need to get this from the relay and not the collators) if node_ws_url.is_empty() { @@ -283,6 +265,13 @@ where network.add_running_node(node, None); } + // Add the bootnodes to the relaychain spec file and ctx + network_spec + .relaychain + .chain_spec + .add_bootnodes(&scoped_fs, &bootnodes_addr) + .await?; + ctx.bootnodes_addr = &bootnodes_addr; // spawn the rest of the nodes (TODO: in batches) @@ -295,21 +284,16 @@ where network.add_running_node(node, None); } - // Add the bootnodes to the relaychain spec file - network_spec - .relaychain - .chain_spec - .add_bootnodes(&scoped_fs, &bootnodes_addr) - .await?; - // spawn paras for para in network_spec.parachains.iter() { // Create parachain (in the context of the running network) let parachain = Parachain::from_spec(para, &global_files_to_inject, &scoped_fs).await?; let parachain_id = parachain.chain_id.clone(); - // Create `ctx` for spawn the nodes - let ctx_para = SpawnNodeCtx { + let (bootnodes, collators) = split_nodes_by_bootnodes(¶.collators); + + // Create `ctx` for spawn parachain nodes + let mut ctx_para = SpawnNodeCtx { parachain: Some(para), parachain_id: parachain_id.as_deref(), role: if para.is_cumulus_based { @@ -333,12 +317,58 @@ where )); } - // Spawn the nodes - let spawning_tasks = para.collators.iter().map(|node| { + let spawning_tasks = bootnodes + .iter() + .map(|node| spawner::spawn_node(node, para_files_to_inject.clone(), &ctx_para)); + + // Calculate the bootnodes addr from the running nodes + let mut bootnodes_addr: Vec = vec![]; + let mut running_nodes: Vec = vec![]; + for node in futures::future::try_join_all(spawning_tasks).await? { + let ip = node.inner.ip().await?; + let port = if ctx.ns.capabilities().use_default_ports_in_cmd { + P2P_PORT + } else { + node.spec.p2p_port.0 + }; + let bootnode_multiaddr = generate_bootnode_addr(&node, &ip, port)?; + bootnodes_addr.push(bootnode_multiaddr); + + let mut para_files_to_inject = global_files_to_inject.clone(); + if para.is_cumulus_based { + para_files_to_inject.push(TransferedFile::new( + PathBuf::from(format!( + "{}/{}.json", + ns.base_dir().to_string_lossy(), + para.id + )), + PathBuf::from(format!("/cfg/{}.json", para.id)), + )); + } + + running_nodes.push(node); + } + + if let Some(para_chain_spec) = para.chain_spec.as_ref() { + para_chain_spec + .add_bootnodes(&scoped_fs, &bootnodes_addr) + .await?; + } + + ctx_para.bootnodes_addr = &bootnodes_addr; + + // Spawn the rest of the nodes + let spawning_tasks = collators.iter().map(|node| { spawner::spawn_node(node, parachain.files_to_inject.clone(), &ctx_para) }); - let running_nodes = futures::future::try_join_all(spawning_tasks).await?; + // join all the running nodes + running_nodes.extend_from_slice( + futures::future::try_join_all(spawning_tasks) + .await? + .as_slice(), + ); + let running_para_id = parachain.para_id; network.add_para(parachain); for node in running_nodes { @@ -386,6 +416,42 @@ where } } +// Helpers + +// Split the node list depending if it's bootnode or not +// NOTE: if there isn't a bootnode declared we use the first one +fn split_nodes_by_bootnodes(nodes: &[NodeSpec]) -> (Vec<&NodeSpec>, Vec<&NodeSpec>) { + // get the bootnodes to spawn first and calculate the bootnode string for use later + let mut bootnodes = vec![]; + let mut other_nodes = vec![]; + nodes.iter().for_each(|node| { + if node.is_bootnode { + bootnodes.push(node) + } else { + other_nodes.push(node) + } + }); + + if bootnodes.is_empty() { + bootnodes.push(other_nodes.remove(0)) + } + (bootnodes, other_nodes) +} + +// Generate a bootnode multiaddress and return as string +fn generate_bootnode_addr( + node: &NetworkNode, + ip: &IpAddr, + port: u16, +) -> Result { + generators::generate_node_bootnode_addr( + &node.spec.peer_id, + ip, + port, + node.inner.args().as_ref(), + &node.spec.p2p_cert_hash, + ) +} // Validate that the config fulfill all the requirements of the provider fn validate_spec_with_provider_capabilities( network_spec: &NetworkSpec,