Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move settlement queue to the driver #3129

Draft
wants to merge 18 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 5 additions & 47 deletions crates/autopilot/src/run_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use {
database::order_events::OrderEventLabel,
ethcontract::U256,
ethrpc::block_stream::BlockInfo,
futures::{future::BoxFuture, FutureExt, TryFutureExt},
futures::{FutureExt, TryFutureExt},
itertools::Itertools,
model::solver_competition::{
CompetitionAuction,
Expand All @@ -38,7 +38,7 @@ use {
sync::Arc,
time::{Duration, Instant},
},
tokio::sync::{mpsc, Mutex},
tokio::sync::Mutex,
tracing::Instrument,
};

Expand Down Expand Up @@ -66,9 +66,6 @@ pub struct RunLoop {
/// Maintenance tasks that should run before every runloop to have
/// the most recent data available.
maintenance: Arc<Maintenance>,
/// Queues by solver for executing settle futures one by one guaranteeing
/// FIFO execution order.
settlement_queues: Arc<Mutex<HashMap<String, mpsc::UnboundedSender<BoxFuture<'static, ()>>>>>,
}

impl RunLoop {
Expand All @@ -93,7 +90,6 @@ impl RunLoop {
in_flight_orders: Default::default(),
liveness,
maintenance,
settlement_queues: Arc::new(Mutex::new(HashMap::new())),
}
}

Expand Down Expand Up @@ -239,7 +235,7 @@ impl RunLoop {
}

let competition_simulation_block = self.eth.current_block().borrow().number;
let block_deadline = auction.block + self.config.submission_deadline;
let block_deadline = competition_simulation_block + self.config.submission_deadline;

// Post-processing should not be executed asynchronously since it includes steps
// of storing all the competition/auction-related data to the DB.
Expand Down Expand Up @@ -346,45 +342,7 @@ impl RunLoop {
}
.instrument(tracing::Span::current());

let sender = self.get_settlement_queue_sender(&driver.name).await;
if let Err(err) = sender.send(Box::pin(settle_fut)) {
tracing::warn!(driver = %driver.name, ?err, "failed to send settle future to queue");
}
}

/// Retrieves or creates the settlement queue sender for a given driver.
///
/// This function ensures that there is a settlement execution queue
/// associated with the specified `driver_name`. If a queue already
/// exists, it returns the existing sender. If not, it creates a new
/// queue, starts a background task to process settlement futures,
/// guaranteeing FIFO execution order for settlements per driver, and
/// returns the new sender.
async fn get_settlement_queue_sender(
self: &Arc<Self>,
driver_name: &str,
) -> mpsc::UnboundedSender<BoxFuture<'static, ()>> {
let mut settlement_queues = self.settlement_queues.lock().await;
match settlement_queues.get(driver_name) {
Some(sender) => sender.clone(),
None => {
let (tx, mut rx) = mpsc::unbounded_channel::<BoxFuture<'static, ()>>();
let driver_name = driver_name.to_string();
let self_ = self.clone();

settlement_queues.insert(driver_name.clone(), tx.clone());

tokio::spawn(async move {
while let Some(fut) = rx.recv().await {
fut.await;
}

tracing::info!(driver = %driver_name, "settlement execution queue stopped");
self_.settlement_queues.lock().await.remove(&driver_name);
});
tx
}
}
tokio::spawn(settle_fut);
}

