From f804705b787ccf7385f3c1a2808dba248e73b43c Mon Sep 17 00:00:00 2001 From: NGUYEN HUU BACH Date: Mon, 29 Nov 2021 17:27:14 +0000 Subject: [PATCH 1/8] update to newer amundsen-databuilder and requests. Also add connect_args to presto driver --- pipelines/requirements.txt | 6 ++++-- pipelines/whale/engine/sql_alchemy_engine.py | 13 +++++++++++-- .../extractor/presto_table_metadata_extractor.py | 8 +++++++- pipelines/whale/models/connection_config.py | 2 ++ pipelines/whale/utils/extractor_wrappers.py | 5 +++++ 5 files changed, 29 insertions(+), 5 deletions(-) diff --git a/pipelines/requirements.txt b/pipelines/requirements.txt index 5b1ea198..9f5b2123 100644 --- a/pipelines/requirements.txt +++ b/pipelines/requirements.txt @@ -1,5 +1,6 @@ aadict==0.2.3 -amundsen-databuilder==3.3.2 +# amundsen-databuilder==3.3.2 +amundsen-databuilder==6.4.0 asn1crypto==1.4.0 asset==0.6.13 azure-common==1.1.25 @@ -64,7 +65,8 @@ pyparsing==2.4.7 python-dateutil==2.8.1 pytz==2020.1 PyYAML==5.3.1 -requests==2.23.0 +# requests==2.23.0 +requests==2.25.0 requests-oauthlib==1.3.0 retrying==1.3.3 rsa==4.6 diff --git a/pipelines/whale/engine/sql_alchemy_engine.py b/pipelines/whale/engine/sql_alchemy_engine.py index 95db9017..cda5fe2b 100644 --- a/pipelines/whale/engine/sql_alchemy_engine.py +++ b/pipelines/whale/engine/sql_alchemy_engine.py @@ -2,6 +2,7 @@ from pyhocon import ConfigFactory, ConfigTree from sqlalchemy import create_engine from typing import Iterator +from ast import literal_eval from whale.engine.base_engine import Engine @@ -13,6 +14,7 @@ class SQLAlchemyEngine(Engine): """ CONN_STRING_KEY = "conn_string" + CONNECT_ARGS = 'connect_args' MODEL_CLASS_KEY = "model_class" CREDENTIALS_PATH_KEY = "credentials_path" @@ -23,6 +25,7 @@ def init(self, conf: ConfigTree): :param conf: configuration file. """ self.conn_string = conf.get_string(SQLAlchemyEngine.CONN_STRING_KEY) + self.connect_args = conf.get_string(SQLAlchemyEngine.CONNECT_ARGS) self.credentials_path = conf.get(SQLAlchemyEngine.CREDENTIALS_PATH_KEY, None) self.connection = self._get_connection() @@ -38,10 +41,16 @@ def _get_connection(self): """ if self.credentials_path is not None: engine = create_engine( - self.conn_string, credentials_path=self.credentials_path + self.conn_string, credentials_path=self.credentials_path, + # connect_args={'protocol': 'https','requests_kwargs': {'verify': False}} + connect_args=literal_eval(self.connect_args) ) else: - engine = create_engine(self.conn_string) + engine = create_engine( + self.conn_string, + # connect_args={'protocol': 'https','requests_kwargs': {'verify': False}} + connect_args=literal_eval(self.connect_args) + ) conn = engine.connect() return conn diff --git a/pipelines/whale/extractor/presto_table_metadata_extractor.py b/pipelines/whale/extractor/presto_table_metadata_extractor.py index 0156b002..661b254e 100644 --- a/pipelines/whale/extractor/presto_table_metadata_extractor.py +++ b/pipelines/whale/extractor/presto_table_metadata_extractor.py @@ -25,6 +25,7 @@ class PrestoTableMetadataExtractor(Extractor): WHERE_CLAUSE_SUFFIX_KEY = "where_clause_suffix" CLUSTER_KEY = "cluster" DATABASE_KEY = "database" + CONNECT_ARGS = "connect_args" SQL_STATEMENT = """ SELECT @@ -81,7 +82,12 @@ def init(self, conf): sql_alch_conf = Scoped.get_scoped_conf( self.conf, self._alchemy_extractor.get_scope() ).with_fallback( - ConfigFactory.from_dict({SQLAlchemyExtractor.EXTRACT_SQL: self.sql_stmt}) + ConfigFactory.from_dict( + { + SQLAlchemyExtractor.EXTRACT_SQL: self.sql_stmt, + PrestoTableMetadataExtractor.CONNECT_ARGS: self.conf.get(PrestoTableMetadataExtractor.CONNECT_ARGS) + } + ) ) self._alchemy_extractor.init(sql_alch_conf) diff --git a/pipelines/whale/models/connection_config.py b/pipelines/whale/models/connection_config.py index 7db2ad66..2aae03d5 100644 --- a/pipelines/whale/models/connection_config.py +++ b/pipelines/whale/models/connection_config.py @@ -7,6 +7,7 @@ def __init__( metadata_source: str, dialect: Optional[str] = None, uri: Optional[str] = None, + connect_args: Optional[str] = None, port: Optional[int] = None, username: Optional[str] = None, password: Optional[str] = None, @@ -36,6 +37,7 @@ def __init__( ): self.uri = uri + self.connect_args = connect_args self.port = port if metadata_source is not None: metadata_source = metadata_source.lower() diff --git a/pipelines/whale/utils/extractor_wrappers.py b/pipelines/whale/utils/extractor_wrappers.py index 3b95c97d..c4619740 100644 --- a/pipelines/whale/utils/extractor_wrappers.py +++ b/pipelines/whale/utils/extractor_wrappers.py @@ -51,6 +51,10 @@ def add_ugc_runner(extractors: list, conf: ConfigTree, connection): f"{METRIC_RUNNER_SCOPE}.{SQL_ALCHEMY_ENGINE_SCOPE}.{SQLAlchemyEngine.CREDENTIALS_PATH_KEY}", connection.key_path, ) + conf.put( + f"{METRIC_RUNNER_SCOPE}.{SQL_ALCHEMY_ENGINE_SCOPE}.{SQLAlchemyEngine.CONNECT_ARGS}", + connection.connect_args, + ) extractors.append(UGCRunner()) return extractors, conf @@ -183,6 +187,7 @@ def configure_presto_extractors(connection: ConnectionConfigSchema): f"{loop_scope}.{LoopExtractor.CLUSTER_KEY}": connection.cluster, f"{scope}.{Extractor.DATABASE_KEY}": connection.name, f"{scope}.{Extractor.CLUSTER_KEY}": connection.cluster, + f"{scope}.{Extractor.CONNECT_ARGS}": connection.connect_args, f"{scope}.{Extractor.WHERE_CLAUSE_SUFFIX_KEY}": connection.where_clause_suffix, } ) From 9d7daadd8ba2792bda08486dc2891167d60e51fd Mon Sep 17 00:00:00 2001 From: NGUYEN HUU BACH Date: Sat, 11 Dec 2021 00:29:57 +0000 Subject: [PATCH 2/8] fix sql_alchemy_engine.py to use connect_args as json connect_args could be set like this: ``` connect_args: {'protocol':'https'} ``` --- pipelines/whale/engine/sql_alchemy_engine.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pipelines/whale/engine/sql_alchemy_engine.py b/pipelines/whale/engine/sql_alchemy_engine.py index cda5fe2b..a7f60385 100644 --- a/pipelines/whale/engine/sql_alchemy_engine.py +++ b/pipelines/whale/engine/sql_alchemy_engine.py @@ -25,7 +25,7 @@ def init(self, conf: ConfigTree): :param conf: configuration file. """ self.conn_string = conf.get_string(SQLAlchemyEngine.CONN_STRING_KEY) - self.connect_args = conf.get_string(SQLAlchemyEngine.CONNECT_ARGS) + self.connect_args = HOCONConverter.to_json(conf.get_config(SQLAlchemyEngine.CONNECT_ARGS)) self.credentials_path = conf.get(SQLAlchemyEngine.CREDENTIALS_PATH_KEY, None) self.connection = self._get_connection() From f5a5781cdb5d9a3cf8fdb48115a8ac179c6308e5 Mon Sep 17 00:00:00 2001 From: NGUYEN HUU BACH Date: Sat, 11 Dec 2021 01:10:49 +0000 Subject: [PATCH 3/8] add missing packages --- pipelines/whale/engine/sql_alchemy_engine.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pipelines/whale/engine/sql_alchemy_engine.py b/pipelines/whale/engine/sql_alchemy_engine.py index a7f60385..e5dfdf3a 100644 --- a/pipelines/whale/engine/sql_alchemy_engine.py +++ b/pipelines/whale/engine/sql_alchemy_engine.py @@ -1,5 +1,5 @@ import importlib -from pyhocon import ConfigFactory, ConfigTree +from pyhocon import ConfigFactory, ConfigTree, HOCONConverter from sqlalchemy import create_engine from typing import Iterator from ast import literal_eval From fdef97d01a3ba3e6e8af912e7376f2c4c16005ca Mon Sep 17 00:00:00 2001 From: NGUYEN HUU BACH Date: Sat, 11 Dec 2021 01:33:51 +0000 Subject: [PATCH 4/8] fix some missing to support sqlachemy connect_args --- pipelines/whale/utils/extractor_wrappers.py | 1 + 1 file changed, 1 insertion(+) diff --git a/pipelines/whale/utils/extractor_wrappers.py b/pipelines/whale/utils/extractor_wrappers.py index c4619740..73abe254 100644 --- a/pipelines/whale/utils/extractor_wrappers.py +++ b/pipelines/whale/utils/extractor_wrappers.py @@ -309,6 +309,7 @@ def configure_unscoped_sqlalchemy_engine(connection: ConnectionConfigSchema): conf = ConfigFactory.from_dict( { f"{Engine.CONN_STRING_KEY}": connection.conn_string, + f"{Engine.CONNECT_ARGS}": connection.connect_args, f"{Engine.CREDENTIALS_PATH_KEY}": connection.key_path, } ) From 5e5f7edd8b5fa1b30403cf1383f4090e26eb5ef5 Mon Sep 17 00:00:00 2001 From: Robert Yi Date: Tue, 4 Jan 2022 08:58:19 -0800 Subject: [PATCH 5/8] chore: advance skim version (#185) * chore: advance skim version * fix: clippy error --- cli/Cargo.toml | 2 +- cli/src/warehouse/hive.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/cli/Cargo.toml b/cli/Cargo.toml index 76b2b731..92fc6d64 100644 --- a/cli/Cargo.toml +++ b/cli/Cargo.toml @@ -15,5 +15,5 @@ regex = "1" serde = { version = "1.0", features = ["derive"] } serde_yaml = "0.8" shellexpand = "2.0.0" -skim = "0.8.2" +skim = "0.9.4" yaml-rust = "0.4" diff --git a/cli/src/warehouse/hive.rs b/cli/src/warehouse/hive.rs index 2fe472fa..f83d6dec 100644 --- a/cli/src/warehouse/hive.rs +++ b/cli/src/warehouse/hive.rs @@ -54,7 +54,7 @@ impl HiveMetastore { } let dialect_msg = "What is the dialect of this metastore? E.g. postgres. This is the dialect type that is used in the SQLAlchemy connection string. See https://docs.sqlalchemy.org/en/13/core/engines.html for more details."; - let dialect = utils::get_input_with_message(&dialect_msg); + let dialect = utils::get_input_with_message(dialect_msg); let database: Option; let database_msg = "Is there a database within this connection that contains the metastore? This usually is the case for hive metastores, and it is usually called 'hive'. If in doubt, enter 'hive'."; From e7459e9f63e23435f589775a5ff6b32384220b36 Mon Sep 17 00:00:00 2001 From: NGUYEN HUU BACH Date: Mon, 29 Nov 2021 17:27:14 +0000 Subject: [PATCH 6/8] enhance fix for connect_args (#184) --- pipelines/requirements.txt | 6 +++-- pipelines/whale/__init__.py | 2 ++ pipelines/whale/engine/sql_alchemy_engine.py | 25 ++++++++++++++++--- .../presto_table_metadata_extractor.py | 8 +++++- pipelines/whale/models/connection_config.py | 2 ++ pipelines/whale/utils/extractor_wrappers.py | 6 +++++ 6 files changed, 42 insertions(+), 7 deletions(-) diff --git a/pipelines/requirements.txt b/pipelines/requirements.txt index 5b1ea198..9f5b2123 100644 --- a/pipelines/requirements.txt +++ b/pipelines/requirements.txt @@ -1,5 +1,6 @@ aadict==0.2.3 -amundsen-databuilder==3.3.2 +# amundsen-databuilder==3.3.2 +amundsen-databuilder==6.4.0 asn1crypto==1.4.0 asset==0.6.13 azure-common==1.1.25 @@ -64,7 +65,8 @@ pyparsing==2.4.7 python-dateutil==2.8.1 pytz==2020.1 PyYAML==5.3.1 -requests==2.23.0 +# requests==2.23.0 +requests==2.25.0 requests-oauthlib==1.3.0 retrying==1.3.3 rsa==4.6 diff --git a/pipelines/whale/__init__.py b/pipelines/whale/__init__.py index 8ea856a2..e7b0c536 100644 --- a/pipelines/whale/__init__.py +++ b/pipelines/whale/__init__.py @@ -1,3 +1,4 @@ +import traceback import datetime import logging import os @@ -181,6 +182,7 @@ def pull(): except Exception as e: LOGGER.warning(e) LOGGER.warning(f"Skipping {type(extractor)}.") + LOGGER.warning(traceback.format_exc()) transfer_manifest(tmp_manifest_path) diff --git a/pipelines/whale/engine/sql_alchemy_engine.py b/pipelines/whale/engine/sql_alchemy_engine.py index 95db9017..19be608a 100644 --- a/pipelines/whale/engine/sql_alchemy_engine.py +++ b/pipelines/whale/engine/sql_alchemy_engine.py @@ -1,7 +1,8 @@ import importlib -from pyhocon import ConfigFactory, ConfigTree +from pyhocon import ConfigFactory, ConfigTree, HOCONConverter from sqlalchemy import create_engine from typing import Iterator +from ast import literal_eval from whale.engine.base_engine import Engine @@ -13,6 +14,7 @@ class SQLAlchemyEngine(Engine): """ CONN_STRING_KEY = "conn_string" + CONNECT_ARGS = 'connect_args' MODEL_CLASS_KEY = "model_class" CREDENTIALS_PATH_KEY = "credentials_path" @@ -22,8 +24,18 @@ def init(self, conf: ConfigTree): :param conf: configuration file. """ + connect_args = { + k: v + for k, v in conf.get_config( + SQLAlchemyEngine.CONNECT_ARGS, default=ConfigTree() + ).items() + } + self.conn_string = conf.get_string(SQLAlchemyEngine.CONN_STRING_KEY) +<<<<<<< HEAD + self.connect_args = connect_args self.credentials_path = conf.get(SQLAlchemyEngine.CREDENTIALS_PATH_KEY, None) + self.connect_args = connect_args self.connection = self._get_connection() model_class = conf.get(SQLAlchemyEngine.MODEL_CLASS_KEY, None) @@ -36,12 +48,17 @@ def _get_connection(self): """ Create a SQLAlchemy connection to `conn_string`. """ - if self.credentials_path is not None: + if self.credentials_path: engine = create_engine( - self.conn_string, credentials_path=self.credentials_path + self.conn_string, + credentials_path=self.credentials_path, + connect_args=self.connect_args ) else: - engine = create_engine(self.conn_string) + engine = create_engine( + self.conn_string, + connect_args=self.connect_args + ) conn = engine.connect() return conn diff --git a/pipelines/whale/extractor/presto_table_metadata_extractor.py b/pipelines/whale/extractor/presto_table_metadata_extractor.py index 0156b002..661b254e 100644 --- a/pipelines/whale/extractor/presto_table_metadata_extractor.py +++ b/pipelines/whale/extractor/presto_table_metadata_extractor.py @@ -25,6 +25,7 @@ class PrestoTableMetadataExtractor(Extractor): WHERE_CLAUSE_SUFFIX_KEY = "where_clause_suffix" CLUSTER_KEY = "cluster" DATABASE_KEY = "database" + CONNECT_ARGS = "connect_args" SQL_STATEMENT = """ SELECT @@ -81,7 +82,12 @@ def init(self, conf): sql_alch_conf = Scoped.get_scoped_conf( self.conf, self._alchemy_extractor.get_scope() ).with_fallback( - ConfigFactory.from_dict({SQLAlchemyExtractor.EXTRACT_SQL: self.sql_stmt}) + ConfigFactory.from_dict( + { + SQLAlchemyExtractor.EXTRACT_SQL: self.sql_stmt, + PrestoTableMetadataExtractor.CONNECT_ARGS: self.conf.get(PrestoTableMetadataExtractor.CONNECT_ARGS) + } + ) ) self._alchemy_extractor.init(sql_alch_conf) diff --git a/pipelines/whale/models/connection_config.py b/pipelines/whale/models/connection_config.py index 7db2ad66..b957444c 100644 --- a/pipelines/whale/models/connection_config.py +++ b/pipelines/whale/models/connection_config.py @@ -7,6 +7,7 @@ def __init__( metadata_source: str, dialect: Optional[str] = None, uri: Optional[str] = None, + connect_args: Optional[dict] = {}, port: Optional[int] = None, username: Optional[str] = None, password: Optional[str] = None, @@ -36,6 +37,7 @@ def __init__( ): self.uri = uri + self.connect_args = connect_args self.port = port if metadata_source is not None: metadata_source = metadata_source.lower() diff --git a/pipelines/whale/utils/extractor_wrappers.py b/pipelines/whale/utils/extractor_wrappers.py index 3b95c97d..73abe254 100644 --- a/pipelines/whale/utils/extractor_wrappers.py +++ b/pipelines/whale/utils/extractor_wrappers.py @@ -51,6 +51,10 @@ def add_ugc_runner(extractors: list, conf: ConfigTree, connection): f"{METRIC_RUNNER_SCOPE}.{SQL_ALCHEMY_ENGINE_SCOPE}.{SQLAlchemyEngine.CREDENTIALS_PATH_KEY}", connection.key_path, ) + conf.put( + f"{METRIC_RUNNER_SCOPE}.{SQL_ALCHEMY_ENGINE_SCOPE}.{SQLAlchemyEngine.CONNECT_ARGS}", + connection.connect_args, + ) extractors.append(UGCRunner()) return extractors, conf @@ -183,6 +187,7 @@ def configure_presto_extractors(connection: ConnectionConfigSchema): f"{loop_scope}.{LoopExtractor.CLUSTER_KEY}": connection.cluster, f"{scope}.{Extractor.DATABASE_KEY}": connection.name, f"{scope}.{Extractor.CLUSTER_KEY}": connection.cluster, + f"{scope}.{Extractor.CONNECT_ARGS}": connection.connect_args, f"{scope}.{Extractor.WHERE_CLAUSE_SUFFIX_KEY}": connection.where_clause_suffix, } ) @@ -304,6 +309,7 @@ def configure_unscoped_sqlalchemy_engine(connection: ConnectionConfigSchema): conf = ConfigFactory.from_dict( { f"{Engine.CONN_STRING_KEY}": connection.conn_string, + f"{Engine.CONNECT_ARGS}": connection.connect_args, f"{Engine.CREDENTIALS_PATH_KEY}": connection.key_path, } ) From 3499ac106eb65413081d9bd9b12d177e7067aa35 Mon Sep 17 00:00:00 2001 From: NGUYEN HUU BACH Date: Thu, 9 Jun 2022 16:26:38 +0000 Subject: [PATCH 7/8] fix metric error described in #177 --- pipelines/whale/extractor/ugc_runner.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pipelines/whale/extractor/ugc_runner.py b/pipelines/whale/extractor/ugc_runner.py index f6572d7e..4d8af7b3 100644 --- a/pipelines/whale/extractor/ugc_runner.py +++ b/pipelines/whale/extractor/ugc_runner.py @@ -16,6 +16,7 @@ DEFINED_METRICS_SECTION, ) from whale.utils.markdown_delimiters import METRICS_DELIMITER +from whale.utils.markdown_delimiters import DEFINED_METRICS_DELIMITER from whale.models.metric_value import MetricValue, SlackAlert SQLALCHEMY_ENGINE_SCOPE = SQLAlchemyEngine().get_scope() @@ -61,7 +62,8 @@ def extract(self): def _find_all_table_stub_paths(self) -> list: try: results = subprocess.check_output( - f"grep -l '{METRICS_DELIMITER}' ~/.whale/metadata/{self.database}/* -d skip", + # f"grep -l '{METRICS_DELIMITER}' ~/.whale/metadata/{self.database}/* -d skip", + f"grep -l '{DEFINED_METRICS_DELIMITER}' ~/.whale/metadata/{self.database}/* -d skip", shell=True, ) results = results.decode("utf-8") From 52e000c32710c4ef0ed0cd69acc44577f7a97041 Mon Sep 17 00:00:00 2001 From: NGUYEN HUU BACH Date: Thu, 9 Jun 2022 22:53:20 +0000 Subject: [PATCH 8/8] fix duplicate item in manifest --- pipelines/whale/engine/sql_alchemy_engine.py | 2 -- pipelines/whale/loader/whale_loader.py | 18 +++++++++++++++++- 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/pipelines/whale/engine/sql_alchemy_engine.py b/pipelines/whale/engine/sql_alchemy_engine.py index 19be608a..2735ec7a 100644 --- a/pipelines/whale/engine/sql_alchemy_engine.py +++ b/pipelines/whale/engine/sql_alchemy_engine.py @@ -32,10 +32,8 @@ def init(self, conf: ConfigTree): } self.conn_string = conf.get_string(SQLAlchemyEngine.CONN_STRING_KEY) -<<<<<<< HEAD self.connect_args = connect_args self.credentials_path = conf.get(SQLAlchemyEngine.CREDENTIALS_PATH_KEY, None) - self.connect_args = connect_args self.connection = self._get_connection() model_class = conf.get(SQLAlchemyEngine.MODEL_CLASS_KEY, None) diff --git a/pipelines/whale/loader/whale_loader.py b/pipelines/whale/loader/whale_loader.py index 2b623023..a93eb355 100644 --- a/pipelines/whale/loader/whale_loader.py +++ b/pipelines/whale/loader/whale_loader.py @@ -1,3 +1,4 @@ +import logging import os import yaml @@ -45,6 +46,7 @@ METRICS_SECTION, ) +LOGGER = logging.getLogger(__name__) class WhaleLoader(Loader): """ @@ -66,6 +68,7 @@ def init(self, conf: ConfigTree): Path(self.base_directory).mkdir(parents=True, exist_ok=True) Path(paths.MANIFEST_DIR).mkdir(parents=True, exist_ok=True) + def load(self, record) -> None: """ Creates a table stub if it does not exist, updates this template with @@ -116,6 +119,7 @@ def load(self, record) -> None: update_markdown(file_path, record) + # add path if it does not exist yet if self.tmp_manifest_path is not None: _append_to_temp_manifest( database=database, @@ -260,6 +264,18 @@ def _get_section_from_metrics(metrics): def _append_to_temp_manifest( database, cluster, schema, table, tmp_manifest_path=paths.TMP_MANIFEST_PATH ): + # read current manifest + tmp = [] + if os.path.exists(tmp_manifest_path): + with open(tmp_manifest_path, "r") as f: + tmp = list(map(lambda s: s.strip(), f.readlines())) + + + # only add new entries relative_file_path = get_table_file_path_relative(database, cluster, schema, table) + # LOGGER.warning(f"###: {relative_file_path}") + with open(tmp_manifest_path, "a") as f: - f.write(relative_file_path + "\n") + if not relative_file_path in tmp: + tmp.append(relative_file_path) + f.write(relative_file_path + "\n")