Skip to content

Commit

Permalink
new: make sure to always catch exceptions in tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
Rafiot committed May 2, 2024
1 parent f9a517e commit 498a372
Show file tree
Hide file tree
Showing 6 changed files with 283 additions and 218 deletions.
3 changes: 2 additions & 1 deletion lacuscore/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from .lacuscore import LacusCore, CaptureStatus, CaptureResponse, CaptureResponseJson, CaptureSettings # noqa
from .lacuscore import LacusCore
from .helpers import CaptureStatus, CaptureResponse, CaptureResponseJson, CaptureSettings # noqa
from .lacus_monitoring import LacusCoreMonitoring # noqa

__all__ = [
Expand Down
102 changes: 102 additions & 0 deletions lacuscore/helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
#!/usr/bin/env python3

from __future__ import annotations

from enum import IntEnum, unique
from logging import LoggerAdapter
from typing import MutableMapping, Any, TypedDict

from playwrightcapture.capture import CaptureResponse as PlaywrightCaptureResponse


class LacusCoreException(Exception):
pass


class CaptureError(LacusCoreException):
pass


class RetryCapture(LacusCoreException):
pass


class CaptureSettingsError(LacusCoreException):
pass


class LacusCoreLogAdapter(LoggerAdapter): # type: ignore[type-arg]
"""
Prepend log entry with the UUID of the capture
"""
def process(self, msg: str, kwargs: MutableMapping[str, Any]) -> tuple[str, MutableMapping[str, Any]]:
if self.extra:
return '[{}] {}'.format(self.extra['uuid'], msg), kwargs
return msg, kwargs


@unique
class CaptureStatus(IntEnum):
'''The status of the capture'''
UNKNOWN = -1
QUEUED = 0
DONE = 1
ONGOING = 2


class CaptureResponse(PlaywrightCaptureResponse, TypedDict, total=False):
'''A capture made by Lacus. With the base64 encoded image and downloaded file decoded to bytes.'''

# Need to make sure the type is what's expected down the line
children: list[CaptureResponse] | None # type: ignore[misc]

status: int
runtime: float | None


class CaptureResponseJson(TypedDict, total=False):
'''A capture made by Lacus. With the base64 encoded image and downloaded file *not* decoded.'''

status: int
last_redirected_url: str | None
har: dict[str, Any] | None
cookies: list[dict[str, str]] | None
error: str | None
html: str | None
png: str | None
downloaded_filename: str | None
downloaded_file: str | None
children: list[CaptureResponseJson] | None
runtime: float | None
potential_favicons: list[str] | None


class CaptureSettings(TypedDict, total=False):
'''The capture settings that can be passed to Lacus.'''

url: str | None
document_name: str | None
document: str | None
browser: str | None
device_name: str | None
user_agent: str | None
proxy: str | dict[str, str] | None
general_timeout_in_sec: int | None
cookies: list[dict[str, Any]] | None
headers: str | dict[str, str] | None
http_credentials: dict[str, str] | None
geolocation: dict[str, float] | None
timezone_id: str | None
locale: str | None
color_scheme: str | None
viewport: dict[str, int] | None
referer: str | None
with_favicon: bool
allow_tracking: bool
force: bool
recapture_interval: int
priority: int
uuid: str | None

depth: int
rendered_hostname_only: bool # Note: only used if depth is > 0
116 changes: 16 additions & 100 deletions lacuscore/lacuscore.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,10 @@
from asyncio import Task
from base64 import b64decode, b64encode
from datetime import date, timedelta
from enum import IntEnum, unique
from ipaddress import ip_address, IPv4Address, IPv6Address
from logging import LoggerAdapter
from pathlib import Path
from tempfile import NamedTemporaryFile
from typing import Literal, Any, TypedDict, overload, cast, MutableMapping, Iterator
from typing import Literal, Any, overload, cast, Iterator
from uuid import uuid4
from urllib.parse import urlsplit

Expand All @@ -33,12 +31,16 @@

from defang import refang # type: ignore[import-untyped]
from playwrightcapture import Capture, PlaywrightCaptureException
from playwrightcapture.capture import CaptureResponse as PlaywrightCaptureResponse
from redis import Redis
from redis.exceptions import ConnectionError as RedisConnectionError
from redis.exceptions import DataError
from ua_parser import user_agent_parser # type: ignore[import-untyped]

from . import task_logger
from .helpers import (
LacusCoreLogAdapter, CaptureError, RetryCapture, CaptureSettingsError,
CaptureStatus, CaptureResponse, CaptureResponseJson, CaptureSettings)

if sys.version_info < (3, 11):
from async_timeout import timeout
else:
Expand All @@ -65,95 +67,6 @@ def _secure_filename(filename: str) -> str:
return filename


class LacusCoreException(Exception):
pass


class CaptureError(LacusCoreException):
pass


class RetryCapture(LacusCoreException):
pass


@unique
class CaptureStatus(IntEnum):
'''The status of the capture'''
UNKNOWN = -1
QUEUED = 0
DONE = 1
ONGOING = 2


class CaptureResponse(PlaywrightCaptureResponse, TypedDict, total=False):
'''A capture made by Lacus. With the base64 encoded image and downloaded file decoded to bytes.'''

# Need to make sure the type is what's expected down the line
children: list[CaptureResponse] | None # type: ignore[misc]

status: int
runtime: float | None


class CaptureResponseJson(TypedDict, total=False):
'''A capture made by Lacus. With the base64 encoded image and downloaded file *not* decoded.'''

status: int
last_redirected_url: str | None
har: dict[str, Any] | None
cookies: list[dict[str, str]] | None
error: str | None
html: str | None
png: str | None
downloaded_filename: str | None
downloaded_file: str | None
children: list[CaptureResponseJson] | None
runtime: float | None
potential_favicons: list[str] | None


class CaptureSettings(TypedDict, total=False):
'''The capture settings that can be passed to Lacus.'''

url: str | None
document_name: str | None
document: str | None
browser: str | None
device_name: str | None
user_agent: str | None
proxy: str | dict[str, str] | None
general_timeout_in_sec: int | None
cookies: list[dict[str, Any]] | None
headers: str | dict[str, str] | None
http_credentials: dict[str, str] | None
geolocation: dict[str, float] | None
timezone_id: str | None
locale: str | None
color_scheme: str | None
viewport: dict[str, int] | None
referer: str | None
with_favicon: bool
allow_tracking: bool
force: bool
recapture_interval: int
priority: int
uuid: str | None

depth: int
rendered_hostname_only: bool # Note: only used if depth is > 0


class LacusCoreLogAdapter(LoggerAdapter): # type: ignore[type-arg]
"""
Prepend log entry with the UUID of the capture
"""
def process(self, msg: str, kwargs: MutableMapping[str, Any]) -> tuple[str, MutableMapping[str, Any]]:
if self.extra:
return '[{}] {}'.format(self.extra['uuid'], msg), kwargs
return msg, kwargs


class LacusCore():
"""Capture URLs or web enabled documents using PlaywrightCapture.
Expand Down Expand Up @@ -368,7 +281,7 @@ def enqueue(self, *,
p.execute()
except DataError:
self.master_logger.exception(f'Unable to enqueue: {to_enqueue}')
raise LacusCoreException(f'Unable to enqueue: {to_enqueue}')
raise CaptureSettingsError(f'Unable to enqueue: {to_enqueue}')
return perma_uuid

def _encode_response(self, capture: CaptureResponse) -> CaptureResponseJson:
Expand Down Expand Up @@ -453,7 +366,10 @@ def consume_queue(self, max_consume: int) -> Iterator[Task]: # type: ignore[typ
max_consume -= 1
uuid: str = value[0][0].decode()
priority: int = int(value[0][1])
yield asyncio.create_task(self._capture(uuid, priority), name=uuid)
logger = LacusCoreLogAdapter(self.master_logger, {'uuid': uuid})
yield task_logger.create_task(self._capture(uuid, priority), name=uuid,
logger=logger,
message='Capture raised an uncaught exception')

async def _capture(self, uuid: str, priority: int) -> None:
"""Trigger a specific capture
Expand Down Expand Up @@ -508,11 +424,11 @@ async def _capture(self, uuid: str, priority: int) -> None:
elif k == 'document':
document_as_bytes = b64decode(v)
else:
raise LacusCoreException(f'Unexpected setting: {k}: {v}')
except LacusCoreException as e:
raise CaptureSettingsError(f'Unexpected setting: {k}: {v}')
except CaptureSettingsError as e:
raise e
except Exception as e:
raise LacusCoreException(f'Error while preparing settings: {e}')
raise CaptureSettingsError(f'Error while preparing settings: {e}')

if not to_capture:
all_entries = self.redis.hgetall(f'lacus:capture_settings:{uuid}')
Expand All @@ -523,7 +439,7 @@ async def _capture(self, uuid: str, priority: int) -> None:
# we do not have a URL yet.
name = to_capture.pop('document_name', None)
if not name:
raise LacusCoreException('No document name provided, settings are invalid')
raise CaptureSettingsError('No document name provided, settings are invalid')
if not Path(name).suffix:
# The browser will simply display the file as text if there is no extension.
# Just add HTML as a fallback, as it will be the most comon one.
Expand Down Expand Up @@ -688,7 +604,7 @@ async def _capture(self, uuid: str, priority: int) -> None:
else:
current_retry = int(_current_retry.decode())
if current_retry > 0:
logger.debug(f'Retrying {url} for the {self.max_retries-current_retry+1}th time.')
logger.debug(f'Retrying {url} for the {self.max_retries - current_retry + 1}th time.')
self.redis.decr(f'lacus:capture_retry:{uuid}')
retry = True
else:
Expand Down
59 changes: 59 additions & 0 deletions lacuscore/task_logger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
#!/usr/bin/env python3

from __future__ import annotations

from typing import Any, Coroutine, Optional, TypeVar, Tuple

import asyncio
import functools
import logging

from .helpers import LacusCoreLogAdapter

T = TypeVar('T')

# Code from https://quantlane.com/blog/ensure-asyncio-task-exceptions-get-logged/


def create_task(
coroutine: Coroutine[Any, Any, T],
*,
name: str,
logger: 'LacusCoreLogAdapter',
message: str,
message_args: Tuple[Any, ...] = (),
loop: Optional[asyncio.AbstractEventLoop] = None,

) -> 'asyncio.Task[T]': # This type annotation has to be quoted for Python < 3.9, see https://www.python.org/dev/peps/pep-0585/
'''
This helper function wraps a ``loop.create_task(coroutine())`` call and ensures there is
an exception handler added to the resulting task. If the task raises an exception it is logged
using the provided ``logger``, with additional context provided by ``message`` and optionally
``message_args``.
'''
if loop is None:
loop = asyncio.get_running_loop()
task = loop.create_task(coroutine, name=name)
task.add_done_callback(
functools.partial(_handle_task_result, logger=logger, message=message, message_args=message_args)
)
return task


def _handle_task_result(
task: asyncio.Task[Any],
*,
logger: logging.Logger,
message: str,
message_args: Tuple[Any, ...] = (),
) -> None:
try:
task.result()
except asyncio.CancelledError:
pass # Task cancellation should not be logged as an error.
except asyncio.TimeoutError:
pass # Timeout is also fine
# Ad the pylint ignore: we want to handle all exceptions here so that the result of the task
# is properly logged. There is no point re-raising the exception in this callback.
except Exception: # pylint: disable=broad-except
logger.exception(message, *message_args)
Loading

0 comments on commit 498a372

Please sign in to comment.