Skip to content

Commit

Permalink
Merge pull request #24 from getAlby/feat/skip-background-sync
Browse files Browse the repository at this point in the history
feat: optional background sync, update fee estimates in sync_wallets
  • Loading branch information
rolznz authored May 23, 2024
2 parents d674826 + 1bc9594 commit f34d371
Show file tree
Hide file tree
Showing 2 changed files with 186 additions and 136 deletions.
3 changes: 3 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ const DEFAULT_PROBING_LIQUIDITY_LIMIT_MULTIPLIER: u64 = 3;
const DEFAULT_LOG_LEVEL: LogLevel = LogLevel::Debug;
const DEFAULT_ANCHOR_PER_CHANNEL_RESERVE_SATS: u64 = 25_000;

// Enable background syncing of fee rates, onchain and lightning wallet
pub(crate) const ENABLE_BACKGROUND_SYNC: bool = false;

// The 'stop gap' parameter used by BDK's wallet sync. This seems to configure the threshold
// number of derivation indexes after which BDK stops looking for new scripts belonging to the wallet.
pub(crate) const BDK_CLIENT_STOP_GAP: usize = 20;
Expand Down
319 changes: 183 additions & 136 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,8 @@ pub use builder::BuildError;
pub use builder::NodeBuilder as Builder;

use config::{
LDK_WALLET_SYNC_TIMEOUT_SECS, NODE_ANN_BCAST_INTERVAL, PEER_RECONNECTION_INTERVAL,
RGS_SYNC_INTERVAL, WALLET_SYNC_INTERVAL_MINIMUM_SECS,
ENABLE_BACKGROUND_SYNC, LDK_WALLET_SYNC_TIMEOUT_SECS, NODE_ANN_BCAST_INTERVAL,
PEER_RECONNECTION_INTERVAL, RGS_SYNC_INTERVAL, WALLET_SYNC_INTERVAL_MINIMUM_SECS,
};
use connection::ConnectionManager;
use event::{EventHandler, EventQueue};
Expand Down Expand Up @@ -250,162 +250,169 @@ impl Node {
})
})?;

