Skip to content

Commit

Permalink
♻️(backends) change how duplicate statement ids are handled
Browse files Browse the repository at this point in the history
Previously this would fail with a 429 return code on any duplicate
event id, now it only fails if the events differ per the LRS spec.
  • Loading branch information
bmtcril authored Jul 21, 2023
1 parent 55f567a commit b121447
Show file tree
Hide file tree
Showing 7 changed files with 99 additions and 36 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ and this project adheres to
## [Unreleased]

### Changed

- Change how duplicate xAPI statements are handled by backends
General improvement for the Helm Chart:
- add dependencies for MongoDB and Clickhouse
- make persistence optional
Expand Down
64 changes: 46 additions & 18 deletions src/ralph/api/routers/statements.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
HTTPException,
Query,
Request,
Response,
status,
)
from pydantic import parse_raw_as
Expand Down Expand Up @@ -332,21 +333,24 @@ async def put(
)

try:
statements_ids_result = DATABASE_CLIENT.query_statements_by_ids([statementId])
existing_statement = DATABASE_CLIENT.query_statements_by_ids([statementId])
except BackendException as error:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="xAPI statements query failed",
) from error

if len(statements_ids_result) > 0:
# NB: LRS specification calls for performing a deep comparison of incoming
# statements and existing statements with the same ID.
# This seems too costly for performance and was not implemented for this POC.
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="Statement already exists with the same ID",
)
if existing_statement:
# The LRS specification calls for deep comparison of duplicate statement ids.
# In the case that the current statement is not an exact duplicate of the one
# found in the database we return a 409, otherwise the usual 204.
for existing in existing_statement:
if statement_dict != existing["_source"]:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="A different statement already exists with the same ID",
)
return

