Skip to content

Commit

Permalink
Tmp commit async
Browse files Browse the repository at this point in the history
  • Loading branch information
sveinugu committed Oct 6, 2024
1 parent 3111af9 commit d6d1d4b
Show file tree
Hide file tree
Showing 12 changed files with 130 additions and 18 deletions.
4 changes: 4 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ httpx = "^0.26.0"
pydantic = {version = "<2", extras = ["email"]}
griffe = "<1"
ipython = ">=7.34.0"
aiohttp = "^3.10.6"
tenacity = "^9.0.0"
anyio = {extras = ["trio"], version = "^4.6.0"}

[tool.poetry.group.dev.dependencies]
deepdiff = "^6.2.1"
Expand All @@ -56,6 +59,7 @@ pre-commit = "^3.8.0"
pytest-mypy-plugins = "^3.1.2"
devtools = "^0.12.2"
mypy = "^1.11.2"
pytest-aiohttp = "^1.0.5"

[tool.poetry.group.docs.dependencies]
portray = {git = "https://github.com/HeinrichAD/portray.git"}
Expand Down
4 changes: 2 additions & 2 deletions src/omnipy/api/protocols/private/compute/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def _revise(self) -> 'IsJobTemplate':
def _call_job_template(self, *args: object, **kwargs: object) -> object:
...

def _call_job(self, *args: object, **kwargs: object) -> object:
async def _call_job(self, *args: object, **kwargs: object) -> object:
...


Expand All @@ -74,7 +74,7 @@ def time_of_cur_toplevel_flow_run(self) -> datetime | None:
def create_job(cls, *args: object, **kwargs: object) -> 'IsJob':
...

def __call__(self, *args: object, **kwargs: object) -> object:
async def __call__(self, *args: object, **kwargs: object) -> object:
...

def _apply_engine_decorator(self, engine: IsEngine) -> None:
Expand Down
10 changes: 6 additions & 4 deletions src/omnipy/compute/func_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

from omnipy.api.typedefs import GeneralDecorator
from omnipy.compute.job import JobBase
from omnipy.compute.mixins.auto_async import AutoAsyncJobBaseMixin
from omnipy.compute.mixins.func_signature import SignatureFuncJobBaseMixin
from omnipy.compute.mixins.iterate import IterateFuncJobBaseMixin
from omnipy.compute.mixins.params import ParamsFuncJobBaseMixin
Expand All @@ -20,17 +21,17 @@ def _get_init_args(self) -> tuple[object, ...]:
def has_coroutine_func(self) -> bool:
return asyncio.iscoroutinefunction(self._job_func)

def _call_job(self, *args: object, **kwargs: object) -> object:
async def _call_job(self, *args: object, **kwargs: object) -> object:
"""To be overloaded by mixins"""
return self._call_func(*args, **kwargs)
return await self._call_func(*args, **kwargs)

def _call_func(self, *args: object, **kwargs: object) -> object:
async def _call_func(self, *args: object, **kwargs: object) -> object:
"""
To be decorated by job runners and mixins that need early application. Should not
be overloaded using inheritance. The method _accept_call_func_decorator accepts
decorators.
"""
return self._job_func(*args, **kwargs)
return await self._job_func(*args, **kwargs)

def _accept_call_func_decorator(self, call_func_decorator: GeneralDecorator) -> None:
self._call_func = call_func_decorator(self._call_func) # type:ignore
Expand All @@ -41,6 +42,7 @@ class FuncArgJobBase(PlainFuncArgJobBase):
...


FuncArgJobBase.accept_mixin(AutoAsyncJobBaseMixin)
FuncArgJobBase.accept_mixin(SignatureFuncJobBaseMixin)
FuncArgJobBase.accept_mixin(IterateFuncJobBaseMixin)
FuncArgJobBase.accept_mixin(SerializerFuncJobBaseMixin)
Expand Down
6 changes: 3 additions & 3 deletions src/omnipy/compute/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ def _call_job_template(self, *args: object, **kwargs: object) -> object:

raise TypeError(f"'{self.__class__.__name__}' object is not callable. Try .run() method")

def _call_job(self, *args: object, **kwargs: object) -> object:
async def _call_job(self, *args: object, **kwargs: object) -> object:
pass

