Skip to content

Commit

Permalink
feat(dto): update dto
Browse files Browse the repository at this point in the history
  • Loading branch information
wangxin688 committed Jan 19, 2024
1 parent 8db9256 commit 37afe2d
Showing 1 changed file with 68 additions and 2 deletions.
70 changes: 68 additions & 2 deletions src/db/dtobase.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,9 @@ def _apply_selectinload(
Args:
stmt (Select[tuple[ModelT]]): The select statement to apply the selectinload option to.
options (tuple[ExecutableOption] | None, optional): The additional options to apply to the statement.
undefer_load: (bool, optional): Whether to apply the undefer load option. Defaults to True.
if True, all `column_property` defined in the model will be executed in subquery. when set to `False`
all `column_property` defined in the model will not execute for better performance.
Returns:
Select[tuple[ModelT]]: The modified select statement.
Expand Down Expand Up @@ -523,7 +525,7 @@ async def list_and_count(
session (AsyncSession): The async session object for the database connection.
query (QuerySchemaType): The query schema object containing the query parameters.
options (tuple | None, optional): Additional options for the query. Defaults to None.
undefer_load (bool, optional): Whether to undefer the load. Defaults to True.
Returns:
tuple[int, Sequence[ModelT]]: A tuple containing the count of items and the list of results.
"""
Expand Down Expand Up @@ -655,6 +657,7 @@ async def get_one_or_404(
session (AsyncSession): The asynchronous session used to execute the database query.
pk_id (PkIdT): The primary key value used to identify the instance to be retrieved.
*options ExecutableOption: query options to apply to the database query.
undefer_load (bool, optional): Whether to undefer the load. Defaults to False.
Returns:
ModelT: The retrieved instance of ModelT from the database.
Expand Down Expand Up @@ -696,6 +699,18 @@ async def get_none_or_409(self, session: AsyncSession, field: str, value: Any) -
async def get_one_by_filter(
self, session: AsyncSession, filters: dict[str, Any], *options: ExecutableOption, undefer_load: bool = False
) -> ModelT | None:
"""
Retrieves a single instance of the model that matches the given filters.
Args:
session (AsyncSession): The async session to be used for the database operations.
filters (dict[str, Any]): The filters to be applied to the query.
*options (ExecutableOption): Additional options to be applied to the query.
undefer_load (bool, optional): Whether to undefer any deferred attributes. Defaults to False.
Returns:
ModelT | None: The retrieved model instance, or None if no match is found.
"""
stmt = self._get_base_stmt()
stmt = self._apply_filter(stmt=stmt, filters=filters)
stmt = self._apply_selectinload(stmt, *options, undefer_load=undefer_load)
Expand All @@ -704,6 +719,18 @@ async def get_one_by_filter(
async def get_multi_by_filter(
self, session: AsyncSession, filters: dict[str, Any], *options: ExecutableOption, undefer_load: bool = False
) -> Sequence[ModelT]:
"""
Retrieves multiple instances of ModelT from the database based on the provided filters.
Args:
session (AsyncSession): The active database session.
filters (dict[str, Any]): A dictionary containing the filters to apply when querying the database.
*options (ExecutableOption): Variable length argument list of options to customize the query.
undefer_load (bool, optional): If True, the query will include deferred attributes. Defaults to False.
Returns:
Sequence[ModelT]: A sequence of ModelT instances that match the provided filters.
"""
stmt = self._get_base_stmt()
stmt = self._apply_filter(stmt=stmt, filters=filters)
stmt = self._apply_selectinload(stmt, *options, undefer_load=undefer_load)
Expand All @@ -712,6 +739,21 @@ async def get_multi_by_filter(
async def get_multi_by_pks_or_404(
self, session: AsyncSession, pk_ids: list[PkIdT], *options: ExecutableOption, undefer_load: bool = False
) -> Sequence[ModelT]:
"""
Retrieves multiple records from the database based on a list of primary key IDs.
Args:
session (AsyncSession): The database session.
pk_ids (list[PkIdT]): A list of primary key IDs.
*options (ExecutableOption): Optional query options.
undefer_load (bool): Whether to undefer any deferred attributes.
Returns:
Sequence[ModelT]: A sequence of model instances.
Raises:
NotFoundError: If no records are found with the given primary key IDs.
"""
stmt = self._get_base_stmt()
id_str = self.get_id_attribute_value(self.model)
stmt = stmt.where(id_str.in_(pk_ids))
Expand All @@ -727,13 +769,37 @@ async def get_multi_by_pks_or_404(
return results

async def get_one_and_delete(self, session: AsyncSession, pk_id: PkIdT) -> None:
"""
Retrieves a single record from the database using the specified primary key ID and deletes it.
Args:
session (AsyncSession): The async session object used to interact with the database.
pk_id (PkIdT): The primary key ID of the record to retrieve and delete.
Returns:
None: This function does not return anything.
"""
stmt = self._get_base_stmt()
id_str = self.get_id_attribute_value(self.model)
result = (await session.scalars(stmt.where(id_str == pk_id))).one_or_none()
result = self._check_not_found(result, self.id_attribute, pk_id)
await self.delete(session, result)

async def get_multi_and_delete(self, session: AsyncSession, pk_ids: list[PkIdT]) -> None:
"""
Get multiple records by their primary keys and delete them from the database.
Args:
session (AsyncSession): The asynchronous session to use for the database operations.
pk_ids (list[PkIdT]): A list of primary key IDs for the records to retrieve and delete.
Returns:
None: This function does not return anything.
Raises:
NotFoundError: If any of the primary key IDs are not found in the database.
"""
stmt = self._get_base_stmt()
id_str = self.get_id_attribute_value(self.model)
results = (await session.scalars(stmt.where(id_str.in_(pk_ids)))).all()
Expand Down

0 comments on commit 37afe2d

Please sign in to comment.