Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: adds @cron task decorator #56

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions docs/userguides/development.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,19 @@ Inside of `handle_token_transfer_events` you can define any logic that you want
Again, you can return any serializable data structure from this function and that will be stored in the results database as a trackable metric for the execution of this handler.
Any errors you raise during this function will get captured by the client, and recorded as a failure to handle this `transfer` event log.

## Cron Tasks

You may also want to run some tasks according to a schedule, either for efficiency reasons or just that the task is not related to any chain-driven events.
You can do that with the `@cron` task decorator.

```python
@app.cron("* */1 * * *")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I should actually test this...

def every_hour():
...
```

For more information see [the linux handbook section on the crontab syntax](https://linuxhandbook.com/crontab/#understanding-crontab-syntax) or the [crontab.guru](https://crontab.guru/) generator.

## Startup and Shutdown

### Worker Events
Expand Down
8 changes: 8 additions & 0 deletions example.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from datetime import datetime
from typing import Annotated

from ape import chain
Expand Down Expand Up @@ -38,6 +39,13 @@ def worker_startup(state: TaskiqState): # NOTE: You need the type hint here
# raise Exception # NOTE: Any exception raised on worker startup aborts immediately


# You can run cron jobs in your apps (functions that execute at a regular time period)
# NOTE: Great for things like regular DB cleanups or producing metrics at regular intervals
@app.cron("*/2 * * * *")
def every_two_minutes(current_time_utc: datetime):
return {"crontime": current_time_utc}


# This is how we trigger off of new blocks
@app.on_(chain.blocks)
# NOTE: The type hint for block is `BlockAPI`, but we parse it using `EcosystemAPI`
Expand Down
19 changes: 18 additions & 1 deletion silverback/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

from .exceptions import ContainerTypeMismatchError, InvalidContainerTypeError
from .settings import Settings
from .types import SilverbackID, TaskType
from .types import CronSchedule, SilverbackID, TaskType


class SystemConfig(BaseModel):
Expand All @@ -32,6 +32,9 @@ class TaskData(BaseModel):

# NOTE: Any other items here must have a default value

def __hash__(self):
return hash(self.name) # NOTE: Name should be unique, okay for hashing


class SilverbackApp(ManagerAccessMixin):
"""
Expand Down Expand Up @@ -146,6 +149,7 @@ def broker_task_decorator(
self,
task_type: TaskType,
container: BlockContainer | ContractEvent | None = None,
cron_schedule: CronSchedule | None = None,
) -> Callable[[Callable], AsyncTaskiqDecoratedTask]:
"""
Dynamically create a new broker task that handles tasks of ``task_type``.
Expand Down Expand Up @@ -194,6 +198,9 @@ def add_taskiq_task(handler: Callable) -> AsyncTaskiqDecoratedTask:
labels["contract_address"] = contract_address
labels["event_signature"] = container.abi.signature

if task_type is TaskType.CRON_JOB:
labels["cron"] = str(cron_schedule)

self.tasks[task_type].append(TaskData(name=handler.__name__, labels=labels))

return self.broker.register_task(
Expand Down Expand Up @@ -309,3 +316,13 @@ def on_(
# TODO: Support account transaction polling
# TODO: Support mempool polling
raise InvalidContainerTypeError(container)

def cron(self, cron_schedule: str) -> Callable:
"""
Create task to run on a schedule.
Args:
schedule (str): A cron-like schedule string.
"""
return self.broker_task_decorator(
TaskType.CRON_JOB, cron_schedule=CronSchedule(cron=cron_schedule)
)
64 changes: 59 additions & 5 deletions silverback/runner.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import traceback
from abc import ABC, abstractmethod

from ape import chain
Expand All @@ -16,7 +17,7 @@
from .recorder import BaseRecorder, TaskResult
from .state import AppDatastore, AppState
from .subscriptions import SubscriptionType, Web3SubscriptionsManager
from .types import TaskType
from .types import CRON_CHECK_SECONDS, CronSchedule, TaskType, utc_now
from .utils import (
async_wrap_iter,
hexbytes_dict,
Expand Down Expand Up @@ -109,6 +110,38 @@ async def _event_task(self, task_data: TaskData):
handle an event handler task for the given contract event
"""

async def _cron_task(self, task_data: list[TaskData]):
cron_jobs: dict[TaskData, CronSchedule] = dict()

for td in task_data:
if cron_schedule := td.labels.get("cron"):
cron_jobs[td] = CronSchedule(cron=cron_schedule)

else:
# NOTE: Shouldn't happen but just in case
logger.warning(f"TaskData missing `cron` label: {td}")

if cron_jobs_str := "\n- ".join(map(str, cron_jobs.values())):
logger.info(f"Cron Jobs:\n- {cron_jobs_str}")

while True:
current_time = utc_now()
if current_time.second < CRON_CHECK_SECONDS:
# Print out current time every minute
logger.info(f"Current Time: {current_time}")
# NOTE: In the absence of any cron jobs, we still print out the current time
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: make this debug. bit verbose and not really useful unless you're debugging.

Copy link
Member

@fubuloubu fubuloubu Aug 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was actually thinking this could be really useful to see for endusers, because they would have some live logging being created in scenarios where they are solely doing event log monitoring and nothing is really happening. Also very useful to know what the bot's local time is vs. the blockchain's, to see if there is some drift occuring.


if task_data_to_kiq := [
td for td, cron in cron_jobs.items() if cron.is_ready(current_time)
]:
tasks = await asyncio.gather(
*((self._create_task_kicker(td).kiq(current_time) for td in task_data_to_kiq))
)
await asyncio.gather(*(self._handle_task(task) for task in tasks))

# Check crons multiple times a minute
await asyncio.sleep(CRON_CHECK_SECONDS)

async def run(self):
"""
Run the task broker client for the assembled ``SilverbackApp`` application.
Expand Down Expand Up @@ -138,7 +171,7 @@ async def run(self):
raise StartupFailure("Worker SDK version too old, please rebuild")

if not (
system_tasks := set(TaskType(task_name) for task_name in result.return_value.task_types)
system_tasks := set(TaskType(task_type) for task_type in result.return_value.task_types)
):
raise StartupFailure("No system tasks detected, startup failure")
# NOTE: Guaranteed to be at least one because of `TaskType.SYSTEM_CONFIG`
Expand All @@ -152,6 +185,10 @@ async def run(self):
# `if TaskType.<SYSTEM_TASK_NAME> not in system_tasks: raise StartupFailure(...)`
# or handle accordingly by having default logic if it is not available

# NOTE: In case we want to add new task types, we can detect feature support
supported_user_tasks = set(
[TaskType(task_type) for task_type in result.return_value.task_types]
)
# Initialize recorder (if available) and fetch state if app has been run previously
if self.recorder:
await self.recorder.init(app_id=self.app.identifier)
Expand Down Expand Up @@ -204,16 +241,31 @@ async def run(self):
if event_log_taskdata_results.is_err:
raise StartupFailure(event_log_taskdata_results.error)

if TaskType.CRON_JOB in supported_user_tasks:
cron_job_taskdata_results = await run_taskiq_task_wait_result(
self._create_system_task_kicker(TaskType.SYSTEM_USER_TASKDATA), TaskType.CRON_JOB
)
if cron_job_taskdata_results.is_err:
raise StartupFailure(cron_job_taskdata_results.error)

else: # Not supported for `TaskType.SYSTEM_USER_TASKDATA`
# NOTE: This is just so that `.return_value` is a proper attribute for next line
cron_job_taskdata_results = type(
"MockAsyncTaskiqResult", (object,), dict(return_value=[])
)()

if (
len(new_block_taskdata_results.return_value)
== len(event_log_taskdata_results.return_value)
== 0 # Both are empty
== len(cron_job_taskdata_results.return_value)
== 0 # All are empty
):
raise NoTasksAvailableError()

# NOTE: Any propagated failure in here should be handled such that shutdown tasks also run
# TODO: `asyncio.TaskGroup` added in Python 3.11
listener_tasks = (
asyncio.create_task(self._cron_task(cron_job_taskdata_results.return_value)),
*(
asyncio.create_task(self._block_task(task_def))
for task_def in new_block_taskdata_results.return_value
Expand All @@ -232,9 +284,11 @@ async def run(self):
tasks_with_errors, tasks_running = await asyncio.wait(
listener_tasks, return_when=asyncio.FIRST_EXCEPTION
)
if runtime_errors := "\n".join(str(task.exception()) for task in tasks_with_errors):
if runtime_errors := "\n\n".join(
"".join(traceback.format_exception(task.exception())) for task in tasks_with_errors
):
# NOTE: In case we are somehow not displaying the error correctly with task status
logger.debug(f"Runtime error(s) detected, shutting down:\n{runtime_errors}")
logger.warning(f"Runtime error(s) detected, shutting down\n{runtime_errors}")

# Cancel any still running
(task.cancel() for task in tasks_running)
Expand Down
79 changes: 78 additions & 1 deletion silverback/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from typing import Literal

from ape.logging import get_logger
from pydantic import BaseModel, Field, RootModel, ValidationError, model_validator
from pydantic import BaseModel, Field, RootModel, ValidationError, model_serializer, model_validator
from pydantic.functional_serializers import PlainSerializer
from typing_extensions import Annotated

Expand All @@ -21,6 +21,7 @@ class TaskType(str, Enum):
STARTUP = "user:startup"
NEW_BLOCK = "user:new-block"
EVENT_LOG = "user:event-log"
CRON_JOB = "user:cron-job"
SHUTDOWN = "user:shutdown"

def __str__(self) -> str:
Expand Down Expand Up @@ -48,6 +49,82 @@ def utc_now() -> datetime:
]


CRON_CHECK_SECONDS = 5


class CronSchedule(BaseModel):
minute: str
hour: str
day_month: str
month: str
day_week: str

def __init__(self, cron: str = "", **field_values):
if cron:
field_values = dict(zip(self.model_fields, cron.split(" ")))

super().__init__(**field_values)

@model_serializer
def create_cron_string(self) -> str:
return " ".join(map(lambda f: getattr(self, f), self.model_fields))

def __str__(self) -> str:
return self.create_cron_string()

def _check_value(self, val: str, current: int) -> bool:
if "/" in val:
val, step_str = val.split("/")
step = int(step_str)

else:
step = 1

if "-" in val:
start, stop = map(int, val.split("-"))
matches = list(range(start, stop + 1, step))

elif "," in val:
matches = list(map(int, val.split(",")))

elif val == "*":
return current % step == step - 1

else:
matches = [int(val)]

return current in matches

def is_ready(self, current_time: datetime) -> bool:
# Intersection/union "bug": https://crontab.guru/cron-bug.html
if self.day_month.startswith("*") or self.day_week.startswith("*"):
# Intersection (all must match)
return all(
[
abs(current_time.second)
< CRON_CHECK_SECONDS, # NOTE: Ensure close to :00 seconds
self._check_value(self.minute, current_time.minute),
self._check_value(self.hour, current_time.hour),
self._check_value(self.day_month, current_time.day),
self._check_value(self.month, current_time.month),
self._check_value(self.day_week, current_time.weekday() + 1),
]
)
else: # Union: only one of day/wk and day/mth must match
return all(
[
abs(current_time.second)
< CRON_CHECK_SECONDS, # NOTE: Ensure close to :00 seconds
self._check_value(self.minute, current_time.minute),
self._check_value(self.hour, current_time.hour),
self._check_value(self.month, current_time.month),
]
) and (
self._check_value(self.day_month, current_time.day)
or self._check_value(self.day_week, current_time.weekday() + 1)
)


class _BaseDatapoint(BaseModel):
type: str # discriminator

Expand Down
22 changes: 21 additions & 1 deletion tests/test_types.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,28 @@
from datetime import datetime, timedelta
from decimal import Decimal

import pytest

from silverback.types import Datapoints
from silverback.types import CRON_CHECK_SECONDS, CronSchedule, Datapoints


@pytest.mark.parametrize(
"cron_schedule,current_time_str",
[
("5 0 * 8 *", "2024-08-01 00:05"),
("0 22 * * 1-5", "2024-06-03 22:00"),
("23 0-20/2 * * *", "2024-06-03 20:23"),
("0 0,12 1 */2 *", "2024-07-01 00:00"),
("0 4 8-14 * *", "2024-06-08 04:00"),
("0 0 1,15 * 3", "2024-06-05 00:00"),
],
)
def test_cron_is_ready(cron_schedule, current_time_str):
current_time = datetime.fromisoformat(current_time_str)
cron = CronSchedule(cron=cron_schedule)
assert cron.is_ready(current_time)
current_time += timedelta(seconds=CRON_CHECK_SECONDS)
assert not cron.is_ready(current_time)


@pytest.mark.parametrize(
Expand Down
Loading