def _check_engine(self, engine_protocol: Type):
Expand Down Expand Up @@ -229,11 +229,11 @@ def revise(self) -> IsJobTemplate:
update_wrapper(job_template, self, updated=[])
return job_template

def __call__(self, *args: object, **kwargs: object) -> object:
async def __call__(self, *args: object, **kwargs: object) -> object:
self_as_job_base = cast(IsJobBase, self)

try:
return self_as_job_base._call_job(*args, **kwargs)
return await self_as_job_base._call_job(*args, **kwargs)
except Exception as e:
self_as_job_base.log(str(e), level=logging.ERROR)
raise
Expand Down
28 changes: 28 additions & 0 deletions src/omnipy/compute/mixins/auto_async.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# from inspect import iscoroutine
from typing import cast

from omnipy.api.protocols.private.compute.job import IsJobBase

# import anyio


class AutoAsyncJobBaseMixin:
def __init__(
self,
*,
auto_async: bool = True,
):
self._auto_async = auto_async

@property
def auto_async(self) -> str | None:
return self._auto_async

async def _call_job(self, *args: object, **kwargs: object) -> object:
super_as_job_base = cast(IsJobBase, super())
res = await super_as_job_base._call_job(*args, **kwargs)
print('res', res)
# if self.auto_async:
# if not iscoroutine(res):
# return anyio.to_thread.run_sync(lambda: res)
return res
6 changes: 4 additions & 2 deletions src/omnipy/compute/mixins/params.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def fixed_params(self) -> MappingProxyType[str, object]:
def param_key_map(self) -> MappingProxyType[str, str]:
return MappingProxyType(self._param_key_mapper.key_map)

def _call_job(self, *args: object, **kwargs: object) -> object:
async def _call_job(self, *args: object, **kwargs: object) -> object:
self_as_name_job_base_mixin = cast(NameJobBaseMixin, self)
self_as_signature_func_job_base_mixin = cast(SignatureFuncJobBaseMixin, self)

Expand All @@ -47,7 +47,9 @@ def _call_job(self, *args: object, **kwargs: object) -> object:
tuple(set(kwargs.keys()) - set(mapped_kwargs.keys()))))

super_as_job_base = cast(IsJobBase, super())
result = super_as_job_base._call_job(*args, **mapped_fixed_params, **mapped_kwargs)
result = await super_as_job_base._call_job(*args,
**mapped_fixed_params,
**mapped_kwargs)

except TypeError as e:
if str(e).startswith('Incorrect job function arguments'):
Expand Down
4 changes: 2 additions & 2 deletions src/omnipy/compute/mixins/result_key.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ def __init__(self, *, result_key: str | None = None):
def result_key(self) -> str | None:
return self._result_key

def _call_job(self, *args: object, **kwargs: object) -> object:
async def _call_job(self, *args: object, **kwargs: object) -> object:
super_as_job_base = cast(IsJobBase, super())
result = super_as_job_base._call_job(*args, **kwargs)
result = await super_as_job_base._call_job(*args, **kwargs)

if self._result_key:
return {self._result_key: result}
Expand Down
17 changes: 17 additions & 0 deletions tests/compute/cases/raw/functions.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
from random import random
import time

import anyio

from omnipy.data.dataset import Dataset
from omnipy.data.model import Model

Expand Down Expand Up @@ -49,3 +54,15 @@ def all_data_files_plus_str_func(dataset: Dataset[Model[int]], number: int) -> D

def single_data_file_plus_str_func(data_number: Model[int], number: int) -> str:
return str(data_number + number)


async def async_sleep_random_time_func() -> float:
seconds: float = random() / 10
await anyio.sleep(seconds)
return seconds


def sync_sleep_random_time_func() -> float:
seconds: float = random() / 10
time.sleep(seconds)
return seconds
6 changes: 3 additions & 3 deletions tests/compute/helpers/mocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class MockJobSubclass(JobMixin, JobBase):
def _get_job_template_subcls_for_revise(cls) -> Type[MockJobTemplateSubclass]:
return MockJobTemplateSubclass

