Skip to content

Commit

Permalink
Merge pull request #121 from meaningfy-ws/feature/TDA-209
Browse files Browse the repository at this point in the history
Feature/tda 209
  • Loading branch information
duprijil authored Jul 20, 2023
2 parents fca353d + fa44933 commit 8421113
Show file tree
Hide file tree
Showing 11 changed files with 221 additions and 40 deletions.
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ elasticsearch~=8.6.2
currencyconverter~=0.17.6
pycountry~=22.3.5
SQLAlchemy==1.4.46
psycopg2-binary==2.9.6
psycopg2-binary==2.9.6
pymongo~=4.0.2
8 changes: 7 additions & 1 deletion ted_data_eu/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@
PROJECT_RESOURCES_POSTGRES_TABLES_PATH = PROJECT_RESOURCES_BQ_FOLDER_PATH / "postgres_tables"


class MongoDBConfig:

@env_property()
def MONGO_DB_AUTH_URL(self, config_value: str) -> str:
return config_value

class CommonConfig:

@env_property()
Expand Down Expand Up @@ -150,7 +156,7 @@ def GITHUB_TED_DATA_ARTEFACTS_URL(self, config_value: str) -> str:


class TedDataConfigResolver(TedConfigResolver, BQResourcesConfig, GraphDBConfig, ElasticConfig, PostgresTablesConfig,
CommonConfig, MasterDataRegistryAPIConfig, GitHubConfig):
CommonConfig, MasterDataRegistryAPIConfig, GitHubConfig, MongoDBConfig):
"""
This class is used for automatic config discovery.
"""
Expand Down
40 changes: 40 additions & 0 deletions ted_data_eu/adapters/event_logger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import logging

from ted_data_eu import config
from ted_data_eu.adapters.storage import MongoDBStorage


class MongoDBEventLogger(logging.StreamHandler):
"""
Implements logging handler for MongoDB storage.
"""

def __init__(self,
database_name: str,
collection_name: str,
mongo_auth_url: str = None,
):
"""
Implements logging handler for MongoDB storage.
:param database_name: MongoDB database name where the documents will be stored
:param collection_name: MongoDB collection name where the documents will be stored
:param mongo_auth_url: MongoDB authentication URL
:return:
"""
super().__init__()
self.database_name = database_name
self.collection_name = collection_name
self.mongo_auth_url = mongo_auth_url or config.MONGO_DB_AUTH_URL
self.mongo_storage = MongoDBStorage(database_name=self.database_name,
collection_name=self.collection_name,
mongo_auth_url=self.mongo_auth_url)

def emit(self, record: logging.LogRecord):
"""
Add document to storage.
:param record: Log record to be stored
:return:
"""
self.mongo_storage.add_document(record.__dict__)
72 changes: 71 additions & 1 deletion ted_data_eu/adapters/storage.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
from typing import Dict, List

from elasticsearch import Elasticsearch, helpers
from pymongo import MongoClient

from ted_data_eu import config
from ted_data_eu.adapters.storage_abc import DocumentStorageABC
from elasticsearch import Elasticsearch, helpers


class ElasticStorageException(Exception):
Expand All @@ -11,6 +14,13 @@ class ElasticStorageException(Exception):
pass


class MongoDBStorageException(Exception):
"""
Implements custom exception for ElasticStorage
"""
pass


