Skip to content

Commit

Permalink
refactored composite batch and added composite api
Browse files Browse the repository at this point in the history
  • Loading branch information
georgebv committed Apr 23, 2024
1 parent e2aeb70 commit b37f9ea
Show file tree
Hide file tree
Showing 4 changed files with 709 additions and 73 deletions.
1 change: 0 additions & 1 deletion src/aiosalesforce/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,6 @@ def composite(self) -> CompositeClient:
* Composite
* Composite Graph
* sObject Tree
* sObject Collections
"""
return CompositeClient(self)
139 changes: 82 additions & 57 deletions src/aiosalesforce/composite/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class Subrequest:
binary_part_name: str | None
binary_part_name_alias: str | None

__result: dict | None
__response: dict | None

def __init__(
self,
Expand All @@ -59,7 +59,7 @@ def __init__(
"must be provided."
)

self.__result = None
self.__response = None

def to_dict(self) -> dict:
payload = {
Expand All @@ -71,79 +71,74 @@ def to_dict(self) -> dict:
}
return {key: value for key, value in payload.items() if value is not None}

def _set_result(self, result: dict) -> None:
self.__result = result
@property
def response(self) -> dict:
if self.__response is None:
raise InvalidStateError("Subrequest response has not been set")
return self.__response

@response.setter
def response(self, value: dict) -> None:
self.__response = value

@property
def done(self) -> bool:
return self.__result is not None
return self.__response is not None

@property
def result(self) -> dict:
if self.__result is None:
raise InvalidStateError(
"Subrequest result has not been set. "
"Make sure that the parent CompositeBatchRequest has been executed."
)
return self.__result
def status_code(self) -> int:
return self.response["statusCode"]

@property
def status_code(self) -> int:
status_code = self.result["statusCode"]
assert isinstance(status_code, int)
return status_code
def result(self) -> dict | list | None:
return self.response["result"]

@property
def status(self) -> Literal["Pending", "Success", "Failure", "Halted"]:
if not self.done:
return "Pending"
match self.status_code:
case value if 200 <= value < 300:
return "Success"
case 412:
return "Halted"
case _:
return "Failure"
def is_success(self) -> bool:
return 200 <= self.status_code < 300

def raise_for_status(self) -> None:
"""Raise an exception if this subrequest failed."""
if self.status_code >= 300:
raise_salesforce_error(
httpx.Response(self.status_code, json=self.result["result"])
)
if not self.is_success:
raise_salesforce_error(httpx.Response(self.status_code, json=self.result))


class QuerySubrequest(Subrequest):
@property
def records(self) -> list[dict]:
self.raise_for_status()
return self.result["result"]["records"]
assert isinstance(self.result, dict)
return self.result["records"]


class SobjectCreateSubrequest(Subrequest):
@property
def id(self) -> str:
self.raise_for_status()
return self.result["result"]["id"]
assert isinstance(self.result, dict)
return self.result["id"]


class SobjectGetSubrequest(Subrequest):
@property
def record(self) -> dict:
self.raise_for_status()
return self.result["result"]
assert isinstance(self.result, dict)
return self.result


class SobjectUpsertSubrequest(Subrequest):
@property
def id(self) -> str:
self.raise_for_status()
return self.result["result"]["id"]
assert isinstance(self.result, dict)
return self.result["id"]

@property
def created(self) -> bool:
self.raise_for_status()
return self.result["result"]["created"]
assert isinstance(self.result, dict)
return self.result["created"]


class SobjectSubrequestClient:
Expand Down Expand Up @@ -379,40 +374,61 @@ def upsert(


class CompositeBatchRequest:
"""
Composite Batch request.
Parameters
----------
salesforce_client : Salesforce
Salesforce client.
halt_on_error : bool, default False
If True, unprocessed subrequests will be halted if any subrequest fails.
autoraise : bool, default False
If True, an exception will be raised if any subrequest fails.
group_errors : bool, default False
Ignored if `autoraise` is False.
If True, raises an ExceptionGroup with all errors.
Otherwise, raises the first exception.
"""

salesforce_client: "Salesforce"
base_url: str
"""Base URL in the format https://[subdomain(s)].my.salesforce.com/services/data/v[version]/composite/batch"""
subrequests: list[Subrequest]
halt_on_error: bool
autoraise: bool
group_errors: bool

subrequests: list[Subrequest]

def __init__(
self,
salesforce_client: "Salesforce",
halt_on_error: bool = False,
autoraise: bool = False,
group_errors: bool = False,
) -> None:
self.salesforce_client = salesforce_client
self.base_url = "/".join(
[
self.salesforce_client.base_url,
"services",
"data",
f"v{self.salesforce_client.version}",
"composite",
"batch",
]
)
self.halt_on_error = halt_on_error
self.autoraise = autoraise
self.group_errors = group_errors

self.subrequests = []