// Setup wallet sync
let wallet = Arc::clone(&self.wallet);
let sync_logger = Arc::clone(&self.logger);
let sync_onchain_wallet_timestamp = Arc::clone(&self.latest_onchain_wallet_sync_timestamp);
let mut stop_sync = self.stop_sender.subscribe();
let onchain_wallet_sync_interval_secs = self
.config
.onchain_wallet_sync_interval_secs
.max(config::WALLET_SYNC_INTERVAL_MINIMUM_SECS);
std::thread::spawn(move || {
tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on(
async move {
let mut onchain_wallet_sync_interval = tokio::time::interval(
Duration::from_secs(onchain_wallet_sync_interval_secs),
);
onchain_wallet_sync_interval
.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
tokio::select! {
_ = stop_sync.changed() => {
log_trace!(
sync_logger,
"Stopping background syncing on-chain wallet.",
);
return;
}
_ = onchain_wallet_sync_interval.tick() => {
let now = Instant::now();
match wallet.sync().await {
Ok(()) => {
log_trace!(
if ENABLE_BACKGROUND_SYNC {
// Setup wallet sync
let wallet = Arc::clone(&self.wallet);
let sync_logger = Arc::clone(&self.logger);
let sync_onchain_wallet_timestamp =
Arc::clone(&self.latest_onchain_wallet_sync_timestamp);
let mut stop_sync = self.stop_sender.subscribe();
let onchain_wallet_sync_interval_secs = self
.config
.onchain_wallet_sync_interval_secs
.max(config::WALLET_SYNC_INTERVAL_MINIMUM_SECS);
std::thread::spawn(move || {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap()
.block_on(async move {
let mut onchain_wallet_sync_interval = tokio::time::interval(
Duration::from_secs(onchain_wallet_sync_interval_secs),
);
onchain_wallet_sync_interval
.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
tokio::select! {
_ = stop_sync.changed() => {
log_trace!(
sync_logger,
"Background sync of on-chain wallet finished in {}ms.",
now.elapsed().as_millis()
"Stopping background syncing on-chain wallet.",
);
let unix_time_secs_opt =
SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs());
*sync_onchain_wallet_timestamp.write().unwrap() = unix_time_secs_opt;
}
Err(err) => {
log_error!(
return;
}
_ = onchain_wallet_sync_interval.tick() => {
let now = Instant::now();
match wallet.sync().await {
Ok(()) => {
log_trace!(
sync_logger,
"Background sync of on-chain wallet failed: {}",
err
)
"Background sync of on-chain wallet finished in {}ms.",
now.elapsed().as_millis()
);
let unix_time_secs_opt =
SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs());
*sync_onchain_wallet_timestamp.write().unwrap() = unix_time_secs_opt;
}
Err(err) => {
log_error!(
sync_logger,
"Background sync of on-chain wallet failed: {}",
err
)
}
}
}
}
}
}
},
);
});
});
});

let mut stop_fee_updates = self.stop_sender.subscribe();
let fee_update_logger = Arc::clone(&self.logger);
let fee_update_timestamp = Arc::clone(&self.latest_fee_rate_cache_update_timestamp);
let fee_estimator = Arc::clone(&self.fee_estimator);
let fee_rate_cache_update_interval_secs =
self.config.fee_rate_cache_update_interval_secs.max(WALLET_SYNC_INTERVAL_MINIMUM_SECS);
runtime.spawn(async move {
let mut fee_rate_update_interval =
tokio::time::interval(Duration::from_secs(fee_rate_cache_update_interval_secs));
// We just blocked on updating, so skip the first tick.
fee_rate_update_interval.reset();
fee_rate_update_interval
.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
tokio::select! {
_ = stop_fee_updates.changed() => {
log_trace!(
fee_update_logger,
"Stopping background updates of fee rate cache.",
);
return;
}
_ = fee_rate_update_interval.tick() => {
let now = Instant::now();
match fee_estimator.update_fee_estimates().await {
Ok(()) => {
log_trace!(
let mut stop_fee_updates = self.stop_sender.subscribe();
let fee_update_logger = Arc::clone(&self.logger);
let fee_update_timestamp = Arc::clone(&self.latest_fee_rate_cache_update_timestamp);
let fee_estimator = Arc::clone(&self.fee_estimator);
let fee_rate_cache_update_interval_secs = self
.config
.fee_rate_cache_update_interval_secs
.max(WALLET_SYNC_INTERVAL_MINIMUM_SECS);
runtime.spawn(async move {
let mut fee_rate_update_interval =
tokio::time::interval(Duration::from_secs(fee_rate_cache_update_interval_secs));
// We just blocked on updating, so skip the first tick.
fee_rate_update_interval.reset();
fee_rate_update_interval
.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
tokio::select! {
_ = stop_fee_updates.changed() => {
log_trace!(
fee_update_logger,
"Background update of fee rate cache finished in {}ms.",
now.elapsed().as_millis()
);
let unix_time_secs_opt =
SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs());
*fee_update_timestamp.write().unwrap() = unix_time_secs_opt;
}
Err(err) => {
log_error!(
"Stopping background updates of fee rate cache.",
);
return;
}
_ = fee_rate_update_interval.tick() => {
let now = Instant::now();
match fee_estimator.update_fee_estimates().await {
Ok(()) => {
log_trace!(
fee_update_logger,
"Background update of fee rate cache failed: {}",
err
)
"Background update of fee rate cache finished in {}ms.",
now.elapsed().as_millis()
);
let unix_time_secs_opt =
SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs());
*fee_update_timestamp.write().unwrap() = unix_time_secs_opt;
}
Err(err) => {
log_error!(
fee_update_logger,
"Background update of fee rate cache failed: {}",
err
)
}
}
}
}
}
}
});
});

let tx_sync = Arc::clone(&self.tx_sync);
let sync_cman = Arc::clone(&self.channel_manager);
let sync_cmon = Arc::clone(&self.chain_monitor);
let sync_sweeper = Arc::clone(&self.output_sweeper);
let sync_logger = Arc::clone(&self.logger);
let sync_wallet_timestamp = Arc::clone(&self.latest_wallet_sync_timestamp);
let mut stop_sync = self.stop_sender.subscribe();
let wallet_sync_interval_secs =
self.config.wallet_sync_interval_secs.max(WALLET_SYNC_INTERVAL_MINIMUM_SECS);
runtime.spawn(async move {
let mut wallet_sync_interval =
tokio::time::interval(Duration::from_secs(wallet_sync_interval_secs));
wallet_sync_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
tokio::select! {
_ = stop_sync.changed() => {
log_trace!(
sync_logger,
"Stopping background syncing Lightning wallet.",
);
return;
}
_ = wallet_sync_interval.tick() => {
let confirmables = vec![
&*sync_cman as &(dyn Confirm + Sync + Send),
&*sync_cmon as &(dyn Confirm + Sync + Send),
&*sync_sweeper as &(dyn Confirm + Sync + Send),
];
let now = Instant::now();
let timeout_fut = tokio::time::timeout(Duration::from_secs(LDK_WALLET_SYNC_TIMEOUT_SECS), tx_sync.sync(confirmables));
match timeout_fut.await {
Ok(res) => match res {
Ok(()) => {
log_trace!(
sync_logger,
"Background sync of Lightning wallet finished in {}ms.",
now.elapsed().as_millis()
);
let unix_time_secs_opt =
SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs());
*sync_wallet_timestamp.write().unwrap() = unix_time_secs_opt;
let tx_sync = Arc::clone(&self.tx_sync);
let sync_cman = Arc::clone(&self.channel_manager);
let sync_cmon = Arc::clone(&self.chain_monitor);
let sync_sweeper = Arc::clone(&self.output_sweeper);
let sync_logger = Arc::clone(&self.logger);
let sync_wallet_timestamp = Arc::clone(&self.latest_wallet_sync_timestamp);
let mut stop_sync = self.stop_sender.subscribe();
let wallet_sync_interval_secs =
self.config.wallet_sync_interval_secs.max(WALLET_SYNC_INTERVAL_MINIMUM_SECS);
runtime.spawn(async move {
let mut wallet_sync_interval =
tokio::time::interval(Duration::from_secs(wallet_sync_interval_secs));
wallet_sync_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
tokio::select! {
_ = stop_sync.changed() => {
log_trace!(
sync_logger,
"Stopping background syncing Lightning wallet.",
);
return;
}
_ = wallet_sync_interval.tick() => {
let confirmables = vec![
&*sync_cman as &(dyn Confirm + Sync + Send),
&*sync_cmon as &(dyn Confirm + Sync + Send),
&*sync_sweeper as &(dyn Confirm + Sync + Send),
];
let now = Instant::now();
let timeout_fut = tokio::time::timeout(Duration::from_secs(LDK_WALLET_SYNC_TIMEOUT_SECS), tx_sync.sync(confirmables));
match timeout_fut.await {
Ok(res) => match res {
Ok(()) => {
log_trace!(
sync_logger,
"Background sync of Lightning wallet finished in {}ms.",
now.elapsed().as_millis()
);
let unix_time_secs_opt =
SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs());
*sync_wallet_timestamp.write().unwrap() = unix_time_secs_opt;
}
Err(e) => {
log_error!(sync_logger, "Background sync of Lightning wallet failed: {}", e)
}
}
Err(e) => {
log_error!(sync_logger, "Background sync of Lightning wallet failed: {}", e)
log_error!(sync_logger, "Background sync of Lightning wallet timed out: {}", e)
}
}
Err(e) => {
log_error!(sync_logger, "Background sync of Lightning wallet timed out: {}", e)
}
}
}
}
}
});
});
}

if self.gossip_source.is_rgs() {
let gossip_source = Arc::clone(&self.gossip_source);
Expand Down Expand Up @@ -1203,6 +1210,8 @@ impl Node {
/// [`Config::onchain_wallet_sync_interval_secs`] and [`Config::wallet_sync_interval_secs`].
/// Therefore, using this blocking sync method is almost always redundant and should be avoided
/// where possible.
/// **Note:** this is currently used by Alby (combined with disabled background syncs) to have
/// dynamic sync intervals.
pub fn sync_wallets(&self) -> Result<(), Error> {
let rt_lock = self.runtime.read().unwrap();
if rt_lock.is_none() {
Expand All @@ -1225,13 +1234,45 @@ impl Node {
tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap().block_on(
async move {
let now = Instant::now();
let fee_estimator = Arc::clone(&self.fee_estimator);
let fee_update_timestamp =
Arc::clone(&self.latest_fee_rate_cache_update_timestamp);

match fee_estimator.update_fee_estimates().await {
Ok(()) => {
log_trace!(
sync_logger,
"Update of fee rate cache finished in {}ms.",
now.elapsed().as_millis()
);
let unix_time_secs_opt = SystemTime::now()
.duration_since(UNIX_EPOCH)
.ok()
.map(|d| d.as_secs());
*fee_update_timestamp.write().unwrap() = unix_time_secs_opt;
},
Err(err) => {
log_error!(sync_logger, "Update of fee rate cache failed: {}", err);
return Err(err);
},
}

let now = Instant::now();
let sync_onchain_wallet_timestamp =
Arc::clone(&self.latest_onchain_wallet_sync_timestamp);

match wallet.sync().await {
Ok(()) => {
log_info!(
sync_logger,
"Sync of on-chain wallet finished in {}ms.",
now.elapsed().as_millis()
);
let unix_time_secs_opt = SystemTime::now()
.duration_since(UNIX_EPOCH)
.ok()
.map(|d| d.as_secs());
*sync_onchain_wallet_timestamp.write().unwrap() = unix_time_secs_opt;
},
Err(e) => {
log_error!(sync_logger, "Sync of on-chain wallet failed: {}", e);
Expand All @@ -1240,13 +1281,19 @@ impl Node {
};

let now = Instant::now();
let sync_wallet_timestamp = Arc::clone(&self.latest_wallet_sync_timestamp);
match tx_sync.sync(confirmables).await {
Ok(()) => {
log_info!(
sync_logger,
"Sync of Lightning wallet finished in {}ms.",
now.elapsed().as_millis()
);
let unix_time_secs_opt = SystemTime::now()
.duration_since(UNIX_EPOCH)
.ok()
.map(|d| d.as_secs());
*sync_wallet_timestamp.write().unwrap() = unix_time_secs_opt;
Ok(())
},
Err(e) => {
Expand Down

0 comments on commit f34d371

Please sign in to comment.