From 4bec008315501f0238c9b0cde8272df7d502ba84 Mon Sep 17 00:00:00 2001 From: erosselli <67162025+erosselli@users.noreply.github.com> Date: Thu, 24 Oct 2024 14:14:18 -0300 Subject: [PATCH] HJ-24 Add DataHub integration config (#5401) --- .fides/db_dataset.yml | 4 +- CHANGELOG.md | 3 + .../public/images/connector-logos/datahub.svg | 1 + .../datastore-connections/constants.ts | 1 + .../src/types/api/models/ConnectionType.ts | 1 + requirements.txt | 1 + ..._last_run_timestamp_to_connectionconfig.py | 31 +++++ ...9a22b284afa_add_datahub_connection_type.py | 106 +++++++++++++++ src/fides/api/models/connectionconfig.py | 6 + .../connection_configuration/__init__.py | 8 ++ .../connection_secrets_datahub.py | 43 ++++++ src/fides/api/service/connectors/__init__.py | 4 + .../service/connectors/datahub_connector.py | 35 +++++ tests/conftest.py | 1 + tests/fixtures/datahub_fixtures.py | 28 ++++ .../test_connection_config_endpoints.py | 124 ++++++++++++++++++ .../test_connection_template_endpoints.py | 2 +- .../connectors/test_datahub_connector.py | 43 ++++++ 18 files changed, 440 insertions(+), 2 deletions(-) create mode 100644 clients/admin-ui/public/images/connector-logos/datahub.svg create mode 100644 src/fides/api/alembic/migrations/versions/5a4859f74832_add_last_run_timestamp_to_connectionconfig.py create mode 100644 src/fides/api/alembic/migrations/versions/c9a22b284afa_add_datahub_connection_type.py create mode 100644 src/fides/api/schemas/connection_configuration/connection_secrets_datahub.py create mode 100644 src/fides/api/service/connectors/datahub_connector.py create mode 100644 tests/fixtures/datahub_fixtures.py create mode 100644 tests/ops/service/connectors/test_datahub_connector.py diff --git a/.fides/db_dataset.yml b/.fides/db_dataset.yml index ef1638597c..22173200d6 100644 --- a/.fides/db_dataset.yml +++ b/.fides/db_dataset.yml @@ -777,6 +777,8 @@ dataset: data_categories: [system.operations] - name: last_test_timestamp data_categories: [system.operations] + - name: last_run_timestamp + data_categories: [system.operations] - name: name data_categories: [system.operations] - name: saas_config @@ -2334,4 +2336,4 @@ dataset: - name: created_at data_categories: [system.operations] - name: updated_at - data_categories: [system.operations] \ No newline at end of file + data_categories: [system.operations] diff --git a/CHANGELOG.md b/CHANGELOG.md index 37592f84a4..a18f86ef55 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,9 @@ The types of changes are: ## [Unreleased](https://github.com/ethyca/fidesplus/compare/2.48.0...main) +### Added +- Added DataHub integration config [#5401](https://github.com/ethyca/fides/pull/5401) + ### Fixed - Fixed Snowflake DSR integration failing with syntax error [#5417](https://github.com/ethyca/fides/pull/5417) diff --git a/clients/admin-ui/public/images/connector-logos/datahub.svg b/clients/admin-ui/public/images/connector-logos/datahub.svg new file mode 100644 index 0000000000..ca979d5719 --- /dev/null +++ b/clients/admin-ui/public/images/connector-logos/datahub.svg @@ -0,0 +1 @@ + \ No newline at end of file diff --git a/clients/admin-ui/src/features/datastore-connections/constants.ts b/clients/admin-ui/src/features/datastore-connections/constants.ts index 54896ffba7..3806ea15e4 100644 --- a/clients/admin-ui/src/features/datastore-connections/constants.ts +++ b/clients/admin-ui/src/features/datastore-connections/constants.ts @@ -36,6 +36,7 @@ export const CONNECTOR_LOGOS_PATH = "/images/connector-logos/"; export const CONNECTION_TYPE_LOGO_MAP = new Map([ [ConnectionType.ATTENTIVE_EMAIL, "attentive.svg"], [ConnectionType.BIGQUERY, "bigquery.svg"], + [ConnectionType.DATAHUB, "datahub.svg"], [ConnectionType.DYNAMODB, "dynamodb.svg"], [ConnectionType.GENERIC_CONSENT_EMAIL, "ethyca.svg"], [ConnectionType.GENERIC_ERASURE_EMAIL, "ethyca.svg"], diff --git a/clients/admin-ui/src/types/api/models/ConnectionType.ts b/clients/admin-ui/src/types/api/models/ConnectionType.ts index 917592953e..d7a69689b7 100644 --- a/clients/admin-ui/src/types/api/models/ConnectionType.ts +++ b/clients/admin-ui/src/types/api/models/ConnectionType.ts @@ -8,6 +8,7 @@ export enum ConnectionType { ATTENTIVE_EMAIL = "attentive_email", BIGQUERY = "bigquery", + DATAHUB = "datahub", DYNAMODB = "dynamodb", FIDES = "fides", GENERIC_CONSENT_EMAIL = "generic_consent_email", diff --git a/requirements.txt b/requirements.txt index 49e70ac374..1520a49f05 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,4 @@ +acryl-datahub==0.14.1 alembic==1.8.1 anyascii==0.3.2 anyio==3.7.1 diff --git a/src/fides/api/alembic/migrations/versions/5a4859f74832_add_last_run_timestamp_to_connectionconfig.py b/src/fides/api/alembic/migrations/versions/5a4859f74832_add_last_run_timestamp_to_connectionconfig.py new file mode 100644 index 0000000000..ac6f2e5ddd --- /dev/null +++ b/src/fides/api/alembic/migrations/versions/5a4859f74832_add_last_run_timestamp_to_connectionconfig.py @@ -0,0 +1,31 @@ +"""Add last_run_timestamp to ConnectionConfig + +Revision ID: 5a4859f74832 +Revises: c9a22b284afa +Create Date: 2024-10-22 14:58:09.174708 + +""" + +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision = "5a4859f74832" +down_revision = "c9a22b284afa" +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.add_column( + "connectionconfig", + sa.Column("last_run_timestamp", sa.DateTime(timezone=True), nullable=True), + ) + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.drop_column("connectionconfig", "last_run_timestamp") + # ### end Alembic commands ### diff --git a/src/fides/api/alembic/migrations/versions/c9a22b284afa_add_datahub_connection_type.py b/src/fides/api/alembic/migrations/versions/c9a22b284afa_add_datahub_connection_type.py new file mode 100644 index 0000000000..9e7e0c4294 --- /dev/null +++ b/src/fides/api/alembic/migrations/versions/c9a22b284afa_add_datahub_connection_type.py @@ -0,0 +1,106 @@ +"""Add Datahub connection type + +Revision ID: c9a22b284afa +Revises: 4ebe0766021b +Create Date: 2024-10-21 13:40:31.313754 + +""" + +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision = "c9a22b284afa" +down_revision = "1c8dfd6a1bc6" +branch_labels = None +depends_on = None + + +def upgrade(): + # Add 'datahub' to ConnectionType enum + op.execute("ALTER TYPE connectiontype RENAME TO connectiontype_old") + op.execute( + """ + CREATE TYPE connectiontype AS ENUM ( + 'mongodb', + 'mysql', + 'https', + 'snowflake', + 'redshift', + 'mssql', + 'mariadb', + 'bigquery', + 'saas', + 'manual', + 'manual_webhook', + 'timescale', + 'fides', + 'sovrn', + 'attentive_email', + 'dynamodb', + 'postgres', + 'generic_consent_email', + 'generic_erasure_email', + 'scylla', + 's3', + 'google_cloud_sql_mysql', + 'google_cloud_sql_postgres', + 'dynamic_erasure_email', + 'rds_mysql', + 'rds_postgres', + 'datahub' + ) + """ + ) + op.execute( + """ + ALTER TABLE connectionconfig ALTER COLUMN connection_type TYPE connectiontype USING + connection_type::text::connectiontype + """ + ) + op.execute("DROP TYPE connectiontype_old") + + +def downgrade(): + # Remove 'datahub' from ConnectionType enum + op.execute("DELETE FROM connectionconfig WHERE connection_type IN ('datahub')") + op.execute("ALTER TYPE connectiontype RENAME TO connectiontype_old") + op.execute( + """ + CREATE TYPE connectiontype AS ENUM ( + 'mongodb', + 'mysql', + 'https', + 'snowflake', + 'redshift', + 'mssql', + 'mariadb', + 'bigquery', + 'saas', + 'manual', + 'manual_webhook', + 'timescale', + 'fides', + 'sovrn', + 'attentive_email', + 'dynamodb', + 'postgres', + 'generic_consent_email', + 'generic_erasure_email', + 'scylla', + 's3', + 'google_cloud_sql_mysql', + 'google_cloud_sql_postgres', + 'dynamic_erasure_email', + 'rds_mysql', + 'rds_postgres' + ) + """ + ) + op.execute( + """ + ALTER TABLE connectionconfig ALTER COLUMN connection_type TYPE connectiontype USING + connection_type::text::connectiontype + """ + ) + op.execute("DROP TYPE connectiontype_old") diff --git a/src/fides/api/models/connectionconfig.py b/src/fides/api/models/connectionconfig.py index c259e6f382..3b81895962 100644 --- a/src/fides/api/models/connectionconfig.py +++ b/src/fides/api/models/connectionconfig.py @@ -37,6 +37,7 @@ class ConnectionType(enum.Enum): attentive_email = "attentive_email" bigquery = "bigquery" + datahub = "datahub" dynamodb = "dynamodb" fides = "fides" generic_consent_email = "generic_consent_email" # Run after the traversal @@ -70,6 +71,7 @@ def human_readable(self) -> str: readable_mapping: Dict[str, str] = { ConnectionType.attentive_email.value: "Attentive Email", ConnectionType.bigquery.value: "BigQuery", + ConnectionType.datahub.value: "DataHub", ConnectionType.dynamic_erasure_email.value: "Dynamic Erasure Email", ConnectionType.dynamodb.value: "DynamoDB", ConnectionType.fides.value: "Fides Connector", @@ -141,6 +143,10 @@ class ConnectionConfig(Base): disabled = Column(Boolean, server_default="f", default=False) disabled_at = Column(DateTime(timezone=True)) + # Optional column to store the last time the connection was "ran" + # Each integration can determine the semantics of what "being run" is + last_run_timestamp = Column(DateTime(timezone=True), nullable=True) + # only applicable to ConnectionConfigs of connection type saas saas_config = Column( MutableDict.as_mutable(JSONB), index=False, unique=False, nullable=True diff --git a/src/fides/api/schemas/connection_configuration/__init__.py b/src/fides/api/schemas/connection_configuration/__init__.py index 60bb0e0a43..a8e0f94d67 100644 --- a/src/fides/api/schemas/connection_configuration/__init__.py +++ b/src/fides/api/schemas/connection_configuration/__init__.py @@ -19,6 +19,12 @@ from fides.api.schemas.connection_configuration.connection_secrets_bigquery import ( BigQuerySchema as BigQuerySchema, ) +from fides.api.schemas.connection_configuration.connection_secrets_datahub import ( + DatahubDocsSchema as DatahubDocsSchema, +) +from fides.api.schemas.connection_configuration.connection_secrets_datahub import ( + DatahubSchema as DatahubSchema, +) from fides.api.schemas.connection_configuration.connection_secrets_dynamic_erasure_email import ( DynamicErasureEmailDocsSchema as DynamicErasureEmailDocsSchema, ) @@ -150,6 +156,7 @@ secrets_schemas: Dict[str, Any] = { ConnectionType.attentive_email.value: AttentiveSchema, ConnectionType.bigquery.value: BigQuerySchema, + ConnectionType.datahub.value: DatahubSchema, ConnectionType.dynamic_erasure_email.value: DynamicErasureEmailSchema, ConnectionType.dynamodb.value: DynamoDBSchema, ConnectionType.fides.value: FidesConnectorSchema, @@ -208,6 +215,7 @@ def get_connection_secrets_schema( # what type of schema we should validate against. connection_secrets_schemas = Union[ BigQueryDocsSchema, + DatahubDocsSchema, DynamicErasureEmailDocsSchema, DynamoDBDocsSchema, EmailDocsSchema, diff --git a/src/fides/api/schemas/connection_configuration/connection_secrets_datahub.py b/src/fides/api/schemas/connection_configuration/connection_secrets_datahub.py new file mode 100644 index 0000000000..bfda71ed50 --- /dev/null +++ b/src/fides/api/schemas/connection_configuration/connection_secrets_datahub.py @@ -0,0 +1,43 @@ +from enum import Enum +from typing import ClassVar, List + +from pydantic import Field + +from fides.api.custom_types import AnyHttpUrlStringRemovesSlash +from fides.api.schemas.base_class import NoValidationSchema +from fides.api.schemas.connection_configuration.connection_secrets import ( + ConnectionConfigSecretsSchema, +) + + +class PeriodicIntegrationFrequency(Enum): + """Enum for periodic integration frequency""" + + daily = "daily" + weekly = "weekly" + monthly = "monthly" + + +class DatahubSchema(ConnectionConfigSecretsSchema): + datahub_server_url: AnyHttpUrlStringRemovesSlash = Field( + title="DataHub Server URL", + description="The URL of your DataHub server.", + ) + datahub_token: str = Field( + title="DataHub Token", + description="The token used to authenticate with your DataHub server.", + json_schema_extra={"sensitive": True}, + ) + frequency: PeriodicIntegrationFrequency = Field( + title="Frequency", + description="The frequency at which the integration should run. Defaults to daily.", + default=PeriodicIntegrationFrequency.daily, + ) + + _required_components: ClassVar[List[str]] = ["datahub_server_url", "datahub_token"] + + +class DatahubDocsSchema(DatahubSchema, NoValidationSchema): + """ + Datahub Schema for API docs. + """ diff --git a/src/fides/api/service/connectors/__init__.py b/src/fides/api/service/connectors/__init__.py index 251c0748fa..d81498c005 100644 --- a/src/fides/api/service/connectors/__init__.py +++ b/src/fides/api/service/connectors/__init__.py @@ -12,6 +12,9 @@ from fides.api.service.connectors.consent_email_connector import ( GenericConsentEmailConnector, ) +from fides.api.service.connectors.datahub_connector import ( + DatahubConnector as DatahubConnector, +) from fides.api.service.connectors.dynamic_erasure_email_connector import ( DynamicErasureEmailConnector, ) @@ -76,6 +79,7 @@ supported_connectors: Dict[str, Any] = { ConnectionType.attentive_email.value: AttentiveConnector, ConnectionType.bigquery.value: BigQueryConnector, + ConnectionType.datahub.value: DatahubConnector, ConnectionType.dynamic_erasure_email.value: DynamicErasureEmailConnector, ConnectionType.dynamodb.value: DynamoDBConnector, ConnectionType.fides.value: FidesConnector, diff --git a/src/fides/api/service/connectors/datahub_connector.py b/src/fides/api/service/connectors/datahub_connector.py new file mode 100644 index 0000000000..a714f30e32 --- /dev/null +++ b/src/fides/api/service/connectors/datahub_connector.py @@ -0,0 +1,35 @@ +from typing import Generic, Optional + +from datahub.ingestion.graph.client import DataHubGraph, DataHubGraphConfig +from loguru import logger + +from fides.api.models.connectionconfig import ConnectionConfig, ConnectionTestStatus +from fides.api.schemas.connection_configuration.connection_secrets_datahub import ( + DatahubSchema, +) +from fides.api.service.connectors.base_connector import DB_CONNECTOR_TYPE + + +class DatahubConnector(Generic[DB_CONNECTOR_TYPE]): + + def __init__(self, configuration: ConnectionConfig): + self.configuration = configuration + self.config = DatahubSchema(**configuration.secrets or {}) + # TODO: use token for authentication + self.datahub_client = DataHubGraph( + DataHubGraphConfig(server=str(self.config.datahub_server_url)) + ) + + def test_connection(self) -> Optional[ConnectionTestStatus]: + logger.info(f"Testing DataHub connection for {self.configuration.key}...") + try: + self.datahub_client.test_connection() + logger.info( + f"DataHub connection test for {self.configuration.key} succeeded." + ) + return ConnectionTestStatus.succeeded + except Exception as e: + logger.error( + f"DataHub connection test for {self.configuration.key} failed: {e}" + ) + return ConnectionTestStatus.failed diff --git a/tests/conftest.py b/tests/conftest.py index cfadd9fa3f..bc371ac74f 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -47,6 +47,7 @@ from fides.config.config_proxy import ConfigProxy from tests.fixtures.application_fixtures import * from tests.fixtures.bigquery_fixtures import * +from tests.fixtures.datahub_fixtures import * from tests.fixtures.dynamodb_fixtures import * from tests.fixtures.email_fixtures import * from tests.fixtures.fides_connector_example_fixtures import * diff --git a/tests/fixtures/datahub_fixtures.py b/tests/fixtures/datahub_fixtures.py new file mode 100644 index 0000000000..029d92d5c0 --- /dev/null +++ b/tests/fixtures/datahub_fixtures.py @@ -0,0 +1,28 @@ +from typing import Generator + +import pytest +from datahub.ingestion.graph.client import DataHubGraphConfig +from sqlalchemy.orm import Session + +from fides.api.models.connectionconfig import ( + AccessLevel, + ConnectionConfig, + ConnectionType, +) + + +@pytest.fixture(scope="function") +def datahub_connection_config_no_secrets( + db: Session, +) -> Generator[ConnectionConfig, None, None]: + connection_config = ConnectionConfig.create( + db=db, + data={ + "name": "DataHub Config", + "key": "my_datahub_config", + "connection_type": ConnectionType.datahub, + "access": AccessLevel.write, + }, + ) + yield connection_config + connection_config.delete(db) diff --git a/tests/ops/api/v1/endpoints/test_connection_config_endpoints.py b/tests/ops/api/v1/endpoints/test_connection_config_endpoints.py index 1998fb7bb2..c04f67139a 100644 --- a/tests/ops/api/v1/endpoints/test_connection_config_endpoints.py +++ b/tests/ops/api/v1/endpoints/test_connection_config_endpoints.py @@ -1981,6 +1981,130 @@ def test_put_http_connection_config_secrets( assert https_connection_config.last_test_timestamp is None assert https_connection_config.last_test_succeeded is None + def test_put_datahub_connection_config_secrets( + self, + api_client: TestClient, + db: Session, + generate_auth_header, + datahub_connection_config_no_secrets, + ): + """ + Note: this test does not call DataHub, via use of verify query param. + """ + url = f"{V1_URL_PREFIX}{CONNECTIONS}/{datahub_connection_config_no_secrets.key}/secret" + auth_header = generate_auth_header(scopes=[CONNECTION_CREATE_OR_UPDATE]) + payload = { + "datahub_server_url": "https://datahub.example.com", + "datahub_token": "test", + "frequency": "weekly", + } + resp = api_client.put( + url + "?verify=False", + headers=auth_header, + json=payload, + ) + assert resp.status_code == 200 + assert ( + json.loads(resp.text)["msg"] + == f"Secrets updated for ConnectionConfig with key: {datahub_connection_config_no_secrets.key}." + ) + + db.refresh(datahub_connection_config_no_secrets) + assert datahub_connection_config_no_secrets.secrets == { + "datahub_server_url": "https://datahub.example.com", + "datahub_token": "test", + "frequency": "weekly", + } + assert datahub_connection_config_no_secrets.last_test_timestamp is None + assert datahub_connection_config_no_secrets.last_test_succeeded is None + + def test_put_datahub_connection_config_secrets_default_frequency( + self, + api_client: TestClient, + db: Session, + generate_auth_header, + datahub_connection_config_no_secrets, + ): + """ + Note: this test does not call DataHub, via use of verify query param. + """ + url = f"{V1_URL_PREFIX}{CONNECTIONS}/{datahub_connection_config_no_secrets.key}/secret" + auth_header = generate_auth_header(scopes=[CONNECTION_CREATE_OR_UPDATE]) + payload = { + "datahub_server_url": "https://datahub.example.com", + "datahub_token": "test", + } + resp = api_client.put( + url + "?verify=False", + headers=auth_header, + json=payload, + ) + assert resp.status_code == 200 + assert ( + json.loads(resp.text)["msg"] + == f"Secrets updated for ConnectionConfig with key: {datahub_connection_config_no_secrets.key}." + ) + + db.refresh(datahub_connection_config_no_secrets) + assert datahub_connection_config_no_secrets.secrets == { + "datahub_server_url": "https://datahub.example.com", + "datahub_token": "test", + "frequency": "daily", + } + assert datahub_connection_config_no_secrets.last_test_timestamp is None + assert datahub_connection_config_no_secrets.last_test_succeeded is None + + def test_put_datahub_connection_config_secrets_missing_url( + self, + api_client: TestClient, + db: Session, + generate_auth_header, + datahub_connection_config_no_secrets, + ): + """ + Note: this test does not call DataHub, via use of verify query param. + """ + url = f"{V1_URL_PREFIX}{CONNECTIONS}/{datahub_connection_config_no_secrets.key}/secret" + auth_header = generate_auth_header(scopes=[CONNECTION_CREATE_OR_UPDATE]) + payload = {"datahub_token": "test", "frequency": "weekly"} + resp = api_client.put( + url + "?verify=False", + headers=auth_header, + json=payload, + ) + assert resp.status_code == 422 + assert ( + resp.json()["detail"][0]["msg"] + == "Value error, DatahubSchema must be supplied all of: ['datahub_server_url', 'datahub_token']." + ) + + def test_put_datahub_connection_config_secrets_missing_token( + self, + api_client: TestClient, + db: Session, + generate_auth_header, + datahub_connection_config_no_secrets, + ): + """ + Note: this test does not call DataHub, via use of verify query param. + """ + url = f"{V1_URL_PREFIX}{CONNECTIONS}/{datahub_connection_config_no_secrets.key}/secret" + auth_header = generate_auth_header(scopes=[CONNECTION_CREATE_OR_UPDATE]) + payload = { + "datahub_server_url": "https://datahub.example.com", + "frequency": "weekly", + } + resp = api_client.put( + url + "?verify=False", + headers=auth_header, + json=payload, + ) + assert resp.status_code == 422 + assert ( + resp.json()["detail"][0]["msg"] + == "Value error, DatahubSchema must be supplied all of: ['datahub_server_url', 'datahub_token']." + ) + @pytest.mark.unit_saas def test_put_saas_example_connection_config_secrets( self, diff --git a/tests/ops/api/v1/endpoints/test_connection_template_endpoints.py b/tests/ops/api/v1/endpoints/test_connection_template_endpoints.py index 9d345ea87e..6c395eb11a 100644 --- a/tests/ops/api/v1/endpoints/test_connection_template_endpoints.py +++ b/tests/ops/api/v1/endpoints/test_connection_template_endpoints.py @@ -355,7 +355,7 @@ def test_search_system_type(self, api_client, generate_auth_header, url): resp = api_client.get(url + "system_type=database", headers=auth_header) assert resp.status_code == 200 data = resp.json()["items"] - assert len(data) == 16 + assert len(data) == 17 def test_search_system_type_and_connection_type( self, diff --git a/tests/ops/service/connectors/test_datahub_connector.py b/tests/ops/service/connectors/test_datahub_connector.py new file mode 100644 index 0000000000..de4978ce66 --- /dev/null +++ b/tests/ops/service/connectors/test_datahub_connector.py @@ -0,0 +1,43 @@ +from unittest.mock import patch + +import pytest + +from fides.api.models.connectionconfig import ConnectionConfig, ConnectionTestStatus +from fides.api.service.connectors.datahub_connector import DatahubConnector + + +class TestDatahubConnector: + @pytest.fixture + def connection_config(self): + return ConnectionConfig( + key="datahub_test_key", + secrets={ + "datahub_server_url": "http://testserver", + "datahub_token": "testtoken", + "frequency": "daily", + }, + ) + + def test_test_connection_success(self, connection_config: ConnectionConfig): + datahub_connector = DatahubConnector(connection_config) + + with patch.object( + datahub_connector.datahub_client, "test_connection", return_value=True + ) as mock_test_connection: + result = datahub_connector.test_connection() + assert result == ConnectionTestStatus.succeeded + + mock_test_connection.assert_called_once() + + def test_test_connection_failure(self, connection_config: ConnectionConfig): + datahub_connector = DatahubConnector(connection_config) + + with patch.object( + datahub_connector.datahub_client, + "test_connection", + side_effect=Exception("Connection failed"), + ) as mock_test_connection: + result = datahub_connector.test_connection() + assert result == ConnectionTestStatus.failed + + mock_test_connection.assert_called_once()