diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 00b898a..b32fa16 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -14,14 +14,19 @@ repos: hooks: - id: flake8 name: flake8 - args: ["--max-line-length=82", "--per-file-ignores=__init__.py:F401", "--exclude=tests/*"] + args: [ + "--max-line-length=82", + "--per-file-ignores=__init__.py:F401", + "--exclude=tests/*", + "--exclude=examples/*", + ] - repo: https://github.com/pre-commit/mirrors-mypy rev: v1.0.1 hooks: - id: mypy name: mypy - exclude: ^tests/ + exclude: ^tests/|^examples/ args: [--ignore-missing-imports ] language: python language_version: python3.11 diff --git a/README.md b/README.md index 4c417b8..67d3898 100644 --- a/README.md +++ b/README.md @@ -63,6 +63,7 @@ to your .env file, overwise default values would be taken 1) Implement and connect your business logic and db methods inside this Crud class. [NOTE]: Check `example` directory for a mongo implementation example. +[Click here](examples) crud.py ```python diff --git a/examples/crud.py b/examples/crud.py new file mode 100644 index 0000000..99c641a --- /dev/null +++ b/examples/crud.py @@ -0,0 +1,119 @@ +import copy +from typing import Any, Tuple +from uuid import uuid4 + +from py_ocpi.core.crud import Crud +from py_ocpi.core.enums import ModuleID, RoleEnum, Action + +from .db_interface import DbInterface + + +class CrudExample(Crud): + @classmethod + async def get( + cls, module: ModuleID, role: RoleEnum, id, *args, **kwargs + ) -> Any: + version = kwargs["version"] + party_id = kwargs.get("party_id") + country_code = kwargs.get("country_code") + command_data = kwargs.get("command_data") + return await DbInterface.get(module, id, *args, **kwargs) + + @classmethod + async def list( + cls, module: ModuleID, role: RoleEnum, filters: dict, *args, **kwargs + ) -> Tuple[list, int, bool]: + version = kwargs["version"] + party_id = kwargs.get("party_id") + country_code = kwargs.get("country_code") + data_list = await DbInterface.get_all(module, filters, *args, **kwargs) + total = await DbInterface.count(module, filters, *args, **kwargs) + is_last_page = await DbInterface.is_last_page( + module, filters, total, *args, **kwargs + ) + return data_list, total, is_last_page + + @classmethod + async def create( + cls, module: ModuleID, role: RoleEnum, data: dict, *args, **kwargs + ) -> Any: + version = kwargs["version"] + auth_token = kwargs["auth_token"] + command = kwargs.get("command") + party_id = kwargs.get("party_id") + country_code = kwargs.get("country_code") + token_type = kwargs.get("token_type") + if module == ModuleID.credentials_and_registration: + # It's advised to save somewhere in separate table token B sent by client: + integration_data = copy.deepcopy(data["credentials"]) + integration_data["endpoints"] = data.pop("endpoints") + integration_data["credentials_id"] = auth_token + await DbInterface.create( + "integration", integration_data, *args, **kwargs + ) + + # It's advised to re-create token A after it was used for register purpose + token_a = uuid4() # TODO: Don't forget to save it! + + return await DbInterface.create(module, data, *args, **kwargs) + + @classmethod + async def update( + cls, module: ModuleID, role: RoleEnum, data: dict, id, *args, **kwargs + ) -> Any: + version = kwargs["version"] + party_id = kwargs["party_id"] + country_code = kwargs.get("country_code") + token_type = kwargs.get("token_type") + match module: + case ModuleID.credentials_and_registration: + # re-create client credentials + await DbInterface.update( + "integration", data, id, *args, **kwargs + ) + + # Generate new token_c instead the one client used + new_token_с = uuid4() + data = {"token": new_token_с} + + return await DbInterface.update(module, data, id, *args, **kwargs) + + @classmethod + async def delete( + cls, module: ModuleID, role: RoleEnum, id, *args, **kwargs + ): + version = kwargs["version"] + if module.credentials_and_registration: + # Make sure to delete corresponding token_b given you by client + await DbInterface.delete("integration", id, *args, **kwargs) + await DbInterface.delete(module, id, *args, **kwargs) + + @classmethod + async def do( + cls, + module: ModuleID, + role: RoleEnum, + action: Action, + *args, + data: dict = None, + **kwargs, + ) -> Any: + """CRUD DO.""" + auth_token = kwargs["auth_token"] + + match action: + case Action.get_client_token: + token_b = await DbInterface.get( + "integration", auth_token, *args, **kwargs + ) + return token_b["token"] + case Action.authorize_token: + # TODO: implement token authorization and return result + # of AuthorizationInfo type + return {} + case Action.send_command: + # TODO: implement logic of send command action, + # check validity of the token data, check token, send command + # to chargepoint, save token in db + # return result of CommandResponse type + return None diff --git a/examples/db_interface.py b/examples/db_interface.py new file mode 100644 index 0000000..8fa1ad9 --- /dev/null +++ b/examples/db_interface.py @@ -0,0 +1,137 @@ +from motor import motor_asyncio # TODO: pip install motor + +from py_ocpi.core.enums import ModuleID + +client = motor_asyncio.AsyncIOMotorClient("db_url") +db = client.ocpi_database + + +class DbInterface: + MODULE_MAP = { + ModuleID.credentials_and_registration: "credentials_table", + ModuleID.locations: "locations_table", + ModuleID.cdrs: "cdrs_table", + ModuleID.tokens: "tokens_table", + ModuleID.tariffs: "tariffs_table", + ModuleID.sessions: "sessions_table", + ModuleID.commands: "commands_table", + "integration": "integration_table", + } + + @classmethod + async def get(cls, module, id, *args, **kwargs) -> dict | None: + collection = cls.MODULE_MAP[module] + + match module: + case ModuleID.commands: + # TODO: implement query using your identification for commands + command_data = kwargs["comand_data"] + query = {} + case ModuleID.tokens: + query = {"uid": id} + case "integration": + query = {"credentials_id": id} + case _: + query = {"id": id} + return await db[collection].find_one(query) + + @classmethod + async def get_all(cls, module, filters, *args, **kwargs) -> list[dict]: + collection = cls.MODULE_MAP[module] + + offset = await cls.get_offset_filter(filters) + limit = await cls.get_limit_filter(filters) + + query = await cls.get_date_from_query(filters) + query |= await cls.get_date_to_query(filters) + + return ( + await db[collection] + .find( + query, + ) + .sort("_id") + .skip(offset) + .limit(limit) + .to_list(None) + ) + + @classmethod + async def create(cls, module, data, *args, **kwargs) -> dict: + collection = cls.MODULE_MAP[module] + + return await db[collection].insert_one(data) + + @classmethod + async def update(cls, module, data, id, *args, **kwargs) -> dict: + collection = cls.MODULE_MAP[module] + + match module: + case ModuleID.tokens: + token_type = kwargs.get("token_type") + query = {"uid": id} + if token_type: + query |= {"token_type": token_type} + case "integration": + query = {"credentials_id": id} + case ModuleID.credentials_and_registration: + query = {"token": id} + case _: + query = {"id": id} + + return await db[collection].update_one(query, {"$set": data}) + + @classmethod + async def delete(cls, module, id, *args, **kwargs) -> None: + collection = cls.MODULE_MAP[module] + if module == ModuleID.credentials_and_registration: + query = {"token": id} + else: + query = {"id": id} + await db[collection].delete_one(query) + + @classmethod + async def count(cls, module, filters, *args, **kwargs) -> int: + collection = cls.MODULE_MAP[module] + + query = await cls.get_date_from_query(filters) + query |= await cls.get_date_to_query(filters) + + total = db[collection].count_documents(query) + return total + + @classmethod + async def is_last_page( + cls, module, filters, total, *args, **kwargs + ) -> bool: + offset = await cls.get_offset_filter(filters) + limit = await cls.get_limit_filter(filters) + return offset + limit >= total if limit else True + + @classmethod + async def get_offset_filter(cls, filters: dict) -> int: + return filters.get("offset", 0) + + @classmethod + async def get_limit_filter(cls, filters: dict) -> int: + return filters.get("limit", 0) + + @classmethod + async def get_date_from_query(cls, filters: dict) -> int: + query = {} + date_to = filters.get("date_to") + if date_to: + query.setdefault("last_updated", {}).update( + {"$lte": date_to.isoformat()} + ) + return query + + @classmethod + async def get_date_to_query(cls, filters: dict) -> int: + query = {} + date_from = filters.get("date_from") + if date_from: + query.setdefault("last_updated", {}).update( + {"$gte": date_from.isoformat()} + ) + return query