Skip to content

Commit

Permalink
daemon: cache user balances in mempool to prevent double spendings in…
Browse files Browse the repository at this point in the history
… mempool too
  • Loading branch information
Slixe committed Jan 16, 2024
1 parent 6e57349 commit 0b3146c
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 10 deletions.
33 changes: 29 additions & 4 deletions xelis_daemon/src/core/blockchain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1036,6 +1036,8 @@ impl<S: Storage> Blockchain<S> {
let current_topoheight = self.get_topo_height();
// get the highest nonce for this owner
let owner = tx.get_owner();
// final balances of a user after the new TX being applied
let mut final_balances: HashMap<Hash, VersionedBalance> = HashMap::new();
// get the highest nonce available
// if presents, it means we have at least one tx from this owner in mempool
if let Some(cache) = mempool.get_cache_for(owner) {
Expand All @@ -1050,21 +1052,44 @@ impl<S: Storage> Blockchain<S> {
debug!("TX {} nonce is not in the range of the pending TXs for this owner, received: {}, expected between {} and {}", hash, tx.get_nonce(), cache.get_min(), cache.get_max());
return Err(BlockchainError::InvalidTxNonceMempoolCache(tx.get_nonce(), cache.get_min(), cache.get_max()))
}

// we need to do it in two times because of the constraint of lifetime on &tx
let mut balances = HashMap::new();
let mut nonces = HashMap::new();
// because we already verified the range of nonce
nonces.insert(tx.get_owner(), tx.get_nonce());
// Insert expected balance from mempool TXs

// Now introduce all expected balances for this user from mempool cache
let user_balances = balances.entry(tx.get_owner()).or_insert_with(HashMap::new);
let cached_balances = cache.get_balances();
// We are forced to clone the balance because we need to manipulate it in the next step
for (asset, balance) in cached_balances {
user_balances.insert(asset, balance.clone());
}

// Verify original TX
// We may have double spending in balances, but it is ok because miner check that all txs included are valid
self.verify_transaction_with_hash(&storage, &tx, &hash, &mut balances, Some(&mut nonces), false, current_topoheight).await?;

// Store all balances changes
if let Some(user_balances) = balances.remove(tx.get_owner()) {
for (asset, balance) in user_balances {
final_balances.insert(asset.clone(), balance);
}
}
} else {
let mut balances = HashMap::new();
self.verify_transaction_with_hash(&storage, &tx, &hash, &mut balances, None, false, current_topoheight).await?;
// Store balances changes
if let Some(user_balances) = balances.remove(tx.get_owner()) {
for (asset, balance) in user_balances {
final_balances.insert(asset.clone(), balance);
}
}
}

mempool.add_tx(hash.clone(), tx.clone(), tx_size)?;
mempool.add_tx(hash.clone(), tx.clone(), tx_size, final_balances)?;
}

