Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move settlement queue to the driver #3129

Draft
wants to merge 18 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 5 additions & 47 deletions crates/autopilot/src/run_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use {
database::order_events::OrderEventLabel,
ethcontract::U256,
ethrpc::block_stream::BlockInfo,
futures::{future::BoxFuture, FutureExt, TryFutureExt},
futures::{FutureExt, TryFutureExt},
itertools::Itertools,
model::solver_competition::{
CompetitionAuction,
Expand All @@ -38,7 +38,7 @@ use {
sync::Arc,
time::{Duration, Instant},
},
tokio::sync::{mpsc, Mutex},
tokio::sync::Mutex,
tracing::Instrument,
};

Expand Down Expand Up @@ -66,9 +66,6 @@ pub struct RunLoop {
/// Maintenance tasks that should run before every runloop to have
/// the most recent data available.
maintenance: Arc<Maintenance>,
/// Queues by solver for executing settle futures one by one guaranteeing
/// FIFO execution order.
settlement_queues: Arc<Mutex<HashMap<String, mpsc::UnboundedSender<BoxFuture<'static, ()>>>>>,
}

impl RunLoop {
Expand All @@ -93,7 +90,6 @@ impl RunLoop {
in_flight_orders: Default::default(),
liveness,
maintenance,
settlement_queues: Arc::new(Mutex::new(HashMap::new())),
}
}

Expand Down Expand Up @@ -239,7 +235,7 @@ impl RunLoop {
}

let competition_simulation_block = self.eth.current_block().borrow().number;
let block_deadline = auction.block + self.config.submission_deadline;
let block_deadline = competition_simulation_block + self.config.submission_deadline;

// Post-processing should not be executed asynchronously since it includes steps
// of storing all the competition/auction-related data to the DB.
Expand Down Expand Up @@ -346,45 +342,7 @@ impl RunLoop {
}
.instrument(tracing::Span::current());

let sender = self.get_settlement_queue_sender(&driver.name).await;
if let Err(err) = sender.send(Box::pin(settle_fut)) {
tracing::warn!(driver = %driver.name, ?err, "failed to send settle future to queue");
}
}

/// Retrieves or creates the settlement queue sender for a given driver.
///
/// This function ensures that there is a settlement execution queue
/// associated with the specified `driver_name`. If a queue already
/// exists, it returns the existing sender. If not, it creates a new
/// queue, starts a background task to process settlement futures,
/// guaranteeing FIFO execution order for settlements per driver, and
/// returns the new sender.
async fn get_settlement_queue_sender(
self: &Arc<Self>,
driver_name: &str,
) -> mpsc::UnboundedSender<BoxFuture<'static, ()>> {
let mut settlement_queues = self.settlement_queues.lock().await;
match settlement_queues.get(driver_name) {
Some(sender) => sender.clone(),
None => {
let (tx, mut rx) = mpsc::unbounded_channel::<BoxFuture<'static, ()>>();
let driver_name = driver_name.to_string();
let self_ = self.clone();

settlement_queues.insert(driver_name.clone(), tx.clone());

tokio::spawn(async move {
while let Some(fut) = rx.recv().await {
fut.await;
}

tracing::info!(driver = %driver_name, "settlement execution queue stopped");
self_.settlement_queues.lock().await.remove(&driver_name);
});
tx
}
}
tokio::spawn(settle_fut);
}

