From 80b4cfe3d8d3ca9b6f217c674b4bf3ee76ce4dfb Mon Sep 17 00:00:00 2001 From: BHouwens Date: Mon, 10 Jun 2024 15:05:13 +0200 Subject: [PATCH] Adding sub-peer limit and list for miners on mempool nodes --- src/api/tests.rs | 2 +- src/bin/node/mempool.rs | 4 ++++ src/bin/node_settings_local_raft_1.toml | 3 ++- src/bin/node_settings_local_raft_2.toml | 11 ++++------- src/comms_handler/node.rs | 26 +++++++++++++++++++++++++ src/comms_handler/tests.rs | 14 ++++++++++--- src/configurations.rs | 2 ++ src/mempool.rs | 7 +++++++ src/mempool_raft.rs | 1 + src/miner.rs | 1 + src/pre_launch.rs | 1 + src/storage.rs | 1 + src/test_utils.rs | 1 + src/user.rs | 1 + 14 files changed, 63 insertions(+), 12 deletions(-) diff --git a/src/api/tests.rs b/src/api/tests.rs index db0120b0..c3004204 100644 --- a/src/api/tests.rs +++ b/src/api/tests.rs @@ -378,7 +378,7 @@ async fn new_self_node_with_port(node_type: NodeType, port: u16) -> (Node, Socke bind_address.set_port(port); let tcp_tls_config = TcpTlsConfig::new_no_tls(bind_address); - let self_node = Node::new(&tcp_tls_config, 20, node_type, false, false) + let self_node = Node::new(&tcp_tls_config, 20, 20, node_type, false, false) .await .unwrap(); socket_address.set_port(self_node.local_address().port()); diff --git a/src/bin/node/mempool.rs b/src/bin/node/mempool.rs index 93aebd9d..751686c4 100644 --- a/src/bin/node/mempool.rs +++ b/src/bin/node/mempool.rs @@ -289,6 +289,10 @@ fn load_settings(matches: &clap::ArgMatches) -> config::Config { settings.set("peer_limit", 1000).unwrap(); } + if let Err(ConfigError::NotFound(_)) = settings.get_int("sub_peer_limit") { + settings.set("sub_peer_limit", 1000).unwrap(); + } + if let Some(port) = matches.value_of("api_port") { settings.set("mempool_api_port", port).unwrap(); } diff --git a/src/bin/node_settings_local_raft_1.toml b/src/bin/node_settings_local_raft_1.toml index 4f95a249..16b658cf 100644 --- a/src/bin/node_settings_local_raft_1.toml +++ b/src/bin/node_settings_local_raft_1.toml @@ -12,7 +12,8 @@ mempool_partition_full_size = 1 mempool_minimum_miner_pool_len = 1 jurisdiction = "US" backup_block_modulo = 4 -peer_limit = 1000 +peer_limit = 5 +sub_peer_limit = 1 mempool_mining_event_timeout= 3000 storage_block_timeout = 30000 #backup_restore = true diff --git a/src/bin/node_settings_local_raft_2.toml b/src/bin/node_settings_local_raft_2.toml index 002e1058..2d506a1d 100644 --- a/src/bin/node_settings_local_raft_2.toml +++ b/src/bin/node_settings_local_raft_2.toml @@ -1,7 +1,7 @@ -mempool_db_mode = {Test = 0} -storage_db_mode = {Test = 0} -miner_db_mode = {Test = 0} -user_db_mode = {Test = 1000} +mempool_db_mode = {Test = 1} +storage_db_mode = {Test = 1} +miner_db_mode = {Live = 1} +user_db_mode = {Test = 1001} mempool_raft = 1 storage_raft = 1 mempool_partition_full_size = 2 @@ -31,9 +31,6 @@ address = "127.0.0.1:12330" [[storage_nodes]] address = "127.0.0.1:12331" -[[miner_nodes]] -address = "127.0.0.1:12340" - [[miner_nodes]] address = "127.0.0.1:12341" diff --git a/src/comms_handler/node.rs b/src/comms_handler/node.rs index a07dddcc..fefabfbb 100644 --- a/src/comms_handler/node.rs +++ b/src/comms_handler/node.rs @@ -147,10 +147,14 @@ pub struct Node { tcp_tls_connector: Arc>, /// List of all connected peers. pub(crate) peers: Arc>, + /// List of sub peers (currently only relevant to Mempool). + sub_peers: Arc>>, /// Node type. node_type: NodeType, /// The max number of peers this node should handle. peer_limit: usize, + /// The max number of sub peers this node should handle (currently only relevant to Mempool). + sub_peer_limit: usize, /// Tracing context. span: Span, /// Channel to transmit incoming frames and events from peers. @@ -217,6 +221,7 @@ impl Node { pub async fn new( config: &TcpTlsConfig, peer_limit: usize, + sub_peer_limit: usize, node_type: NodeType, disable_listening: bool, send_heartbeat_messages: bool, @@ -224,6 +229,7 @@ impl Node { Self::new_with_version( config, peer_limit, + sub_peer_limit, node_type, NETWORK_VERSION, disable_listening, @@ -243,6 +249,7 @@ impl Node { pub async fn new_with_version( config: &TcpTlsConfig, peer_limit: usize, + sub_peer_limit: usize, node_type: NodeType, network_version: u32, disable_listening: bool, @@ -265,7 +272,9 @@ impl Node { tcp_tls_connector: Arc::new(RwLock::new(tcp_tls_connector)), node_type, peers: Arc::new(RwLock::new(HashMap::with_capacity(peer_limit))), + sub_peers: Arc::new(RwLock::new(HashSet::with_capacity(sub_peer_limit))), peer_limit, + sub_peer_limit, span, event_tx, event_rx: Arc::new(Mutex::new(event_rx)), @@ -983,6 +992,7 @@ impl Node { })); } + // Check for duplicate peers let mut all_peers = self.peers.write().await; if all_peers.contains_key(&peer_in_addr) { return Err(CommsError::PeerDuplicate(PeerInfo { @@ -991,6 +1001,7 @@ impl Node { })); } + // Check whether peer is already connected to us let peer = all_peers .get_mut(&peer_out_addr) .ok_or(CommsError::PeerNotFound(PeerInfo { @@ -998,6 +1009,20 @@ impl Node { address: Some(peer_out_addr), }))?; + // Check if the peer is a miner and we are a mempool + let mut sub_peers = self.sub_peers.write().await; + if self.node_type == NodeType::Mempool && peer_type == NodeType::Miner { + debug!("Current sub-peers: {:?}", sub_peers); + debug!("Sub-peer limit: {:?}", self.sub_peer_limit); + + if sub_peers.len() + 1 > self.sub_peer_limit { + all_peers.remove_entry(&peer_in_addr); + return Err(CommsError::PeerListFull); + } else { + sub_peers.insert(peer_in_addr); + } + } + // We only do DNS validation on mempool and storage nodes if self.node_type == NodeType::Mempool || self.node_type == NodeType::Storage { if let Some(peer_cert) = peer_cert { @@ -1383,6 +1408,7 @@ mod test { Node::new_with_version( &tcp_tls_config, peer_limit, + peer_limit, NodeType::Mempool, network_version, false, diff --git a/src/comms_handler/tests.rs b/src/comms_handler/tests.rs index 1fc149a1..4c9abfed 100644 --- a/src/comms_handler/tests.rs +++ b/src/comms_handler/tests.rs @@ -586,9 +586,16 @@ async fn create_config_mempool_nodes(configs: Vec, peer_limit: usi let mut nodes = Vec::new(); for tcp_tls_config in configs { nodes.push( - Node::new(&tcp_tls_config, peer_limit, NodeType::Mempool, false, false) - .await - .unwrap(), + Node::new( + &tcp_tls_config, + peer_limit, + peer_limit, + NodeType::Mempool, + false, + false, + ) + .await + .unwrap(), ); } nodes @@ -607,6 +614,7 @@ async fn create_node_type_version( Node::new_with_version( &tcp_tls_config, peer_limit, + peer_limit, node_type, network_version, false, diff --git a/src/configurations.rs b/src/configurations.rs index b5dee1f7..d3f6ca69 100644 --- a/src/configurations.rs +++ b/src/configurations.rs @@ -156,6 +156,8 @@ pub struct MempoolNodeConfig { pub mempool_miner_whitelist: MinerWhitelist, /// Limit for the number of peers this node can have pub peer_limit: usize, + /// Limit for the number of sub-peers (miners) this node can have + pub sub_peer_limit: usize, /// Initial issuances pub initial_issuances: Vec, /// Lifetime of a transaction's status in seconds diff --git a/src/mempool.rs b/src/mempool.rs index 1330ed82..1eec7382 100644 --- a/src/mempool.rs +++ b/src/mempool.rs @@ -210,6 +210,7 @@ impl MempoolNode { let node = Node::new( &tcp_tls_config, config.peer_limit, + config.sub_peer_limit, NodeType::Mempool, false, true, @@ -240,6 +241,12 @@ impl MempoolNode { mempool_miner_whitelist: config.mempool_miner_whitelist, }; + if config.sub_peer_limit > config.peer_limit { + return Err(MempoolError::ConfigError( + "Sub peer limit cannot be greater than peer limit", + )); + } + MempoolNode { node, node_raft, diff --git a/src/mempool_raft.rs b/src/mempool_raft.rs index c859feda..c5c3342a 100644 --- a/src/mempool_raft.rs +++ b/src/mempool_raft.rs @@ -2118,6 +2118,7 @@ mod test { enable_trigger_messages_pipeline_reset: Default::default(), mempool_miner_whitelist: Default::default(), peer_limit: 1000, + sub_peer_limit: 1000, initial_issuances: Default::default(), tx_status_lifetime: 600000, }; diff --git a/src/miner.rs b/src/miner.rs index 23041047..92a63f7a 100644 --- a/src/miner.rs +++ b/src/miner.rs @@ -208,6 +208,7 @@ impl MinerNode { let node = Node::new( &tcp_tls_config, config.peer_limit, + config.peer_limit, NodeType::Miner, disable_tcp_listener, false, diff --git a/src/pre_launch.rs b/src/pre_launch.rs index c039f5a0..4dd59612 100644 --- a/src/pre_launch.rs +++ b/src/pre_launch.rs @@ -148,6 +148,7 @@ impl PreLaunchNode { let node = Node::new( &tcp_tls_config, config.peer_limit, + config.peer_limit, NodeType::PreLaunch, false, false, diff --git a/src/storage.rs b/src/storage.rs index 146bd01d..54d5513c 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -178,6 +178,7 @@ impl StorageNode { let node = Node::new( &tcp_tls_config, config.peer_limit, + config.peer_limit, NodeType::Storage, false, false, diff --git a/src/test_utils.rs b/src/test_utils.rs index 847068b9..269ee8df 100644 --- a/src/test_utils.rs +++ b/src/test_utils.rs @@ -1189,6 +1189,7 @@ async fn init_mempool( enable_trigger_messages_pipeline_reset: config.enable_pipeline_reset, mempool_miner_whitelist: config.mempool_miner_whitelist.clone(), peer_limit: config.peer_limit, + sub_peer_limit: config.peer_limit, initial_issuances: config.initial_issuances.clone(), tx_status_lifetime: 600000, }; diff --git a/src/user.rs b/src/user.rs index c5965fca..35fadec0 100644 --- a/src/user.rs +++ b/src/user.rs @@ -170,6 +170,7 @@ impl UserNode { let node = Node::new( &tcp_tls_config, config.peer_limit, + config.peer_limit, NodeType::User, disable_tcp_listener, false,