if broadcast {
Expand Down Expand Up @@ -1227,7 +1252,7 @@ impl<S: Storage> Blockchain<S> {
trace!("Checking TX {} with nonce {}, {}", hash, tx.get_nonce(), tx.get_owner());
let owner = tx.get_owner();
if let Err(e) = self.verify_transaction_with_hash(&storage, tx, hash, &mut balances, Some(&mut nonces), false, topoheight).await {
debug!("TX {} ({}) is not valid for mining: {}", hash, owner, e);
warn!("TX {} ({}) is not valid for mining: {}", hash, owner, e);
} else {
trace!("Selected {} (nonce: {}, fees: {}) for mining", hash, tx.get_nonce(), format_xelis(tx.get_fee()));
// TODO no clone
Expand Down Expand Up @@ -1767,7 +1792,7 @@ impl<S: Storage> Blockchain<S> {
debug!("Locking mempool write mode");
let mut mempool = self.mempool.write().await;
debug!("mempool write mode ok");
mempool.clean_up(&*storage).await;
mempool.clean_up(&*storage, &self, highest_topo).await;
}

// Now we can try to add back all transactions
Expand Down Expand Up @@ -2001,7 +2026,7 @@ impl<S: Storage> Blockchain<S> {
// verify the transaction and returns fees available
// nonces allow us to support multiples tx from same owner in the same block
// txs must be sorted in ascending order based on account nonce
async fn verify_transaction_with_hash<'a>(&self, storage: &S, tx: &'a Transaction, hash: &Hash, balances: &mut HashMap<&'a PublicKey, HashMap<&'a Hash, VersionedBalance>>, nonces: Option<&mut HashMap<&'a PublicKey, u64>>, skip_nonces: bool, topoheight: u64) -> Result<(), BlockchainError> {
pub async fn verify_transaction_with_hash<'a>(&self, storage: &S, tx: &'a Transaction, hash: &Hash, balances: &mut HashMap<&'a PublicKey, HashMap<&'a Hash, VersionedBalance>>, nonces: Option<&mut HashMap<&'a PublicKey, u64>>, skip_nonces: bool, topoheight: u64) -> Result<(), BlockchainError> {
trace!("Verify transaction with hash {}", hash);

// Verify that the TX has a valid signature
Expand Down
70 changes: 64 additions & 6 deletions xelis_daemon/src/core/mempool.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
use super::error::BlockchainError;
use super::storage::Storage;
use super::{
blockchain::Blockchain,
error::BlockchainError,
storage::Storage
};
use std::collections::HashMap;
use std::sync::Arc;
use indexmap::IndexSet;
use log::{trace, debug, warn};
use xelis_common::utils::get_current_time_in_seconds;
use xelis_common::{
account::VersionedBalance,
utils::get_current_time_in_seconds,
crypto::{
hash::Hash,
key::PublicKey
Expand Down Expand Up @@ -35,6 +39,9 @@ pub struct AccountCache {
max: u64,
// all txs for this user ordered by nonce
txs: IndexSet<Arc<Hash>>,
// Expected balances after all txs in this cache
// This is also used to verify the validity of the TX spendings
balances: HashMap<Hash, VersionedBalance>
}

#[derive(serde::Serialize)]
Expand All @@ -55,14 +62,18 @@ impl Mempool {
}

// All checks are made in Blockchain before calling this function
pub fn add_tx(&mut self, hash: Hash, tx: Arc<Transaction>, size: usize) -> Result<(), BlockchainError> {
pub fn add_tx(&mut self, hash: Hash, tx: Arc<Transaction>, size: usize, balances: HashMap<Hash, VersionedBalance>) -> Result<(), BlockchainError> {
let hash = Arc::new(hash);
let nonce = tx.get_nonce();
// update the cache for this owner
let mut must_update = true;
if let Some(cache) = self.caches.get_mut(tx.get_owner()) {
// delete the TX if its in the range of already tracked nonces
trace!("Cache found for owner {} with nonce range {}-{}, nonce = {}", tx.get_owner(), cache.get_min(), cache.get_max(), nonce);

// Support the case where the nonce is already used in cache
// If a user want to cancel its TX, he can just resend a TX with same nonce and higher fee
// NOTE: This is not possible anymore, disabled in blockchain function
if nonce >= cache.get_min() && nonce <= cache.get_max() {
trace!("nonce {} is in range {}-{}", nonce, cache.get_min(), cache.get_max());
// because it's based on order and we may have the same order
Expand All @@ -84,6 +95,8 @@ impl Mempool {
if must_update {
cache.update(nonce, hash.clone());
}
// Update re-computed balances
cache.set_balances(balances);
} else {
let mut txs = IndexSet::new();
txs.insert(hash.clone());
Expand All @@ -92,7 +105,8 @@ impl Mempool {
let cache = AccountCache {
max: nonce,
min: nonce,
txs
txs,
balances
};
self.caches.insert(tx.get_owner().clone(), cache);
}
Expand Down Expand Up @@ -238,7 +252,7 @@ impl Mempool {
// Because of DAG reorg, we can't only check updated keys from new block,
// as a block could be orphaned and the nonce order would change
// So we need to check all keys from mempool and compare it from storage
pub async fn clean_up<S: Storage>(&mut self, storage: &S) {
pub async fn clean_up<S: Storage>(&mut self, storage: &S, blockchain: &Blockchain<S>, topoheight: u64) {
trace!("Cleaning up mempool...");

let mut cache = HashMap::new();
Expand Down Expand Up @@ -326,6 +340,40 @@ impl Mempool {

// delete the nonce cache if no txs are left
delete_cache = cache.txs.is_empty();
// Cache is not empty yet, but we deleted some TXs from it, balances may be out-dated, verify TXs left
// TODO: there may be a way to optimize this even more, by checking if deleted TXs are those who got mined
// Which mean, expected balances are still up to date with chain state
if !delete_cache && !hashes.is_empty() {
let mut balances = HashMap::new();
let mut invalid_txs = Vec::new();
for tx_hash in &cache.txs {
if let Some(sorted_tx) = self.txs.get(tx_hash) {
// Verify if the TX is still valid
// If not, delete it
let tx = sorted_tx.get_tx();

// Skip nonces verification as we already did it
if let Err(e) = blockchain.verify_transaction_with_hash(storage, tx, &tx_hash, &mut balances, None, true, topoheight).await {
warn!("TX {} is not valid anymore, deleting it: {}", tx_hash, e);
invalid_txs.push(tx_hash.clone());
}
} else {
// Shouldn't happen
warn!("TX {} not found in mempool while verifying", tx_hash);
}
}

// Update balances cache
if let Some(balances) = balances.remove(&key) {
cache.set_balances(balances.into_iter().map(|(k, v)| (k.clone(), v)).collect());
}

// Delete all invalid txs from cache
for hash in invalid_txs {
cache.txs.remove(&hash);
hashes.push(hash);
}
}

// now delete all necessary txs
for hash in hashes {
Expand Down Expand Up @@ -381,6 +429,16 @@ impl AccountCache {
&self.txs
}

// Update balances cache
fn set_balances(&mut self, balances: HashMap<Hash, VersionedBalance>) {
self.balances = balances;
}

// Returns the expected balances cache after the execution of all TXs
pub fn get_balances(&self) -> &HashMap<Hash, VersionedBalance> {
&self.balances
}

// Update the cache with a new TX
fn update(&mut self, nonce: u64, hash: Arc<Hash>) {
self.update_nonce_range(nonce);
Expand Down

0 comments on commit 0b3146c

Please sign in to comment.