async fn post_processing(
Expand Down Expand Up @@ -801,7 +759,7 @@ impl RunLoop {
let current_block = self.eth.current_block().borrow().number;
anyhow::ensure!(
current_block < submission_deadline_latest_block,
"submission deadline was missed while waiting for the settlement queue"
"submission deadline was missed"
);

let request = settle::Request {
Expand Down
6 changes: 6 additions & 0 deletions crates/driver/src/domain/competition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -466,8 +466,14 @@ pub enum Error {
SolutionNotAvailable,
#[error("{0:?}")]
DeadlineExceeded(#[from] time::DeadlineExceeded),
#[error("deadline exceeded while waiting for a queue")]
QueueAwaitingDeadlineExceeded,
#[error("solver error: {0:?}")]
Solver(#[from] solver::Error),
#[error("failed to submit the solution")]
SubmissionError,
#[error("unable to enqueue the request")]
UnableToEnqueue,
#[error("unable to dequeue the result")]
UnableToDequeue,
squadgazzz marked this conversation as resolved.
Show resolved Hide resolved
squadgazzz marked this conversation as resolved.
Show resolved Hide resolved
}
13 changes: 13 additions & 0 deletions crates/driver/src/infra/api/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,16 @@ enum Kind {
SolverFailed,
SolutionNotAvailable,
DeadlineExceeded,
QueueAwaitingDeadlineExceeded,
Unknown,
InvalidAuctionId,
MissingSurplusFee,
InvalidTokens,
InvalidAmounts,
QuoteSameTokens,
FailedToSubmit,
UnableToEnqueueRequest,
UnableToDequeueResult,
}

#[derive(Debug, Serialize)]
Expand All @@ -39,6 +42,9 @@ impl From<Kind> for (hyper::StatusCode, axum::Json<Error>) {
/solve returned"
}
Kind::DeadlineExceeded => "Exceeded solution deadline",
Kind::QueueAwaitingDeadlineExceeded => {
"Exceeded solution deadline while waiting in the queue"
}
Kind::Unknown => "An unknown error occurred",
Kind::InvalidAuctionId => "Invalid ID specified in the auction",
Kind::MissingSurplusFee => "Auction contains a limit order with no surplus fee",
Expand All @@ -51,6 +57,8 @@ impl From<Kind> for (hyper::StatusCode, axum::Json<Error>) {
or sell amount"
}
Kind::FailedToSubmit => "Could not submit the solution to the blockchain",
Kind::UnableToEnqueueRequest => "Could not enqueue the request",
Kind::UnableToDequeueResult => "Could not dequeue the result",
};
(
hyper::StatusCode::BAD_REQUEST,
Expand Down Expand Up @@ -81,8 +89,13 @@ impl From<competition::Error> for (hyper::StatusCode, axum::Json<Error>) {
let error = match value {
competition::Error::SolutionNotAvailable => Kind::SolutionNotAvailable,
competition::Error::DeadlineExceeded(_) => Kind::DeadlineExceeded,
competition::Error::QueueAwaitingDeadlineExceeded => {
Kind::QueueAwaitingDeadlineExceeded
}
competition::Error::Solver(_) => Kind::SolverFailed,
competition::Error::SubmissionError => Kind::FailedToSubmit,
competition::Error::UnableToEnqueue => Kind::UnableToEnqueueRequest,
competition::Error::UnableToDequeue => Kind::UnableToDequeueResult,
};
error.into()
}
Expand Down
9 changes: 8 additions & 1 deletion crates/driver/src/infra/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use {
error::Error,
futures::Future,
std::{net::SocketAddr, sync::Arc},
tokio::sync::oneshot,
tokio::sync::{mpsc, oneshot},
};

mod error;
Expand All @@ -39,6 +39,7 @@ impl Api {
self,
shutdown: impl Future<Output = ()> + Send + 'static,
order_priority_strategies: Vec<OrderPriorityStrategy>,
settle_queue_size: usize,
squadgazzz marked this conversation as resolved.
Show resolved Hide resolved
) -> Result<(), hyper::Error> {
// Add middleware.
let mut app = axum::Router::new().layer(
Expand Down Expand Up @@ -83,6 +84,7 @@ impl Api {
liquidity: self.liquidity.clone(),
tokens: tokens.clone(),
pre_processor: pre_processor.clone(),
settle_queue_sender: routes::create_settle_queue_sender(settle_queue_size),
})));
let path = format!("/{name}");
infra::observe::mounting_solver(&name, &path);
Expand Down Expand Up @@ -135,6 +137,10 @@ impl State {
fn timeouts(&self) -> Timeouts {
self.0.solver.timeouts()
}

fn settle_queue_sender(&self) -> &mpsc::Sender<routes::QueuedSettleRequest> {
&self.0.settle_queue_sender
}
}

struct Inner {
Expand All @@ -144,4 +150,5 @@ struct Inner {
liquidity: liquidity::Fetcher,
tokens: tokens::Fetcher,
pre_processor: domain::competition::AuctionProcessor,
settle_queue_sender: mpsc::Sender<routes::QueuedSettleRequest>,
}
2 changes: 1 addition & 1 deletion crates/driver/src/infra/api/routes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,6 @@ pub(super) use {
metrics::metrics,
quote::{quote, OrderError},
reveal::reveal,
settle::settle,
settle::{create_settle_queue_sender, settle, QueuedSettleRequest},
solve::{solve, AuctionError},
};
104 changes: 78 additions & 26 deletions crates/driver/src/infra/api/routes/settle/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,41 +8,93 @@ use {
observe,
},
},
tokio::sync::{mpsc, oneshot},
tracing::Instrument,
};

pub(in crate::infra::api) fn settle(router: axum::Router<State>) -> axum::Router<State> {
router.route("/settle", axum::routing::post(route))
}

pub(in crate::infra::api) struct QueuedSettleRequest {
state: State,
req: dto::Solution,
response_sender: oneshot::Sender<Result<(), competition::Error>>,
}

pub(in crate::infra::api) fn create_settle_queue_sender(
squadgazzz marked this conversation as resolved.
Show resolved Hide resolved
queue_size: usize,
) -> mpsc::Sender<QueuedSettleRequest> {
let (sender, mut receiver) = mpsc::channel::<QueuedSettleRequest>(queue_size);

// Spawn the background task to process the queue
tokio::spawn(async move {
while let Some(queued_request) = receiver.recv().await {
let QueuedSettleRequest {
state,
req,
response_sender,
} = queued_request;

let auction_id = req.auction_id;
let solver = state.solver().name().to_string();
if state.eth().current_block().borrow().number >= req.submission_deadline_latest_block {
squadgazzz marked this conversation as resolved.
Show resolved Hide resolved
if let Err(err) =
response_sender.send(Err(competition::Error::QueueAwaitingDeadlineExceeded))
{
tracing::error!(
?err,
"settle deadline exceeded. unable to return a response"
);
}
return;
squadgazzz marked this conversation as resolved.
Show resolved Hide resolved
}

let result = async move {
observe::settling();
let result = state
.competition()
.settle(
req.auction_id,
req.solution_id,
req.submission_deadline_latest_block,
)
.await;
observe::settled(state.solver().name(), &result);
result.map(|_| ()).map_err(Into::into)
squadgazzz marked this conversation as resolved.
Show resolved Hide resolved
}
.instrument(tracing::info_span!("/settle", solver, auction_id))
.await;

if let Err(err) = response_sender.send(result) {
tracing::error!(?err, "Failed to send /settle response");
}
}
});

sender
}

async fn route(
state: axum::extract::State<State>,
req: axum::Json<dto::Solution>,
) -> Result<(), (hyper::StatusCode, axum::Json<Error>)> {
let auction_id = req.auction_id;
let solver = state.solver().name().to_string();

let handle_request = async move {
observe::settling();
let result = state
.competition()
.settle(
squadgazzz marked this conversation as resolved.
Show resolved Hide resolved
req.auction_id,
req.solution_id,
req.submission_deadline_latest_block,
)
.await;
observe::settled(state.solver().name(), &result);
result.map(|_| ()).map_err(Into::into)
}
.instrument(tracing::info_span!("/settle", solver, auction_id));

// Handle `/settle` call in a background task to ensure that we correctly
// submit the settlement (or cancellation) on-chain even if the server
// aborts the endpoint handler code.
// This can happen due do connection issues or when the autopilot aborts
// the `/settle` call when we reach the submission deadline.
Ok(tokio::task::spawn(handle_request)
.await
.unwrap_or_else(|_| Err(competition::Error::SubmissionError))?)
let sender = state.settle_queue_sender();
let (response_tx, response_rx) = oneshot::channel();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be created within settle_queue_sender? as we have now, we create sender and then response_tx just to be passed to sender in send() function. The queue sender should create its corresponding resources.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume, this is addressed now.


let queued_request = QueuedSettleRequest {
state: state.0.clone(),
req: req.0,
response_sender: response_tx,
};

sender.send(queued_request).await.map_err(|err| {
tracing::error!(?err, "Failed to enqueue /settle request");
competition::Error::UnableToEnqueue
})?;

Ok(response_rx.await.map_err(|err| {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't we have a timeout here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, since the mempool internally checks for the submission deadline and the task will be dropped once it is reached:

if block.number >= submission_deadline {
tracing::info!(
?hash,
deadline = submission_deadline,
current_block = block.number,
"tx not confirmed in time, cancelling",
);
self.cancel(mempool, settlement.gas.price, solver).await?;
return Err(Error::Expired);
}

tracing::error!(?err, "Failed to dequeue /settle response");
competition::Error::UnableToDequeue
})??)
}
1 change: 1 addition & 0 deletions crates/driver/src/infra/config/file/load.rs
Original file line number Diff line number Diff line change
Expand Up @@ -340,5 +340,6 @@ pub async fn load(chain: chain::Id, path: &Path) -> infra::Config {
gas_estimator: config.gas_estimator,
order_priority_strategies: config.order_priority_strategies,
archive_node_url: config.archive_node_url,
settle_queue_size: config.settle_queue_size,
}
}
9 changes: 9 additions & 0 deletions crates/driver/src/infra/config/file/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,19 @@ struct Config {
)]
order_priority_strategies: Vec<OrderPriorityStrategy>,

/// The maximum number of `/settle` requests that can be queued up
/// before the driver starts dropping new requests.
squadgazzz marked this conversation as resolved.
Show resolved Hide resolved
#[serde(default = "default_settle_queue_size")]
squadgazzz marked this conversation as resolved.
Show resolved Hide resolved
settle_queue_size: usize,

/// Archive node URL used to index CoW AMM
archive_node_url: Option<Url>,
}