async fn post_processing(
Expand Down Expand Up @@ -801,7 +759,7 @@ impl RunLoop {
let current_block = self.eth.current_block().borrow().number;
anyhow::ensure!(
current_block < submission_deadline_latest_block,
"submission deadline was missed while waiting for the settlement queue"
"submission deadline was missed"
);

let request = settle::Request {
Expand Down
112 changes: 110 additions & 2 deletions crates/driver/src/domain/competition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use {
Mempools,
},
crate::{
domain::{competition::solution::Settlement, eth},
domain::{competition::solution::Settlement, eth, time::DeadlineExceeded},
infra::{
self,
blockchain::Ethereum,
Expand All @@ -22,9 +22,11 @@ use {
std::{
cmp::Reverse,
collections::{HashMap, HashSet, VecDeque},
sync::Mutex,
sync::{Arc, Mutex},
},
tap::TapFallible,
tokio::sync::{mpsc, oneshot},
tracing::Instrument,
};

pub mod auction;
Expand Down Expand Up @@ -52,11 +54,44 @@ pub struct Competition {
pub mempools: Mempools,
/// Cached solutions with the most recent solutions at the front.
pub settlements: Mutex<VecDeque<Settlement>>,
settle_queue: mpsc::Sender<SettleRequest>,
}

impl Competition {
pub fn new(
solver: Solver,
eth: Ethereum,
liquidity: infra::liquidity::Fetcher,
simulator: Simulator,
mempools: Mempools,
) -> Arc<Self> {
let (settle_tx, settle_rx) = mpsc::channel(solver.settle_queue_size());

let competition = Arc::new(Self {
solver,
eth,
liquidity,
simulator,
mempools,
settlements: Default::default(),
settle_queue: settle_tx,
});

let competition_clone = Arc::clone(&competition);
tokio::spawn(async move {
competition_clone.process_settle_requests(settle_rx).await;
});

competition
}

/// Solve an auction as part of this competition.
pub async fn solve(&self, auction: &Auction) -> Result<Option<Solved>, Error> {
if self.settle_queue.capacity() == 0 {
tracing::warn!("settlement queue is full; auction is rejected");
return Err(Error::SettlementQueueIsFull);
}

let liquidity = match self.solver.liquidity() {
solver::Liquidity::Fetch => {
self.liquidity
Expand Down Expand Up @@ -302,6 +337,70 @@ impl Competition {
auction_id: Option<i64>,
solution_id: u64,
submission_deadline: u64,
) -> Result<Settled, Error> {
let (response_tx, response_rx) = oneshot::channel();

let request = SettleRequest {
auction_id,
solution_id,
submission_deadline,
response_sender: response_tx,
};

self.settle_queue.try_send(request).map_err(|err| {
tracing::error!(?err, "Failed to enqueue /settle request");
Error::SubmissionError
})?;

response_rx.await.map_err(|err| {
tracing::error!(?err, "Failed to dequeue /settle response");
Error::SubmissionError
})?
}

async fn process_settle_requests(
self: Arc<Self>,
mut settle_rx: mpsc::Receiver<SettleRequest>,
) {
while let Some(request) = settle_rx.recv().await {
let SettleRequest {
auction_id,
solution_id,
submission_deadline,
response_sender,
} = request;
let solver = self.solver.name().as_str();
async {
if self.eth.current_block().borrow().number >= submission_deadline {
if let Err(err) = response_sender.send(Err(DeadlineExceeded.into())) {
tracing::warn!(
?err,
"settle deadline exceeded. unable to return a response"
);
}
return;
}

observe::settling();
let result = self
.process_settle_request(auction_id, solution_id, submission_deadline)
.await;
observe::settled(self.solver.name(), &result);

if let Err(err) = response_sender.send(result) {
tracing::error!(?err, "Failed to send /settle response");
}
}
.instrument(tracing::info_span!("/settle", solver, auction_id))
.await
}
}

async fn process_settle_request(
&self,
auction_id: Option<i64>,
solution_id: u64,
submission_deadline: u64,
) -> Result<Settled, Error> {
let settlement = {
let mut lock = self.settlements.lock().unwrap();
Expand Down Expand Up @@ -413,6 +512,13 @@ fn merge(solutions: impl Iterator<Item = Solution>, auction: &Auction) -> Vec<So
merged
}

struct SettleRequest {
auction_id: Option<i64>,
squadgazzz marked this conversation as resolved.
Show resolved Hide resolved
solution_id: u64,
submission_deadline: u64,
response_sender: oneshot::Sender<Result<Settled, Error>>,
}

/// Solution information sent to the protocol by the driver before the solution
/// ranking happens.
#[derive(Debug)]
Expand Down Expand Up @@ -480,4 +586,6 @@ pub enum Error {
Solver(#[from] solver::Error),
#[error("failed to submit the solution")]
SubmissionError,
#[error("too many pending settlements for the same solver")]
SettlementQueueIsFull,
}
1 change: 1 addition & 0 deletions crates/driver/src/infra/api/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ impl From<competition::Error> for (hyper::StatusCode, axum::Json<Error>) {
competition::Error::DeadlineExceeded(_) => Kind::DeadlineExceeded,
competition::Error::Solver(_) => Kind::SolverFailed,
competition::Error::SubmissionError => Kind::FailedToSubmit,
competition::Error::SettlementQueueIsFull => Kind::SolverFailed,
};
error.into()
}
Expand Down
15 changes: 7 additions & 8 deletions crates/driver/src/infra/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,14 +72,13 @@ impl Api {
let router = router.with_state(State(Arc::new(Inner {
eth: self.eth.clone(),
solver: solver.clone(),
competition: domain::Competition {
competition: domain::Competition::new(
solver,
eth: self.eth.clone(),
liquidity: self.liquidity.clone(),
simulator: self.simulator.clone(),
mempools: self.mempools.clone(),
settlements: Default::default(),
},
self.eth.clone(),
self.liquidity.clone(),
self.simulator.clone(),
self.mempools.clone(),
),
liquidity: self.liquidity.clone(),
tokens: tokens.clone(),
pre_processor: pre_processor.clone(),
Expand Down Expand Up @@ -140,7 +139,7 @@ impl State {
struct Inner {
eth: Ethereum,
solver: Solver,
competition: domain::Competition,
competition: Arc<domain::Competition>,
liquidity: liquidity::Fetcher,
tokens: tokens::Fetcher,
pre_processor: domain::competition::AuctionProcessor,
Expand Down
1 change: 1 addition & 0 deletions crates/driver/src/infra/config/file/load.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ pub async fn load(chain: chain::Id, path: &Path) -> infra::Config {
solver_native_token: config.manage_native_token.to_domain(),
quote_tx_origin: config.quote_tx_origin.map(eth::Address),
response_size_limit_max_bytes: config.response_size_limit_max_bytes,
settle_queue_size: config.settle_queue_size,
}
}))
.await,
Expand Down
9 changes: 9 additions & 0 deletions crates/driver/src/infra/config/file/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,11 @@ struct SolverConfig {
/// Maximum HTTP response size the driver will accept in bytes.
#[serde(default = "default_response_size_limit_max_bytes")]
response_size_limit_max_bytes: usize,

/// The maximum number of `/settle` requests that can be queued up
/// before the driver starts dropping new `/solve` requests.
#[serde(default = "default_settle_queue_size")]
settle_queue_size: usize,
}

#[derive(Clone, Copy, Debug, Default, Deserialize, PartialEq, Serialize)]
Expand Down Expand Up @@ -654,3 +659,7 @@ fn default_order_priority_strategies() -> Vec<OrderPriorityStrategy> {
fn default_max_order_age() -> Option<Duration> {
Some(Duration::from_secs(300))
}

fn default_settle_queue_size() -> usize {
3
}
1 change: 1 addition & 0 deletions crates/driver/src/infra/observe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,7 @@ fn competition_error(err: &competition::Error) -> &'static str {
competition::Error::Solver(solver::Error::Deserialize(_)) => "SolverDeserializeError",
competition::Error::Solver(solver::Error::Dto(_)) => "SolverDtoError",
competition::Error::SubmissionError => "SubmissionError",
competition::Error::SettlementQueueIsFull => "SettlementQueueIsFull",
}
}

Expand Down
6 changes: 6 additions & 0 deletions crates/driver/src/infra/solver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ pub struct Config {
/// Which `tx.origin` is required to make quote verification pass.
pub quote_tx_origin: Option<eth::Address>,
pub response_size_limit_max_bytes: usize,
/// Max size of the pending settlements queue.
pub settle_queue_size: usize,
}

impl Solver {
Expand Down Expand Up @@ -201,6 +203,10 @@ impl Solver {
&self.config.quote_tx_origin
}

pub fn settle_queue_size(&self) -> usize {
self.config.settle_queue_size
}

/// Make a POST request instructing the solver to solve an auction.
/// Allocates at most `timeout` time for the solving.
pub async fn solve(
Expand Down
Loading