Skip to content

Commit

Permalink
Transaction helper improvements and release 1.1.0 (#173)
Browse files Browse the repository at this point in the history
* Add option to specify a branch on trx run

* flag to flush queue on error

* telemetry

* bump release version
  • Loading branch information
philkra authored Sep 25, 2023
1 parent 5f56ac3 commit 111620b
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 22 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "xata"
version = "1.0.2"
version = "1.1.0"
description = "Python SDK for Xata.io"
authors = ["Xata <support@xata.io>"]
license = "Apache-2.0"
Expand Down
57 changes: 46 additions & 11 deletions tests/integration-tests/helpers_transaction_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,15 @@ def test_insert_records_and_response_shape(self):
assert "has_errors" in response
assert "results" in response
assert "errors" in response
assert "attempts" in response

assert response["status_code"] == 200
assert not response["has_errors"]
assert response["errors"] == []
assert len(response["results"]) == 5
assert response["attempts"] == 1

r = self.client.data().query("Posts", {})
r = self.client.data().query("Posts")
assert len(r["records"]) == 5

def test_insert_records_with_create_only_option_existing_record(self):
Expand All @@ -103,7 +105,7 @@ def test_insert_records_with_create_only_option_existing_record(self):
assert len(response["errors"]) == 1

def test_insert_records_with_create_only_option_new_record_id(self):
before_insert = len(self.client.data().query("Posts", {})["records"])
before_insert = len(self.client.data().query("Posts")["records"])

trx = Transaction(self.client)
trx.insert("Posts", {"id": "record-123", "title": "a new title #1", "content": "yes!"}, True)
Expand All @@ -114,7 +116,7 @@ def test_insert_records_with_create_only_option_new_record_id(self):
assert response["status_code"] == 200
assert len(response["errors"]) == 0

after_insert = len(self.client.data().query("Posts", {})["records"])
after_insert = len(self.client.data().query("Posts")["records"])
assert before_insert == (after_insert - 2)

def test_delete_records(self):
Expand All @@ -124,7 +126,7 @@ def test_delete_records(self):
response = setup.run()
delete_me = [x["id"] for x in response["results"]]
assert len(delete_me) > 0
before_delete = len(self.client.data().query("Posts", {})["records"])
before_delete = len(self.client.data().query("Posts")["records"])

trx = Transaction(self.client)
for rid in delete_me:
Expand All @@ -136,7 +138,7 @@ def test_delete_records(self):
assert response["errors"] == []
assert len(response["results"]) == len(delete_me)

r = self.client.data().query("Posts", {})
r = self.client.data().query("Posts")
assert len(r["records"]) == (before_delete - len(delete_me))

def test_delete_records_with_columns(self):
Expand All @@ -146,7 +148,7 @@ def test_delete_records_with_columns(self):
response = setup.run()
delete_me = [x["id"] for x in response["results"]]
assert len(delete_me) > 0
before_delete = len(self.client.data().query("Posts", {})["records"])
before_delete = len(self.client.data().query("Posts")["records"])

trx = Transaction(self.client)
trx.delete("Posts", delete_me[0])
Expand All @@ -165,7 +167,7 @@ def test_delete_records_with_columns(self):
assert list(response["results"][3]["columns"].keys()) == ["content", "title"]
assert list(response["results"][4]["columns"].keys()) == ["content", "id", "title"]

r = self.client.data().query("Posts", {})
r = self.client.data().query("Posts")
assert len(r["records"]) == (before_delete - len(delete_me))

def test_delete_records_with_fail_if_missing(self):
Expand Down Expand Up @@ -249,7 +251,7 @@ def test_update_records(self):
assert len(response["results"]) == len(update_me)

def test_update_records_via_upsert(self):
before_upsert = len(self.client.data().query("Posts", {})["records"])
before_upsert = len(self.client.data().query("Posts")["records"])

trx = Transaction(self.client)
for rid in range(0, 5):
Expand All @@ -261,7 +263,7 @@ def test_update_records_via_upsert(self):
assert response["errors"] == []
assert len(response["results"]) == 5

after_upsert = len(self.client.data().query("Posts", {})["records"])
after_upsert = len(self.client.data().query("Posts")["records"])
assert before_upsert == after_upsert

def test_mixed_operations(self):
Expand Down Expand Up @@ -313,7 +315,7 @@ def test_run_trx_next_batch_of_operations(self):
assert not r["has_errors"]

def test_has_errors_insert(self):
before_insert = len(self.client.data().query("Posts", {})["records"])
before_insert = len(self.client.data().query("Posts")["records"])

trx = Transaction(self.client)
trx.insert("Posts", self._get_record()) # good
Expand All @@ -329,5 +331,38 @@ def test_has_errors_insert(self):
assert response["errors"][1]["index"] == 3
assert len(response["results"]) == 0

after_insert = len(self.client.data().query("Posts", {})["records"])
after_insert = len(self.client.data().query("Posts")["records"])
assert before_insert == after_insert

def test_alternative_branch_on_run(self):
# create new branch
branch_name = "testing-issue-170"
assert self.client.branch().create({"from": "main"}, branch_name=branch_name).is_success()
before_insert = len(self.client.data().query("Posts", branch_name=branch_name)["records"])

trx = Transaction(self.client)
trx.insert("Posts", self._get_record())
trx.insert("Posts", self._get_record())
trx.insert("Posts", self._get_record())
trx.run(branch_name=branch_name)

after_insert = len(self.client.data().query("Posts", branch_name=branch_name)["records"])
assert after_insert > before_insert

def test_flush_on_error_flag(self):
trx = Transaction(self.client)
trx.insert("PostsThatDoNotExist", self._get_record()) # bad
trx.insert("Posts", self._get_record()) # good
trx.insert("Posts", {"foo": "bar"}) # bad

before_run = trx.size()
assert before_run == 3
response = trx.run(flush_on_error=False)

assert response["status_code"] == 400
assert response["has_errors"]
assert trx.size() == before_run

response = trx.run(flush_on_error=True)
assert response["status_code"] == 400
assert trx.size() == 0
2 changes: 1 addition & 1 deletion xata/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@

# TODO this is a manual task, to keep in sync with pyproject.toml
# could/should be automated to keep in sync
__version__ = "1.0.2"
__version__ = "1.1.0"

PERSONAL_API_KEY_LOCATION = "~/.config/xata/key"
DEFAULT_DATA_PLANE_DOMAIN = "xata.sh"
Expand Down
45 changes: 36 additions & 9 deletions xata/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@
BP_DEFAULT_THROW_EXCEPTION = False
BP_VERSION = "0.2.1"
TRX_MAX_OPERATIONS = 1000
TRX_VERSION = "0.0.1"
TRX_VERSION = "0.1.0"
TRX_BACKOFF = 0.1


class BulkProcessor(object):
Expand Down Expand Up @@ -74,7 +75,9 @@ def __init__(
raise Exception("batch size can not be less than one, default: %d" % BP_DEFAULT_BATCH_SIZE)

self.client = client
self.client.set_header("x-xata-helper", f"bulkprocessor/{BP_VERSION}")
telemetry = "%shelper=bp;v=%s" % (self.client.get_headers()["x-xata-agent"], BP_VERSION)
self.client.set_header("x-xata-agent", telemetry)

self.processing_timeout = processing_timeout
self.batch_size = batch_size
self.flush_interval = flush_interval
Expand Down Expand Up @@ -316,7 +319,9 @@ def __init__(
:param client: XataClient
"""
self.client = client
self.client.set_header("x-xata-helper", f"transaction/{TRX_VERSION}")
telemetry = "%shelper=trx;v=%s" % (self.client.get_headers()["x-xata-agent"], TRX_VERSION)
self.client.set_header("x-xata-agent", telemetry)
self.logger = logging.getLogger(f"{__name__}.{self.__class__.__name__}")

self.has_run = False
self.operations = {"operations": []}
Expand Down Expand Up @@ -388,21 +393,43 @@ def get(self, table: str, record_id: str, columns: list[str] = []) -> None:
"""
self._add_operation({"get": {"table": table, "id": record_id, "columns": columns}})

def run(self) -> dict:
def run(self, branch_name: str = None, retry: bool = True, flush_on_error: bool = False) -> dict:
"""
Commit the transactions. Flushes the operations queue
Commit the transactions. Flushes the operations queue if no error happened.
In case of too many connections, hitting rate limits, two extra attempts are taken
with an incremental back off.
:param branch_name: str Override the branch name from the client init
:param retry: bool Retry rate limit errors, Default: True
:param flush_on_error: bool Flush the operations if an error happened, Default: False
:returns dict
"""
r = self.client.records().transaction(self.operations)
result = {
r = self.client.records().transaction(self.operations, branch_name=branch_name)
attempt = 1

# retry on 429, if requested
if r.status_code == 429 and retry:
# back off and retry
while attempt < 3 and not r.is_success():
wait = attempt * TRX_BACKOFF
time.sleep(wait)
self.logger.info(f"request {attempt} encountered a 429: too many requests error. will retry in {wait} ms.")
r = self.client.records().transaction(self.operations, branch_name=branch_name)
attempt += 1

# free memory
if r.is_success() or flush_on_error:
self.operations["operations"] = []

# build response
return {
"status_code": r.status_code,
"results": r["results"] if "results" in r else [],
"has_errors": True if "errors" in r else False,
"errors": r["errors"] if "errors" in r else [],
"attempts": attempt
}
self.operations["operations"] = [] # free memory
return result

def size(self) -> int:
"""
Expand Down

0 comments on commit 111620b

Please sign in to comment.