-
-
Notifications
You must be signed in to change notification settings - Fork 10
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: adds @cron task decorator #56
base: main
Are you sure you want to change the base?
Changes from 13 commits
65bddb2
c722220
7a93ab3
77e4fd8
c1ae751
2f6bb26
cfc076b
def032a
b77e718
72593d6
cd519f5
1b4bd1c
50d3341
4a0ea3c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,5 @@ | ||
import asyncio | ||
import traceback | ||
from abc import ABC, abstractmethod | ||
|
||
from ape import chain | ||
|
@@ -16,7 +17,7 @@ | |
from .recorder import BaseRecorder, TaskResult | ||
from .state import AppDatastore, AppState | ||
from .subscriptions import SubscriptionType, Web3SubscriptionsManager | ||
from .types import TaskType | ||
from .types import CRON_CHECK_SECONDS, CronSchedule, TaskType, utc_now | ||
from .utils import ( | ||
async_wrap_iter, | ||
hexbytes_dict, | ||
|
@@ -109,6 +110,38 @@ async def _event_task(self, task_data: TaskData): | |
handle an event handler task for the given contract event | ||
""" | ||
|
||
async def _cron_task(self, task_data: list[TaskData]): | ||
cron_jobs: dict[TaskData, CronSchedule] = dict() | ||
|
||
for td in task_data: | ||
if cron_schedule := td.labels.get("cron"): | ||
cron_jobs[td] = CronSchedule(cron=cron_schedule) | ||
|
||
else: | ||
# NOTE: Shouldn't happen but just in case | ||
logger.warning(f"TaskData missing `cron` label: {td}") | ||
|
||
if cron_jobs_str := "\n- ".join(map(str, cron_jobs.values())): | ||
logger.info(f"Cron Jobs:\n- {cron_jobs_str}") | ||
|
||
while True: | ||
current_time = utc_now() | ||
if current_time.second < CRON_CHECK_SECONDS: | ||
# Print out current time every minute | ||
logger.info(f"Current Time: {current_time}") | ||
# NOTE: In the absence of any cron jobs, we still print out the current time | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. suggestion: make this debug. bit verbose and not really useful unless you're debugging. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was actually thinking this could be really useful to see for endusers, because they would have some live logging being created in scenarios where they are solely doing event log monitoring and nothing is really happening. Also very useful to know what the bot's local time is vs. the blockchain's, to see if there is some drift occuring. |
||
|
||
if task_data_to_kiq := [ | ||
td for td, cron in cron_jobs.items() if cron.is_ready(current_time) | ||
]: | ||
tasks = await asyncio.gather( | ||
*((self._create_task_kicker(td).kiq(current_time) for td in task_data_to_kiq)) | ||
) | ||
await asyncio.gather(*(self._handle_task(task) for task in tasks)) | ||
|
||
# Check crons multiple times a minute | ||
await asyncio.sleep(CRON_CHECK_SECONDS) | ||
|
||
async def run(self): | ||
""" | ||
Run the task broker client for the assembled ``SilverbackApp`` application. | ||
|
@@ -138,7 +171,7 @@ async def run(self): | |
raise StartupFailure("Worker SDK version too old, please rebuild") | ||
|
||
if not ( | ||
system_tasks := set(TaskType(task_name) for task_name in result.return_value.task_types) | ||
system_tasks := set(TaskType(task_type) for task_type in result.return_value.task_types) | ||
): | ||
raise StartupFailure("No system tasks detected, startup failure") | ||
# NOTE: Guaranteed to be at least one because of `TaskType.SYSTEM_CONFIG` | ||
|
@@ -152,6 +185,10 @@ async def run(self): | |
# `if TaskType.<SYSTEM_TASK_NAME> not in system_tasks: raise StartupFailure(...)` | ||
# or handle accordingly by having default logic if it is not available | ||
|
||
# NOTE: In case we want to add new task types, we can detect feature support | ||
supported_user_tasks = set( | ||
[TaskType(task_type) for task_type in result.return_value.task_types] | ||
) | ||
# Initialize recorder (if available) and fetch state if app has been run previously | ||
if self.recorder: | ||
await self.recorder.init(app_id=self.app.identifier) | ||
|
@@ -204,16 +241,31 @@ async def run(self): | |
if event_log_taskdata_results.is_err: | ||
raise StartupFailure(event_log_taskdata_results.error) | ||
|
||
if TaskType.CRON_JOB in supported_user_tasks: | ||
cron_job_taskdata_results = await run_taskiq_task_wait_result( | ||
self._create_system_task_kicker(TaskType.SYSTEM_USER_TASKDATA), TaskType.CRON_JOB | ||
) | ||
if cron_job_taskdata_results.is_err: | ||
raise StartupFailure(cron_job_taskdata_results.error) | ||
|
||
else: # Not supported for `TaskType.SYSTEM_USER_TASKDATA` | ||
# NOTE: This is just so that `.return_value` is a proper attribute for next line | ||
cron_job_taskdata_results = type( | ||
"MockAsyncTaskiqResult", (object,), dict(return_value=[]) | ||
)() | ||
|
||
if ( | ||
len(new_block_taskdata_results.return_value) | ||
== len(event_log_taskdata_results.return_value) | ||
== 0 # Both are empty | ||
== len(cron_job_taskdata_results.return_value) | ||
== 0 # All are empty | ||
): | ||
raise NoTasksAvailableError() | ||
|
||
# NOTE: Any propagated failure in here should be handled such that shutdown tasks also run | ||
# TODO: `asyncio.TaskGroup` added in Python 3.11 | ||
listener_tasks = ( | ||
asyncio.create_task(self._cron_task(cron_job_taskdata_results.return_value)), | ||
*( | ||
asyncio.create_task(self._block_task(task_def)) | ||
for task_def in new_block_taskdata_results.return_value | ||
|
@@ -232,9 +284,11 @@ async def run(self): | |
tasks_with_errors, tasks_running = await asyncio.wait( | ||
listener_tasks, return_when=asyncio.FIRST_EXCEPTION | ||
) | ||
if runtime_errors := "\n".join(str(task.exception()) for task in tasks_with_errors): | ||
if runtime_errors := "\n\n".join( | ||
"".join(traceback.format_exception(task.exception())) for task in tasks_with_errors | ||
): | ||
# NOTE: In case we are somehow not displaying the error correctly with task status | ||
logger.debug(f"Runtime error(s) detected, shutting down:\n{runtime_errors}") | ||
logger.warning(f"Runtime error(s) detected, shutting down\n{runtime_errors}") | ||
|
||
# Cancel any still running | ||
(task.cancel() for task in tasks_running) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I should actually test this...