Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consume jobs from the new job queue #3455

Open
wants to merge 5 commits into
base: quenting/new-queue/insert-jobs
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 17 additions & 0 deletions crates/storage-pg/migrations/20241120163320_queue_job_failures.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
-- Copyright 2024 New Vector Ltd.
--
-- SPDX-License-Identifier: AGPL-3.0-only
-- Please see LICENSE in the repository root for full details.

-- Add a new status for failed jobs
ALTER TYPE "queue_job_status" ADD VALUE 'failed';

ALTER TABLE "queue_jobs"
-- When the job failed
ADD COLUMN "failed_at" TIMESTAMP WITH TIME ZONE,
-- Error message of the failure
ADD COLUMN "failed_reason" TEXT,
-- How many times we've already tried to run the job
ADD COLUMN "attempt" INTEGER NOT NULL DEFAULT 0,
-- The next attempt, if it was retried
ADD COLUMN "next_attempt_id" UUID REFERENCES "queue_jobs" ("queue_job_id");
239 changes: 237 additions & 2 deletions crates/storage-pg/src/queue/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,16 @@
//! [`QueueJobRepository`].

use async_trait::async_trait;
use mas_storage::{queue::QueueJobRepository, Clock};
use mas_storage::{
queue::{Job, QueueJobRepository, Worker},
Clock,
};
use rand::RngCore;
use sqlx::PgConnection;
use ulid::Ulid;
use uuid::Uuid;

use crate::{DatabaseError, ExecuteExt};
use crate::{DatabaseError, DatabaseInconsistencyError, ExecuteExt};

/// An implementation of [`QueueJobRepository`] for a PostgreSQL connection.
pub struct PgQueueJobRepository<'c> {
Expand All @@ -29,6 +32,46 @@ impl<'c> PgQueueJobRepository<'c> {
}
}

struct JobReservationResult {
queue_job_id: Uuid,
queue_name: String,
payload: serde_json::Value,
metadata: serde_json::Value,
attempt: i32,
}

impl TryFrom<JobReservationResult> for Job {
type Error = DatabaseInconsistencyError;

fn try_from(value: JobReservationResult) -> Result<Self, Self::Error> {
let id = value.queue_job_id.into();
let queue_name = value.queue_name;
let payload = value.payload;

let metadata = serde_json::from_value(value.metadata).map_err(|e| {
DatabaseInconsistencyError::on("queue_jobs")
.column("metadata")
.row(id)
.source(e)
})?;

let attempt = value.attempt.try_into().map_err(|e| {
DatabaseInconsistencyError::on("queue_jobs")
.column("attempt")
.row(id)
.source(e)
})?;

Ok(Self {
id,
queue_name,
payload,
metadata,
attempt,
})
}
}

#[async_trait]
impl<'c> QueueJobRepository for PgQueueJobRepository<'c> {
type Error = DatabaseError;
Expand Down Expand Up @@ -73,4 +116,196 @@ impl<'c> QueueJobRepository for PgQueueJobRepository<'c> {

Ok(())
}

#[tracing::instrument(
name = "db.queue_job.reserve",
skip_all,
fields(
db.query.text,
),
err,
)]
async fn reserve(
&mut self,
clock: &dyn Clock,
worker: &Worker,
queues: &[&str],
count: usize,
) -> Result<Vec<Job>, Self::Error> {
let now = clock.now();
let max_count = i64::try_from(count).unwrap_or(i64::MAX);
let queues: Vec<String> = queues.iter().map(|&s| s.to_owned()).collect();
let results = sqlx::query_as!(
JobReservationResult,
r#"
-- We first grab a few jobs that are available,
-- using a FOR UPDATE SKIP LOCKED so that this can be run concurrently
-- and we don't get multiple workers grabbing the same jobs
WITH locked_jobs AS (
SELECT queue_job_id
FROM queue_jobs
WHERE
status = 'available'
AND queue_name = ANY($1)
ORDER BY queue_job_id ASC
LIMIT $2
FOR UPDATE
SKIP LOCKED
)
-- then we update the status of those jobs to 'running', returning the job details
UPDATE queue_jobs
SET status = 'running', started_at = $3, started_by = $4
FROM locked_jobs
WHERE queue_jobs.queue_job_id = locked_jobs.queue_job_id
RETURNING
queue_jobs.queue_job_id,
queue_jobs.queue_name,
queue_jobs.payload,
queue_jobs.metadata,
queue_jobs.attempt
"#,
&queues,
max_count,
now,
Uuid::from(worker.id),
)
.traced()
.fetch_all(&mut *self.conn)
.await?;

let jobs = results
.into_iter()
.map(TryFrom::try_from)
.collect::<Result<Vec<_>, _>>()?;

Ok(jobs)
}

#[tracing::instrument(
name = "db.queue_job.mark_as_completed",
skip_all,
fields(
db.query.text,
job.id = %id,
),
err,
)]
async fn mark_as_completed(&mut self, clock: &dyn Clock, id: Ulid) -> Result<(), Self::Error> {
let now = clock.now();
let res = sqlx::query!(
r#"
UPDATE queue_jobs
SET status = 'completed', completed_at = $1
WHERE queue_job_id = $2 AND status = 'running'
"#,
now,
Uuid::from(id),
)
.traced()
.execute(&mut *self.conn)
.await?;

DatabaseError::ensure_affected_rows(&res, 1)?;

Ok(())
}

#[tracing::instrument(
name = "db.queue_job.mark_as_failed",
skip_all,
fields(
db.query.text,
job.id = %id,
),
err
)]
async fn mark_as_failed(
&mut self,
clock: &dyn Clock,
id: Ulid,
reason: &str,
) -> Result<(), Self::Error> {
let now = clock.now();
let res = sqlx::query!(
r#"
UPDATE queue_jobs
SET
status = 'failed',
failed_at = $1,
failed_reason = $2
WHERE
queue_job_id = $3
AND status = 'running'
"#,
now,
reason,
Uuid::from(id),
)
.traced()
.execute(&mut *self.conn)
.await?;

DatabaseError::ensure_affected_rows(&res, 1)?;

Ok(())
}

#[tracing::instrument(
name = "db.queue_job.retry",
skip_all,
fields(
db.query.text,
job.id = %id,
),
err
)]
async fn retry(
&mut self,
rng: &mut (dyn RngCore + Send),
clock: &dyn Clock,
id: Ulid,
) -> Result<(), Self::Error> {
let now = clock.now();
let new_id = Ulid::from_datetime_with_source(now.into(), rng);

// Create a new job with the same payload and metadata, but a new ID and
// increment the attempt
// We make sure we do this only for 'failed' jobs
let res = sqlx::query!(
r#"
INSERT INTO queue_jobs
(queue_job_id, queue_name, payload, metadata, created_at, attempt)
SELECT $1, queue_name, payload, metadata, $2, attempt + 1
FROM queue_jobs
WHERE queue_job_id = $3
AND status = 'failed'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this also say AND next_attempt_id IS NULL?

"#,
Uuid::from(new_id),
now,
Uuid::from(id),
)
.traced()
.execute(&mut *self.conn)
.await?;

DatabaseError::ensure_affected_rows(&res, 1)?;

// Update the old job to point to the new attempt
let res = sqlx::query!(
r#"
UPDATE queue_jobs
SET next_attempt_id = $1
WHERE queue_job_id = $2
"#,
Uuid::from(new_id),
Uuid::from(id),
)
.traced()
.execute(&mut *self.conn)
.await?;

DatabaseError::ensure_affected_rows(&res, 1)?;

Ok(())
}
}
Loading