From e9aed9ea394cdd68a079d458686615f631bfddc0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pepe=20M=C3=A1rquez=20Romero?= Date: Fri, 25 Aug 2023 13:01:45 +0200 Subject: [PATCH] ayrat could you fix this trait bound error ? --- fang/src/blocking/queue.rs | 95 +++++++++++++++++++++----------------- 1 file changed, 52 insertions(+), 43 deletions(-) diff --git a/fang/src/blocking/queue.rs b/fang/src/blocking/queue.rs index 3716833f..e940fb77 100644 --- a/fang/src/blocking/queue.rs +++ b/fang/src/blocking/queue.rs @@ -11,12 +11,14 @@ use chrono::DateTime; use chrono::Duration; use chrono::Utc; use cron::Schedule; -use diesel::pg::PgConnection; +use diesel::connection::LoadConnection; use diesel::prelude::*; +use diesel::query_dsl::methods::LoadQuery; use diesel::r2d2; use diesel::r2d2::ConnectionManager; use diesel::r2d2::PoolError; use diesel::r2d2::PooledConnection; +use diesel::r2d2::R2D2Connection; use diesel::result::Error as DieselError; use sha2::Digest; use sha2::Sha256; @@ -30,7 +32,10 @@ use dotenvy::dotenv; #[cfg(test)] use std::env; -pub type PoolConnection = PooledConnection>; +pub type PoolConnection +where + T: R2D2Connection + Send + 'static, += PooledConnection>; #[derive(Insertable, Debug, Eq, PartialEq, Clone, TypedBuilder)] #[diesel(table_name = fang_tasks)] @@ -119,10 +124,10 @@ pub trait Queueable { /// /// ```rust /// // Set DATABASE_URL enviroment variable if you would like to try this function. -/// pub fn connection_pool(pool_size: u32) -> r2d2::Pool> { +/// pub fn connection_pool(pool_size: u32) -> r2d2::Pool> { /// let database_url = env::var("DATABASE_URL").expect("DATABASE_URL must be set"); /// -/// let manager = r2d2::ConnectionManager::::new(database_url); +/// let manager = r2d2::ConnectionManager::::new(database_url); /// /// r2d2::Pool::builder() /// .max_size(pool_size) @@ -134,49 +139,55 @@ pub trait Queueable { /// ``` /// #[derive(Clone, TypedBuilder)] -pub struct Queue { +pub struct Queue +where + T: R2D2Connection + Send + 'static, +{ #[builder(setter(into))] - pub connection_pool: r2d2::Pool>, + pub connection_pool: r2d2::Pool>, } -impl Queueable for Queue { +impl Queueable for Queue +where + T: R2D2Connection + Send + 'static + LoadConnection, +{ fn fetch_and_touch_task(&self, task_type: String) -> Result, QueueError> { - let mut connection = self.get_connection()?; + let mut connection: PoolConnection = self.get_connection()?; Self::fetch_and_touch_query(&mut connection, task_type) } fn insert_task(&self, params: &dyn Runnable) -> Result { - let mut connection = self.get_connection()?; + let mut connection: PoolConnection = self.get_connection()?; Self::insert_query(&mut connection, params, Utc::now()) } fn schedule_task(&self, params: &dyn Runnable) -> Result { - let mut connection = self.get_connection()?; + let mut connection: PoolConnection = self.get_connection()?; Self::schedule_task_query(&mut connection, params) } fn remove_all_scheduled_tasks(&self) -> Result { - let mut connection = self.get_connection()?; + let mut connection: PoolConnection = self.get_connection()?; Self::remove_all_scheduled_tasks_query(&mut connection) } fn remove_all_tasks(&self) -> Result { - let mut connection = self.get_connection()?; + let mut connection: PoolConnection = self.get_connection()?; Self::remove_all_tasks_query(&mut connection) } fn remove_tasks_of_type(&self, task_type: &str) -> Result { - let mut connection = self.get_connection()?; + let mut connection: PoolConnection = self.get_connection()?; Self::remove_tasks_of_type_query(&mut connection, task_type) } fn remove_task(&self, id: Uuid) -> Result { - let mut connection = self.get_connection()?; + let mut connection: PoolConnection = self.get_connection()?; Self::remove_task_query(&mut connection, id) } @@ -185,7 +196,7 @@ impl Queueable for Queue { /// If task is not uniq this function will not do anything. fn remove_task_by_metadata(&self, task: &dyn Runnable) -> Result { if task.uniq() { - let mut connection = self.get_connection()?; + let mut connection: PoolConnection = self.get_connection()?; Self::remove_task_by_metadata_query(&mut connection, task) } else { @@ -194,19 +205,19 @@ impl Queueable for Queue { } fn update_task_state(&self, task: &Task, state: FangTaskState) -> Result { - let mut connection = self.get_connection()?; + let mut connection: PoolConnection = self.get_connection()?; Self::update_task_state_query(&mut connection, task, state) } fn fail_task(&self, task: &Task, error: &str) -> Result { - let mut connection = self.get_connection()?; + let mut connection: PoolConnection = self.get_connection()?; Self::fail_task_query(&mut connection, task, error) } fn find_task_by_id(&self, id: Uuid) -> Option { - let mut connection = self.get_connection().unwrap(); + let mut connection: PoolConnection = self.get_connection().unwrap(); Self::find_task_by_id_query(&mut connection, id) } @@ -217,13 +228,16 @@ impl Queueable for Queue { backoff_seconds: u32, error: &str, ) -> Result { - let mut connection = self.get_connection()?; + let mut connection: PoolConnection = self.get_connection()?; Self::schedule_retry_query(&mut connection, task, backoff_seconds, error) } } -impl Queue { +impl Queue +where + T: R2D2Connection + Send + 'static + LoadConnection, +{ /// Provides a Queue that does not commit to the DB #[cfg(test)] pub fn test() -> Self { @@ -237,7 +251,7 @@ impl Queue { } /// Connect to the db if not connected - pub fn get_connection(&self) -> Result { + pub fn get_connection(&self) -> Result, QueueError> { let result = self.connection_pool.get(); if let Err(err) = result { @@ -249,7 +263,7 @@ impl Queue { } pub fn schedule_task_query( - connection: &mut PgConnection, + connection: &mut T, params: &dyn Runnable, ) -> Result { let scheduled_at = match params.cron() { @@ -280,7 +294,7 @@ impl Queue { } pub fn insert_query( - connection: &mut PgConnection, + connection: &mut T, params: &dyn Runnable, scheduled_at: DateTime, ) -> Result { @@ -318,12 +332,12 @@ impl Queue { } } - pub fn fetch_task_query(connection: &mut PgConnection, task_type: String) -> Option { + pub fn fetch_task_query(connection: &mut T, task_type: String) -> Option { Self::fetch_task_of_type_query(connection, &task_type) } pub fn fetch_and_touch_query( - connection: &mut PgConnection, + connection: &mut PoolConnection, task_type: String, ) -> Result, QueueError> { connection.transaction::, QueueError, _>(|conn| { @@ -344,27 +358,25 @@ impl Queue { }) } - pub fn find_task_by_id_query(connection: &mut PgConnection, id: Uuid) -> Option { + pub fn find_task_by_id_query(connection: &mut T, id: Uuid) -> Option { fang_tasks::table .filter(fang_tasks::id.eq(id)) .first::(connection) .ok() } - pub fn remove_all_tasks_query(connection: &mut PgConnection) -> Result { + pub fn remove_all_tasks_query(connection: &mut T) -> Result { Ok(diesel::delete(fang_tasks::table).execute(connection)?) } - pub fn remove_all_scheduled_tasks_query( - connection: &mut PgConnection, - ) -> Result { + pub fn remove_all_scheduled_tasks_query(connection: &mut T) -> Result { let query = fang_tasks::table.filter(fang_tasks::scheduled_at.gt(Utc::now())); Ok(diesel::delete(query).execute(connection)?) } pub fn remove_tasks_of_type_query( - connection: &mut PgConnection, + connection: &mut T, task_type: &str, ) -> Result { let query = fang_tasks::table.filter(fang_tasks::task_type.eq(task_type)); @@ -373,7 +385,7 @@ impl Queue { } pub fn remove_task_by_metadata_query( - connection: &mut PgConnection, + connection: &mut T, task: &dyn Runnable, ) -> Result { let metadata = serde_json::to_value(task).unwrap(); @@ -385,14 +397,14 @@ impl Queue { Ok(diesel::delete(query).execute(connection)?) } - pub fn remove_task_query(connection: &mut PgConnection, id: Uuid) -> Result { + pub fn remove_task_query(connection: &mut T, id: Uuid) -> Result { let query = fang_tasks::table.filter(fang_tasks::id.eq(id)); Ok(diesel::delete(query).execute(connection)?) } pub fn update_task_state_query( - connection: &mut PgConnection, + connection: &mut T, task: &Task, state: FangTaskState, ) -> Result { @@ -405,7 +417,7 @@ impl Queue { } pub fn fail_task_query( - connection: &mut PgConnection, + connection: &mut T, task: &Task, error: &str, ) -> Result { @@ -423,12 +435,12 @@ impl Queue { } #[cfg(test)] - pub fn connection_pool(pool_size: u32) -> r2d2::Pool> { + pub fn connection_pool(pool_size: u32) -> r2d2::Pool> { dotenv().ok(); let database_url = env::var("DATABASE_URL").expect("DATABASE_URL must be set"); - let manager = r2d2::ConnectionManager::::new(database_url); + let manager = r2d2::ConnectionManager::::new(database_url); r2d2::Pool::builder() .max_size(pool_size) @@ -436,7 +448,7 @@ impl Queue { .unwrap() } - fn fetch_task_of_type_query(connection: &mut PgConnection, task_type: &str) -> Option { + fn fetch_task_of_type_query(connection: &mut T, task_type: &str) -> Option { fang_tasks::table .order(fang_tasks::created_at.asc()) .order(fang_tasks::scheduled_at.asc()) @@ -450,10 +462,7 @@ impl Queue { .ok() } - fn find_task_by_uniq_hash_query( - connection: &mut PgConnection, - uniq_hash: &str, - ) -> Option { + fn find_task_by_uniq_hash_query(connection: &mut T, uniq_hash: &str) -> Option { fang_tasks::table .filter(fang_tasks::uniq_hash.eq(uniq_hash)) .filter(fang_tasks::state.eq_any(vec![FangTaskState::New, FangTaskState::Retried])) @@ -462,7 +471,7 @@ impl Queue { } pub fn schedule_retry_query( - connection: &mut PgConnection, + connection: &mut T, task: &Task, backoff_seconds: u32, error: &str,