Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix #482: callback memory repo #484

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions pybotx/bot/bot.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,11 @@
BotXAPIUsersAsCSVRequestPayload,
UsersAsCSVMethod,
)
from pybotx.constants import BOTX_DEFAULT_TIMEOUT, STICKER_PACKS_PER_PAGE
from pybotx.constants import (
AUTODELETE_CALLBACK_DEFAULT_TIMEOUT,
BOTX_DEFAULT_TIMEOUT,
STICKER_PACKS_PER_PAGE,
)
from pybotx.converters import optional_sequence_to_list
from pybotx.image_validators import (
ensure_file_content_is_png,
Expand Down Expand Up @@ -264,6 +268,7 @@ def __init__(
httpx_client: Optional[httpx.AsyncClient] = None,
exception_handlers: Optional[ExceptionHandlersDict] = None,
default_callback_timeout: float = BOTX_DEFAULT_TIMEOUT,
autodete_callbacks_timeout: float = AUTODELETE_CALLBACK_DEFAULT_TIMEOUT,
callback_repo: Optional[CallbackRepoProto] = None,
) -> None:
if not collectors:
Expand All @@ -283,7 +288,7 @@ def __init__(
self._httpx_client = httpx_client or httpx.AsyncClient()

if not callback_repo:
callback_repo = CallbackMemoryRepo()
callback_repo = CallbackMemoryRepo(timeout=autodete_callbacks_timeout)

self._callbacks_manager = CallbackManager(callback_repo)

Expand Down
38 changes: 27 additions & 11 deletions pybotx/bot/callbacks/callback_memory_repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,43 +3,57 @@
from uuid import UUID

from pybotx.bot.callbacks.callback_repo_proto import CallbackRepoProto
from pybotx.bot.exceptions import BotShuttingDownError, BotXMethodCallbackNotFoundError
from pybotx.bot.exceptions import BotShuttingDownError
from pybotx.client.exceptions.callbacks import CallbackNotReceivedError
from pybotx.logger import logger
from pybotx.models.method_callbacks import BotXMethodCallback

if TYPE_CHECKING:
from asyncio import Future # noqa: WPS458


class CallbackMemoryRepo(CallbackRepoProto):
def __init__(self) -> None:
def __init__(self, timeout: float = 0) -> None:
self._callback_futures: Dict[UUID, "Future[BotXMethodCallback]"] = {}
self.timeout = timeout

async def create_botx_method_callback(self, sync_id: UUID) -> None:
self._callback_futures[sync_id] = asyncio.Future()
self._callback_futures.setdefault(sync_id, asyncio.Future())

async def set_botx_method_callback_result(
self,
callback: BotXMethodCallback,
) -> None:
sync_id = callback.sync_id

future = self._get_botx_method_callback(sync_id)
if sync_id not in self._callback_futures:
logger.warning(
f"Callback `{sync_id}` doesn't exist yet or already "
f"waited or timed out. Waiting for {self.timeout}s "
f"for it or will be ignored.",
)
self._callback_futures.setdefault(sync_id, asyncio.Future())
asyncio.create_task(self._wait_and_drop_orphan_callback(sync_id))

future = self._callback_futures[sync_id]
future.set_result(callback)

async def wait_botx_method_callback(
self,
sync_id: UUID,
timeout: float,
) -> BotXMethodCallback:
future = self._get_botx_method_callback(sync_id)
future = self._callback_futures[sync_id]

try:
return await asyncio.wait_for(future, timeout=timeout)
result = await asyncio.wait_for(future, timeout=timeout)
except asyncio.TimeoutError as exc:
del self._callback_futures[sync_id] # noqa: WPS420
raise CallbackNotReceivedError(sync_id) from exc

del self._callback_futures[sync_id] # noqa: WPS420
return result

async def pop_botx_method_callback(
self,
sync_id: UUID,
Expand All @@ -55,8 +69,10 @@ async def stop_callbacks_waiting(self) -> None:
),
)

def _get_botx_method_callback(self, sync_id: UUID) -> "Future[BotXMethodCallback]":
try:
return self._callback_futures[sync_id]
except KeyError:
raise BotXMethodCallbackNotFoundError(sync_id) from None
async def _wait_and_drop_orphan_callback(self, sync_id: UUID) -> None:
await asyncio.sleep(self.timeout)
if sync_id not in self._callback_futures:
return

self._callback_futures.pop(sync_id, None)
logger.debug(f"Callback `{sync_id}` was dropped")
1 change: 1 addition & 0 deletions pybotx/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@
MAX_NOTIFICATION_BODY_LENGTH: Final = 4096
MAX_FILE_LEN_IN_LOGS: Final = 64
BOTX_DEFAULT_TIMEOUT: Final = 60
AUTODELETE_CALLBACK_DEFAULT_TIMEOUT: Final = 30
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "pybotx"
version = "0.69.1"
version = "0.69.2"
description = "A python library for interacting with eXpress BotX API"
authors = [
"Sidnev Nikolay <nsidnev@ccsteam.ru>",
Expand Down
149 changes: 114 additions & 35 deletions tests/client/test_botx_method_callback.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,32 +108,48 @@ async def call_foo_bar(

async def test__botx_method_callback__callback_not_found(
bot_account: BotAccountWithSecret,
loguru_caplog: pytest.LogCaptureFixture,
) -> None:
# - Arrange -
built_bot = Bot(collectors=[HandlerCollector()], bot_accounts=[bot_account])
memory_repo = CallbackMemoryRepo(timeout=0.5)
built_bot = Bot(
collectors=[HandlerCollector()],
bot_accounts=[bot_account],
callback_repo=memory_repo,
)

# - Act -
async with lifespan_wrapper(built_bot) as bot:
with pytest.raises(BotXMethodCallbackNotFoundError) as exc:
await bot.set_raw_botx_method_result(
{
"status": "error",
"sync_id": "21a9ec9e-f21f-4406-ac44-1a78d2ccf9e3",
"reason": "chat_not_found",
"errors": [],
"error_data": {
"group_chat_id": "705df263-6bfd-536a-9d51-13524afaab5c",
"error_description": (
"Chat with id 705df263-6bfd-536a-9d51-13524afaab5c not found"
),
},
await bot.set_raw_botx_method_result(
{
"status": "error",
"sync_id": "21a9ec9e-f21f-4406-ac44-1a78d2ccf9e3",
"reason": "chat_not_found",
"errors": [],
"error_data": {
"group_chat_id": "705df263-6bfd-536a-9d51-13524afaab5c",
"error_description": (
"Chat with id 705df263-6bfd-536a-9d51-13524afaab5c not found"
),
},
verify_request=False,
)
},
verify_request=False,
)

# - Assert -
assert "Callback `21a9ec9e-f21f-4406-ac44-1a78d2ccf9e3` doesn't exist" in str(
exc.value,
assert (
"Callback `21a9ec9e-f21f-4406-ac44-1a78d2ccf9e3` doesn't exist"
in loguru_caplog.text
)
assert memory_repo._callback_futures.get(
UUID("21a9ec9e-f21f-4406-ac44-1a78d2ccf9e3"),
)

await asyncio.sleep(0.7)
# Drop callback after timeout
assert (
memory_repo._callback_futures.get(UUID("21a9ec9e-f21f-4406-ac44-1a78d2ccf9e3"))
is None
)


Expand Down Expand Up @@ -303,7 +319,12 @@ async def test__botx_method_callback__callback_received_after_timeout(
},
),
)
built_bot = Bot(collectors=[HandlerCollector()], bot_accounts=[bot_account])
memory_repo = CallbackMemoryRepo(timeout=0.5)
built_bot = Bot(
collectors=[HandlerCollector()],
bot_accounts=[bot_account],
callback_repo=memory_repo,
)

built_bot.call_foo_bar = types.MethodType(call_foo_bar, built_bot)

Expand All @@ -312,26 +333,28 @@ async def test__botx_method_callback__callback_received_after_timeout(
with pytest.raises(CallbackNotReceivedError) as not_received_exc:
await bot.call_foo_bar(bot_id, baz=1, callback_timeout=0)

with pytest.raises(BotXMethodCallbackNotFoundError) as not_found_exc:
await bot.set_raw_botx_method_result(
{
"status": "error",
"sync_id": "21a9ec9e-f21f-4406-ac44-1a78d2ccf9e3",
"reason": "quux_error",
"errors": [],
"error_data": {
"group_chat_id": "705df263-6bfd-536a-9d51-13524afaab5c",
"error_description": (
"Chat with id 705df263-6bfd-536a-9d51-13524afaab5c not found"
),
},
await bot.set_raw_botx_method_result(
{
"status": "error",
"sync_id": "21a9ec9e-f21f-4406-ac44-1a78d2ccf9e3",
"reason": "quux_error",
"errors": [],
"error_data": {
"group_chat_id": "705df263-6bfd-536a-9d51-13524afaab5c",
"error_description": (
"Chat with id 705df263-6bfd-536a-9d51-13524afaab5c not found"
),
},
verify_request=False,
)
},
verify_request=False,
)

# - Assert -
assert "hasn't been received" in str(not_received_exc.value)
assert "21a9ec9e-f21f-4406-ac44-1a78d2ccf9e3" in str(not_found_exc.value)
assert (
"Callback `21a9ec9e-f21f-4406-ac44-1a78d2ccf9e3` doesn't exist"
in loguru_caplog.text
)
assert endpoint.called


Expand Down Expand Up @@ -611,6 +634,62 @@ async def test__botx_method_callback__bot_wait_callback_after_its_receiving(
assert endpoint.called


async def test__botx_method_callback__callback_received_before_its_expecting(
respx_mock: MockRouter,
httpx_client: httpx.AsyncClient,
host: str,
bot_id: UUID,
bot_account: BotAccountWithSecret,
) -> None:
"""https://github.com/ExpressApp/pybotx/issues/482."""
# - Arrange -
endpoint = respx_mock.post(
f"https://{host}/foo/bar",
json={"baz": 1},
headers={"Content-Type": "application/json"},
).mock(
return_value=httpx.Response(
HTTPStatus.ACCEPTED,
json={
"status": "ok",
"result": {"sync_id": "21a9ec9e-f21f-4406-ac44-1a78d2ccf9e3"},
},
),
)
built_bot = Bot(
collectors=[HandlerCollector()],
bot_accounts=[bot_account],
httpx_client=httpx_client,
callback_repo=CallbackMemoryRepo(timeout=0.5),
)

built_bot.call_foo_bar = types.MethodType(call_foo_bar, built_bot)

# - Act -
async with lifespan_wrapper(built_bot) as bot:
await bot.set_raw_botx_method_result(
{
"sync_id": "21a9ec9e-f21f-4406-ac44-1a78d2ccf9e3",
"status": "ok",
"result": {},
},
verify_request=False,
)
foo_bar = await bot.call_foo_bar(bot_id, baz=1, wait_callback=False)

callback = await bot.wait_botx_method_callback(foo_bar)

await asyncio.sleep(1)

# - Assert -
assert callback == BotAPIMethodSuccessfulCallback(
sync_id=foo_bar,
status="ok",
result={},
)
assert endpoint.called


async def test__botx_method_callback__bot_dont_wait_received_callback(
respx_mock: MockRouter,
httpx_client: httpx.AsyncClient,
Expand Down
Loading