# For valid requests, perform the bulk indexing of all incoming statements
try:
Expand All @@ -368,6 +372,7 @@ async def put(
async def post(
statements: Union[LaxStatement, List[LaxStatement]],
background_tasks: BackgroundTasks,
response: Response,
):
"""Stores a set of statements (or a single statement as a single member of a set).
Expand Down Expand Up @@ -405,21 +410,44 @@ async def post(
)

try:
statements_ids_result = DATABASE_CLIENT.query_statements_by_ids(statements_ids)
existing_statements = DATABASE_CLIENT.query_statements_by_ids(statements_ids)
except BackendException as error:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="xAPI statements query failed",
) from error

if len(statements_ids_result) > 0:
# NB: LRS specification calls for performing a deep comparison of incoming
# statements and existing statements with the same ID.
# This seems too costly for performance and was not implemented for this POC.
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="Statements already exist with the same ID",
)
# If there are duplicate statements, remove them from our id list and
# dictionary for insertion. We will return the shortened list of ids below
# so that consumers can derive which statements were inserted and which
# were skipped for being duplicates.
# See: https://github.com/openfun/ralph/issues/345
if existing_statements:
existing_ids = []
for existing in existing_statements:
existing_ids.append(existing["_id"])

# The LRS specification calls for deep comparison of duplicates. This
# is done here. If they are not exactly the same, we raise an error.
if statements_dict[existing["_id"]] != existing["_source"]:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="Differing statements already exist with the same ID: "
f"{existing['_id']}",
)

# Overwrite our statements and ids with the deduplicated ones.
statements_ids = [id for id in statements_ids if id not in existing_ids]

statements_dict = {
key: value
for key, value in statements_dict.items()
if key in statements_ids
}

if not statements_ids:
response.status_code = status.HTTP_204_NO_CONTENT
return

# For valid requests, perform the bulk indexing of all incoming statements
try:
Expand Down
21 changes: 14 additions & 7 deletions src/ralph/backends/database/clickhouse.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,15 +238,15 @@ def put(

return rows_inserted

def query_statements_by_ids(self, ids: List[str]) -> List:
"""Returns the list of matching statement IDs from the database."""
def query_statements_by_ids(self, ids: List[str]) -> List[dict]:
"""Returns the list of matching statements from the database."""

def chunk_id_list(chunk_size=10000):
for i in range(0, len(ids), chunk_size):
yield ids[i : i + chunk_size]

sql = """
SELECT event_id
SELECT event_id, event_str
FROM {table_name:Identifier}
WHERE event_id IN ({ids:Array(String)})
"""
Expand All @@ -257,15 +257,22 @@ def chunk_id_list(chunk_size=10000):
column_oriented=True,
)

found_ids = []
found_statements = []

try:
for chunk_ids in chunk_id_list():
query_context.set_parameter("ids", chunk_ids)
result = self.client.query(context=query_context).named_results()
found_ids.extend(result)

return found_ids
for row in result:
# This is the format to match the other backends
found_statements.append(
{
"_id": str(row["event_id"]),
"_source": json.loads(row["event_str"]),
}
)

return found_statements
except (ClickHouseError, IndexError, TypeError, ValueError) as error:
msg = "Failed to execute ClickHouse query"
logger.error("%s. %s", msg, error)
Expand Down
7 changes: 5 additions & 2 deletions src/ralph/backends/database/mongo.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,8 +254,11 @@ def query_statements(self, params: StatementParameters) -> StatementQueryResult:
)

def query_statements_by_ids(self, ids: List[str]) -> List:
"""Returns the list of matching statement IDs from the database."""
return self._find(filter={"_source.id": {"$in": ids}})
"""Returns the list of matching statements from the database."""
return [
{"_id": statement["_source"]["id"], "_source": statement["_source"]}
for statement in self._find(filter={"_source.id": {"$in": ids}})
]

def _find(self, **kwargs):
"""Wraps the MongoClient.collection.find method.
Expand Down
23 changes: 20 additions & 3 deletions tests/api/test_statements_post.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,8 @@ def test_api_statements_post_statements_list_with_duplicate_of_existing_statemen
# pylint: disable=invalid-name,unused-argument

monkeypatch.setattr("ralph.api.routers.statements.DATABASE_CLIENT", backend())

statement_uuid = str(uuid4())
statement = {
"actor": {
"account": {
Expand All @@ -286,7 +288,7 @@ def test_api_statements_post_statements_list_with_duplicate_of_existing_statemen
},
"objectType": "Agent",
},
"id": str(uuid4()),
"id": statement_uuid,
"object": {"id": "https://example.com/object-id/1/"},
"timestamp": "2022-03-15T14:07:51Z",
"verb": {"id": "https://example.com/verb-id/1/"},
Expand All @@ -299,18 +301,33 @@ def test_api_statements_post_statements_list_with_duplicate_of_existing_statemen
json=statement,
)
assert response.status_code == 200
assert response.json() == [statement_uuid]

es.indices.refresh()

# Post the statement twice, the data is identical so it should succeed but not
# include the ID in the response as it wasn't inserted.
response = client.post(
"/xAPI/statements/",
headers={"Authorization": f"Basic {auth_credentials}"},
json=statement,
)
assert response.status_code == 204

es.indices.refresh()

# Post the statement twice, trying to change the version field which is not allowed.
# Post the statement again, trying to change the version field which is not allowed.
response = client.post(
"/xAPI/statements/",
headers={"Authorization": f"Basic {auth_credentials}"},
json=[dict(statement, **{"version": "1.0.0"})],
)

assert response.status_code == 409
assert response.json() == {"detail": "Statements already exist with the same ID"}
assert response.json() == {
"detail": f"Differing statements already exist with the same ID: "
f"{statement_uuid}"
}

response = client.get(
"/xAPI/statements/", headers={"Authorization": f"Basic {auth_credentials}"}
Expand Down
4 changes: 3 additions & 1 deletion tests/api/test_statements_put.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,9 @@ def test_api_statements_put_statement_duplicate_of_existing_statement(
)

assert response.status_code == 409
assert response.json() == {"detail": "Statement already exists with the same ID"}
assert response.json() == {
"detail": "A different statement already exists with the same ID"
}

response = client.get(
f"/xAPI/statements/?statementId={statement['id']}",
Expand Down
14 changes: 9 additions & 5 deletions tests/backends/database/test_mongo.py
Original file line number Diff line number Diff line change
Expand Up @@ -470,16 +470,20 @@ def test_backends_database_mongo_query_statements_by_ids_with_multiple_collectio

# Insert documents
timestamp = {"timestamp": "2022-06-27T15:36:50"}
collection_1_document = list(MongoDatabase.to_documents([{"id": "1", **timestamp}]))
collection_2_document = list(MongoDatabase.to_documents([{"id": "2", **timestamp}]))
statement_1 = {"id": "1", **timestamp}
statement_1_expected = [{"_id": "1", "_source": statement_1}]
statement_2 = {"id": "2", **timestamp}
statement_2_expected = [{"_id": "2", "_source": statement_2}]
collection_1_document = list(MongoDatabase.to_documents([statement_1]))
collection_2_document = list(MongoDatabase.to_documents([statement_2]))
backend_1.bulk_import(collection_1_document)
backend_2.bulk_import(collection_2_document)

# Check the expected search query results
assert backend_1.query_statements_by_ids(["1"]) == collection_1_document
assert backend_1.query_statements_by_ids(["2"]) == []
assert backend_1.query_statements_by_ids(["1"]) == statement_1_expected
assert backend_2.query_statements_by_ids(["1"]) == []
assert backend_2.query_statements_by_ids(["2"]) == collection_2_document
assert backend_2.query_statements_by_ids(["2"]) == statement_2_expected
assert backend_1.query_statements_by_ids(["2"]) == []


def test_backends_database_mongo_status(mongo):
Expand Down

0 comments on commit b121447

Please sign in to comment.