Skip to content

Commit

Permalink
Add transaction retry service (#1029)
Browse files Browse the repository at this point in the history
* Add transaction retry service

* fix signature

* fix comments

* fix tracing error
  • Loading branch information
tcoratger authored Apr 29, 2024
1 parent e471f48 commit 21774a4
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 23 deletions.
1 change: 1 addition & 0 deletions src/eth_provider/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ pub mod constant;
pub mod contracts;
pub mod database;
pub mod error;
pub mod pending_pool;
pub mod provider;
pub mod starknet;
pub mod utils;
17 changes: 17 additions & 0 deletions src/eth_provider/pending_pool.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
use crate::eth_provider::provider::EthDataProvider;
use tokio::time::{sleep, Duration};

pub async fn start_retry_service<SP>(eth_provider: EthDataProvider<SP>)
where
SP: starknet::providers::Provider + Send + Sync,
{
// Start an infinite loop.
loop {
// Call the retry_transactions method
if let Err(err) = eth_provider.retry_transactions().await {
tracing::error!("Error while retrying transactions: {:?}", err);
}
// 30-second pause
sleep(Duration::from_secs(30)).await;
}
}
34 changes: 16 additions & 18 deletions src/eth_provider/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use super::utils::{contract_not_found, entrypoint_not_found, into_filter, split_
use crate::eth_provider::utils::format_hex;
use crate::models::block::{EthBlockId, EthBlockNumberOrTag};
use crate::models::felt::Felt252Wrapper;
use crate::models::transaction::rpc_to_primitive_transaction;
use crate::models::transaction::rpc_to_ec_recovered_transaction;
use crate::{into_via_try_wrapper, into_via_wrapper};

pub type EthProviderResult<T> = Result<T, EthApiError>;
Expand Down Expand Up @@ -129,7 +129,7 @@ pub trait EthereumProvider {
/// Structure that implements the EthereumProvider trait.
/// Uses an access to a database to certain data, while
/// the rest is fetched from the Starknet Provider.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct EthDataProvider<SP: starknet::providers::Provider> {
database: Database,
starknet_provider: SP,
Expand Down Expand Up @@ -960,24 +960,22 @@ where
continue;
}

// Extract the signature from the transaction
let transaction_signature = tx.tx.signature.unwrap();
// Generate primitive transaction, handle error if any
let transaction = match rpc_to_ec_recovered_transaction(tx.tx.clone()) {
Ok(transaction) => transaction,
Err(_) => {
// Delete the pending transaction from the database due conversion error
// Malformed transaction
self.database
.delete_one::<StoredPendingTransaction>(into_filter("tx.hash", &tx.tx.hash, HASH_PADDING))
.await?;
// Continue to the next iteration of the loop
continue;
}
};

// Create a signed transaction and send it
transactions_retried.push(
self.send_raw_transaction(
TransactionSigned::from_transaction_and_signature(
rpc_to_primitive_transaction(tx.tx.clone())?,
reth_primitives::Signature {
r: transaction_signature.r,
s: transaction_signature.s,
odd_y_parity: transaction_signature.y_parity.unwrap().0,
},
)
.envelope_encoded(),
)
.await?,
);
transactions_retried.push(self.send_raw_transaction(transaction.into_signed().envelope_encoded()).await?);
}

// Return the hashes of retried transactions
Expand Down
13 changes: 8 additions & 5 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use dotenvy::dotenv;
use eyre::Result;
use kakarot_rpc::config::{JsonRpcClientBuilder, KakarotRpcConfig, Network, SequencerGatewayProviderBuilder};
use kakarot_rpc::eth_provider::database::Database;
use kakarot_rpc::eth_provider::pending_pool::start_retry_service;
use kakarot_rpc::eth_provider::provider::EthDataProvider;
use kakarot_rpc::eth_rpc::config::RPCConfig;
use kakarot_rpc::eth_rpc::rpc::KakarotRpcModuleBuilder;
Expand Down Expand Up @@ -74,15 +75,17 @@ async fn main() -> Result<()> {
let kakarot_rpc_module = match starknet_provider {
StarknetProvider::JsonRpcClient(starknet_provider) => {
let starknet_provider = Arc::new(starknet_provider);
let eth_provider = EthDataProvider::new(db, starknet_provider).await?;
KakarotRpcModuleBuilder::new(eth_provider).rpc_module()
let eth_provider = EthDataProvider::new(db.clone(), starknet_provider).await?;
tokio::spawn(start_retry_service(eth_provider.clone()));
KakarotRpcModuleBuilder::new(eth_provider).rpc_module()?
}
StarknetProvider::SequencerGatewayProvider(starknet_provider) => {
let starknet_provider = Arc::new(starknet_provider);
let eth_provider = EthDataProvider::new(db, starknet_provider).await?;
KakarotRpcModuleBuilder::new(eth_provider).rpc_module()
let eth_provider = EthDataProvider::new(db.clone(), starknet_provider).await?;
tokio::spawn(start_retry_service(eth_provider.clone()));
KakarotRpcModuleBuilder::new(eth_provider).rpc_module()?
}
}?;
};

let (socket_addr, server_handle) = run_server(kakarot_rpc_module, rpc_config).await?;

Expand Down

0 comments on commit 21774a4

Please sign in to comment.