async def execute(self) -> None:
"""Execute composite batch and set subrequests' results."""
"""Execute composite batch request and set subrequests' responses."""
if len(self.subrequests) == 0:
return
response = await self.salesforce_client.request(
"POST",
self.base_url,
"/".join(
[
self.salesforce_client.base_url,
"services",
"data",
f"v{self.salesforce_client.version}",
"composite",
"batch",
]
),
content=json_dumps(
{
"haltOnError": self.halt_on_error,
Expand All @@ -423,20 +439,29 @@ async def execute(self) -> None:
),
headers={"Content-Type": "application/json", "Accept": "application/json"},
)
# Salesforce escapes characters in Composite JSON response as if it was HTML
response_data = json_loads(unescape(response.content.decode("utf-8")))
for subrequest, subrequest_response in zip(
self.subrequests, response_data["results"]
self.subrequests,
# Salesforce escapes characters in Composite JSON response as if it was HTML
json_loads(unescape(response.content.decode("utf-8")))["results"],
):
subrequest._set_result(subrequest_response)
if self.autoraise:
subrequest.raise_for_status()
subrequest.response = subrequest_response
if self.autoraise:
errors: list[Exception] = []
for subrequest in self.subrequests:
try:
subrequest.raise_for_status()
except Exception as exc:
errors.append(exc)
if len(errors) > 0:
if self.group_errors:
raise ExceptionGroup("Composite Batch request error", errors)
raise errors[0]

async def __aenter__(self) -> "CompositeBatchRequest":
return self

async def __aexit__(self, exc_type, exc, tb) -> None:
# Don't execute partial batches if an exception occurred
# Execute only if no exception occurred
if exc is None:
await self.execute()

Expand Down
74 changes: 59 additions & 15 deletions src/aiosalesforce/composite/client.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from typing import TYPE_CHECKING

from .batch import CompositeBatchRequest
from .composite import CompositeRequest

if TYPE_CHECKING:
from aiosalesforce.client import Salesforce
Expand All @@ -18,26 +19,16 @@ class CompositeClient:
"""

salesforce_client: "Salesforce"
base_url: str
"""Base URL in the format https://[subdomain(s)].my.salesforce.com/services/data/v[version]/composite"""

def __init__(self, salesforce_client: "Salesforce") -> None:
self.salesforce_client = salesforce_client
self.base_url = "/".join(
[
self.salesforce_client.base_url,
"services",
"data",
f"v{self.salesforce_client.version}",
"composite",
]
)

def batch(
self,
halt_on_error: bool = False,
autoraise: bool = False,
) -> "CompositeBatchRequest":
group_errors: bool = False,
) -> CompositeBatchRequest:
"""
Start a Comsposite Batch operation.
Expand All @@ -47,9 +38,13 @@ def batch(
Parameters
----------
halt_on_error : bool, default False
If True, the batch will be halted if any subrequests fail.
If True, unprocessed subrequests will be halted if any subrequest fails.
autoraise : bool, default False
If True, an exception will be raised if any subrequests fail.
If True, an exception will be raised if any subrequest fails.
group_errors : bool, default False
Ignored if `autoraise` is False.
If True, raises an ExceptionGroup with all errors.
Otherwise, raises the first exception.
Returns
-------
Expand All @@ -58,7 +53,7 @@ def batch(
Examples
--------
>>> async with salesforce.composite.batch(autoraise=True) as batch:
>>> async with salesforce.composite.batch(halt_on_error=True) as batch:
... query = batch.query("SELECT Id, Name FROM Account LIMIT 10")
... contact = batch.sobject.create(
... "Contact",
Expand All @@ -72,4 +67,53 @@ def batch(
salesforce_client=self.salesforce_client,
halt_on_error=halt_on_error,
autoraise=autoraise,
group_errors=group_errors,
)

def __call__(
self,
all_or_none: bool = False,
collate_subrequests: bool = False,
autoraise: bool = False,
) -> CompositeRequest:
"""
Start a Comsposite operation.
To execute a composite request, add subrequests to it and call `execute`.
Alternatively, use a context manager to automatically execute the request.
Parameters
----------
all_or_none : bool, default False
If True, all subrequests are rolled back if any subrequest fails.
collate_subrequests : bool, default True
If True, independent subrequests are executed by Salesforce in parallel.
autoraise : bool, default False
If True, raises an ExceptionGroup if any subrequest fails.
Returns
-------
CompositeRequest
Composite request.
Examples
--------
>>> async with salesforce.composite(all_or_none=True) as batch:
... account = composite.sobject.create(
... "Account",
... {...},
... )
... contact = composite.sobject.create(
... "Contact",
... {"Account": account.reference.id, ...}
... )
... print(account.id)
... print(contact.id)
"""
return CompositeRequest(
salesforce_client=self.salesforce_client,
all_or_none=all_or_none,
collate_subrequests=collate_subrequests,
autoraise=autoraise,
)
Loading

0 comments on commit b37f9ea

Please sign in to comment.