Skip to content

Commit

Permalink
Merge branch 'main' of github.com:chainwayxyz/citrea-e2e into da-moni…
Browse files Browse the repository at this point in the history
…toring
  • Loading branch information
jfldde committed Nov 5, 2024
2 parents 5c1a0d5 + 3a9bf6f commit 2a6ad0e
Showing 1 changed file with 115 additions and 11 deletions.
126 changes: 115 additions & 11 deletions src/bitcoin.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::{
collections::HashSet,
fs::File,
future::Future,
process::Stdio,
sync::Arc,
time::{Duration, Instant},
Expand Down Expand Up @@ -308,7 +309,7 @@ impl BitcoinNodeCluster {

pub async fn wait_for_sync(&self, timeout: Option<Duration>) -> Result<()> {
let start = Instant::now();
let timeout = timeout.unwrap_or(Duration::from_secs(30));
let timeout = timeout.unwrap_or(Duration::from_secs(60));
while start.elapsed() < timeout {
let mut heights = HashSet::new();
for node in &self.inner {
Expand All @@ -334,9 +335,53 @@ impl BitcoinNodeCluster {
SpawnOutput::Container(container) => container.ip.clone(),
SpawnOutput::Child(_) => "127.0.0.1".to_string(),
};
let ip_port = format!("{}:{}", ip, to_node.config.p2p_port);

from_node.onetry_node(&ip_port).await?;

let from_subver = from_node.get_network_info().await?.subversion;
let to_subver = to_node.get_network_info().await?.subversion;

// Check and wait for both inbound and outbound connections
wait_until(|| async {
let out_connected = from_node
.get_peer_info()
.await?
.iter()
.any(|peer| peer.subver == to_subver && !peer.inbound);

let in_connected = to_node
.get_peer_info()
.await?
.iter()
.any(|peer| peer.subver == from_subver && peer.inbound);

Ok(out_connected && in_connected)
})
.await?;

let target_node_addr = format!("{}:{}", ip, to_node.config.p2p_port);
from_node.onetry_node(&target_node_addr).await?;
// Handshake check. Wait for pong messages
wait_until(|| async {
let out_peer = from_node
.get_peer_info()
.await?
.into_iter()
.find(|peer| peer.subver == to_subver && !peer.inbound);

let in_peer = to_node
.get_peer_info()
.await?
.into_iter()
.find(|peer| peer.subver == from_subver && peer.inbound);

if let (Some(out_p), Some(in_p)) = (out_peer, in_peer) {
Ok(out_p.bytesrecv_per_msg.get("pong").unwrap_or(&0) >= &29u64
&& in_p.bytesrecv_per_msg.get("pong").unwrap_or(&0) >= &29u64)
} else {
Ok(false)
}
})
.await?;
}
}
}
Expand All @@ -347,20 +392,51 @@ impl BitcoinNodeCluster {
for (i, from_node) in self.iter().enumerate() {
for (j, to_node) in self.iter().enumerate() {
if i != j {
let ip = match &to_node.spawn_output {
SpawnOutput::Container(container) => container.ip.clone(),
SpawnOutput::Child(_) => "127.0.0.1".to_string(),
};

let target_node_addr = format!("{}:{}", ip, to_node.config.p2p_port);
// from_node.remove_node(&target_node_addr).await?;
from_node.disconnect_node(&target_node_addr).await?;
let to_subver = to_node.get_network_info().await?.subversion;

let peers = from_node.get_peer_info().await?;
let peer_ids: Vec<_> = peers
.iter()
.filter(|peer| peer.subver == to_subver)
.map(|peer| peer.id)
.collect();

if peer_ids.is_empty() {
return Ok(());
}

for peer_id in peer_ids {
match from_node.disconnect_node_by_id(peer_id as u32).await {
Ok(_) => (),
Err(e) => {
if !e.to_string().contains("Node not found") {
bail!("{e}")
}
}
}
}

wait_until(|| self.test_connection(from_node, to_node, false)).await?;
wait_until(|| self.test_connection(to_node, from_node, false)).await?;
}
}
}
Ok(())
}

async fn test_connection(
&self,
from_node: &BitcoinNode,
to_node: &BitcoinNode,
expect_connected: bool,
) -> Result<bool> {
let to_subver = to_node.get_network_info().await?.subversion;
let peers = from_node.get_peer_info().await?;
let is_connected = peers.iter().any(|peer| peer.subver == to_subver);

Ok(is_connected == expect_connected)
}

pub fn get(&self, index: usize) -> Option<&BitcoinNode> {
self.inner.get(index)
}
Expand Down Expand Up @@ -389,3 +465,31 @@ async fn wait_for_rpc_ready(client: &Client, timeout: Option<Duration>) -> Resul
}
bail!("Timeout waiting for RPC to be ready")
}

pub async fn wait_until<F, Fut>(mut f: F) -> Result<()>
where
F: FnMut() -> Fut,
Fut: Future<Output = Result<bool>>,
{
let timeout = Duration::from_secs(60);
let start = Instant::now();

while start.elapsed() < timeout {
match f().await {
Ok(true) => return Ok(()),
Ok(false) => {
sleep(Duration::from_secs(1)).await;
continue;
}
Err(e) => {
if e.to_string().contains("node not connected") {
sleep(Duration::from_secs(1)).await;
continue;
}
return Err(e);
}
}
}

bail!("wait_until timed out after {} seconds", timeout.as_secs())
}

0 comments on commit 2a6ad0e

Please sign in to comment.