diff --git a/src/eth_provider/mod.rs b/src/eth_provider/mod.rs index 559b22bb3..3ca75b4d5 100644 --- a/src/eth_provider/mod.rs +++ b/src/eth_provider/mod.rs @@ -2,6 +2,7 @@ pub mod constant; pub mod contracts; pub mod database; pub mod error; +pub mod pending_pool; pub mod provider; pub mod starknet; pub mod utils; diff --git a/src/eth_provider/pending_pool.rs b/src/eth_provider/pending_pool.rs new file mode 100644 index 000000000..b5f1f60eb --- /dev/null +++ b/src/eth_provider/pending_pool.rs @@ -0,0 +1,17 @@ +use crate::eth_provider::provider::EthDataProvider; +use tokio::time::{sleep, Duration}; + +pub async fn start_retry_service(eth_provider: EthDataProvider) +where + SP: starknet::providers::Provider + Send + Sync, +{ + // Start an infinite loop. + loop { + // Call the retry_transactions method + if let Err(err) = eth_provider.retry_transactions().await { + tracing::error!("Error while retrying transactions: {:?}", err); + } + // 30-second pause + sleep(Duration::from_secs(30)).await; + } +} diff --git a/src/eth_provider/provider.rs b/src/eth_provider/provider.rs index a8ce13527..92f470fbf 100644 --- a/src/eth_provider/provider.rs +++ b/src/eth_provider/provider.rs @@ -39,7 +39,7 @@ use super::utils::{contract_not_found, entrypoint_not_found, into_filter, split_ use crate::eth_provider::utils::format_hex; use crate::models::block::{EthBlockId, EthBlockNumberOrTag}; use crate::models::felt::Felt252Wrapper; -use crate::models::transaction::rpc_to_primitive_transaction; +use crate::models::transaction::rpc_to_ec_recovered_transaction; use crate::{into_via_try_wrapper, into_via_wrapper}; pub type EthProviderResult = Result; @@ -129,7 +129,7 @@ pub trait EthereumProvider { /// Structure that implements the EthereumProvider trait. /// Uses an access to a database to certain data, while /// the rest is fetched from the Starknet Provider. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct EthDataProvider { database: Database, starknet_provider: SP, @@ -960,24 +960,22 @@ where continue; } - // Extract the signature from the transaction - let transaction_signature = tx.tx.signature.unwrap(); + // Generate primitive transaction, handle error if any + let transaction = match rpc_to_ec_recovered_transaction(tx.tx.clone()) { + Ok(transaction) => transaction, + Err(_) => { + // Delete the pending transaction from the database due conversion error + // Malformed transaction + self.database + .delete_one::(into_filter("tx.hash", &tx.tx.hash, HASH_PADDING)) + .await?; + // Continue to the next iteration of the loop + continue; + } + }; // Create a signed transaction and send it - transactions_retried.push( - self.send_raw_transaction( - TransactionSigned::from_transaction_and_signature( - rpc_to_primitive_transaction(tx.tx.clone())?, - reth_primitives::Signature { - r: transaction_signature.r, - s: transaction_signature.s, - odd_y_parity: transaction_signature.y_parity.unwrap().0, - }, - ) - .envelope_encoded(), - ) - .await?, - ); + transactions_retried.push(self.send_raw_transaction(transaction.into_signed().envelope_encoded()).await?); } // Return the hashes of retried transactions diff --git a/src/main.rs b/src/main.rs index 7d3e70af5..2e54ae85e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -5,6 +5,7 @@ use dotenvy::dotenv; use eyre::Result; use kakarot_rpc::config::{JsonRpcClientBuilder, KakarotRpcConfig, Network, SequencerGatewayProviderBuilder}; use kakarot_rpc::eth_provider::database::Database; +use kakarot_rpc::eth_provider::pending_pool::start_retry_service; use kakarot_rpc::eth_provider::provider::EthDataProvider; use kakarot_rpc::eth_rpc::config::RPCConfig; use kakarot_rpc::eth_rpc::rpc::KakarotRpcModuleBuilder; @@ -74,15 +75,17 @@ async fn main() -> Result<()> { let kakarot_rpc_module = match starknet_provider { StarknetProvider::JsonRpcClient(starknet_provider) => { let starknet_provider = Arc::new(starknet_provider); - let eth_provider = EthDataProvider::new(db, starknet_provider).await?; - KakarotRpcModuleBuilder::new(eth_provider).rpc_module() + let eth_provider = EthDataProvider::new(db.clone(), starknet_provider).await?; + tokio::spawn(start_retry_service(eth_provider.clone())); + KakarotRpcModuleBuilder::new(eth_provider).rpc_module()? } StarknetProvider::SequencerGatewayProvider(starknet_provider) => { let starknet_provider = Arc::new(starknet_provider); - let eth_provider = EthDataProvider::new(db, starknet_provider).await?; - KakarotRpcModuleBuilder::new(eth_provider).rpc_module() + let eth_provider = EthDataProvider::new(db.clone(), starknet_provider).await?; + tokio::spawn(start_retry_service(eth_provider.clone())); + KakarotRpcModuleBuilder::new(eth_provider).rpc_module()? } - }?; + }; let (socket_addr, server_handle) = run_server(kakarot_rpc_module, rpc_config).await?;