Skip to content

Commit

Permalink
feat!: introduce usd liability allocation (#487)
Browse files Browse the repository at this point in the history
* wip

* refactor: better templates for allocation

* fix: run tests serially

* chore :add spawn_global_liability_listener

* refactor: move allocation logic to hedging app

* test: add exchange_allocation test

* chore: bump flake

* feat: add bitfinex allocation in ledger

* test: conditionally execute galoy-client test

* fix: clippy

* test: remove warnings in test

* refactor: adjust_exchange_allocation template

* fix: correct usd -> cents conversion when allocating

* chore: okex allocation initialisation on start

* refactor: push ledger init hack into fn

* test: add file_serial to ledger related tests

* refactor: remove redundant if in hack init

* refactor: extract adjust_exchange_allocation in hedging app

* chore: delete decrease_exchange_allocation template

* fix: missing await

---------

Co-authored-by: bodymindarts <justin@galoy.io>
  • Loading branch information
thevaibhav-dixit and bodymindarts authored Jan 4, 2024
1 parent 7af398f commit b5b9dff
Show file tree
Hide file tree
Showing 30 changed files with 593 additions and 134 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion bitfinex-price/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use shared::{payload::*, pubsub::*};
use tokio::join;

pub use config::*;
pub use error::*;
pub use price_feed::*;

pub async fn run(
Expand Down
1 change: 1 addition & 0 deletions cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ hedging = { path = "../hedging" }
okex-price = { path = "../okex-price" }
bitfinex-price = { path = "../bitfinex-price" }
bria-client = { path = "../bria-client" }
ledger = { path = "../ledger", package = "stablesats-ledger" }

anyhow = { workspace = true }
clap = { workspace = true }
Expand Down
62 changes: 45 additions & 17 deletions cli/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,8 @@ async fn run_cmd(
}));
}

let mut pool = None;
let pool = crate::db::init_pool(&db).await?;
let ledger = hack_init_ledger(&pool).await?;