class ElasticStorage(DocumentStorageABC):
"""
Implements interaction with ElasticSearch storage by using its API.
Expand Down Expand Up @@ -97,3 +107,63 @@ def query(self, query) -> List[dict]:
return [document_hit["_source"] for document_hit in response['hits']['hits']]
else:
raise ElasticStorageException(str(response))


class MongoDBStorage(DocumentStorageABC):
"""
Implements interaction with MongoDB storage by using its API.
"""

def __init__(self,
database_name: str,
collection_name: str,
mongo_auth_url: str = None
):
"""
Implements interaction with MongoDB storage by using its API.
:param database_name: MongoDB database name where the documents will be stored
:param collection_name: MongoDB collection name where the documents will be stored
:param mongo_auth_url: MongoDB authentication URL
:return:
"""

self.mongodb_client = MongoClient(mongo_auth_url or config.MONGO_DB_AUTH_URL, connect=False)
self.database_name = database_name
self.collection_name = collection_name
self.connection = self.mongodb_client[self.database_name]
self.collection = self.connection[self.collection_name]

def add_document(self, document: Dict):
"""
Add document to storage.
:param document: Document to be stored
:return:
"""
self.collection.insert_one(document)

def add_documents(self, documents: List[Dict]):
"""
Add documents to storage.
:param documents: List of documents to be stored
:return:
"""
self.collection.insert_many(documents)

def clear(self):
"""
Delete current collection.
:return:
"""
self.collection.delete_many({})

def count(self) -> int:
"""
Return number of documents from current collection.
:return:
"""
return self.collection.count_documents({})
70 changes: 42 additions & 28 deletions ted_data_eu/services/etl_pipelines/postgres_etl_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from ted_data_eu import config
from ted_data_eu.adapters.cpv_processor import CellarCPVProcessor
from ted_data_eu.adapters.etl_pipeline_abc import ETLPipelineABC
from ted_data_eu.adapters.event_logger import MongoDBEventLogger
from ted_data_eu.adapters.master_data_registry import UNIQUE_ID_SRC_COLUMN_NAME, UNIQUE_ID_DST_COLUMN_NAME, \
MATCH_PROBABILITY_COLUMN_NAME
from ted_data_eu.adapters.nuts_processor import CellarNUTSProcessor, NUTSProcessor
Expand Down Expand Up @@ -152,7 +153,8 @@ def transform_notice_table(data_csv: io.StringIO) -> DataFrame:
# Add notice year column
data_table[NOTICE_YEAR_COLUMN] = data_table.apply(lambda row: generate_notice_year(row[NOTICE_ID_COLUMN]), axis=1)
# Add notice Number column
data_table[NOTICE_NUMBER_COLUMN] = data_table.apply(lambda row: generate_notice_number(row[NOTICE_ID_COLUMN]), axis=1)
data_table[NOTICE_NUMBER_COLUMN] = data_table.apply(lambda row: generate_notice_number(row[NOTICE_ID_COLUMN]),
axis=1)
# Remove duplicates
data_table.drop_duplicates(inplace=True)
return data_table
Expand Down Expand Up @@ -332,26 +334,29 @@ def transform_nuts_table(data_csv: io.StringIO) -> DataFrame:
data_table[NUTS_LEVEL_TEMPLATE.format(nuts_lvl=nuts_lvl)] = data_table.apply(
lambda row: cellar_nuts_processor.get_nuts_parent_code_by_level(row[NUTS_ID_COLUMN], nuts_lvl), axis=1)
data_table[NUTS_LABEL_TEMPLATE.format(nuts_lvl=nuts_lvl)] = data_table.apply(
lambda row: cellar_nuts_processor.get_nuts_label_by_code(row[NUTS_LEVEL_TEMPLATE.format(nuts_lvl=nuts_lvl)]),
lambda row: cellar_nuts_processor.get_nuts_label_by_code(
row[NUTS_LEVEL_TEMPLATE.format(nuts_lvl=nuts_lvl)]),
axis=1)
data_table[NUTS_LABEL_ENG_TEMPLATE.format(nuts_lvl=nuts_lvl)] = data_table.apply(
lambda row: static_nuts_processor.get_nuts_label_by_code(row[NUTS_LEVEL_TEMPLATE.format(nuts_lvl=nuts_lvl)]),
lambda row: static_nuts_processor.get_nuts_label_by_code(
row[NUTS_LEVEL_TEMPLATE.format(nuts_lvl=nuts_lvl)]),
axis=1)
data_table.drop_duplicates(inplace=True)
return data_table


ORGANIZATION_DEDUPLICATION_ID_COLUMN_NAME = "OrganizationDeduplicationId"
ORGANIZATION_ADDRESS_ID_COLUMN_NAME = "OrganizationAddressId"
REFERENCE_ORGANIZATION_ADDRESS_ID_COLUMN_NAME = "ReferenceOrganizationAddressId"
REFERENCE_MATCH_PROBABILITY_COLUMN_NAME = "MatchProbability"


def transform_organization_deduplication_table(data_csv: io.StringIO) -> DataFrame:
"""
Transforms Organization Deduplication table by making deduplication between the organizations
"""
data_table = pd.read_csv(data_csv)



deduplicated_data_table = get_organization_records_links(data_table, ORGANIZATION_ADDRESS_ID_COLUMN_NAME)

deduplicated_data_table.rename(columns={
Expand All @@ -360,7 +365,8 @@ def transform_organization_deduplication_table(data_csv: io.StringIO) -> DataFra
MATCH_PROBABILITY_COLUMN_NAME: REFERENCE_MATCH_PROBABILITY_COLUMN_NAME
}, inplace=True)

deduplicated_data_table[ORGANIZATION_DEDUPLICATION_ID_COLUMN_NAME] = deduplicated_data_table[[ORGANIZATION_ADDRESS_ID_COLUMN_NAME, REFERENCE_ORGANIZATION_ADDRESS_ID_COLUMN_NAME]].apply(
deduplicated_data_table[ORGANIZATION_DEDUPLICATION_ID_COLUMN_NAME] = deduplicated_data_table[
[ORGANIZATION_ADDRESS_ID_COLUMN_NAME, REFERENCE_ORGANIZATION_ADDRESS_ID_COLUMN_NAME]].apply(
"".join, axis=1)

return deduplicated_data_table
Expand Down Expand Up @@ -458,27 +464,41 @@ class PostgresETLException(Exception):
pass


POSTGRES_ETL_NAME = "PostgresETLPipeline"
EXTRACTED_DAY_FIELD = "extracted_day"


class PostgresETLPipeline(ETLPipelineABC):
"""
ETL Class that gets data from TDA endpoint, transforms and inserts result to document storage
"""

def __init__(self, table_name: str, sparql_query_path: Path, primary_key_column_name: str,
postgres_url: str = None, foreign_key_column_names: List[dict] = None,
triple_store: TripleStoreABC = None, triple_store_endpoint: str = None):
def __init__(self,
table_name: str,
sparql_query_path: Path,
primary_key_column_name: str,
postgres_url: str = None,
foreign_key_column_names: List[dict] = None,
triple_store: TripleStoreABC = None,
triple_store_endpoint: str = None,
etl_name: str = POSTGRES_ETL_NAME):
"""
Constructor
"""
self.etl_metadata = {}
self.table_name = table_name
self.sparql_query_path = sparql_query_path
self.postgres_url = postgres_url if postgres_url else POSTGRES_URL
self.sql_engine = sqlalchemy.create_engine(self.postgres_url, echo=False,
self.sql_engine = sqlalchemy.create_engine(self.postgres_url,
echo=False,
isolation_level=SQLALCHEMY_ISOLATION_LEVEL)
self.primary_key_column_name = primary_key_column_name
self.foreign_key_column_names = foreign_key_column_names if foreign_key_column_names else []
self.foreign_key_column_names = foreign_key_column_names or []
self.triple_store = triple_store or GraphDBAdapter()
self.triple_store_endpoint = triple_store_endpoint or TRIPLE_STORE_ENDPOINT
self.event_logger = logging.Logger(f"{etl_name}-{self.table_name}".lower())
self.mongo_logger = MongoDBEventLogger(database_name=etl_name, collection_name=etl_name)
self.event_logger.addHandler(self.mongo_logger)

def set_metadata(self, etl_metadata: dict):
"""
Expand Down Expand Up @@ -508,11 +528,9 @@ def extract(self) -> Dict:
if START_DATE_METADATA_FIELD in etl_metadata_fields and END_DATE_METADATA_FIELD in etl_metadata_fields:
if START_DATE_METADATA_FIELD == END_DATE_METADATA_FIELD:
date_range = datetime.strptime(START_DATE_METADATA_FIELD, "\"%Y%m%d\"")
logging.info("Querying data from one day")
else:
date_range = generate_sparql_filter_by_date_range(etl_metadata[START_DATE_METADATA_FIELD],
etl_metadata[END_DATE_METADATA_FIELD])
logging.info("Querying data from date range")
else:
logging.info("Querying data from yesterday")
date_range = (date.today() - timedelta(days=1)).strftime("\"%Y%m%d\"")
Expand All @@ -522,7 +540,7 @@ def extract(self) -> Dict:
triple_store_endpoint = self.triple_store.get_sparql_tda_triple_store_endpoint(
repository_name=self.triple_store_endpoint)
result_table = triple_store_endpoint.with_query(sparql_query).fetch_csv()
return {DATA_FIELD: result_table}
return {DATA_FIELD: result_table, EXTRACTED_DAY_FIELD: date_range}

