Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make event validation resilient to unretrievable state #684

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/release.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# This workflow tests and releases a new version of the package before publishing it to PyPi.
# This workflow tests and releases a new version of the package before publishing it to PyPI.

name: Release

Expand Down
20 changes: 14 additions & 6 deletions docs/contributing.md
Original file line number Diff line number Diff line change
@@ -1,21 +1,23 @@
# Contributing

## Developing the SDK

- We follow a test-driven development (TDD) approach and require a high test coverage of each PR.
- We use [`pre-commit`](https://pre-commit.com/) to apply consistent code quality checks and linting to new code, commit messages, and documentation - see [below](#pre-commit) for how to set this up
- Documentation is automatically built by `pre-commit` but needs to be updated with any changes to public interface of the package


## Releases

We use continuous deployment and semantic versioning for our releases:

- Continuous deployment - each pull request into `main` constitutes a new version
- [Semantic versioning](https://semver.org/) supported by [Conventional Commits](https://github.com/octue/conventional-commits) - to automate meaningful version numbering
- Conventional Commit messages - these are essential for the above to be automated. We've developed a `pre-commit` check that guides and enforces this.


## Pull requests

### Internal developers

1. Check out a new branch
2. Create a pull request into the `main` branch
3. Undertake your changes, committing and pushing to your branch
Expand All @@ -24,14 +26,16 @@ We use continuous deployment and semantic versioning for our releases:
6. Ensure your code meets the style guidelines (`pre-commit` checks will fail otherwise)
7. Address any review comments on the PR
8. Ensure the version in `pyproject.toml` is correct and satisfies the GitHub workflow check
9. Merge into `main`. A release will automatically be created on GitHub and published to PyPi and Docker Hub.
9. Merge into `main`. A release will automatically be created on GitHub and published to PyPI and Docker Hub.

### External developers

- Please [raise an issue](https://github.com/octue/octue-sdk-python/issues) (or add your $0.02 to an existing issue) so
the maintainers know what's happening and can advise/steer you.

- Create a fork of `octue-sdk-python`, undertaking your changes on a new branch, (see `.pre-commit-config.yaml` for
branch naming conventions). To run tests and make commits, you'll need to do something like:

```
git clone <your_forked_repo_address> # Fetches the repo to your local machine
cd octue-sdk-python # Move into the repo directory
Expand All @@ -48,9 +52,10 @@ We use continuous deployment and semantic versioning for our releases:
- Once checks have passed, test coverage of the new code is 100%, documentation is updated, and the review has passed,
we'll merge and release your changes.


## Pre-Commit

You need to install pre-commit to get the hooks working. Run:

```
pip install pre-commit
pre-commit install && pre-commit install -t commit-msg
Expand All @@ -74,16 +79,18 @@ Upon failure, the commit will halt. **Re-running the commit will automatically f
- Commit messages - the error messages should explain how to fix these too

You can run pre-commit hooks without making a commit, too, like:

```
pre-commit run black --all-files
```

or

```
# -v gives verbose output, useful for figuring out why docs won't build
pre-commit run build-docs -v
```


## Documentation

### Building documents automatically
Expand All @@ -93,19 +100,20 @@ The documentation will build automatically in a pre-configured environment when
In fact, the way `pre-commit` works, you won't be allowed to make the commit unless the documentation builds. This way
we avoid getting broken documentation pushed to the main repository on any commit so we can rely on builds working.


### Building documents manually

**If you did need to build the documentation**

Install `doxygen`. On a mac, that's `brew install doxygen`; other systems may differ.

Install sphinx and other requirements for building the docs:

```
pip install -r docs/requirements.txt
```

Run the build process:

```
sphinx-build -b html docs/source docs/html
```
28 changes: 19 additions & 9 deletions octue/cloud/events/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import time
from datetime import datetime

import referencing.exceptions

from octue.cloud import EXCEPTIONS_MAPPING
from octue.cloud.events.validation import SERVICE_COMMUNICATION_SCHEMA, is_event_valid
from octue.definitions import GOOGLE_COMPUTE_PROVIDERS
Expand Down Expand Up @@ -119,15 +121,23 @@ def _extract_and_validate_event(self, container):
recipient = attributes.get("recipient")
child_sdk_version = attributes.get("sender_sdk_version")

if self.validate_events and not is_event_valid(
event=event,
attributes=attributes,
recipient=recipient,
parent_sdk_version=PARENT_SDK_VERSION,
child_sdk_version=child_sdk_version,
schema=self.schema,
):
return (None, None)
try:
if self.validate_events and not is_event_valid(
event=event,
attributes=attributes,
recipient=recipient,
parent_sdk_version=PARENT_SDK_VERSION,
child_sdk_version=child_sdk_version,
schema=self.schema,
):
return (None, None)
except referencing.exceptions.Unretrievable:
logger.warning(
"Retrieving the event validation schema failed. Disabling event validation for the rest of this event "
"stream.",
exc_info=True,
)
self.validate_events = False

logger.debug(
"%r: Received an event related to question %r.",
Expand Down
56 changes: 38 additions & 18 deletions octue/cloud/events/validation.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import functools
import logging

import jsonschema
Expand Down Expand Up @@ -26,7 +27,7 @@
# Instantiate a JSON schema validator to cache the service communication schema. This avoids downloading it from the
# registry every time a message is validated against it.
jsonschema.Draft202012Validator.check_schema(SERVICE_COMMUNICATION_SCHEMA)
jsonschema_validator = jsonschema.Draft202012Validator(SERVICE_COMMUNICATION_SCHEMA)
cached_validator = jsonschema.Draft202012Validator(SERVICE_COMMUNICATION_SCHEMA)

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -69,30 +70,49 @@ def raise_if_event_is_invalid(event, attributes, recipient, parent_sdk_version,
:raise jsonschema.ValidationError: if the event or its attributes are invalid
:return None:
"""
# Transform attributes to a dictionary in the case they're a different kind of mapping.
data = {"event": event, "attributes": dict(attributes)}

if schema is None:
schema = SERVICE_COMMUNICATION_SCHEMA

try:
# If the schema is the official service communication schema, use the cached validator.
if schema == SERVICE_COMMUNICATION_SCHEMA:
jsonschema_validator.validate(data)
validator = _get_validator(schema)

# Otherwise, use uncached validation.
else:
jsonschema.validate(data, schema)
# Transform attributes to a dictionary in the case they're a different kind of mapping.
data = {"event": event, "attributes": dict(attributes)}

try:
validator(data)

except jsonschema.ValidationError as error:
warn_if_incompatible(parent_sdk_version=parent_sdk_version, child_sdk_version=child_sdk_version)

logger.exception(
"%r received an event that doesn't conform with version %s of the service communication schema (%s): %r.",
recipient,
SERVICE_COMMUNICATION_SCHEMA_VERSION,
SERVICE_COMMUNICATION_SCHEMA_INFO_URL,
event,
)
if schema == SERVICE_COMMUNICATION_SCHEMA:
logger.exception(
"%r received an event that doesn't conform with version %s of the service communication schema (%s): "
"%r.",
recipient,
SERVICE_COMMUNICATION_SCHEMA_VERSION,
SERVICE_COMMUNICATION_SCHEMA_INFO_URL,
event,
)
else:
logger.exception(
"%r received an event that doesn't conform with the provided event schema: %r",
recipient,
event,
)

raise error


def _get_validator(schema):
"""If the schema is the official service communication schema, get the cached schema validator; otherwise, get the
uncached validator.

:param dict schema: the schema to validate events and their attributes against
:return callable: the `validate` function of the schema validator or `jsonschema` module
"""
# If the schema is the official service communication schema, use the cached validator.
if schema == SERVICE_COMMUNICATION_SCHEMA:
return cached_validator.validate

# Otherwise, use uncached validation.
return functools.partial(jsonschema.validate, schema=schema)
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "octue"
version = "0.60.2"
version = "0.60.3"
description = "A package providing template applications for data services, and a python SDK to the Octue API."
readme = "README.md"
authors = ["Marcus Lugg <marcus@octue.com>", "Thomas Clark <support@octue.com>"]
Expand Down
17 changes: 17 additions & 0 deletions tests/cloud/events/test_validation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import unittest

import jsonschema

from octue.cloud.events.validation import _get_validator, cached_validator, SERVICE_COMMUNICATION_SCHEMA


class TestGetValidator(unittest.TestCase):
def test_cached_validator_returned_if_event_schema_is_official(self):
"""Test that the cached validator is returned if the official event schema is provided."""
self.assertEqual(_get_validator(schema=SERVICE_COMMUNICATION_SCHEMA), cached_validator.validate)

def test_uncached_validator_returned_if_custom_event_schema_provided(self):
"""Test that the uncached validator is returned if a custom event schema is provided."""
validator = _get_validator(schema={})
self.assertIs(validator.func, jsonschema.validate)
self.assertEqual(validator.keywords, {"schema": {}})
Loading