Skip to content

Commit

Permalink
Merge pull request #86 from AIBlockOfficial/perf_improvements
Browse files Browse the repository at this point in the history
Semaphore rate limiting for API; Node worker threading
  • Loading branch information
BHouwens authored Aug 21, 2024
2 parents 6dfa383 + 161d583 commit f23defe
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 10 deletions.
5 changes: 5 additions & 0 deletions src/api/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,8 @@ impl std::fmt::Display for ApiErrorType {
}
}
}

#[derive(Debug)]
pub struct OverloadedError;

impl warp::reject::Reject for OverloadedError {}
48 changes: 40 additions & 8 deletions src/api/routes.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
use crate::api::handlers::{self, DbgPaths};
use crate::api::utils::{
auth_request, create_new_cache, handle_rejection, map_api_res_and_cache, warp_path,
with_node_component, ReplyCache, CACHE_LIVE_TIME,
with_node_component, with_semaphore, ReplyCache, CACHE_LIVE_TIME,
};
use crate::comms_handler::Node;
use crate::constants::API_CONCURRENCY_LIMIT;
use crate::db_utils::SimpleDb;
use crate::interfaces::{MempoolApi, UserApi};
use crate::miner::CurrentBlockWithMutex;
use crate::threaded_call::ThreadedCallSender;
use crate::utils::{ApiKeys, RoutesPoWInfo};
use crate::wallet::WalletDb;
use std::sync::{Arc, Mutex};
use tokio::sync::Semaphore;

use warp::{Filter, Rejection, Reply};