def transform(self, extracted_data: Dict) -> Dict:
"""
Expand All @@ -533,16 +551,21 @@ def transform(self, extracted_data: Dict) -> Dict:
return extracted_data
data_json: io.StringIO = extracted_data.get(DATA_FIELD, None)
if not data_json:
self.event_logger.error(ERROR_NO_DATA_FETCHED,
extra={EXTRACTED_DAY_FIELD: extracted_data.get(EXTRACTED_DAY_FIELD, None), "table_name": self.table_name})
raise PostgresETLException(ERROR_NO_DATA_FETCHED)
extracted_table: DataFrame = pd.read_csv(data_json)
if extracted_table.empty:
self.event_logger.error(ERROR_NO_DATA_FETCHED,
extra={EXTRACTED_DAY_FIELD: extracted_data.get(EXTRACTED_DAY_FIELD, None), "table_name": self.table_name})
raise PostgresETLException(ERROR_NO_DATA_FETCHED)
data_json.seek(0)
if self.table_name in TRANSFORMED_TABLES.keys():
data_table: DataFrame = TRANSFORMED_TABLES[self.table_name](data_json)
else:
data_table: DataFrame = pd.read_csv(data_json)
extracted_data[DATA_FIELD] = data_table

return extracted_data

def load(self, transformed_data: Dict):
Expand All @@ -559,18 +582,9 @@ def load(self, transformed_data: Dict):
data_table.to_sql(self.table_name, con=sql_connection, if_exists='append',
chunksize=SEND_CHUNK_SIZE,
index=False)
except IntegrityError:
logging.error("Duplicate primary key found")
logging.error("Table name: %s", self.table_name)
etl_metadata_fields = self.etl_metadata.keys()
if START_DATE_METADATA_FIELD in etl_metadata_fields and END_DATE_METADATA_FIELD in etl_metadata_fields:
logging.error("Date: START: %s END: %s", self.etl_metadata[START_DATE_METADATA_FIELD],
self.etl_metadata[END_DATE_METADATA_FIELD])
else:
logging.error("Date: %s", (date.today() - timedelta(days=1)).strftime("\"%Y%m%d\""))
logging.error("Primary key column name: %s", self.primary_key_column_name)
logging.error("Foreign key column names: %s", self.foreign_key_column_names)
logging.error("Data: %s", data_table.to_string())
except IntegrityError as integrity_error:
self.event_logger.error("Integrity error: %s", integrity_error,
extra={EXTRACTED_DAY_FIELD: transformed_data.get(EXTRACTED_DAY_FIELD, None), "table_name": self.table_name})
raise PostgresETLException()

sql_connection.execute(DROP_DUPLICATES_QUERY.format(table_name=self.table_name,
Expand All @@ -588,7 +602,8 @@ def load(self, transformed_data: Dict):
# sql_connection.execute(ADD_FOREIGN_KEY_IF_NOT_EXISTS_QUERY.format(table_name=self.table_name,
# foreign_key_column_name=foreign_key_column_name,
# foreign_key_table_name=foreign_key_table_name))

self.event_logger.info("Data loaded to postgres",
extra={EXTRACTED_DAY_FIELD: transformed_data.get(EXTRACTED_DAY_FIELD, None), "table_name": self.table_name})
return {DATA_FIELD: transformed_data[DATA_FIELD]}


Expand All @@ -601,7 +616,6 @@ def extract(self) -> Dict:
table_exists = sql_connection.execute(TABLE_EXISTS_QUERY.format(table_name=self.table_name)).fetchone()[0]
if table_exists:
return {DATA_FIELD: None, SKIP_NEXT_STEP_FIELD: True}

cellar_endpoint = TDATripleStoreEndpoint(CELLAR_ENDPOINT_URL)
data_table = cellar_endpoint.with_query(
self.sparql_query_path.read_text(encoding='utf-8')).fetch_csv()
Expand Down
Loading

0 comments on commit 8421113

Please sign in to comment.