Skip to content

Commit

Permalink
fix: fetch and update delivery statuses
Browse files Browse the repository at this point in the history
  • Loading branch information
s-aga-r committed Nov 3, 2024
1 parent 7077ae1 commit 108caee
Showing 1 changed file with 34 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,17 @@ def push_emails_to_queue() -> None:
def fetch_and_update_delivery_statuses() -> None:
"""Fetches and updates delivery statuses of the emails."""

def has_unsynced_mails() -> bool:
OML = frappe.qb.DocType("Outgoing Mail Log")
mails = (
frappe.qb.from_(OML)
.select(OML.name)
.where(OML.status.isin(["Queued (RMQ)", "Queued (Haraka)", "Deferred"]))
.limit(1)
).run(pluck="name")

return bool(mails)

def queue_ok(agent: str, data: dict) -> None:
frappe.db.set_value(
"Outgoing Mail Log",
Expand Down Expand Up @@ -512,49 +523,33 @@ def delivered(data: dict) -> None:
except Exception:
frappe.log_error(title="Update Delivery Status - Delivered", message=frappe.get_traceback())

max_failures = 3
total_failures = 0
if not has_unsynced_mails():
return

while total_failures < max_failures:
OML = frappe.qb.DocType("Outgoing Mail Log")
mails = (
frappe.qb.from_(OML)
.select(OML.name)
.where(OML.status.isin(["Queued (RMQ)", "Queued (Haraka)", "Deferred"]))
.limit(1)
).run(pluck="name")

if not mails:
break
try:
with rabbitmq_context() as rmq:
rmq.declare_queue(OUTGOING_MAIL_STATUS_QUEUE, max_priority=3)

try:
with rabbitmq_context() as rmq:
rmq.declare_queue(OUTGOING_MAIL_STATUS_QUEUE, max_priority=3)
while True:
result = rmq.basic_get(OUTGOING_MAIL_STATUS_QUEUE)

while True:
result = rmq.basic_get(OUTGOING_MAIL_STATUS_QUEUE)
if not result:
break

if not result:
break
method, properties, body = result
if body:
data = json.loads(body)
hook = data["hook"]

method, properties, body = result
if body:
data = json.loads(body)
hook = data["hook"]
if hook == "queue_ok":
queue_ok(properties.app_id, data)
elif hook in ["bounce", "deferred"]:
undelivered(data)
elif hook == "delivered":
delivered(data)

if hook == "queue_ok":
queue_ok(properties.app_id, data)
elif hook in ["bounce", "deferred"]:
undelivered(data)
elif hook == "delivered":
delivered(data)
rmq.channel.basic_ack(delivery_tag=method.delivery_tag)

rmq.channel.basic_ack(delivery_tag=method.delivery_tag)

except Exception:
total_failures += 1
error_log = frappe.get_traceback(with_context=False)
frappe.log_error(title="Fetch and Update Delivery Statuses", message=error_log)

if total_failures < max_failures:
time.sleep(2**total_failures)
except Exception:
error_log = frappe.get_traceback(with_context=False)
frappe.log_error(title="Fetch and Update Delivery Statuses", message=error_log)

0 comments on commit 108caee

Please sign in to comment.