if hedging.enabled {
println!("Starting hedging process");
Expand All @@ -251,13 +252,22 @@ async fn run_cmd(

if let Some(okex_cfg) = exchanges.okex.as_ref() {
let okex_config = okex_cfg.config.clone();
pool = Some(crate::db::init_pool(&db).await?);
let pool = pool.as_ref().unwrap().clone();
let pool = pool.clone();
let ledger = ledger.clone();
handles.push(tokio::spawn(async move {
let _ = hedging_send.try_send(
hedging::run(pool, recv, hedging.config, okex_config, galoy, bria, price)
.await
.context("Hedging error"),
hedging::run(
pool,
recv,
hedging.config,
okex_config,
galoy,
bria,
price,
ledger,
)
.await
.context("Hedging error"),
);
}));
}
Expand All @@ -267,15 +277,12 @@ async fn run_cmd(
println!("Starting quotes_server");

let quotes_send = send.clone();
let pool = if let Some(pool) = pool.as_ref() {
pool.clone()
} else {
crate::db::init_pool(&db).await?
};
let (snd, recv) = futures::channel::mpsc::unbounded();
checkers.insert("quotes", snd);
let price = price_recv.resubscribe();
let weights = extract_weights_for_quotes_server(&exchanges);
let ledger = ledger.clone();
let pool = pool.clone();
handles.push(tokio::spawn(async move {
let _ = quotes_send.try_send(
quotes_server::run(
Expand All @@ -288,6 +295,7 @@ async fn run_cmd(
quotes_server.price_cache,
weights,
quotes_server.config,
ledger,
)
.await
.context("Quote Server error"),
Expand All @@ -299,14 +307,9 @@ async fn run_cmd(
println!("Starting user trades process");

let user_trades_send = send.clone();
let pool = if let Some(pool) = pool {
pool
} else {
crate::db::init_pool(&db).await?
};
handles.push(tokio::spawn(async move {
let _ = user_trades_send.try_send(
user_trades::run(pool, user_trades.config, galoy)
user_trades::run(pool, user_trades.config, galoy, ledger)
.await
.context("User Trades error"),
);
Expand Down Expand Up @@ -360,3 +363,28 @@ fn extract_weights_for_quotes_server(
bitfinex: None,
}
}

/// Needs to execute one time on upgrade to allocation based accounting
async fn hack_init_ledger(pool: &sqlx::PgPool) -> anyhow::Result<ledger::Ledger> {
let ledger = ledger::Ledger::init(&pool).await?;
let liability_balances = ledger.balances().usd_liability_balances().await?;

if liability_balances.unallocated_usd > Decimal::ZERO
&& liability_balances.okex_allocation == Decimal::ZERO
{
let tx = pool.begin().await?;
let unallocated_usd = liability_balances.unallocated_usd;
let adjustment_params = ledger::AdjustExchangeAllocationParams {
okex_allocation_adjustment_usd_cents_amount: unallocated_usd
* ledger::constants::CENTS_PER_USD,
bitfinex_allocation_adjustment_usd_cents_amount: Decimal::ZERO,
meta: ledger::AdjustExchangeAllocationMeta {
timestamp: chrono::Utc::now(),
},
};
ledger
.adjust_exchange_allocation(tx, adjustment_params)
.await?;
}
Ok(ledger)
}
15 changes: 1 addition & 14 deletions cli/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ impl Default for PriceServerWrapper {
}
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Default, Debug, Clone, Serialize, Deserialize)]
pub struct QuotesServerWrapper {
#[serde(default)]
pub enabled: bool,
Expand All @@ -123,19 +123,6 @@ pub struct QuotesServerWrapper {
pub config: QuotesConfig,
}

impl Default for QuotesServerWrapper {
fn default() -> Self {
Self {
enabled: false,
server: QuotesServerConfig::default(),
health: QuotesServerHealthCheckConfig::default(),
fees: QuotesFeeCalculatorConfig::default(),
price_cache: QuotesExchangePriceCacheConfig::default(),
config: QuotesConfig::default(),
}
}
}

#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct BitfinexPriceFeedConfigWrapper {
#[serde(default)]
Expand Down
18 changes: 9 additions & 9 deletions flake.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 12 additions & 13 deletions galoy-client/tests/galoy_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,27 @@ use std::env;

use galoy_client::*;

fn client_configuration() -> GaloyClientConfig {
let api = env::var("GALOY_GRAPHQL_URI").expect("GALOY_GRAPHQL_URI not set");
let phone_number = env::var("GALOY_PHONE_NUMBER").expect("GALOY_PHONE_NUMBER not set");
let code = env::var("GALOY_PHONE_CODE").expect("GALOY_PHONE_CODE not set");
async fn configured_client() -> anyhow::Result<GaloyClient> {
let api = env::var("GALOY_GRAPHQL_URI")?;
let phone_number = env::var("GALOY_PHONE_NUMBER")?;
let code = env::var("GALOY_PHONE_CODE")?;

let config = GaloyClientConfig {
let client = GaloyClient::connect(GaloyClientConfig {
api,
phone_number,
auth_code: code,
};
})
.await?;

config
Ok(client)
}

/// Test to get transactions list of the default wallet
#[tokio::test]
async fn transactions_list() -> anyhow::Result<()> {
let config = client_configuration();
let client = GaloyClient::connect(config).await?;

let transactions = client.transactions_list(None).await?;
assert!(transactions.list.len() > 0);

if let Ok(client) = configured_client().await {
let transactions = client.transactions_list(None).await?;
assert!(transactions.list.len() > 0);
}
Ok(())
}
78 changes: 75 additions & 3 deletions hedging/src/app/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use bria_client::{BriaClient, BriaClientConfig};
use futures::stream::StreamExt;
use rust_decimal::Decimal;
use sqlxmq::JobRunnerHandle;
use tracing::instrument;

Expand All @@ -25,13 +26,12 @@ impl HedgingApp {
galoy_client_cfg: GaloyClientConfig,
bria_client_cfg: BriaClientConfig,
price_receiver: memory::Subscriber<PriceStreamPayload>,
ledger: ledger::Ledger,
) -> Result<Self, HedgingError> {
let (mut jobs, mut channels) = (Vec::new(), Vec::new());
OkexEngine::register_jobs(&mut jobs, &mut channels);

let mut job_registry = sqlxmq::JobRegistry::new(&jobs);

let ledger = ledger::Ledger::init(&pool).await?;
job_registry.set_context(ledger.clone());
job_registry.set_context(
shared::tracing::record_error(tracing::Level::ERROR, || async move {
Expand All @@ -49,7 +49,7 @@ impl HedgingApp {
let okex_engine = OkexEngine::run(
pool.clone(),
okex_config,
ledger,
ledger.clone(),
price_receiver.resubscribe(),
)
.await?;
Expand All @@ -62,6 +62,7 @@ impl HedgingApp {
.run()
.await?;

let _ = Self::spawn_global_liability_listener(pool.clone(), ledger).await;
Self::spawn_health_checker(health_check_trigger, health_cfg, price_receiver).await;
let app = HedgingApp {
_job_runner_handle: job_runner_handle,
Expand All @@ -88,4 +89,75 @@ impl HedgingApp {
}
}
}

async fn spawn_global_liability_listener(
pool: sqlx::PgPool,
ledger: ledger::Ledger,
) -> Result<(), HedgingError> {
let mut events = ledger.usd_omnibus_balance_events().await?;
tokio::spawn(async move {
loop {
match events.recv().await {
Ok(received) => {
if let ledger::LedgerEventData::BalanceUpdated(_data) = received.data {
let _ = adjust_exchange_allocation(&pool, &ledger).await;
}
}
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => (),
_ => {
break;
}
}
}
});
Ok(())
}
}

#[instrument(
name = "hedging.adjust_exchange_allocation",
skip_all,
fields(execute_adjustment, unallocated, okex, bitfinex, omnibus),
err
)]
async fn adjust_exchange_allocation(
pool: &sqlx::PgPool,
ledger: &ledger::Ledger,
) -> Result<(), ledger::LedgerError> {
let liability_balances = ledger.balances().usd_liability_balances().await?;
let span = tracing::Span::current();
span.record(
"unallocated_usd",
&tracing::field::display(liability_balances.unallocated_usd),
);
span.record(
"okex",
&tracing::field::display(liability_balances.okex_allocation),
);
span.record(
"bitfinex",
&tracing::field::display(liability_balances.bitfinex_allocation),
);
span.record(
"omnibus",
&tracing::field::display(liability_balances.total_liability),
);
span.record("execute_adjustment", false);
let tx = pool.begin().await?;
let unallocated_usd = liability_balances.unallocated_usd;
if unallocated_usd != Decimal::ZERO {
span.record("execute_adjustment", true);
let adjustment_params = ledger::AdjustExchangeAllocationParams {
okex_allocation_adjustment_usd_cents_amount: unallocated_usd
* ledger::constants::CENTS_PER_USD,
bitfinex_allocation_adjustment_usd_cents_amount: Decimal::ZERO,
meta: ledger::AdjustExchangeAllocationMeta {
timestamp: chrono::Utc::now(),
},
};
ledger
.adjust_exchange_allocation(tx, adjustment_params)
.await?;
}
Ok(())
}
2 changes: 2 additions & 0 deletions hedging/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub async fn run(
galoy_config: GaloyClientConfig,
bria_config: BriaClientConfig,
tick_receiver: memory::Subscriber<PriceStreamPayload>,
ledger: ledger::Ledger,
) -> Result<(), HedgingError> {
HedgingApp::run(
pool,
Expand All @@ -34,6 +35,7 @@ pub async fn run(
galoy_config,
bria_config,
tick_receiver,
ledger,
)
.await?;
Ok(())
Expand Down
Loading

0 comments on commit b5b9dff

Please sign in to comment.