-
Notifications
You must be signed in to change notification settings - Fork 72
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
HJ-24 Add DataHub integration config (#5401)
- Loading branch information
Showing
18 changed files
with
440 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,4 @@ | ||
acryl-datahub==0.14.1 | ||
alembic==1.8.1 | ||
anyascii==0.3.2 | ||
anyio==3.7.1 | ||
|
31 changes: 31 additions & 0 deletions
31
...pi/alembic/migrations/versions/5a4859f74832_add_last_run_timestamp_to_connectionconfig.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
"""Add last_run_timestamp to ConnectionConfig | ||
Revision ID: 5a4859f74832 | ||
Revises: c9a22b284afa | ||
Create Date: 2024-10-22 14:58:09.174708 | ||
""" | ||
|
||
import sqlalchemy as sa | ||
from alembic import op | ||
|
||
# revision identifiers, used by Alembic. | ||
revision = "5a4859f74832" | ||
down_revision = "c9a22b284afa" | ||
branch_labels = None | ||
depends_on = None | ||
|
||
|
||
def upgrade(): | ||
# ### commands auto generated by Alembic - please adjust! ### | ||
op.add_column( | ||
"connectionconfig", | ||
sa.Column("last_run_timestamp", sa.DateTime(timezone=True), nullable=True), | ||
) | ||
# ### end Alembic commands ### | ||
|
||
|
||
def downgrade(): | ||
# ### commands auto generated by Alembic - please adjust! ### | ||
op.drop_column("connectionconfig", "last_run_timestamp") | ||
# ### end Alembic commands ### |
106 changes: 106 additions & 0 deletions
106
src/fides/api/alembic/migrations/versions/c9a22b284afa_add_datahub_connection_type.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,106 @@ | ||
"""Add Datahub connection type | ||
Revision ID: c9a22b284afa | ||
Revises: 4ebe0766021b | ||
Create Date: 2024-10-21 13:40:31.313754 | ||
""" | ||
|
||
import sqlalchemy as sa | ||
from alembic import op | ||
|
||
# revision identifiers, used by Alembic. | ||
revision = "c9a22b284afa" | ||
down_revision = "1c8dfd6a1bc6" | ||
branch_labels = None | ||
depends_on = None | ||
|
||
|
||
def upgrade(): | ||
# Add 'datahub' to ConnectionType enum | ||
op.execute("ALTER TYPE connectiontype RENAME TO connectiontype_old") | ||
op.execute( | ||
""" | ||
CREATE TYPE connectiontype AS ENUM ( | ||
'mongodb', | ||
'mysql', | ||
'https', | ||
'snowflake', | ||
'redshift', | ||
'mssql', | ||
'mariadb', | ||
'bigquery', | ||
'saas', | ||
'manual', | ||
'manual_webhook', | ||
'timescale', | ||
'fides', | ||
'sovrn', | ||
'attentive_email', | ||
'dynamodb', | ||
'postgres', | ||
'generic_consent_email', | ||
'generic_erasure_email', | ||
'scylla', | ||
's3', | ||
'google_cloud_sql_mysql', | ||
'google_cloud_sql_postgres', | ||
'dynamic_erasure_email', | ||
'rds_mysql', | ||
'rds_postgres', | ||
'datahub' | ||
) | ||
""" | ||
) | ||
op.execute( | ||
""" | ||
ALTER TABLE connectionconfig ALTER COLUMN connection_type TYPE connectiontype USING | ||
connection_type::text::connectiontype | ||
""" | ||
) | ||
op.execute("DROP TYPE connectiontype_old") | ||
|
||
|
||
def downgrade(): | ||
# Remove 'datahub' from ConnectionType enum | ||
op.execute("DELETE FROM connectionconfig WHERE connection_type IN ('datahub')") | ||
op.execute("ALTER TYPE connectiontype RENAME TO connectiontype_old") | ||
op.execute( | ||
""" | ||
CREATE TYPE connectiontype AS ENUM ( | ||
'mongodb', | ||
'mysql', | ||
'https', | ||
'snowflake', | ||
'redshift', | ||
'mssql', | ||
'mariadb', | ||
'bigquery', | ||
'saas', | ||
'manual', | ||
'manual_webhook', | ||
'timescale', | ||
'fides', | ||
'sovrn', | ||
'attentive_email', | ||
'dynamodb', | ||
'postgres', | ||
'generic_consent_email', | ||
'generic_erasure_email', | ||
'scylla', | ||
's3', | ||
'google_cloud_sql_mysql', | ||
'google_cloud_sql_postgres', | ||
'dynamic_erasure_email', | ||
'rds_mysql', | ||
'rds_postgres' | ||
) | ||
""" | ||
) | ||
op.execute( | ||
""" | ||
ALTER TABLE connectionconfig ALTER COLUMN connection_type TYPE connectiontype USING | ||
connection_type::text::connectiontype | ||
""" | ||
) | ||
op.execute("DROP TYPE connectiontype_old") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
43 changes: 43 additions & 0 deletions
43
src/fides/api/schemas/connection_configuration/connection_secrets_datahub.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
from enum import Enum | ||
from typing import ClassVar, List | ||
|
||
from pydantic import Field | ||
|
||
from fides.api.custom_types import AnyHttpUrlStringRemovesSlash | ||
from fides.api.schemas.base_class import NoValidationSchema | ||
from fides.api.schemas.connection_configuration.connection_secrets import ( | ||
ConnectionConfigSecretsSchema, | ||
) | ||
|
||
|
||
class PeriodicIntegrationFrequency(Enum): | ||
"""Enum for periodic integration frequency""" | ||
|
||
daily = "daily" | ||
weekly = "weekly" | ||
monthly = "monthly" | ||
|
||
|
||
class DatahubSchema(ConnectionConfigSecretsSchema): | ||
datahub_server_url: AnyHttpUrlStringRemovesSlash = Field( | ||
title="DataHub Server URL", | ||
description="The URL of your DataHub server.", | ||
) | ||
datahub_token: str = Field( | ||
title="DataHub Token", | ||
description="The token used to authenticate with your DataHub server.", | ||
json_schema_extra={"sensitive": True}, | ||
) | ||
frequency: PeriodicIntegrationFrequency = Field( | ||
title="Frequency", | ||
description="The frequency at which the integration should run. Defaults to daily.", | ||
default=PeriodicIntegrationFrequency.daily, | ||
) | ||
|
||
_required_components: ClassVar[List[str]] = ["datahub_server_url", "datahub_token"] | ||
|
||
|
||
class DatahubDocsSchema(DatahubSchema, NoValidationSchema): | ||
""" | ||
Datahub Schema for API docs. | ||
""" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
from typing import Generic, Optional | ||
|
||
from datahub.ingestion.graph.client import DataHubGraph, DataHubGraphConfig | ||
from loguru import logger | ||
|
||
from fides.api.models.connectionconfig import ConnectionConfig, ConnectionTestStatus | ||
from fides.api.schemas.connection_configuration.connection_secrets_datahub import ( | ||
DatahubSchema, | ||
) | ||
from fides.api.service.connectors.base_connector import DB_CONNECTOR_TYPE | ||
|
||
|
||
class DatahubConnector(Generic[DB_CONNECTOR_TYPE]): | ||
|
||
def __init__(self, configuration: ConnectionConfig): | ||
self.configuration = configuration | ||
self.config = DatahubSchema(**configuration.secrets or {}) | ||
# TODO: use token for authentication | ||
self.datahub_client = DataHubGraph( | ||
DataHubGraphConfig(server=str(self.config.datahub_server_url)) | ||
) | ||
|
||
def test_connection(self) -> Optional[ConnectionTestStatus]: | ||
logger.info(f"Testing DataHub connection for {self.configuration.key}...") | ||
try: | ||
self.datahub_client.test_connection() | ||
logger.info( | ||
f"DataHub connection test for {self.configuration.key} succeeded." | ||
) | ||
return ConnectionTestStatus.succeeded | ||
except Exception as e: | ||
logger.error( | ||
f"DataHub connection test for {self.configuration.key} failed: {e}" | ||
) | ||
return ConnectionTestStatus.failed |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
from typing import Generator | ||
|
||
import pytest | ||
from datahub.ingestion.graph.client import DataHubGraphConfig | ||
from sqlalchemy.orm import Session | ||
|
||
from fides.api.models.connectionconfig import ( | ||
AccessLevel, | ||
ConnectionConfig, | ||
ConnectionType, | ||
) | ||
|
||
|
||
@pytest.fixture(scope="function") | ||
def datahub_connection_config_no_secrets( | ||
db: Session, | ||
) -> Generator[ConnectionConfig, None, None]: | ||
connection_config = ConnectionConfig.create( | ||
db=db, | ||
data={ | ||
"name": "DataHub Config", | ||
"key": "my_datahub_config", | ||
"connection_type": ConnectionType.datahub, | ||
"access": AccessLevel.write, | ||
}, | ||
) | ||
yield connection_config | ||
connection_config.delete(db) |
Oops, something went wrong.