def _call_job(self, *args: object, **kwargs: object) -> object:
async def _call_job(self, *args: object, **kwargs: object) -> object:
...


Expand Down Expand Up @@ -281,14 +281,14 @@ def persisted_time_of_cur_toplevel_flow_run(self) -> list[datetime]:
def reset_persisted_time_of_cur_toplevel_flow_run(cls) -> None:
cls._persisted_time_of_cur_toplevel_flow_run.clear()

def _call_func(self, *args: object, **kwargs: object) -> object:
async def _call_func(self, *args: object, **kwargs: object) -> object:
if self.persisted_time_of_cur_toplevel_flow_run:
assert self.persisted_time_of_cur_toplevel_flow_run == \
self.time_of_cur_toplevel_flow_run
else:
self._persisted_time_of_cur_toplevel_flow_run.append(self.time_of_cur_toplevel_flow_run)

return super()._call_func(*args, **kwargs)
return await super()._call_func(*args, **kwargs)


class IsMockTaskTemplateAssertSameTimeOfCurFlowRun(
Expand Down
44 changes: 44 additions & 0 deletions tests/compute/mixins/test_auto_async.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
from inspect import iscoroutine

import pytest
import pytest_cases as pc

from omnipy import TaskTemplate

from ..cases.raw.functions import async_sleep_random_time_func, sync_sleep_random_time_func


@pc.parametrize('async_task', [False, True], ids=['sync_task', 'async_task'])
@pytest.mark.anyio
def test_synchronously_run_task_with_auto_async(async_task: bool) -> None:
task_func = async_sleep_random_time_func if async_task else sync_sleep_random_time_func

_assert_synchronizity_of_task_is_same_as_task_func(async_task, task_func)

task_auto = TaskTemplate(auto_async=True)(task_func).apply()
seconds_auto = task_auto()
assert not iscoroutine(seconds_auto)
assert seconds_auto <= 0.1


@pc.parametrize('async_task', [False, True], ids=['sync_task', 'async_task'])
@pytest.mark.anyio
async def test_asynchronously_run_task_with_auto_async(async_task: bool) -> None:
task_func = async_sleep_random_time_func if async_task else sync_sleep_random_time_func

_assert_synchronizity_of_task_is_same_as_task_func(async_task, task_func)

task_auto = TaskTemplate(auto_async=True)(task_func).apply()
seconds_auto = task_auto()
assert iscoroutine(seconds_auto)
assert await seconds_auto <= 0.1


def _assert_synchronizity_of_task_is_same_as_task_func(async_task, task_func):
task_no_auto = TaskTemplate(auto_async=False)(task_func).apply()
seconds_no_auto = task_no_auto()
if async_task:
assert iscoroutine(seconds_no_auto)
else:
assert not iscoroutine(seconds_no_auto)
assert seconds_no_auto <= 0.1
4 changes: 2 additions & 2 deletions tests/engine/helpers/mocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ def regenerate_unique_name(self) -> None:
def __call__(self, *args: object, **kwargs: object) -> Any:
return self._call_func(*args, **kwargs)

def _call_func(self, *args: object, **kwargs: object) -> Any:
return self._func(*args, **kwargs)
async def _call_func(self, *args: object, **kwargs: object) -> Any:
return await self._func(*args, **kwargs)

def has_coroutine_func(self) -> bool:
return asyncio.iscoroutinefunction(self._func)
Expand Down
15 changes: 15 additions & 0 deletions tests/modules/general/test_tasks.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from typing import Generic

from aiohttp import web
from typing_extensions import TypeVar

from omnipy.data.dataset import Dataset
Expand Down Expand Up @@ -84,3 +85,17 @@ def test_convert_dataset_with_params() -> None:
ints = convert_dataset.run(floats, dataset_cls=RoundToNearestIntDataset)
assert isinstance(ints, RoundToNearestIntDataset)
assert ints.to_data() == dict(a=1, b=4)


async def text_endpoint(request):
return web.Response(body='I kjøleskapet hjemme er det øl!'.encode('utf-8'))


def create_app():
app = web.Application()
app.router.add_route('GET', '/text', text_endpoint)
return app


def test_get_endpoint_data() -> None:
...

0 comments on commit d6d1d4b

Please sign in to comment.