Expand Down Expand Up @@ -148,19 +150,21 @@ pub fn debug_data(
aux_node: Option<Node>,
routes_pow: RoutesPoWInfo,
api_keys: ApiKeys,
semaphore: Arc<Semaphore>,
cache: ReplyCache,
) -> impl Filter<Extract = (impl Reply,), Error = Rejection> + Clone {
let route = "debug_data";
warp_path(&mut dp, route)
.and(warp::get())
.and(with_semaphore(semaphore))
.and(auth_request(routes_pow.clone(), api_keys))
.and(with_node_component(dp))
.and(with_node_component(node))
.and(with_node_component(aux_node))
.and(with_node_component(routes_pow))
.and(with_node_component(cache))
.and_then(
move |call_id: String, dp, node, aux, routes_pow: RoutesPoWInfo, cache| {
move |_, call_id: String, dp, node, aux, routes_pow: RoutesPoWInfo, cache| {
let routes = routes_pow.lock().unwrap().clone();
map_api_res_and_cache(
call_id.clone(),
Expand Down Expand Up @@ -201,14 +205,16 @@ pub fn total_supply(
dp: &mut DbgPaths,
routes_pow: RoutesPoWInfo,
api_keys: ApiKeys,
semaphore: Arc<Semaphore>,
cache: ReplyCache,
) -> impl Filter<Extract = (impl Reply,), Error = Rejection> + Clone {
let route = "total_supply";
warp_path(dp, route)
.and(warp::get())
.and(with_semaphore(semaphore))
.and(auth_request(routes_pow, api_keys))
.and(with_node_component(cache))
.and_then(move |call_id: String, cache| {
.and_then(move |_, call_id: String, cache| {
map_api_res_and_cache(
call_id.clone(),
cache,
Expand All @@ -224,15 +230,17 @@ pub fn issued_supply(
threaded_calls: ThreadedCallSender<dyn MempoolApi>,
routes_pow: RoutesPoWInfo,
api_keys: ApiKeys,
semaphore: Arc<Semaphore>,
cache: ReplyCache,
) -> impl Filter<Extract = (impl Reply,), Error = Rejection> + Clone {
let route = "issued_supply";
warp_path(dp, route)
.and(warp::get())
.and(with_semaphore(semaphore))
.and(auth_request(routes_pow, api_keys))
.and(with_node_component(threaded_calls))
.and(with_node_component(cache))
.and_then(move |call_id: String, tc, cache| {
.and_then(move |_, call_id: String, tc, cache| {
map_api_res_and_cache(
call_id.clone(),
cache,
Expand Down Expand Up @@ -529,16 +537,18 @@ pub fn transaction_status(
threaded_calls: ThreadedCallSender<dyn MempoolApi>,
routes_pow: RoutesPoWInfo,
api_keys: ApiKeys,
semaphore: Arc<Semaphore>,
cache: ReplyCache,
) -> impl Filter<Extract = (impl Reply,), Error = Rejection> + Clone {
let route = "transaction_status";
warp_path(dp, route)
.and(warp::post())
.and(with_semaphore(semaphore))
.and(auth_request(routes_pow, api_keys))
.and(with_node_component(threaded_calls))
.and(warp::body::json())
.and(with_node_component(cache))
.and_then(move |call_id: String, tc, info, cache| {
.and_then(move |_, call_id: String, tc, info, cache| {
map_api_res_and_cache(
call_id.clone(),
cache,
Expand Down Expand Up @@ -581,16 +591,18 @@ pub fn fetch_balance(
threaded_calls: ThreadedCallSender<dyn MempoolApi>,
routes_pow: RoutesPoWInfo,
api_keys: ApiKeys,
semaphore: Arc<Semaphore>,
cache: ReplyCache,
) -> impl Filter<Extract = (impl Reply,), Error = Rejection> + Clone {
let route = "fetch_balance";
warp_path(dp, route)
.and(warp::post())
.and(with_semaphore(semaphore))
.and(auth_request(routes_pow, api_keys))
.and(with_node_component(threaded_calls))
.and(warp::body::json())
.and(with_node_component(cache))
.and_then(move |call_id: String, tc, info, cache| {
.and_then(move |_, call_id: String, tc, info, cache| {
map_api_res_and_cache(
call_id.clone(),
cache,
Expand Down Expand Up @@ -631,16 +643,18 @@ pub fn create_item_asset(
threaded_calls: ThreadedCallSender<dyn MempoolApi>,
routes_pow: RoutesPoWInfo,
api_keys: ApiKeys,
semaphore: Arc<Semaphore>,
cache: ReplyCache,
) -> impl Filter<Extract = (impl Reply,), Error = Rejection> + Clone {
let route = "create_item_asset";
warp_path(dp, route)
.and(warp::post())
.and(with_semaphore(semaphore))
.and(auth_request(routes_pow, api_keys))
.and(with_node_component(threaded_calls))
.and(warp::body::json())
.and(with_node_component(cache))
.and_then(move |call_id: String, tc, info, cache| {
.and_then(move |_, call_id: String, tc, info, cache| {
map_api_res_and_cache(
call_id.clone(),
cache,
Expand Down Expand Up @@ -706,16 +720,18 @@ pub fn create_transactions(
threaded_calls: ThreadedCallSender<dyn MempoolApi>,
routes_pow: RoutesPoWInfo,
api_keys: ApiKeys,
semaphore: Arc<Semaphore>,
cache: ReplyCache,
) -> impl Filter<Extract = (impl Reply,), Error = warp::Rejection> + Clone {
let route = "create_transactions";
warp_path(dp, route)
.and(warp::post())
.and(with_semaphore(semaphore))
.and(auth_request(routes_pow, api_keys))
.and(with_node_component(threaded_calls))
.and(warp::body::json())
.and(with_node_component(cache))
.and_then(move |call_id: String, tc, info, cache| {
.and_then(move |_, call_id: String, tc, info, cache| {
map_api_res_and_cache(
call_id.clone(),
cache,
Expand Down Expand Up @@ -907,6 +923,7 @@ pub fn user_node_routes(
let mut dp_vec = DbgPaths::new();
let dp = &mut dp_vec;
let cache = create_new_cache(CACHE_LIVE_TIME);
let semaphore = Arc::new(Semaphore::new(API_CONCURRENCY_LIMIT));

let routes = wallet_info(
dp,
Expand Down Expand Up @@ -1014,6 +1031,7 @@ pub fn user_node_routes(
None,
routes_pow_info,
api_keys,
semaphore,
cache,
));

Expand All @@ -1030,6 +1048,7 @@ pub fn storage_node_routes(
let mut dp_vec = DbgPaths::new();
let dp = &mut dp_vec;
let cache = create_new_cache(CACHE_LIVE_TIME);
let semaphore = Arc::new(Semaphore::new(API_CONCURRENCY_LIMIT));

let routes = block_by_num(
dp,
Expand Down Expand Up @@ -1078,6 +1097,7 @@ pub fn storage_node_routes(
None,
routes_pow_info,
api_keys,
semaphore,
cache,
));

Expand All @@ -1096,46 +1116,53 @@ pub fn mempool_node_routes(
let mut dp_vec = DbgPaths::new();
let dp = &mut dp_vec;
let cache = create_new_cache(CACHE_LIVE_TIME);
let semaphore = Arc::new(Semaphore::new(API_CONCURRENCY_LIMIT));

let routes = fetch_balance(
dp,
threaded_calls.clone(),
routes_pow_info.clone(),
api_keys.clone(),
semaphore.clone(),
cache.clone(),
)
.or(create_item_asset(
dp,
threaded_calls.clone(),
routes_pow_info.clone(),
api_keys.clone(),
semaphore.clone(),
cache.clone(),
))
.or(create_transactions(
dp,
threaded_calls.clone(),
routes_pow_info.clone(),
api_keys.clone(),
semaphore.clone(),
cache.clone(),
))
.or(total_supply(
dp,
routes_pow_info.clone(),
api_keys.clone(),
semaphore.clone(),
cache.clone(),
))
.or(issued_supply(
dp,
threaded_calls.clone(),
routes_pow_info.clone(),
api_keys.clone(),
semaphore.clone(),
cache.clone(),
))
.or(transaction_status(
dp,
threaded_calls.clone(),
routes_pow_info.clone(),
api_keys.clone(),
semaphore.clone(),
cache.clone(),
))
// .or(utxo_addresses(
Expand Down Expand Up @@ -1185,6 +1212,7 @@ pub fn mempool_node_routes(
None,
routes_pow_info,
api_keys,
semaphore,
cache,
));

Expand All @@ -1202,6 +1230,7 @@ pub fn miner_node_routes(
let mut dp_vec = DbgPaths::new();
let dp = &mut dp_vec;
let cache = create_new_cache(CACHE_LIVE_TIME);
let semaphore = Arc::new(Semaphore::new(API_CONCURRENCY_LIMIT));

let routes = wallet_info(
dp,
Expand Down Expand Up @@ -1258,6 +1287,7 @@ pub fn miner_node_routes(
None,
routes_pow_info,
api_keys,
semaphore,
cache,
));

Expand All @@ -1277,6 +1307,7 @@ pub fn miner_node_with_user_routes(
let mut dp_vec = DbgPaths::new();
let dp = &mut dp_vec;
let cache = create_new_cache(CACHE_LIVE_TIME);
let semaphore = Arc::new(Semaphore::new(API_CONCURRENCY_LIMIT));

let routes = wallet_info(
dp,
Expand Down Expand Up @@ -1379,6 +1410,7 @@ pub fn miner_node_with_user_routes(
Some(user_node),
routes_pow_info,
api_keys,
semaphore,
cache,
));

Expand Down
8 changes: 8 additions & 0 deletions src/api/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1686,12 +1686,14 @@ async fn test_post_fetch_balance() {
//
let ks = to_api_keys(Default::default());
let cache = create_new_cache(CACHE_LIVE_TIME);
let semaphore = Arc::new(tokio::sync::Semaphore::new(1));

let filter = routes::fetch_balance(
&mut dp(),
mempool.threaded_calls.tx.clone(),
Default::default(),
ks,
semaphore,
cache,
)
.recover(handle_rejection);
Expand Down Expand Up @@ -1877,11 +1879,13 @@ async fn test_post_create_transactions_common(address_version: Option<u64>) {
let ks = to_api_keys(Default::default());
let cache = create_new_cache(CACHE_LIVE_TIME);

let semaphore = Arc::new(tokio::sync::Semaphore::new(1));
let filter = routes::create_transactions(
&mut dp(),
mempool.threaded_calls.tx.clone(),
Default::default(),
ks,
semaphore,
cache,
)
.recover(handle_rejection);
Expand Down Expand Up @@ -2131,12 +2135,14 @@ async fn test_post_create_item_asset_tx_mempool() {
//
let ks = to_api_keys(Default::default());
let cache = create_new_cache(CACHE_LIVE_TIME);
let semaphore = Arc::new(tokio::sync::Semaphore::new(1));

let filter = routes::create_item_asset(
&mut dp(),
mempool.threaded_calls.tx.clone(),
Default::default(),
ks,
semaphore,
cache,
)
.recover(handle_rejection);
Expand Down Expand Up @@ -2228,11 +2234,13 @@ async fn test_post_create_item_asset_tx_mempool_failure() {
//
let ks = to_api_keys(Default::default());
let cache = create_new_cache(CACHE_LIVE_TIME);
let semaphore = Arc::new(tokio::sync::Semaphore::new(1));
let filter = routes::create_item_asset(
&mut dp(),
mempool.threaded_calls.tx.clone(),
Default::default(),
ks,
semaphore,
cache,
)
.recover(handle_rejection);
Expand Down
23 changes: 22 additions & 1 deletion src/api/utils.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
use super::{
errors::{ApiError, ApiErrorType},
errors::{ApiError, ApiErrorType, OverloadedError},
handlers::DbgPaths,
responses::{common_error_reply, json_serialize_embed, CallResponse, JsonReply},
};
use crate::utils::{ApiKeys, RoutesPoWInfo, StringError};
use futures::Future;
use moka::future::{Cache, CacheBuilder};
use std::convert::Infallible;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Semaphore;
use tracing::{log::error, warn};
use warp::{
hyper::{HeaderMap, StatusCode},
Expand Down Expand Up @@ -211,3 +213,22 @@ pub async fn get_or_insert_cache_value(

insert_cache_value(&call_id, r.await, &cache).await
}

/// Filter to limit the number of concurrent requests.
///
/// # Arguments
///
/// * `semaphore` - A reference to a semaphore that limits the number of concurrent requests.
pub fn with_semaphore(
semaphore: Arc<Semaphore>,
) -> impl Filter<Extract = ((),), Error = warp::Rejection> + Clone {
warp::any().and_then(move || {
let semaphore = semaphore.clone();
async move {
let _permit = semaphore
.try_acquire()
.map_err(|_| warp::reject::custom(OverloadedError))?;
Ok::<(), warp::Rejection>(())
}
})
}
Loading

0 comments on commit f23defe

Please sign in to comment.