Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SQLite and MySQL support for blocking #138

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 52 additions & 43 deletions fang/src/blocking/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,14 @@
use chrono::Duration;
use chrono::Utc;
use cron::Schedule;
use diesel::pg::PgConnection;
use diesel::connection::LoadConnection;
use diesel::prelude::*;
use diesel::query_dsl::methods::LoadQuery;

Check failure on line 16 in fang/src/blocking/queue.rs

View workflow job for this annotation

GitHub Actions / Test

unused import: `diesel::query_dsl::methods::LoadQuery`

Check failure on line 16 in fang/src/blocking/queue.rs

View workflow job for this annotation

GitHub Actions / Test

unused import: `diesel::query_dsl::methods::LoadQuery`
use diesel::r2d2;
use diesel::r2d2::ConnectionManager;
use diesel::r2d2::PoolError;
use diesel::r2d2::PooledConnection;
use diesel::r2d2::R2D2Connection;
use diesel::result::Error as DieselError;
use sha2::Digest;
use sha2::Sha256;
Expand All @@ -30,7 +32,10 @@
#[cfg(test)]
use std::env;

pub type PoolConnection = PooledConnection<ConnectionManager<PgConnection>>;
pub type PoolConnection<T>
where
T: R2D2Connection + Send + 'static,
= PooledConnection<ConnectionManager<T>>;

