-
Notifications
You must be signed in to change notification settings - Fork 6
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
6 changed files
with
266 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,117 @@ | ||
from datetime import datetime | ||
from typing import List | ||
|
||
from descope._auth_base import AuthBase | ||
from descope.management.common import MgmtV1 | ||
|
||
|
||
class Audit(AuthBase): | ||
def search( | ||
self, | ||
user_ids: List[str] = None, | ||
actions: List[str] = None, | ||
excluded_actions: List[str] = None, | ||
devices: List[str] = None, | ||
methods: List[str] = None, | ||
geos: List[str] = None, | ||
remote_addresses: List[str] = None, | ||
login_ids: List[str] = None, | ||
tenants: List[str] = None, | ||
no_tenants: bool = False, | ||
text: str = None, | ||
from_ts: datetime = None, | ||
to_ts: datetime = None, | ||
) -> dict: | ||
""" | ||
Search the audit trail up to last 30 days based on given parameters | ||
Args: | ||
user_ids (List[str]): Optional list of user IDs to filter by | ||
actions (List[str]): Optional list of actions to filter by | ||
excluded_actions (List[str]): Optional list of actions to exclude | ||
devices (List[str]): Optional list of devices to filter by. Current devices supported are "Bot"/"Mobile"/"Desktop"/"Tablet"/"Unknown" | ||
methods (List[str]): Optional list of methods to filter by. Current auth methods are "otp"/"totp"/"magiclink"/"oauth"/"saml"/"password" | ||
geos (List[str]): Optional list of geos to filter by. Geo is currently country code like "US", "IL", etc. | ||
remote_addresses (List[str]): Optional list of remote addresses to filter by | ||
login_ids (List[str]): Optional list of login IDs to filter by | ||
tenants (List[str]): Optional list of tenants to filter by | ||
no_tenants (bool): Should audits without any tenants always be included | ||
text (str): Free text search across all fields | ||
from_ts (datetime): Retrieve records newer than given time but not older than 30 days | ||
to_ts (datetime): Retrieve records older than given time | ||
Return value (dict): | ||
Return dict in the format | ||
{ | ||
"audits": [ | ||
{ | ||
"projectId":"", | ||
"userId": "", | ||
"action": "", | ||
"occurred": 0 (unix-time-milli), | ||
"device": "", | ||
"method": "", | ||
"geo": "", | ||
"remoteAddress": "", | ||
"externalIds": [""], | ||
"tenants": [""], | ||
"data": { | ||
"field1": "field1-value", | ||
"more-details": "in-console-examples" | ||
} | ||
} | ||
] | ||
} | ||
Raise: | ||
AuthException: raised if search operation fails | ||
""" | ||
body = {"noTenants": no_tenants} | ||
if user_ids is not None: | ||
body["userIds"] = user_ids | ||
if actions is not None: | ||
body["actions"] = actions | ||
if excluded_actions is not None: | ||
body["excludedActions"] = excluded_actions | ||
if devices is not None: | ||
body["devices"] = devices | ||
if methods is not None: | ||
body["methods"] = methods | ||
if geos is not None: | ||
body["geos"] = geos | ||
if remote_addresses is not None: | ||
body["remoteAddresses"] = remote_addresses | ||
if login_ids is not None: | ||
body["externalIds"] = login_ids | ||
if tenants is not None: | ||
body["tenants"] = tenants | ||
if text is not None: | ||
body["text"] = text | ||
if from_ts is not None: | ||
body["from"] = from_ts.timestamp * 1000 | ||
if to_ts is not None: | ||
body["to"] = to_ts.timestamp * 1000 | ||
|
||
response = self._auth.do_post( | ||
MgmtV1.audit_search, | ||
body=body, | ||
pswd=self._auth.management_key, | ||
) | ||
return { | ||
"audits": list(map(Audit._convert_audit_record, response.json()["audits"])) | ||
} | ||
|
||
@staticmethod | ||
def _convert_audit_record(a: dict) -> dict: | ||
return { | ||
"projectId": a.get("projectId", ""), | ||
"userId": a.get("userId", ""), | ||
"action": a.get("action", ""), | ||
"occurred": datetime.utcfromtimestamp(float(a.get("occurred", "0")) / 1000), | ||
"device": a.get("device", ""), | ||
"method": a.get("method", ""), | ||
"geo": a.get("geo", ""), | ||
"remoteAddress": a.get("remoteAddress", ""), | ||
"loginIds": a.get("externalIds", []), | ||
"tenants": a.get("tenants", []), | ||
"data": a.get("data", {}), | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,40 @@ | ||
import logging | ||
import os | ||
import sys | ||
from datetime import datetime | ||
|
||
dir_name = os.path.dirname(__file__) | ||
sys.path.insert(0, os.path.join(dir_name, "../")) | ||
from descope import AuthException, DescopeClient # noqa: E402 | ||
|
||
logging.basicConfig(level=logging.INFO) | ||
|
||
|
||
def main(): | ||
# Either specify here or read from env | ||
project_id = "" | ||
management_key = "" | ||
|
||
try: | ||
descope_client = DescopeClient( | ||
project_id=project_id, management_key=management_key | ||
) | ||
try: | ||
logging.info("Going to search audit") | ||
text = None | ||
if len(sys.argv) > 1: | ||
text = sys.argv[1] | ||
from_ts = None | ||
if len(sys.argv) > 2: | ||
from_ts = datetime.fromisoformat(sys.argv[2]) | ||
logging.info(descope_client.mgmt.audit.search(text=text, from_ts=from_ts)) | ||
|
||
except AuthException as e: | ||
logging.info(f"Audit search failed {e}") | ||
|
||
except AuthException: | ||
raise | ||
|
||
|
||
if __name__ == "__main__": | ||
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,82 @@ | ||
from datetime import datetime | ||
from unittest import mock | ||
from unittest.mock import patch | ||
|
||
from descope import AuthException, DescopeClient | ||
from descope.common import DEFAULT_TIMEOUT_SECONDS | ||
from descope.management.common import MgmtV1 | ||
|
||
from .. import common | ||
|
||
|
||
class TestAudit(common.DescopeTest): | ||
def setUp(self) -> None: | ||
super().setUp() | ||
self.dummy_project_id = "dummy" | ||
self.dummy_management_key = "key" | ||
self.public_key_dict = { | ||
"alg": "ES384", | ||
"crv": "P-384", | ||
"kid": "P2CtzUhdqpIF2ys9gg7ms06UvtC4", | ||
"kty": "EC", | ||
"use": "sig", | ||
"x": "pX1l7nT2turcK5_Cdzos8SKIhpLh1Wy9jmKAVyMFiOCURoj-WQX1J0OUQqMsQO0s", | ||
"y": "B0_nWAv2pmG_PzoH3-bSYZZzLNKUA0RoE2SH7DaS0KV4rtfWZhYd0MEr0xfdGKx0", | ||
} | ||
|
||
def test_search(self): | ||
client = DescopeClient( | ||
self.dummy_project_id, | ||
self.public_key_dict, | ||
False, | ||
self.dummy_management_key, | ||
) | ||
|
||
# Test failed search | ||
with patch("requests.post") as mock_post: | ||
mock_post.return_value.ok = False | ||
self.assertRaises( | ||
AuthException, | ||
client.mgmt.audit.search, | ||
"data", | ||
) | ||
|
||
# Test success search | ||
with patch("requests.post") as mock_post: | ||
network_resp = mock.Mock() | ||
network_resp.ok = True | ||
network_resp.json.return_value = { | ||
"audits": [ | ||
{ | ||
"projectId": "p", | ||
"userId": "u1", | ||
"action": "a1", | ||
"externalIds": ["e1"], | ||
"occurred": str(datetime.now().timestamp() * 1000), | ||
}, | ||
{ | ||
"projectId": "p", | ||
"userId": "u2", | ||
"action": "a2", | ||
"externalIds": ["e2"], | ||
"occurred": str(datetime.now().timestamp() * 1000), | ||
}, | ||
] | ||
} | ||
mock_post.return_value = network_resp | ||
resp = client.mgmt.audit.search() | ||
audits = resp["audits"] | ||
self.assertEqual(len(audits), 2) | ||
self.assertEqual(audits[0]["loginIds"][0], "e1") | ||
mock_post.assert_called_with( | ||
f"{common.DEFAULT_BASE_URL}{MgmtV1.audit_search}", | ||
headers={ | ||
**common.default_headers, | ||
"Authorization": f"Bearer {self.dummy_project_id}:{self.dummy_management_key}", | ||
}, | ||
params=None, | ||
json={"noTenants": False}, | ||
allow_redirects=False, | ||
verify=True, | ||
timeout=DEFAULT_TIMEOUT_SECONDS, | ||
) |