Skip to content

Commit

Permalink
Merge pull request #74 from AIBlockOfficial/separate_peer_lists
Browse files Browse the repository at this point in the history
Adding sub-peer limit and list for miners on mempool nodes
  • Loading branch information
BHouwens authored Jun 11, 2024
2 parents 02606d0 + 80b4cfe commit be196dd
Show file tree
Hide file tree
Showing 14 changed files with 63 additions and 12 deletions.
2 changes: 1 addition & 1 deletion src/api/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ async fn new_self_node_with_port(node_type: NodeType, port: u16) -> (Node, Socke
bind_address.set_port(port);

let tcp_tls_config = TcpTlsConfig::new_no_tls(bind_address);
let self_node = Node::new(&tcp_tls_config, 20, node_type, false, false)
let self_node = Node::new(&tcp_tls_config, 20, 20, node_type, false, false)
.await
.unwrap();
socket_address.set_port(self_node.local_address().port());
Expand Down
4 changes: 4 additions & 0 deletions src/bin/node/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,10 @@ fn load_settings(matches: &clap::ArgMatches) -> config::Config {
settings.set("peer_limit", 1000).unwrap();
}

if let Err(ConfigError::NotFound(_)) = settings.get_int("sub_peer_limit") {
settings.set("sub_peer_limit", 1000).unwrap();
}

if let Some(port) = matches.value_of("api_port") {
settings.set("mempool_api_port", port).unwrap();
}
Expand Down
3 changes: 2 additions & 1 deletion src/bin/node_settings_local_raft_1.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ mempool_partition_full_size = 1
mempool_minimum_miner_pool_len = 1
jurisdiction = "US"
backup_block_modulo = 4
peer_limit = 1000
peer_limit = 5
sub_peer_limit = 1
mempool_mining_event_timeout= 3000
storage_block_timeout = 30000
#backup_restore = true
Expand Down
11 changes: 4 additions & 7 deletions src/bin/node_settings_local_raft_2.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
mempool_db_mode = {Test = 0}
storage_db_mode = {Test = 0}
miner_db_mode = {Test = 0}
user_db_mode = {Test = 1000}
mempool_db_mode = {Test = 1}
storage_db_mode = {Test = 1}
miner_db_mode = {Live = 1}
user_db_mode = {Test = 1001}
mempool_raft = 1
storage_raft = 1
mempool_partition_full_size = 2
Expand Down Expand Up @@ -31,9 +31,6 @@ address = "127.0.0.1:12330"
[[storage_nodes]]
address = "127.0.0.1:12331"

[[miner_nodes]]
address = "127.0.0.1:12340"

[[miner_nodes]]
address = "127.0.0.1:12341"

Expand Down
26 changes: 26 additions & 0 deletions src/comms_handler/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,10 +147,14 @@ pub struct Node {
tcp_tls_connector: Arc<RwLock<TcpTlsConnector>>,
/// List of all connected peers.
pub(crate) peers: Arc<RwLock<PeerList>>,
/// List of sub peers (currently only relevant to Mempool).
sub_peers: Arc<RwLock<HashSet<SocketAddr>>>,
/// Node type.
node_type: NodeType,
/// The max number of peers this node should handle.
peer_limit: usize,
/// The max number of sub peers this node should handle (currently only relevant to Mempool).
sub_peer_limit: usize,
/// Tracing context.
span: Span,
/// Channel to transmit incoming frames and events from peers.
Expand Down Expand Up @@ -217,13 +221,15 @@ impl Node {
pub async fn new(
config: &TcpTlsConfig,
peer_limit: usize,
sub_peer_limit: usize,
node_type: NodeType,
disable_listening: bool,
send_heartbeat_messages: bool,
) -> Result<Self> {
Self::new_with_version(
config,
peer_limit,
sub_peer_limit,
node_type,
NETWORK_VERSION,
disable_listening,
Expand All @@ -243,6 +249,7 @@ impl Node {
pub async fn new_with_version(
config: &TcpTlsConfig,
peer_limit: usize,
sub_peer_limit: usize,
node_type: NodeType,
network_version: u32,
disable_listening: bool,
Expand All @@ -265,7 +272,9 @@ impl Node {
tcp_tls_connector: Arc::new(RwLock::new(tcp_tls_connector)),
node_type,
peers: Arc::new(RwLock::new(HashMap::with_capacity(peer_limit))),
sub_peers: Arc::new(RwLock::new(HashSet::with_capacity(sub_peer_limit))),
peer_limit,
sub_peer_limit,
span,
event_tx,
event_rx: Arc::new(Mutex::new(event_rx)),
Expand Down Expand Up @@ -983,6 +992,7 @@ impl Node {
}));
}

// Check for duplicate peers
let mut all_peers = self.peers.write().await;
if all_peers.contains_key(&peer_in_addr) {
return Err(CommsError::PeerDuplicate(PeerInfo {
Expand All @@ -991,13 +1001,28 @@ impl Node {
}));
}

// Check whether peer is already connected to us
let peer = all_peers
.get_mut(&peer_out_addr)
.ok_or(CommsError::PeerNotFound(PeerInfo {
node_type: None,
address: Some(peer_out_addr),
}))?;

// Check if the peer is a miner and we are a mempool
let mut sub_peers = self.sub_peers.write().await;
if self.node_type == NodeType::Mempool && peer_type == NodeType::Miner {
debug!("Current sub-peers: {:?}", sub_peers);
debug!("Sub-peer limit: {:?}", self.sub_peer_limit);

if sub_peers.len() + 1 > self.sub_peer_limit {
all_peers.remove_entry(&peer_in_addr);
return Err(CommsError::PeerListFull);
} else {
sub_peers.insert(peer_in_addr);
}
}

// We only do DNS validation on mempool and storage nodes
if self.node_type == NodeType::Mempool || self.node_type == NodeType::Storage {
if let Some(peer_cert) = peer_cert {
Expand Down Expand Up @@ -1383,6 +1408,7 @@ mod test {
Node::new_with_version(
&tcp_tls_config,
peer_limit,
peer_limit,
NodeType::Mempool,
network_version,
false,
Expand Down
14 changes: 11 additions & 3 deletions src/comms_handler/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -586,9 +586,16 @@ async fn create_config_mempool_nodes(configs: Vec<TcpTlsConfig>, peer_limit: usi
let mut nodes = Vec::new();
for tcp_tls_config in configs {
nodes.push(
Node::new(&tcp_tls_config, peer_limit, NodeType::Mempool, false, false)
.await
.unwrap(),
Node::new(
&tcp_tls_config,
peer_limit,
peer_limit,
NodeType::Mempool,
false,
false,
)
.await
.unwrap(),
);
}
nodes
Expand All @@ -607,6 +614,7 @@ async fn create_node_type_version(
Node::new_with_version(
&tcp_tls_config,
peer_limit,
peer_limit,
node_type,
network_version,
false,
Expand Down
2 changes: 2 additions & 0 deletions src/configurations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,8 @@ pub struct MempoolNodeConfig {
pub mempool_miner_whitelist: MinerWhitelist,
/// Limit for the number of peers this node can have
pub peer_limit: usize,
/// Limit for the number of sub-peers (miners) this node can have
pub sub_peer_limit: usize,
/// Initial issuances
pub initial_issuances: Vec<InitialIssuance>,
/// Lifetime of a transaction's status in seconds
Expand Down
7 changes: 7 additions & 0 deletions src/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ impl MempoolNode {
let node = Node::new(
&tcp_tls_config,
config.peer_limit,
config.sub_peer_limit,
NodeType::Mempool,
false,
true,
Expand Down Expand Up @@ -240,6 +241,12 @@ impl MempoolNode {
mempool_miner_whitelist: config.mempool_miner_whitelist,
};

if config.sub_peer_limit > config.peer_limit {
return Err(MempoolError::ConfigError(
"Sub peer limit cannot be greater than peer limit",
));
}

MempoolNode {
node,
node_raft,
Expand Down
1 change: 1 addition & 0 deletions src/mempool_raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2118,6 +2118,7 @@ mod test {
enable_trigger_messages_pipeline_reset: Default::default(),
mempool_miner_whitelist: Default::default(),
peer_limit: 1000,
sub_peer_limit: 1000,
initial_issuances: Default::default(),
tx_status_lifetime: 600000,
};
Expand Down
1 change: 1 addition & 0 deletions src/miner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ impl MinerNode {
let node = Node::new(
&tcp_tls_config,
config.peer_limit,
config.peer_limit,
NodeType::Miner,
disable_tcp_listener,
false,
Expand Down
1 change: 1 addition & 0 deletions src/pre_launch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ impl PreLaunchNode {
let node = Node::new(
&tcp_tls_config,
config.peer_limit,
config.peer_limit,
NodeType::PreLaunch,
false,
false,
Expand Down
1 change: 1 addition & 0 deletions src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ impl StorageNode {
let node = Node::new(
&tcp_tls_config,
config.peer_limit,
config.peer_limit,
NodeType::Storage,
false,
false,
Expand Down
1 change: 1 addition & 0 deletions src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1189,6 +1189,7 @@ async fn init_mempool(
enable_trigger_messages_pipeline_reset: config.enable_pipeline_reset,
mempool_miner_whitelist: config.mempool_miner_whitelist.clone(),
peer_limit: config.peer_limit,
sub_peer_limit: config.peer_limit,
initial_issuances: config.initial_issuances.clone(),
tx_status_lifetime: 600000,
};
Expand Down
1 change: 1 addition & 0 deletions src/user.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ impl UserNode {
let node = Node::new(
&tcp_tls_config,
config.peer_limit,
config.peer_limit,
NodeType::User,
disable_tcp_listener,
false,
Expand Down

0 comments on commit be196dd

Please sign in to comment.