Skip to content

Commit

Permalink
docs: Add examples of crud implementation.
Browse files Browse the repository at this point in the history
  • Loading branch information
oleksandr-bozbei-ew committed Nov 7, 2023
1 parent b147496 commit 8c9bacb
Show file tree
Hide file tree
Showing 4 changed files with 264 additions and 2 deletions.
9 changes: 7 additions & 2 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,19 @@ repos:
hooks:
- id: flake8
name: flake8
args: ["--max-line-length=82", "--per-file-ignores=__init__.py:F401", "--exclude=tests/*"]
args: [
"--max-line-length=82",
"--per-file-ignores=__init__.py:F401",
"--exclude=tests/*",
"--exclude=examples/*",
]

- repo: https://github.com/pre-commit/mirrors-mypy
rev: v1.0.1
hooks:
- id: mypy
name: mypy
exclude: ^tests/
exclude: ^tests/|^examples/
args: [--ignore-missing-imports ]
language: python
language_version: python3.11
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ to your .env file, overwise default values would be taken
1) Implement and connect your business logic and db methods inside this Crud class.

[NOTE]: Check `example` directory for a mongo implementation example.
[Click here](examples)

crud.py
```python
Expand Down
119 changes: 119 additions & 0 deletions examples/crud.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
import copy
from typing import Any, Tuple
from uuid import uuid4

from py_ocpi.core.crud import Crud
from py_ocpi.core.enums import ModuleID, RoleEnum, Action

from .db_interface import DbInterface


class CrudExample(Crud):
@classmethod
async def get(
cls, module: ModuleID, role: RoleEnum, id, *args, **kwargs
) -> Any:
version = kwargs["version"]
party_id = kwargs.get("party_id")
country_code = kwargs.get("country_code")
command_data = kwargs.get("command_data")
return await DbInterface.get(module, id, *args, **kwargs)

@classmethod
async def list(
cls, module: ModuleID, role: RoleEnum, filters: dict, *args, **kwargs
) -> Tuple[list, int, bool]:
version = kwargs["version"]
party_id = kwargs.get("party_id")
country_code = kwargs.get("country_code")
data_list = await DbInterface.get_all(module, filters, *args, **kwargs)
total = await DbInterface.count(module, filters, *args, **kwargs)
is_last_page = await DbInterface.is_last_page(
module, filters, total, *args, **kwargs
)
return data_list, total, is_last_page

@classmethod
async def create(
cls, module: ModuleID, role: RoleEnum, data: dict, *args, **kwargs
) -> Any:
version = kwargs["version"]
auth_token = kwargs["auth_token"]
command = kwargs.get("command")
party_id = kwargs.get("party_id")
country_code = kwargs.get("country_code")
token_type = kwargs.get("token_type")
if module == ModuleID.credentials_and_registration:
# It's advised to save somewhere in separate table token B sent by client:
integration_data = copy.deepcopy(data["credentials"])
integration_data["endpoints"] = data.pop("endpoints")
integration_data["credentials_id"] = auth_token
await DbInterface.create(
"integration", integration_data, *args, **kwargs
)

# It's advised to re-create token A after it was used for register purpose
token_a = uuid4() # TODO: Don't forget to save it!

return await DbInterface.create(module, data, *args, **kwargs)

@classmethod
async def update(
cls, module: ModuleID, role: RoleEnum, data: dict, id, *args, **kwargs
) -> Any:
version = kwargs["version"]
party_id = kwargs["party_id"]
country_code = kwargs.get("country_code")
token_type = kwargs.get("token_type")
match module:
case ModuleID.credentials_and_registration:
# re-create client credentials
await DbInterface.update(
"integration", data, id, *args, **kwargs
)

# Generate new token_c instead the one client used
new_token_с = uuid4()
data = {"token": new_token_с}

return await DbInterface.update(module, data, id, *args, **kwargs)

@classmethod
async def delete(
cls, module: ModuleID, role: RoleEnum, id, *args, **kwargs
):
version = kwargs["version"]
if module.credentials_and_registration:
# Make sure to delete corresponding token_b given you by client
await DbInterface.delete("integration", id, *args, **kwargs)
await DbInterface.delete(module, id, *args, **kwargs)

@classmethod
async def do(
cls,
module: ModuleID,
role: RoleEnum,
action: Action,
*args,
data: dict = None,
**kwargs,
) -> Any:
"""CRUD DO."""
auth_token = kwargs["auth_token"]

match action:
case Action.get_client_token:
token_b = await DbInterface.get(
"integration", auth_token, *args, **kwargs
)
return token_b["token"]
case Action.authorize_token:
# TODO: implement token authorization and return result
# of AuthorizationInfo type
return {}
case Action.send_command:
# TODO: implement logic of send command action,
# check validity of the token data, check token, send command
# to chargepoint, save token in db
# return result of CommandResponse type
return None
137 changes: 137 additions & 0 deletions examples/db_interface.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
from motor import motor_asyncio # TODO: pip install motor

from py_ocpi.core.enums import ModuleID

client = motor_asyncio.AsyncIOMotorClient("db_url")
db = client.ocpi_database


class DbInterface:
MODULE_MAP = {
ModuleID.credentials_and_registration: "credentials_table",
ModuleID.locations: "locations_table",
ModuleID.cdrs: "cdrs_table",
ModuleID.tokens: "tokens_table",
ModuleID.tariffs: "tariffs_table",
ModuleID.sessions: "sessions_table",
ModuleID.commands: "commands_table",
"integration": "integration_table",
}

@classmethod
async def get(cls, module, id, *args, **kwargs) -> dict | None:
collection = cls.MODULE_MAP[module]

match module:
case ModuleID.commands:
# TODO: implement query using your identification for commands
command_data = kwargs["comand_data"]
query = {}
case ModuleID.tokens:
query = {"uid": id}
case "integration":
query = {"credentials_id": id}
case _:
query = {"id": id}
return await db[collection].find_one(query)

@classmethod
async def get_all(cls, module, filters, *args, **kwargs) -> list[dict]:
collection = cls.MODULE_MAP[module]

offset = await cls.get_offset_filter(filters)
limit = await cls.get_limit_filter(filters)

query = await cls.get_date_from_query(filters)
query |= await cls.get_date_to_query(filters)

return (
await db[collection]
.find(
query,
)
.sort("_id")
.skip(offset)
.limit(limit)
.to_list(None)
)

@classmethod
async def create(cls, module, data, *args, **kwargs) -> dict:
collection = cls.MODULE_MAP[module]

return await db[collection].insert_one(data)

@classmethod
async def update(cls, module, data, id, *args, **kwargs) -> dict:
collection = cls.MODULE_MAP[module]

match module:
case ModuleID.tokens:
token_type = kwargs.get("token_type")
query = {"uid": id}
if token_type:
query |= {"token_type": token_type}
case "integration":
query = {"credentials_id": id}
case ModuleID.credentials_and_registration:
query = {"token": id}
case _:
query = {"id": id}

return await db[collection].update_one(query, {"$set": data})

@classmethod
async def delete(cls, module, id, *args, **kwargs) -> None:
collection = cls.MODULE_MAP[module]
if module == ModuleID.credentials_and_registration:
query = {"token": id}
else:
query = {"id": id}
await db[collection].delete_one(query)

@classmethod
async def count(cls, module, filters, *args, **kwargs) -> int:
collection = cls.MODULE_MAP[module]

query = await cls.get_date_from_query(filters)
query |= await cls.get_date_to_query(filters)

total = db[collection].count_documents(query)
return total

@classmethod
async def is_last_page(
cls, module, filters, total, *args, **kwargs
) -> bool:
offset = await cls.get_offset_filter(filters)
limit = await cls.get_limit_filter(filters)
return offset + limit >= total if limit else True

@classmethod
async def get_offset_filter(cls, filters: dict) -> int:
return filters.get("offset", 0)

@classmethod
async def get_limit_filter(cls, filters: dict) -> int:
return filters.get("limit", 0)

@classmethod
async def get_date_from_query(cls, filters: dict) -> int:
query = {}
date_to = filters.get("date_to")
if date_to:
query.setdefault("last_updated", {}).update(
{"$lte": date_to.isoformat()}
)
return query

@classmethod
async def get_date_to_query(cls, filters: dict) -> int:
query = {}
date_from = filters.get("date_from")
if date_from:
query.setdefault("last_updated", {}).update(
{"$gte": date_from.isoformat()}
)
return query

0 comments on commit 8c9bacb

Please sign in to comment.