Skip to content

Commit

Permalink
General PgExecutor to enqueue tasks in transactions
Browse files Browse the repository at this point in the history
* PgExecutor

* Cleanup
  • Loading branch information
imbolc authored Jul 24, 2024
1 parent 5a453de commit c23e2f9
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 8 deletions.
16 changes: 12 additions & 4 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,22 +225,30 @@ pub use traits::{Scheduler, Step};
pub use worker::Worker;

use chrono::{DateTime, Utc};
use sqlx::{types::Uuid, PgPool};
use sqlx::{types::Uuid, PgExecutor};
use std::time::Duration;

const LOST_CONNECTION_SLEEP: Duration = Duration::from_secs(1);

/// Enqueues the task to be run immediately
pub async fn enqueue(db: &PgPool, task: &impl Scheduler) -> Result<Uuid> {
pub async fn enqueue<'e>(db: impl PgExecutor<'e>, task: &impl Scheduler) -> Result<Uuid> {
task.enqueue(db).await
}

/// Schedules a task to be run after a specified delay
pub async fn delay(db: &PgPool, task: &impl Scheduler, delay: Duration) -> Result<Uuid> {
pub async fn delay<'e>(
db: impl PgExecutor<'e>,
task: &impl Scheduler,
delay: Duration,
) -> Result<Uuid> {
task.delay(db, delay).await
}

/// Schedules a task to run at a specified time in the future
pub async fn schedule(db: &PgPool, task: &impl Scheduler, at: DateTime<Utc>) -> Result<Uuid> {
pub async fn schedule<'e>(
db: impl PgExecutor<'e>,
task: &impl Scheduler,
at: DateTime<Utc>,
) -> Result<Uuid> {
task.schedule(db, at).await
}
12 changes: 8 additions & 4 deletions src/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::{Error, StepResult};
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use serde::{de::DeserializeOwned, Serialize};
use sqlx::{types::Uuid, PgPool};
use sqlx::{types::Uuid, PgExecutor, PgPool};
use std::{fmt, time::Duration};

/// A tait to implement on each task step
Expand Down Expand Up @@ -36,19 +36,23 @@ where
#[async_trait]
pub trait Scheduler: fmt::Debug + DeserializeOwned + Serialize + Sized + Sync {
/// Enqueues the task to be run immediately
async fn enqueue(&self, db: &PgPool) -> crate::Result<Uuid> {
async fn enqueue<'e>(&self, db: impl PgExecutor<'e>) -> crate::Result<Uuid> {
self.schedule(db, Utc::now()).await
}

/// Schedules a task to be run after a specified delay
async fn delay(&self, db: &PgPool, delay: Duration) -> crate::Result<Uuid> {
async fn delay<'e>(&self, db: impl PgExecutor<'e>, delay: Duration) -> crate::Result<Uuid> {
let delay =
chrono::Duration::from_std(delay).unwrap_or_else(|_| chrono::Duration::max_value());
self.schedule(db, Utc::now() + delay).await
}

/// Schedules a task to run at a specified time in the future
async fn schedule(&self, db: &PgPool, at: DateTime<Utc>) -> crate::Result<Uuid> {
async fn schedule<'e>(
&self,
db: impl PgExecutor<'e>,
at: DateTime<Utc>,
) -> crate::Result<Uuid> {
let step = serde_json::to_string(self)
.map_err(|e| Error::SerializeStep(e, format!("{self:?}")))?;
sqlx::query!(
Expand Down

0 comments on commit c23e2f9

Please sign in to comment.