From 1bc9594cf8a73ae33b4a889562955fca725815f8 Mon Sep 17 00:00:00 2001 From: Roland Bewick Date: Thu, 23 May 2024 14:55:01 +0700 Subject: [PATCH] feat: optional background sync, update fee estimates in sync_wallets --- src/config.rs | 3 + src/lib.rs | 319 +++++++++++++++++++++++++++++--------------------- 2 files changed, 186 insertions(+), 136 deletions(-) diff --git a/src/config.rs b/src/config.rs index 0d756e674..69aeb5f79 100644 --- a/src/config.rs +++ b/src/config.rs @@ -17,6 +17,9 @@ const DEFAULT_PROBING_LIQUIDITY_LIMIT_MULTIPLIER: u64 = 3; const DEFAULT_LOG_LEVEL: LogLevel = LogLevel::Debug; const DEFAULT_ANCHOR_PER_CHANNEL_RESERVE_SATS: u64 = 25_000; +// Enable background syncing of fee rates, onchain and lightning wallet +pub(crate) const ENABLE_BACKGROUND_SYNC: bool = false; + // The 'stop gap' parameter used by BDK's wallet sync. This seems to configure the threshold // number of derivation indexes after which BDK stops looking for new scripts belonging to the wallet. pub(crate) const BDK_CLIENT_STOP_GAP: usize = 20; diff --git a/src/lib.rs b/src/lib.rs index f31e5a97d..fc773612b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -123,8 +123,8 @@ pub use builder::BuildError; pub use builder::NodeBuilder as Builder; use config::{ - LDK_WALLET_SYNC_TIMEOUT_SECS, NODE_ANN_BCAST_INTERVAL, PEER_RECONNECTION_INTERVAL, - RGS_SYNC_INTERVAL, WALLET_SYNC_INTERVAL_MINIMUM_SECS, + ENABLE_BACKGROUND_SYNC, LDK_WALLET_SYNC_TIMEOUT_SECS, NODE_ANN_BCAST_INTERVAL, + PEER_RECONNECTION_INTERVAL, RGS_SYNC_INTERVAL, WALLET_SYNC_INTERVAL_MINIMUM_SECS, }; use connection::ConnectionManager; use event::{EventHandler, EventQueue}; @@ -250,162 +250,169 @@ impl Node { }) })?; - // Setup wallet sync - let wallet = Arc::clone(&self.wallet); - let sync_logger = Arc::clone(&self.logger); - let sync_onchain_wallet_timestamp = Arc::clone(&self.latest_onchain_wallet_sync_timestamp); - let mut stop_sync = self.stop_sender.subscribe(); - let onchain_wallet_sync_interval_secs = self - .config - .onchain_wallet_sync_interval_secs - .max(config::WALLET_SYNC_INTERVAL_MINIMUM_SECS); - std::thread::spawn(move || { - tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on( - async move { - let mut onchain_wallet_sync_interval = tokio::time::interval( - Duration::from_secs(onchain_wallet_sync_interval_secs), - ); - onchain_wallet_sync_interval - .set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); - loop { - tokio::select! { - _ = stop_sync.changed() => { - log_trace!( - sync_logger, - "Stopping background syncing on-chain wallet.", - ); - return; - } - _ = onchain_wallet_sync_interval.tick() => { - let now = Instant::now(); - match wallet.sync().await { - Ok(()) => { - log_trace!( + if ENABLE_BACKGROUND_SYNC { + // Setup wallet sync + let wallet = Arc::clone(&self.wallet); + let sync_logger = Arc::clone(&self.logger); + let sync_onchain_wallet_timestamp = + Arc::clone(&self.latest_onchain_wallet_sync_timestamp); + let mut stop_sync = self.stop_sender.subscribe(); + let onchain_wallet_sync_interval_secs = self + .config + .onchain_wallet_sync_interval_secs + .max(config::WALLET_SYNC_INTERVAL_MINIMUM_SECS); + std::thread::spawn(move || { + tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap() + .block_on(async move { + let mut onchain_wallet_sync_interval = tokio::time::interval( + Duration::from_secs(onchain_wallet_sync_interval_secs), + ); + onchain_wallet_sync_interval + .set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + loop { + tokio::select! { + _ = stop_sync.changed() => { + log_trace!( sync_logger, - "Background sync of on-chain wallet finished in {}ms.", - now.elapsed().as_millis() + "Stopping background syncing on-chain wallet.", ); - let unix_time_secs_opt = - SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs()); - *sync_onchain_wallet_timestamp.write().unwrap() = unix_time_secs_opt; - } - Err(err) => { - log_error!( + return; + } + _ = onchain_wallet_sync_interval.tick() => { + let now = Instant::now(); + match wallet.sync().await { + Ok(()) => { + log_trace!( sync_logger, - "Background sync of on-chain wallet failed: {}", - err - ) + "Background sync of on-chain wallet finished in {}ms.", + now.elapsed().as_millis() + ); + let unix_time_secs_opt = + SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs()); + *sync_onchain_wallet_timestamp.write().unwrap() = unix_time_secs_opt; + } + Err(err) => { + log_error!( + sync_logger, + "Background sync of on-chain wallet failed: {}", + err + ) + } } } } } - } - }, - ); - }); + }); + }); - let mut stop_fee_updates = self.stop_sender.subscribe(); - let fee_update_logger = Arc::clone(&self.logger); - let fee_update_timestamp = Arc::clone(&self.latest_fee_rate_cache_update_timestamp); - let fee_estimator = Arc::clone(&self.fee_estimator); - let fee_rate_cache_update_interval_secs = - self.config.fee_rate_cache_update_interval_secs.max(WALLET_SYNC_INTERVAL_MINIMUM_SECS); - runtime.spawn(async move { - let mut fee_rate_update_interval = - tokio::time::interval(Duration::from_secs(fee_rate_cache_update_interval_secs)); - // We just blocked on updating, so skip the first tick. - fee_rate_update_interval.reset(); - fee_rate_update_interval - .set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); - loop { - tokio::select! { - _ = stop_fee_updates.changed() => { - log_trace!( - fee_update_logger, - "Stopping background updates of fee rate cache.", - ); - return; - } - _ = fee_rate_update_interval.tick() => { - let now = Instant::now(); - match fee_estimator.update_fee_estimates().await { - Ok(()) => { - log_trace!( + let mut stop_fee_updates = self.stop_sender.subscribe(); + let fee_update_logger = Arc::clone(&self.logger); + let fee_update_timestamp = Arc::clone(&self.latest_fee_rate_cache_update_timestamp); + let fee_estimator = Arc::clone(&self.fee_estimator); + let fee_rate_cache_update_interval_secs = self + .config + .fee_rate_cache_update_interval_secs + .max(WALLET_SYNC_INTERVAL_MINIMUM_SECS); + runtime.spawn(async move { + let mut fee_rate_update_interval = + tokio::time::interval(Duration::from_secs(fee_rate_cache_update_interval_secs)); + // We just blocked on updating, so skip the first tick. + fee_rate_update_interval.reset(); + fee_rate_update_interval + .set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + loop { + tokio::select! { + _ = stop_fee_updates.changed() => { + log_trace!( fee_update_logger, - "Background update of fee rate cache finished in {}ms.", - now.elapsed().as_millis() - ); - let unix_time_secs_opt = - SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs()); - *fee_update_timestamp.write().unwrap() = unix_time_secs_opt; - } - Err(err) => { - log_error!( + "Stopping background updates of fee rate cache.", + ); + return; + } + _ = fee_rate_update_interval.tick() => { + let now = Instant::now(); + match fee_estimator.update_fee_estimates().await { + Ok(()) => { + log_trace!( fee_update_logger, - "Background update of fee rate cache failed: {}", - err - ) + "Background update of fee rate cache finished in {}ms.", + now.elapsed().as_millis() + ); + let unix_time_secs_opt = + SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs()); + *fee_update_timestamp.write().unwrap() = unix_time_secs_opt; + } + Err(err) => { + log_error!( + fee_update_logger, + "Background update of fee rate cache failed: {}", + err + ) + } } } } } - } - }); + }); - let tx_sync = Arc::clone(&self.tx_sync); - let sync_cman = Arc::clone(&self.channel_manager); - let sync_cmon = Arc::clone(&self.chain_monitor); - let sync_sweeper = Arc::clone(&self.output_sweeper); - let sync_logger = Arc::clone(&self.logger); - let sync_wallet_timestamp = Arc::clone(&self.latest_wallet_sync_timestamp); - let mut stop_sync = self.stop_sender.subscribe(); - let wallet_sync_interval_secs = - self.config.wallet_sync_interval_secs.max(WALLET_SYNC_INTERVAL_MINIMUM_SECS); - runtime.spawn(async move { - let mut wallet_sync_interval = - tokio::time::interval(Duration::from_secs(wallet_sync_interval_secs)); - wallet_sync_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); - loop { - tokio::select! { - _ = stop_sync.changed() => { - log_trace!( - sync_logger, - "Stopping background syncing Lightning wallet.", - ); - return; - } - _ = wallet_sync_interval.tick() => { - let confirmables = vec![ - &*sync_cman as &(dyn Confirm + Sync + Send), - &*sync_cmon as &(dyn Confirm + Sync + Send), - &*sync_sweeper as &(dyn Confirm + Sync + Send), - ]; - let now = Instant::now(); - let timeout_fut = tokio::time::timeout(Duration::from_secs(LDK_WALLET_SYNC_TIMEOUT_SECS), tx_sync.sync(confirmables)); - match timeout_fut.await { - Ok(res) => match res { - Ok(()) => { - log_trace!( - sync_logger, - "Background sync of Lightning wallet finished in {}ms.", - now.elapsed().as_millis() - ); - let unix_time_secs_opt = - SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs()); - *sync_wallet_timestamp.write().unwrap() = unix_time_secs_opt; + let tx_sync = Arc::clone(&self.tx_sync); + let sync_cman = Arc::clone(&self.channel_manager); + let sync_cmon = Arc::clone(&self.chain_monitor); + let sync_sweeper = Arc::clone(&self.output_sweeper); + let sync_logger = Arc::clone(&self.logger); + let sync_wallet_timestamp = Arc::clone(&self.latest_wallet_sync_timestamp); + let mut stop_sync = self.stop_sender.subscribe(); + let wallet_sync_interval_secs = + self.config.wallet_sync_interval_secs.max(WALLET_SYNC_INTERVAL_MINIMUM_SECS); + runtime.spawn(async move { + let mut wallet_sync_interval = + tokio::time::interval(Duration::from_secs(wallet_sync_interval_secs)); + wallet_sync_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + loop { + tokio::select! { + _ = stop_sync.changed() => { + log_trace!( + sync_logger, + "Stopping background syncing Lightning wallet.", + ); + return; + } + _ = wallet_sync_interval.tick() => { + let confirmables = vec![ + &*sync_cman as &(dyn Confirm + Sync + Send), + &*sync_cmon as &(dyn Confirm + Sync + Send), + &*sync_sweeper as &(dyn Confirm + Sync + Send), + ]; + let now = Instant::now(); + let timeout_fut = tokio::time::timeout(Duration::from_secs(LDK_WALLET_SYNC_TIMEOUT_SECS), tx_sync.sync(confirmables)); + match timeout_fut.await { + Ok(res) => match res { + Ok(()) => { + log_trace!( + sync_logger, + "Background sync of Lightning wallet finished in {}ms.", + now.elapsed().as_millis() + ); + let unix_time_secs_opt = + SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs()); + *sync_wallet_timestamp.write().unwrap() = unix_time_secs_opt; + } + Err(e) => { + log_error!(sync_logger, "Background sync of Lightning wallet failed: {}", e) + } } Err(e) => { - log_error!(sync_logger, "Background sync of Lightning wallet failed: {}", e) + log_error!(sync_logger, "Background sync of Lightning wallet timed out: {}", e) } } - Err(e) => { - log_error!(sync_logger, "Background sync of Lightning wallet timed out: {}", e) - } } } } - } - }); + }); + } if self.gossip_source.is_rgs() { let gossip_source = Arc::clone(&self.gossip_source); @@ -1203,6 +1210,8 @@ impl Node { /// [`Config::onchain_wallet_sync_interval_secs`] and [`Config::wallet_sync_interval_secs`]. /// Therefore, using this blocking sync method is almost always redundant and should be avoided /// where possible. + /// **Note:** this is currently used by Alby (combined with disabled background syncs) to have + /// dynamic sync intervals. pub fn sync_wallets(&self) -> Result<(), Error> { let rt_lock = self.runtime.read().unwrap(); if rt_lock.is_none() { @@ -1225,6 +1234,33 @@ impl Node { tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap().block_on( async move { let now = Instant::now(); + let fee_estimator = Arc::clone(&self.fee_estimator); + let fee_update_timestamp = + Arc::clone(&self.latest_fee_rate_cache_update_timestamp); + + match fee_estimator.update_fee_estimates().await { + Ok(()) => { + log_trace!( + sync_logger, + "Update of fee rate cache finished in {}ms.", + now.elapsed().as_millis() + ); + let unix_time_secs_opt = SystemTime::now() + .duration_since(UNIX_EPOCH) + .ok() + .map(|d| d.as_secs()); + *fee_update_timestamp.write().unwrap() = unix_time_secs_opt; + }, + Err(err) => { + log_error!(sync_logger, "Update of fee rate cache failed: {}", err); + return Err(err); + }, + } + + let now = Instant::now(); + let sync_onchain_wallet_timestamp = + Arc::clone(&self.latest_onchain_wallet_sync_timestamp); + match wallet.sync().await { Ok(()) => { log_info!( @@ -1232,6 +1268,11 @@ impl Node { "Sync of on-chain wallet finished in {}ms.", now.elapsed().as_millis() ); + let unix_time_secs_opt = SystemTime::now() + .duration_since(UNIX_EPOCH) + .ok() + .map(|d| d.as_secs()); + *sync_onchain_wallet_timestamp.write().unwrap() = unix_time_secs_opt; }, Err(e) => { log_error!(sync_logger, "Sync of on-chain wallet failed: {}", e); @@ -1240,6 +1281,7 @@ impl Node { }; let now = Instant::now(); + let sync_wallet_timestamp = Arc::clone(&self.latest_wallet_sync_timestamp); match tx_sync.sync(confirmables).await { Ok(()) => { log_info!( @@ -1247,6 +1289,11 @@ impl Node { "Sync of Lightning wallet finished in {}ms.", now.elapsed().as_millis() ); + let unix_time_secs_opt = SystemTime::now() + .duration_since(UNIX_EPOCH) + .ok() + .map(|d| d.as_secs()); + *sync_wallet_timestamp.write().unwrap() = unix_time_secs_opt; Ok(()) }, Err(e) => {