Skip to content

Commit

Permalink
Remove RETRIES_EXCEEDED Workflows From Queues Table (#158)
Browse files Browse the repository at this point in the history
If an enqueued workflow enters the dead letter queue
(`RETRIES_EXCEEDED`), it remains in the `workflow_queue` table forever,
potentially blocking the dequeuing of later workflows. This PR removes
DLQ'ed workflows from the `workflow_queue` table, unblocking the queue.
  • Loading branch information
kraftp authored Nov 21, 2024
1 parent 5de7ca7 commit 38db10c
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 2 deletions.
7 changes: 7 additions & 0 deletions dbos/_sys_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,12 @@ def update_workflow_status(
recovery_attempts: int = row[0]
if recovery_attempts > max_recovery_attempts:
with self.engine.begin() as c:
c.execute(
sa.delete(SystemSchema.workflow_queue).where(
SystemSchema.workflow_queue.c.workflow_uuid
== status["workflow_uuid"]
)
)
c.execute(
sa.update(SystemSchema.workflow_status)
.where(
Expand All @@ -311,6 +317,7 @@ def update_workflow_status(
)
.values(
status=WorkflowStatusString.RETRIES_EXCEEDED.value,
queue_name=None,
)
)
raise DBOSDeadLetterQueueError(
Expand Down
45 changes: 43 additions & 2 deletions tests/test_failures.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
from sqlalchemy.exc import OperationalError

# Public API
from dbos import DBOS, GetWorkflowsInput, SetWorkflowID
from dbos._error import DBOSDeadLetterQueueError, DBOSErrorCode, DBOSException
from dbos import DBOS, GetWorkflowsInput, Queue, SetWorkflowID
from dbos._error import DBOSDeadLetterQueueError
from dbos._sys_db import WorkflowStatusString


Expand Down Expand Up @@ -150,3 +150,44 @@ def dead_letter_workflow() -> None:
assert handle.get_result() == None
dbos._sys_db.wait_for_buffer_flush()
assert handle.get_status().status == WorkflowStatusString.SUCCESS.value


def test_enqueued_dead_letter_queue(dbos: DBOS) -> None:
function_started_event = threading.Event()
event = threading.Event()
max_concurrency = 1
max_recovery_attempts = 10
recovery_count = 0

@DBOS.workflow(max_recovery_attempts=max_recovery_attempts)
def dead_letter_workflow() -> None:
function_started_event.set()
nonlocal recovery_count
recovery_count += 1
event.wait()

@DBOS.workflow()
def regular_workflow() -> None:
return

queue = Queue("test_queue", concurrency=max_concurrency)
handle = queue.enqueue(dead_letter_workflow)
function_started_event.wait()

for i in range(max_recovery_attempts):
DBOS.recover_pending_workflows()
assert recovery_count == i + 2

regular_handle = queue.enqueue(regular_workflow)

with pytest.raises(Exception) as exc_info:
DBOS.recover_pending_workflows()
assert exc_info.errisinstance(DBOSDeadLetterQueueError)
assert handle.get_status().status == WorkflowStatusString.RETRIES_EXCEEDED.value

assert regular_handle.get_result() == None

event.set()
assert handle.get_result() == None
dbos._sys_db.wait_for_buffer_flush()
assert handle.get_status().status == WorkflowStatusString.SUCCESS.value

0 comments on commit 38db10c

Please sign in to comment.