diff --git a/src/bitcoin.rs b/src/bitcoin.rs index 2a2fb57..e93aa76 100644 --- a/src/bitcoin.rs +++ b/src/bitcoin.rs @@ -1,6 +1,7 @@ use std::{ collections::HashSet, fs::File, + future::Future, process::Stdio, sync::Arc, time::{Duration, Instant}, @@ -308,7 +309,7 @@ impl BitcoinNodeCluster { pub async fn wait_for_sync(&self, timeout: Option) -> Result<()> { let start = Instant::now(); - let timeout = timeout.unwrap_or(Duration::from_secs(30)); + let timeout = timeout.unwrap_or(Duration::from_secs(60)); while start.elapsed() < timeout { let mut heights = HashSet::new(); for node in &self.inner { @@ -334,9 +335,53 @@ impl BitcoinNodeCluster { SpawnOutput::Container(container) => container.ip.clone(), SpawnOutput::Child(_) => "127.0.0.1".to_string(), }; + let ip_port = format!("{}:{}", ip, to_node.config.p2p_port); + + from_node.onetry_node(&ip_port).await?; + + let from_subver = from_node.get_network_info().await?.subversion; + let to_subver = to_node.get_network_info().await?.subversion; + + // Check and wait for both inbound and outbound connections + wait_until(|| async { + let out_connected = from_node + .get_peer_info() + .await? + .iter() + .any(|peer| peer.subver == to_subver && !peer.inbound); + + let in_connected = to_node + .get_peer_info() + .await? + .iter() + .any(|peer| peer.subver == from_subver && peer.inbound); + + Ok(out_connected && in_connected) + }) + .await?; - let target_node_addr = format!("{}:{}", ip, to_node.config.p2p_port); - from_node.onetry_node(&target_node_addr).await?; + // Handshake check. Wait for pong messages + wait_until(|| async { + let out_peer = from_node + .get_peer_info() + .await? + .into_iter() + .find(|peer| peer.subver == to_subver && !peer.inbound); + + let in_peer = to_node + .get_peer_info() + .await? + .into_iter() + .find(|peer| peer.subver == from_subver && peer.inbound); + + if let (Some(out_p), Some(in_p)) = (out_peer, in_peer) { + Ok(out_p.bytesrecv_per_msg.get("pong").unwrap_or(&0) >= &29u64 + && in_p.bytesrecv_per_msg.get("pong").unwrap_or(&0) >= &29u64) + } else { + Ok(false) + } + }) + .await?; } } } @@ -347,20 +392,51 @@ impl BitcoinNodeCluster { for (i, from_node) in self.iter().enumerate() { for (j, to_node) in self.iter().enumerate() { if i != j { - let ip = match &to_node.spawn_output { - SpawnOutput::Container(container) => container.ip.clone(), - SpawnOutput::Child(_) => "127.0.0.1".to_string(), - }; - - let target_node_addr = format!("{}:{}", ip, to_node.config.p2p_port); - // from_node.remove_node(&target_node_addr).await?; - from_node.disconnect_node(&target_node_addr).await?; + let to_subver = to_node.get_network_info().await?.subversion; + + let peers = from_node.get_peer_info().await?; + let peer_ids: Vec<_> = peers + .iter() + .filter(|peer| peer.subver == to_subver) + .map(|peer| peer.id) + .collect(); + + if peer_ids.is_empty() { + return Ok(()); + } + + for peer_id in peer_ids { + match from_node.disconnect_node_by_id(peer_id as u32).await { + Ok(_) => (), + Err(e) => { + if !e.to_string().contains("Node not found") { + bail!("{e}") + } + } + } + } + + wait_until(|| self.test_connection(from_node, to_node, false)).await?; + wait_until(|| self.test_connection(to_node, from_node, false)).await?; } } } Ok(()) } + async fn test_connection( + &self, + from_node: &BitcoinNode, + to_node: &BitcoinNode, + expect_connected: bool, + ) -> Result { + let to_subver = to_node.get_network_info().await?.subversion; + let peers = from_node.get_peer_info().await?; + let is_connected = peers.iter().any(|peer| peer.subver == to_subver); + + Ok(is_connected == expect_connected) + } + pub fn get(&self, index: usize) -> Option<&BitcoinNode> { self.inner.get(index) } @@ -389,3 +465,31 @@ async fn wait_for_rpc_ready(client: &Client, timeout: Option) -> Resul } bail!("Timeout waiting for RPC to be ready") } + +pub async fn wait_until(mut f: F) -> Result<()> +where + F: FnMut() -> Fut, + Fut: Future>, +{ + let timeout = Duration::from_secs(60); + let start = Instant::now(); + + while start.elapsed() < timeout { + match f().await { + Ok(true) => return Ok(()), + Ok(false) => { + sleep(Duration::from_secs(1)).await; + continue; + } + Err(e) => { + if e.to_string().contains("node not connected") { + sleep(Duration::from_secs(1)).await; + continue; + } + return Err(e); + } + } + } + + bail!("wait_until timed out after {} seconds", timeout.as_secs()) +}