fn default_settle_queue_size() -> usize {
3
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if it makes sense to keep more than 3 pending settlements, at least for mainnet.

}

#[serde_as]
#[derive(Debug, Default, Deserialize)]
#[serde(rename_all = "kebab-case", deny_unknown_fields)]
Expand Down
1 change: 1 addition & 0 deletions crates/driver/src/infra/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,5 @@ pub struct Config {
pub contracts: blockchain::contracts::Addresses,
pub order_priority_strategies: Vec<OrderPriorityStrategy>,
pub archive_node_url: Option<Url>,
pub settle_queue_size: usize,
}
3 changes: 3 additions & 0 deletions crates/driver/src/infra/observe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -364,10 +364,13 @@ fn competition_error(err: &competition::Error) -> &'static str {
match err {
competition::Error::SolutionNotAvailable => "SolutionNotAvailable",
competition::Error::DeadlineExceeded(_) => "DeadlineExceeded",
competition::Error::QueueAwaitingDeadlineExceeded => "QueueAwaitingDeadlineExceeded",
competition::Error::Solver(solver::Error::Http(_)) => "SolverHttpError",
competition::Error::Solver(solver::Error::Deserialize(_)) => "SolverDeserializeError",
competition::Error::Solver(solver::Error::Dto(_)) => "SolverDtoError",
competition::Error::SubmissionError => "SubmissionError",
competition::Error::UnableToEnqueue => "UnableToEnqueue",
competition::Error::UnableToDequeue => "UnableToDequeue",
}
}

Expand Down
1 change: 1 addition & 0 deletions crates/driver/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ async fn run_with(args: cli::Args, addr_sender: Option<oneshot::Sender<SocketAdd
let _ = shutdown_receiver.await;
},
config.order_priority_strategies,
config.settle_queue_size,
);

futures::pin_mut!(serve);
Expand Down
Loading