#[derive(Insertable, Debug, Eq, PartialEq, Clone, TypedBuilder)]
#[diesel(table_name = fang_tasks)]
Expand Down Expand Up @@ -119,10 +124,10 @@
///
/// ```rust
/// // Set DATABASE_URL enviroment variable if you would like to try this function.
/// pub fn connection_pool(pool_size: u32) -> r2d2::Pool<r2d2::ConnectionManager<PgConnection>> {
/// pub fn connection_pool(pool_size: u32) -> r2d2::Pool<r2d2::ConnectionManager<T>> {
/// let database_url = env::var("DATABASE_URL").expect("DATABASE_URL must be set");
///
/// let manager = r2d2::ConnectionManager::<PgConnection>::new(database_url);
/// let manager = r2d2::ConnectionManager::<T>::new(database_url);
///
/// r2d2::Pool::builder()
/// .max_size(pool_size)
Expand All @@ -134,49 +139,55 @@
/// ```
///
#[derive(Clone, TypedBuilder)]
pub struct Queue {
pub struct Queue<T>
where
T: R2D2Connection + Send + 'static,
{
#[builder(setter(into))]
pub connection_pool: r2d2::Pool<r2d2::ConnectionManager<PgConnection>>,
pub connection_pool: r2d2::Pool<r2d2::ConnectionManager<T>>,
}

impl Queueable for Queue {
impl<T> Queueable for Queue<T>
where
T: R2D2Connection + Send + 'static + LoadConnection,
{
fn fetch_and_touch_task(&self, task_type: String) -> Result<Option<Task>, QueueError> {
let mut connection = self.get_connection()?;
let mut connection: PoolConnection<T> = self.get_connection()?;

Self::fetch_and_touch_query(&mut connection, task_type)
}

fn insert_task(&self, params: &dyn Runnable) -> Result<Task, QueueError> {
let mut connection = self.get_connection()?;
let mut connection: PoolConnection<T> = self.get_connection()?;

Self::insert_query(&mut connection, params, Utc::now())
}
fn schedule_task(&self, params: &dyn Runnable) -> Result<Task, QueueError> {
let mut connection = self.get_connection()?;
let mut connection: PoolConnection<T> = self.get_connection()?;

Self::schedule_task_query(&mut connection, params)
}

fn remove_all_scheduled_tasks(&self) -> Result<usize, QueueError> {
let mut connection = self.get_connection()?;
let mut connection: PoolConnection<T> = self.get_connection()?;

Self::remove_all_scheduled_tasks_query(&mut connection)
}

fn remove_all_tasks(&self) -> Result<usize, QueueError> {
let mut connection = self.get_connection()?;
let mut connection: PoolConnection<T> = self.get_connection()?;

Self::remove_all_tasks_query(&mut connection)
}

fn remove_tasks_of_type(&self, task_type: &str) -> Result<usize, QueueError> {
let mut connection = self.get_connection()?;
let mut connection: PoolConnection<T> = self.get_connection()?;

Self::remove_tasks_of_type_query(&mut connection, task_type)
}

fn remove_task(&self, id: Uuid) -> Result<usize, QueueError> {
let mut connection = self.get_connection()?;
let mut connection: PoolConnection<T> = self.get_connection()?;

Self::remove_task_query(&mut connection, id)
}
Expand All @@ -185,7 +196,7 @@
/// If task is not uniq this function will not do anything.
fn remove_task_by_metadata(&self, task: &dyn Runnable) -> Result<usize, QueueError> {
if task.uniq() {
let mut connection = self.get_connection()?;
let mut connection: PoolConnection<T> = self.get_connection()?;

Self::remove_task_by_metadata_query(&mut connection, task)
} else {
Expand All @@ -194,19 +205,19 @@
}

fn update_task_state(&self, task: &Task, state: FangTaskState) -> Result<Task, QueueError> {
let mut connection = self.get_connection()?;
let mut connection: PoolConnection<T> = self.get_connection()?;

Self::update_task_state_query(&mut connection, task, state)
}

fn fail_task(&self, task: &Task, error: &str) -> Result<Task, QueueError> {
let mut connection = self.get_connection()?;
let mut connection: PoolConnection<T> = self.get_connection()?;

Self::fail_task_query(&mut connection, task, error)
}

fn find_task_by_id(&self, id: Uuid) -> Option<Task> {
let mut connection = self.get_connection().unwrap();
let mut connection: PoolConnection<T> = self.get_connection().unwrap();

Self::find_task_by_id_query(&mut connection, id)
}
Expand All @@ -217,13 +228,16 @@
backoff_seconds: u32,
error: &str,
) -> Result<Task, QueueError> {
let mut connection = self.get_connection()?;
let mut connection: PoolConnection<T> = self.get_connection()?;

Self::schedule_retry_query(&mut connection, task, backoff_seconds, error)
}
}

impl Queue {
impl<T> Queue<T>
where
T: R2D2Connection + Send + 'static + LoadConnection,
{
/// Provides a Queue that does not commit to the DB
#[cfg(test)]
pub fn test() -> Self {
Expand All @@ -237,7 +251,7 @@
}

/// Connect to the db if not connected
pub fn get_connection(&self) -> Result<PoolConnection, QueueError> {
pub fn get_connection(&self) -> Result<PoolConnection<T>, QueueError> {
let result = self.connection_pool.get();

if let Err(err) = result {
Expand All @@ -249,7 +263,7 @@
}

pub fn schedule_task_query(
connection: &mut PgConnection,
connection: &mut T,
params: &dyn Runnable,
) -> Result<Task, QueueError> {
let scheduled_at = match params.cron() {
Expand Down Expand Up @@ -280,7 +294,7 @@
}

pub fn insert_query(
connection: &mut PgConnection,
connection: &mut T,
params: &dyn Runnable,
scheduled_at: DateTime<Utc>,
) -> Result<Task, QueueError> {
Expand All @@ -294,7 +308,7 @@

Ok(diesel::insert_into(fang_tasks::table)
.values(new_task)
.get_result::<Task>(connection)?)

Check failure on line 311 in fang/src/blocking/queue.rs

View workflow job for this annotation

GitHub Actions / Test

type mismatch resolving `<T as Connection>::Backend == Pg`

Check failure on line 311 in fang/src/blocking/queue.rs

View workflow job for this annotation

GitHub Actions / Test

type mismatch resolving `<T as Connection>::Backend == Pg`
} else {
let metadata = serde_json::to_value(params).unwrap();

Expand All @@ -312,18 +326,18 @@

Ok(diesel::insert_into(fang_tasks::table)
.values(new_task)
.get_result::<Task>(connection)?)

Check failure on line 329 in fang/src/blocking/queue.rs

View workflow job for this annotation

GitHub Actions / Test

type mismatch resolving `<T as Connection>::Backend == Pg`

Check failure on line 329 in fang/src/blocking/queue.rs

View workflow job for this annotation

GitHub Actions / Test

type mismatch resolving `<T as Connection>::Backend == Pg`
}
}
}
}

pub fn fetch_task_query(connection: &mut PgConnection, task_type: String) -> Option<Task> {
pub fn fetch_task_query(connection: &mut T, task_type: String) -> Option<Task> {
Self::fetch_task_of_type_query(connection, &task_type)
}

pub fn fetch_and_touch_query(
connection: &mut PgConnection,
connection: &mut PoolConnection<T>,
task_type: String,
) -> Result<Option<Task>, QueueError> {
connection.transaction::<Option<Task>, QueueError, _>(|conn| {
Expand All @@ -344,36 +358,34 @@
})
}

pub fn find_task_by_id_query(connection: &mut PgConnection, id: Uuid) -> Option<Task> {
pub fn find_task_by_id_query(connection: &mut T, id: Uuid) -> Option<Task> {
fang_tasks::table
.filter(fang_tasks::id.eq(id))
.first::<Task>(connection)

Check failure on line 364 in fang/src/blocking/queue.rs

View workflow job for this annotation

GitHub Actions / Test

type mismatch resolving `<T as Connection>::Backend == Pg`

Check failure on line 364 in fang/src/blocking/queue.rs

View workflow job for this annotation

GitHub Actions / Test

type mismatch resolving `<T as Connection>::Backend == Pg`
.ok()
}

pub fn remove_all_tasks_query(connection: &mut PgConnection) -> Result<usize, QueueError> {
pub fn remove_all_tasks_query(connection: &mut T) -> Result<usize, QueueError> {
Ok(diesel::delete(fang_tasks::table).execute(connection)?)

Check failure on line 369 in fang/src/blocking/queue.rs

View workflow job for this annotation

GitHub Actions / Test

the trait bound `<T as diesel::Connection>::Backend: diesel::backend::private::DieselReserveSpecialization` is not satisfied

Check failure on line 369 in fang/src/blocking/queue.rs

View workflow job for this annotation

GitHub Actions / Test

the trait bound `<T as diesel::Connection>::Backend: diesel::backend::private::DieselReserveSpecialization` is not satisfied
}

pub fn remove_all_scheduled_tasks_query(
connection: &mut PgConnection,
) -> Result<usize, QueueError> {
pub fn remove_all_scheduled_tasks_query(connection: &mut T) -> Result<usize, QueueError> {
let query = fang_tasks::table.filter(fang_tasks::scheduled_at.gt(Utc::now()));

Ok(diesel::delete(query).execute(connection)?)

Check failure on line 375 in fang/src/blocking/queue.rs

View workflow job for this annotation

GitHub Actions / Test

type mismatch resolving `<T as Connection>::Backend == Pg`

Check failure on line 375 in fang/src/blocking/queue.rs

View workflow job for this annotation

GitHub Actions / Test

type mismatch resolving `<T as Connection>::Backend == Pg`
}

pub fn remove_tasks_of_type_query(
connection: &mut PgConnection,
connection: &mut T,
task_type: &str,
) -> Result<usize, QueueError> {
let query = fang_tasks::table.filter(fang_tasks::task_type.eq(task_type));

Ok(diesel::delete(query).execute(connection)?)

Check failure on line 384 in fang/src/blocking/queue.rs

View workflow job for this annotation

GitHub Actions / Test

the trait bound `<T as diesel::Connection>::Backend: diesel::backend::private::DieselReserveSpecialization` is not satisfied

Check failure on line 384 in fang/src/blocking/queue.rs

View workflow job for this annotation

GitHub Actions / Test

type mismatch resolving `<<T as Connection>::Backend as Backend>::BindCollector<'a> == RawBytesBindCollector<<T as Connection>::Backend>`

Check failure on line 384 in fang/src/blocking/queue.rs

View workflow job for this annotation

GitHub Actions / Test

the trait bound `<T as diesel::Connection>::Backend: diesel::backend::private::DieselReserveSpecialization` is not satisfied

Check failure on line 384 in fang/src/blocking/queue.rs

View workflow job for this annotation

GitHub Actions / Test

type mismatch resolving `<<T as Connection>::Backend as Backend>::BindCollector<'a> == RawBytesBindCollector<<T as Connection>::Backend>`
}

pub fn remove_task_by_metadata_query(
connection: &mut PgConnection,
connection: &mut T,
task: &dyn Runnable,
) -> Result<usize, QueueError> {
let metadata = serde_json::to_value(task).unwrap();
Expand All @@ -382,17 +394,17 @@

let query = fang_tasks::table.filter(fang_tasks::uniq_hash.eq(uniq_hash));

Ok(diesel::delete(query).execute(connection)?)

Check failure on line 397 in fang/src/blocking/queue.rs

View workflow job for this annotation

GitHub Actions / Test

the trait bound `<T as diesel::Connection>::Backend: diesel::backend::private::DieselReserveSpecialization` is not satisfied

Check failure on line 397 in fang/src/blocking/queue.rs

View workflow job for this annotation

GitHub Actions / Test

type mismatch resolving `<<T as Connection>::Backend as Backend>::BindCollector<'a> == RawBytesBindCollector<<T as Connection>::Backend>`

Check failure on line 397 in fang/src/blocking/queue.rs

View workflow job for this annotation

GitHub Actions / Test

the trait bound `<T as diesel::Connection>::Backend: diesel::backend::private::DieselReserveSpecialization` is not satisfied

Check failure on line 397 in fang/src/blocking/queue.rs

View workflow job for this annotation

GitHub Actions / Test

type mismatch resolving `<<T as Connection>::Backend as Backend>::BindCollector<'a> == RawBytesBindCollector<<T as Connection>::Backend>`
}

pub fn remove_task_query(connection: &mut PgConnection, id: Uuid) -> Result<usize, QueueError> {
pub fn remove_task_query(connection: &mut T, id: Uuid) -> Result<usize, QueueError> {
let query = fang_tasks::table.filter(fang_tasks::id.eq(id));

Ok(diesel::delete(query).execute(connection)?)
}

pub fn update_task_state_query(
connection: &mut PgConnection,
connection: &mut T,
task: &Task,
state: FangTaskState,
) -> Result<Task, QueueError> {
Expand All @@ -405,7 +417,7 @@
}

pub fn fail_task_query(
connection: &mut PgConnection,
connection: &mut T,
task: &Task,
error: &str,
) -> Result<Task, QueueError> {
Expand All @@ -423,20 +435,20 @@
}

#[cfg(test)]
pub fn connection_pool(pool_size: u32) -> r2d2::Pool<r2d2::ConnectionManager<PgConnection>> {
pub fn connection_pool(pool_size: u32) -> r2d2::Pool<r2d2::ConnectionManager<T>> {
dotenv().ok();

let database_url = env::var("DATABASE_URL").expect("DATABASE_URL must be set");

let manager = r2d2::ConnectionManager::<PgConnection>::new(database_url);
let manager = r2d2::ConnectionManager::<T>::new(database_url);

r2d2::Pool::builder()
.max_size(pool_size)
.build(manager)
.unwrap()
}

fn fetch_task_of_type_query(connection: &mut PgConnection, task_type: &str) -> Option<Task> {
fn fetch_task_of_type_query(connection: &mut T, task_type: &str) -> Option<Task> {
fang_tasks::table
.order(fang_tasks::created_at.asc())
.order(fang_tasks::scheduled_at.asc())
Expand All @@ -450,10 +462,7 @@
.ok()
}

fn find_task_by_uniq_hash_query(
connection: &mut PgConnection,
uniq_hash: &str,
) -> Option<Task> {
fn find_task_by_uniq_hash_query(connection: &mut T, uniq_hash: &str) -> Option<Task> {
fang_tasks::table
.filter(fang_tasks::uniq_hash.eq(uniq_hash))
.filter(fang_tasks::state.eq_any(vec![FangTaskState::New, FangTaskState::Retried]))
Expand All @@ -462,7 +471,7 @@
}

pub fn schedule_retry_query(
connection: &mut PgConnection,
connection: &mut T,
task: &Task,
backoff_seconds: u32,
error: &str,
Expand Down
Loading