diff --git a/.github/workflows/github-actions.yml b/.github/workflows/github-actions.yml index eb972a34..631606de 100644 --- a/.github/workflows/github-actions.yml +++ b/.github/workflows/github-actions.yml @@ -1,32 +1,45 @@ name: github-actions on: [push, pull_request] jobs: - test: - runs-on: ubuntu-latest + test: + runs-on: ubuntu-latest + strategy: + matrix: + db_url: ["http://localhost:8000", "postgres://postgres:postgres@localhost:5432/gene_normalizer_test"] + services: + postgres: + image: postgres:14 env: - AWS_ACCESS_KEY_ID: ${{ secrets.DUMMY_AWS_ACCESS_KEY_ID }} - AWS_SECRET_ACCESS_KEY: ${{ secrets.DUMMY_AWS_SECRET_ACCESS_KEY }} - AWS_DEFAULT_REGION: us-east-2 - AWS_DEFAULT_OUTPUT: text - GENE_NORM_DB_URL: http://localhost:8000 - GENE_TEST: true - steps: - - uses: actions/checkout@v3 + POSTGRES_USER: 'postgres' + POSTGRES_DB: 'gene_normalizer_test' + POSTGRES_PASSWORD: 'postgres' + ports: + - 5432:5432 + env: + AWS_ACCESS_KEY_ID: ${{ secrets.DUMMY_AWS_ACCESS_KEY_ID }} + AWS_SECRET_ACCESS_KEY: ${{ secrets.DUMMY_AWS_SECRET_ACCESS_KEY }} + AWS_DEFAULT_REGION: us-east-2 + AWS_DEFAULT_OUTPUT: text + GENE_NORM_DB_URL: ${{ matrix.db_url }} + GENE_TEST: true + steps: + - uses: actions/checkout@v3 - - name: Setup Python - uses: actions/setup-python@v4 - with: - python-version: 3.8 + - name: Setup Python + uses: actions/setup-python@v4 + with: + python-version: 3.8 - - name: Install dependencies - run: | - python3 -m pip install pipenv - pipenv install --dev + - name: Install dependencies + run: | + python3 -m pip install pipenv + pipenv install --dev - - name: Build local DynamoDB - run: | - chmod +x ./tests/unit/dynamodb_build.bash - ./tests/unit/dynamodb_build.bash + - name: Build local DynamoDB + if: ${{ env.GENE_NORM_DB_URL == 'http://localhost:8000' }} + run: | + chmod +x ./tests/unit/dynamodb_build.bash + ./tests/unit/dynamodb_build.bash - - run: pipenv run flake8 - - run: pipenv run pytest tests/ + - run: pipenv run flake8 + - run: pipenv run pytest tests/ diff --git a/.gitignore b/.gitignore index 62bc12ee..00a2c462 100644 --- a/.gitignore +++ b/.gitignore @@ -118,6 +118,7 @@ venv/ ENV/ env.bak/ venv.bak/ +.python-version # Spyder project settings .spyderproject @@ -160,4 +161,4 @@ dynamodb_local_latest/* Pipfile.lock *.toml -*.zip \ No newline at end of file +*.zip diff --git a/Pipfile b/Pipfile index b15047a2..d9d2682c 100644 --- a/Pipfile +++ b/Pipfile @@ -16,7 +16,7 @@ boto3 = "*" gene = {editable = true, path = "."} gffutils = "*" "biocommons.seqrepo" = "*" -psycopg2-binary = "*" +psycopg = {version = "*", extras=["binary"]} pytest = "*" pre-commit = "*" flake8 = "*" diff --git a/README.md b/README.md index 17c32e6f..dfc0efb2 100644 --- a/README.md +++ b/README.md @@ -1,9 +1,11 @@ [![DOI](https://zenodo.org/badge/309797998.svg)](https://zenodo.org/badge/latestdoi/309797998) -# Gene Normalization +# Gene Normalizer Services and guidelines for normalizing gene terms -Installing with pip: +## Installation + +The Normalizer is available via PyPI: ```commandline pip install gene[dev] @@ -11,21 +13,7 @@ pip install gene[dev] The `[dev]` argument tells pip to install packages to fulfill the dependencies of the `gene.etl` package. -## Developer instructions -Following are sections include instructions specifically for developers. - -### Installation -For a development install, we recommend using Pipenv. See the -[pipenv docs](https://pipenv-fork.readthedocs.io/en/latest/#install-pipenv-today) -for direction on installing pipenv in your compute environment. - -Once installed, from the project root dir, just run: - -```commandline -pipenv shell -pipenv lock && pipenv sync -pipenv install --dev -``` +### External requirements Gene Normalization relies on [SeqRepo](https://github.com/biocommons/biocommons.seqrepo) data, which you must download yourself. @@ -44,103 +32,122 @@ PermissionError: [Error 13] Permission denied: '/usr/local/share/seqrepo/2021-01 You will want to do the following:\ (*Might not be ._fkuefgd, so replace with your error message path*) + ```console sudo mv /usr/local/share/seqrepo/2021-01-29._fkuefgd /usr/local/share/seqrepo/2021-01-29 -exit ``` -### Deploying DynamoDB Locally +### Database Initialization -We use Amazon DynamoDB for our database. To deploy locally, follow [these instructions](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/DynamoDBLocal.DownloadingAndRunning.html). +The Normalizer supports two data storage options: -### Init coding style tests +* [DynamoDB](https://aws.amazon.com/dynamodb), a NoSQL service provided by AWS. This is our preferred storage solution. In addition to cloud deployment, Amazon also provides a tool for local service, which can be installed [here](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/DynamoDBLocal.DownloadingAndRunning.html). Once downloaded, you can start service by running `java -Djava.library.path=./DynamoDBLocal_lib -jar DynamoDBLocal.jar -sharedDb` in a terminal (add a `-port ` option to use a different port) +* [PostgreSQL](https://www.postgresql.org/), a well-known relational database technology. Once starting the Postgres server process, [ensure that a database is created](https://www.postgresql.org/docs/current/sql-createdatabase.html) (we typically name ours `gene_normalizer`). -Code style is managed by [flake8](https://github.com/PyCQA/flake8) and checked prior to commit. +By default, the Gene Normalizer expects to find a DynamoDB instance listening at `http://localhost:8000`. Alternative locations can be specified in two ways: -We use [pre-commit](https://pre-commit.com/#usage) to run conformance tests. +The first way is to set the `--db_url` option to the URL endpoint. -This ensures: +```commandline +gene_update --update_all --db_url="http://localhost:8001" +``` -* Check code style -* Check for added large files -* Detect AWS Credentials -* Detect Private Key +The second way is to set the `GENE_NORM_DB_URL` environment variable to the URL endpoint. +```commandline +export GENE_NORM_DB_URL="http://localhost:8001" +``` -Before first commit run: +To use a PostgreSQL instance instead of DynamoDB, provide a PostgreSQL connection URL instead, e.g. ```commandline -pre-commit install +export GENE_NORM_DB_URL="postgresql://postgres@localhost:5432/gene_normalizer" ``` +### Adding and refreshing data -### Running unit tests +Use the `gene_update` command in a shell to update the database. -By default, tests will employ an existing DynamoDB database. For test environments where this is unavailable (e.g. in CI), the `GENE_TEST` environment variable can be set to initialize a local DynamoDB instance with miniature versions of input data files before tests are executed. +#### Update source(s) + +The normalizer currently pulls data from [HGNC](https://www.genenames.org/), [Ensembl](https://useast.ensembl.org/index.html), and [NCBI](https://www.ncbi.nlm.nih.gov/gene/). + +To update one source, simply set `--normalizer` to the source you wish to update. The normalizer will check to see if local source data is up-to-date, acquire the most recent data if not, and use it to populate the database. + +For example, run the following to acquire the latest HGNC data if necessary, and update the HGNC gene records in the normalizer database: ```commandline -export GENE_TEST=true +gene_update --normalizer="hgnc" ``` -Running unit tests is as easy as pytest. +To update multiple sources, you can use the `--normalizer` option with the source names separated by spaces. + +#### Update all sources + +To update all sources, use the `--update_all` flag: ```commandline -pipenv run pytest +gene_update --update_all ``` -### Updating the gene normalization database +### Starting the gene normalization service -Before you use the CLI to update the database, run the following in a separate terminal to start a local DynamoDB service on `port 8000`: +Once the Gene Normalizer database has been loaded, from the project root, run the following: -``` -java -Djava.library.path=./DynamoDBLocal_lib -jar DynamoDBLocal.jar -sharedDb +```commandline +uvicorn gene.main:app --reload ``` -To change the port, simply add `-port value`. +Next, view the OpenAPI docs on your local machine: -#### Update source(s) -The sources we currently use are: HGNC, Ensembl, and NCBI. +http://127.0.0.1:8000/gene + +## Developer instructions +The following sections include instructions specifically for developers. -To update one source, simply set `--normalizer` to the source you wish to update. +### Installation +For a development install, we recommend using Pipenv. See the +[pipenv docs](https://pipenv-fork.readthedocs.io/en/latest/#install-pipenv-today) +for direction on installing pipenv in your compute environment. -From the project root, run the following to update the HGNC source: +Once installed, clone the repo and initialize the environment: ```commandline -python3 -m gene.cli --normalizer="hgnc" +git clone https://github.com/cancervariants/gene-normalization +cd gene-normalization +pipenv shell +pipenv update +pipenv install --dev ``` -To update multiple sources, you can use the `--normalizer` flag with the source names separated by spaces. +### Init coding style tests -#### Update all sources +Code style is managed by [flake8](https://github.com/PyCQA/flake8) and checked prior to commit. -To update all sources, use the `--update_all` flag. +We use [pre-commit](https://pre-commit.com/#usage) to run conformance tests. -From the project root, run the following to update all sources: +This ensures: -```commandline -python3 -m gene.cli --update_all -``` +* Check code style +* Check for added large files +* Detect AWS Credentials +* Detect Private Key -#### Specifying the database URL endpoint -The default URL endpoint is `http://localhost:8000`. -There are two different ways to specify the database URL endpoint. +Before first commit run: -The first way is to set the `--db_url` flag to the URL endpoint. ```commandline -python3 -m gene.cli --update_all --db_url="http://localhost:8001" +pre-commit install ``` -The second way is to set the `GENE_NORM_DB_URL` to the URL endpoint. -```commandline -export GENE_NORM_DB_URL="http://localhost:8001" -python3 -m gene.cli --update_all -``` +### Running unit tests + +By default, tests will employ an existing database. For test environments where this is unavailable (e.g. in CI), the `GENE_TEST` environment variable can be set to initialize a local DynamoDB instance with miniature versions of input data files before tests are executed. -### Starting the gene normalization service -From the project root, run the following: ```commandline - uvicorn gene.main:app --reload +export GENE_TEST=true ``` -Next, view the OpenAPI docs on your local machine: +Running unit tests is as easy as pytest. -http://127.0.0.1:8000/gene +```commandline +pipenv run pytest +``` diff --git a/gene/__init__.py b/gene/__init__.py index 825b02fb..d314fb58 100644 --- a/gene/__init__.py +++ b/gene/__init__.py @@ -34,8 +34,8 @@ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) -from gene.schemas import SourceName, NamespacePrefix, SourceIDAfterNamespace, ItemTypes # noqa: E402, E501 -ITEM_TYPES = {k.lower(): v.value for k, v in ItemTypes.__members__.items()} +from gene.schemas import SourceName, NamespacePrefix, SourceIDAfterNamespace, RefType # noqa: E402, E501 +ITEM_TYPES = {k.lower(): v.value for k, v in RefType.__members__.items()} # Sources we import directly (HGNC, Ensembl, NCBI) SOURCES = {source.value.lower(): source.value diff --git a/gene/cli.py b/gene/cli.py index ff077981..26f15976 100644 --- a/gene/cli.py +++ b/gene/cli.py @@ -1,15 +1,18 @@ """This module provides a CLI util to make updates to normalizer database.""" +from collections.abc import Collection import logging -from os import environ +import os +from pathlib import Path from timeit import default_timer as timer +from typing import List, Optional import click -from botocore.exceptions import ClientError -from boto3.dynamodb.conditions import Key from gene import SOURCES -from gene.database import Database, confirm_aws_db_use, SKIP_AWS_DB_ENV_NAME, \ - VALID_AWS_ENV_NAMES, AWS_ENV_VAR_NAME +from gene.database import AbstractDatabase, DatabaseReadException, \ + DatabaseWriteException +from gene.database import create_db +from gene.database.database import DatabaseException from gene.etl import NCBI, HGNC, Ensembl # noqa: F401 from gene.etl.merge import Merge from gene.schemas import SourceName @@ -19,225 +22,193 @@ logger.setLevel(logging.DEBUG) -class CLI: - """Class for updating the normalizer database via Click""" - - @staticmethod - @click.command() - @click.option( - '--normalizer', - help="The normalizer(s) you wish to update separated by spaces." - ) - @click.option( - '--aws_instance', - is_flag=True, - help="Using AWS DynamodDB instance." - ) - @click.option( - '--db_url', - help="URL endpoint for the application database." - ) - @click.option( - '--update_all', - is_flag=True, - help='Update all normalizer sources.' - ) - @click.option( - '--update_merged', - is_flag=True, - help='Update concepts for normalize endpoint from accepted sources.' - ) - def update_normalizer_db(normalizer, aws_instance, db_url, update_all, - update_merged): - """Update selected normalizer source(s) in the gene database.""" - # If SKIP_AWS_CONFIRMATION is accidentally set, we should verify that the - # aws instance should actually be used - invalid_aws_msg = f"{AWS_ENV_VAR_NAME} must be set to one of {VALID_AWS_ENV_NAMES}" # noqa: E501 - aws_env_var_set = False - if AWS_ENV_VAR_NAME in environ: - aws_env_var_set = True - assert environ[AWS_ENV_VAR_NAME] in VALID_AWS_ENV_NAMES, invalid_aws_msg - confirm_aws_db_use(environ[AWS_ENV_VAR_NAME].upper()) - - if aws_env_var_set or aws_instance: - assert AWS_ENV_VAR_NAME in environ, invalid_aws_msg - environ[SKIP_AWS_DB_ENV_NAME] = "true" # this is already checked above - db: Database = Database() - else: - if db_url: - endpoint_url = db_url - elif 'GENE_NORM_DB_URL' in environ.keys(): - endpoint_url = environ['GENE_NORM_DB_URL'] - else: - endpoint_url = 'http://localhost:8000' - db: Database = Database(db_url=endpoint_url) - - if update_all: - normalizers = [src for src in SOURCES] - CLI()._update_normalizers(normalizers, db, update_merged) - elif not normalizer: - if update_merged: - CLI()._load_merge(db, []) - else: - CLI()._help_msg() +@click.command() +@click.option("--data_url", help="URL to data dump") +@click.option("--db_url", help="URL endpoint for the application database.") +def update_from_remote(data_url: Optional[str], db_url: str) -> None: + """Update data from remotely-hosted DB dump. By default, fetches from latest + available dump on VICC S3 bucket; specific URLs can be provided instead by + command line option or GENE_NORM_REMOTE_DB_URL environment variable. + + \f + :param data_url: user-specified location to pull DB dump from + :param db_url: URL to normalizer database + """ # noqa: D301 + if not click.confirm("Are you sure you want to overwrite existing data?"): + click.get_current_context().exit() + if not data_url: + data_url = os.environ.get("GENE_NORM_REMOTE_DB_URL") + db = create_db(db_url, False) + try: + db.load_from_remote(data_url) + except NotImplementedError: + click.echo(f"Error: Fetching remote data dump not supported for {db.__class__.__name__}") # noqa: E501 + click.get_current_context().exit(1) + except DatabaseException as e: + click.echo(f"Encountered exception during update: {str(e)}") + click.get_current_context().exit(1) + + +@click.command() +@click.option( + "--output_directory", "-o", + help="Output location to write to", + type=click.Path(exists=True, path_type=Path) +) +@click.option("--db_url", help="URL endpoint for the application database.") +def dump_database(output_directory: Path, db_url: str): + """Dump data from database into file. + + \f + :param output_directory: path to existing directory + :param db_url: URL to normalizer database + """ # noqa: D301 + if not output_directory: + output_directory = Path(".") + + db = create_db(db_url, False) + try: + db.export_db(output_directory) + except NotImplementedError: + click.echo(f"Error: Dumping data to file not supported for {db.__class__.__name__}") # noqa: E501 + click.get_current_context().exit(1) + except DatabaseException as e: + click.echo(f"Encountered exception during update: {str(e)}") + click.get_current_context().exit(1) + + +def _update_normalizers( + normalizers: Collection[SourceName], db: AbstractDatabase, update_merged: bool +) -> None: + """Update selected normalizer sources.""" + processed_ids = list() + for n in normalizers: + delete_time = _delete_source(n, db) + _load_source(n, db, delete_time, processed_ids) + + if update_merged: + _load_merge(db, processed_ids) + + +def _delete_source(n: SourceName, db: AbstractDatabase) -> float: + """Delete individual source data.""" + msg = f"Deleting {n.value}..." + click.echo(f"\n{msg}") + logger.info(msg) + start_delete = timer() + db.delete_source(n) + end_delete = timer() + delete_time = end_delete - start_delete + msg = f"Deleted {n.value} in {delete_time:.5f} seconds." + click.echo(f"{msg}\n") + logger.info(msg) + return delete_time + + +def _load_source( + n: SourceName, db: AbstractDatabase, delete_time: float, processed_ids: List[str] +) -> None: + """Load individual source data.""" + msg = f"Loading {n.value}..." + click.echo(msg) + logger.info(msg) + start_load = timer() + + # used to get source class name from string + SourceClass = eval(n.value) + + source = SourceClass(database=db) + processed_ids += source.perform_etl() + end_load = timer() + load_time = end_load - start_load + msg = f"Loaded {n.value} in {load_time:.5f} seconds." + click.echo(msg) + logger.info(msg) + msg = f"Total time for {n.value}: {(delete_time + load_time):.5f} seconds." + click.echo(msg) + logger.info(msg) + + +def _delete_normalized_data(database): + """Delete normalized concepts""" + click.echo("\nDeleting normalized records...") + start_delete = timer() + try: + database.delete_normalized_concepts() + except (DatabaseReadException, DatabaseWriteException) as e: + click.echo(f"Encountered exception during normalized data deletion: {e}") + end_delete = timer() + delete_time = end_delete - start_delete + click.echo(f"Deleted normalized records in {delete_time:.5f} seconds.") + + +def _load_merge(db, processed_ids): + """Load merged concepts""" + start = timer() + _delete_normalized_data(db) + if not processed_ids: + processed_ids = db.get_all_concept_ids() + merge = Merge(database=db) + click.echo("Constructing normalized records...") + merge.create_merged_concepts(processed_ids) + end = timer() + click.echo(f"Merged concept generation completed in " + f"{(end - start):.5f} seconds") + + +@click.command() +@click.option( + '--normalizer', + help="The normalizer(s) you wish to update separated by spaces." +) +@click.option( + '--aws_instance', + is_flag=True, + help="Using AWS DynamodDB instance." +) +@click.option( + '--db_url', + help="URL endpoint for the application database." +) +@click.option( + '--update_all', + is_flag=True, + help='Update all normalizer sources.' +) +@click.option( + '--update_merged', + is_flag=True, + help='Update concepts for normalize endpoint from accepted sources.' +) +def update_normalizer_db(normalizer, aws_instance, db_url, update_all, + update_merged): + """Update selected normalizer source(s) in the gene database.""" + db = create_db(db_url, aws_instance) + + if update_all: + _update_normalizers(list(SourceName), db, update_merged) + elif not normalizer: + if update_merged: + _load_merge(db, []) else: - normalizers = normalizer.lower().split() - - if len(normalizers) == 0: - raise Exception("Must enter a normalizer") + ctx = click.get_current_context() + click.echo("Must either enter 1 or more sources, or use `--update_all` parameter") # noqa: E501 + click.echo(ctx.get_help()) + ctx.exit() + else: + normalizers = normalizer.lower().split() - non_sources = set(normalizers) - {src for src in SOURCES} + if len(normalizers) == 0: + raise Exception("Must enter a normalizer") - if len(non_sources) != 0: - raise Exception(f"Not valid source(s): {non_sources}") + non_sources = set(normalizers) - set(SOURCES) - CLI()._update_normalizers(normalizers, db, update_merged) + if len(non_sources) != 0: + raise Exception(f"Not valid source(s): {non_sources}") - @staticmethod - def _help_msg(): - """Display help message.""" - ctx = click.get_current_context() - click.echo("Must either enter 1 or more sources, or use `--update_all` parameter") # noqa: E501 - click.echo(ctx.get_help()) - ctx.exit() - - @staticmethod - def _update_normalizers(normalizers, db, update_merged): - """Update selected normalizer sources.""" - processed_ids = list() - for n in normalizers: - delete_time = CLI()._delete_source(n, db) - CLI()._load_source(n, db, delete_time, processed_ids) - - if update_merged: - CLI()._load_merge(db, processed_ids) - - @staticmethod - def _delete_source(n, db): - """Delete individual source data.""" - msg = f"Deleting {n}..." - click.echo(f"\n{msg}") - logger.info(msg) - start_delete = timer() - CLI()._delete_data(n, db) - end_delete = timer() - delete_time = end_delete - start_delete - msg = f"Deleted {n} in {delete_time:.5f} seconds." - click.echo(f"{msg}\n") - logger.info(msg) - return delete_time - - @staticmethod - def _load_source(n, db, delete_time, processed_ids): - """Load individual source data.""" - msg = f"Loading {n}..." - click.echo(msg) - logger.info(msg) - start_load = timer() - - # used to get source class name from string - SOURCES_CLASS = \ - {s.value.lower(): eval(s.value) for s in SourceName.__members__.values()} - - source = SOURCES_CLASS[n](database=db) - processed_ids += source.perform_etl() - end_load = timer() - load_time = end_load - start_load - msg = f"Loaded {n} in {load_time:.5f} seconds." - click.echo(msg) - logger.info(msg) - msg = f"Total time for {n}: {(delete_time + load_time):.5f} seconds." - click.echo(msg) - logger.info(msg) - - @staticmethod - def _load_merge(db, processed_ids): - """Load merged concepts""" - start = timer() - if not processed_ids: - CLI()._delete_normalized_data(db) - processed_ids = db.get_ids_for_merge() - merge = Merge(database=db) - click.echo("Constructing normalized records...") - merge.create_merged_concepts(processed_ids) - end = timer() - click.echo(f"Merged concept generation completed in " - f"{(end - start):.5f} seconds") - - @staticmethod - def _delete_normalized_data(database): - """Delete normalized concepts""" - click.echo("\nDeleting normalized records...") - start_delete = timer() - try: - while True: - with database.genes.batch_writer( - overwrite_by_pkeys=['label_and_type', 'concept_id']) \ - as batch: - response = database.genes.query( - IndexName='item_type_index', - KeyConditionExpression=Key('item_type').eq('merger'), - ) - records = response['Items'] - if not records: - break - for record in records: - batch.delete_item(Key={ - 'label_and_type': record['label_and_type'], - 'concept_id': record['concept_id'] - }) - except ClientError as e: - click.echo(e.response['Error']['Message']) - end_delete = timer() - delete_time = end_delete - start_delete - click.echo(f"Deleted normalized records in {delete_time:.5f} seconds.") - - @staticmethod - def _delete_data(source, database): - """Delete a source's data from dynamodb table.""" - # Delete source's metadata - try: - metadata = database.metadata.query( - KeyConditionExpression=Key( - 'src_name').eq(SourceName[f"{source.upper()}"].value) - ) - if metadata['Items']: - database.metadata.delete_item( - Key={'src_name': metadata['Items'][0]['src_name']}, - ConditionExpression="src_name = :src", - ExpressionAttributeValues={ - ':src': SourceName[f"{source.upper()}"].value} - ) - except ClientError as e: - click.echo(e.response['Error']['Message']) - - # Delete source's data from genes table - try: - while True: - response = database.genes.query( - IndexName='src_index', - KeyConditionExpression=Key('src_name').eq( - SourceName[f"{source.upper()}"].value) - ) - - records = response['Items'] - if not records: - break - - with database.genes.batch_writer( - overwrite_by_pkeys=['label_and_type', 'concept_id']) \ - as batch: - - for record in records: - batch.delete_item( - Key={ - 'label_and_type': record['label_and_type'], - 'concept_id': record['concept_id'] - } - ) - except ClientError as e: - click.echo(e.response['Error']['Message']) + sources_to_update = {SourceName(SOURCES[s]) for s in normalizers} + _update_normalizers(sources_to_update, db, update_merged) if __name__ == '__main__': - CLI().update_normalizer_db() + update_normalizer_db() diff --git a/gene/database.py b/gene/database.py deleted file mode 100644 index 8a72d104..00000000 --- a/gene/database.py +++ /dev/null @@ -1,382 +0,0 @@ -"""This module creates the database.""" -from enum import Enum -import sys -import logging -from os import environ -from typing import List, Optional, Dict, Any, Set - -import boto3 -import click -from boto3.dynamodb.conditions import Key -from botocore.exceptions import ClientError - -from gene import PREFIX_LOOKUP - - -logger = logging.getLogger() -logger.setLevel(logging.DEBUG) - -# can be set to either `Dev`, `Staging`, or `Prod` -# ONLY set when wanting to access aws instance -AWS_ENV_VAR_NAME = "GENE_NORM_ENV" - -# Set to "true" if want to skip db confirmation check. Should ONLY be used for -# deployment needs -SKIP_AWS_DB_ENV_NAME = "SKIP_AWS_CONFIRMATION" - - -class AwsEnvName(str, Enum): - """AWS environment name that is being used""" - - DEVELOPMENT = "Dev" - STAGING = "Staging" - PRODUCTION = "Prod" - - -VALID_AWS_ENV_NAMES = {v.value for v in AwsEnvName.__members__.values()} - - -def confirm_aws_db_use(env_name: str) -> None: - """Check to ensure that AWS instance should actually be used.""" - if click.confirm(f"Are you sure you want to use the AWS {env_name} database?", - default=False): - click.echo(f"***GENE AWS {env_name.upper()} DATABASE IN USE***") - else: - click.echo("Exiting.") - sys.exit() - - -class DatabaseException(Exception): - """Create custom class for handling database exceptions""" - - pass - - -class Database: - """The database class.""" - - def __init__(self, db_url: str = '', region_name: str = 'us-east-2'): - """Initialize Database class. - - :param str db_url: URL endpoint for DynamoDB source - :param str region_name: default AWS region - """ - gene_concepts_table = "gene_concepts" # default - gene_metadata_table = "gene_metadata" # default - if AWS_ENV_VAR_NAME in environ: - if "GENE_TEST" in environ: - raise DatabaseException(f"Cannot have both GENE_TEST and {AWS_ENV_VAR_NAME} set.") # noqa: E501 - - aws_env = environ[AWS_ENV_VAR_NAME] - if aws_env not in VALID_AWS_ENV_NAMES: - raise DatabaseException(f"{AWS_ENV_VAR_NAME} must be one of {VALID_AWS_ENV_NAMES}") # noqa: E501 - - skip_confirmation = environ.get(SKIP_AWS_DB_ENV_NAME) - if (not skip_confirmation) or (skip_confirmation and skip_confirmation != "true"): # noqa: E501 - confirm_aws_db_use(environ[AWS_ENV_VAR_NAME]) - - boto_params = { - "region_name": region_name - } - - if aws_env == AwsEnvName.DEVELOPMENT: - gene_concepts_table = "gene_concepts_nonprod" - gene_metadata_table = "gene_metadata_nonprod" - else: - if db_url: - endpoint_url = db_url - elif 'GENE_NORM_DB_URL' in environ: - endpoint_url = environ['GENE_NORM_DB_URL'] - else: - endpoint_url = 'http://localhost:8000' - click.echo(f"***Using Gene Database Endpoint: {endpoint_url}***") - boto_params = { - 'region_name': region_name, - 'endpoint_url': endpoint_url - } - - self.dynamodb = boto3.resource('dynamodb', **boto_params) - self.dynamodb_client = boto3.client('dynamodb', **boto_params) - - # Only create tables for local instance - envs_do_not_create_tables = {AWS_ENV_VAR_NAME, "GENE_TEST"} - if not set(envs_do_not_create_tables) & set(environ): - self.create_db_tables() - - self.genes = self.dynamodb.Table(gene_concepts_table) - self.metadata = self.dynamodb.Table(gene_metadata_table) - self.batch = self.genes.batch_writer() - self.cached_sources = {} - - def _get_table_names(self) -> List[str]: - """Return names of tables in database. - - :return: Table names in DynamoDB - """ - return self.dynamodb_client.list_tables()['TableNames'] - - def delete_all_db_tables(self) -> None: - """Delete all tables from database.""" - existing_tables = self._get_table_names() - for table_name in existing_tables: - self.dynamodb.Table(table_name).delete() - - def create_db_tables(self) -> None: - """Create gene_concepts and gene_metadata tables.""" - existing_tables = self._get_table_names() - self.create_genes_table(existing_tables) - self.create_meta_data_table(existing_tables) - - def create_genes_table(self, existing_tables: List[str]): - """Create Genes table if non-existent. - - :param List[str] existing_tables: table names already in DB - """ - table_name = 'gene_concepts' - if table_name not in existing_tables: - self.dynamodb.create_table( - TableName=table_name, - KeySchema=[ - { - 'AttributeName': 'label_and_type', - 'KeyType': 'HASH' # Partition key - }, - { - 'AttributeName': 'concept_id', - 'KeyType': 'RANGE' # Sort key - } - ], - AttributeDefinitions=[ - { - 'AttributeName': 'label_and_type', - 'AttributeType': 'S' - }, - { - 'AttributeName': 'concept_id', - 'AttributeType': 'S' - }, - { - 'AttributeName': 'src_name', - 'AttributeType': 'S' - }, - { - 'AttributeName': 'item_type', - 'AttributeType': 'S' - } - - ], - GlobalSecondaryIndexes=[ - { - 'IndexName': 'src_index', - 'KeySchema': [ - { - 'AttributeName': 'src_name', - 'KeyType': 'HASH' - } - ], - 'Projection': { - 'ProjectionType': 'KEYS_ONLY' - }, - 'ProvisionedThroughput': { - 'ReadCapacityUnits': 10, - 'WriteCapacityUnits': 10 - } - }, - { - 'IndexName': 'item_type_index', - 'KeySchema': [ - { - 'AttributeName': 'item_type', - 'KeyType': 'HASH' - } - ], - 'Projection': { - 'ProjectionType': 'KEYS_ONLY' - }, - 'ProvisionedThroughput': { - 'ReadCapacityUnits': 10, - 'WriteCapacityUnits': 10 - } - } - ], - ProvisionedThroughput={ - 'ReadCapacityUnits': 10, - 'WriteCapacityUnits': 10 - } - ) - - def create_meta_data_table(self, existing_tables: List[str]): - """Create MetaData table if non-existent. - - :param List[str] existing_tables: table names already in DB - """ - table_name = 'gene_metadata' - if table_name not in existing_tables: - self.dynamodb.create_table( - TableName=table_name, - KeySchema=[ - { - 'AttributeName': 'src_name', - 'KeyType': 'HASH' # Partition key - } - ], - AttributeDefinitions=[ - { - 'AttributeName': 'src_name', - 'AttributeType': 'S' - }, - ], - ProvisionedThroughput={ - 'ReadCapacityUnits': 10, - 'WriteCapacityUnits': 10 - } - ) - - def get_record_by_id(self, concept_id: str, - case_sensitive: bool = True, - merge: bool = False) -> Optional[Dict]: - """Fetch record corresponding to provided concept ID - :param str concept_id: concept ID for gene record - :param bool case_sensitive: if true, performs exact lookup, which is - more efficient. Otherwise, performs filter operation, which - doesn't require correct casing. - :param bool merge: if true, look for merged record; look for identity - record otherwise. - :return: complete gene record, if match is found; None otherwise - """ - try: - if merge: - pk = f'{concept_id.lower()}##merger' - else: - pk = f'{concept_id.lower()}##identity' - if case_sensitive: - match = self.genes.get_item(Key={ - 'label_and_type': pk, - 'concept_id': concept_id - }) - return match['Item'] - else: - exp = Key('label_and_type').eq(pk) - response = self.genes.query(KeyConditionExpression=exp) - return response['Items'][0] - except ClientError as e: - logger.error(f"boto3 client error on get_records_by_id for " - f"search term {concept_id}: " - f"{e.response['Error']['Message']}") - return None - except (KeyError, IndexError): # record doesn't exist - return None - - def get_records_by_type(self, query: str, - match_type: str) -> List[Dict]: - """Retrieve records for given query and match type. - :param query: string to match against - :param str match_type: type of match to look for. Should be one - of {"symbol", "prev_symbol", "alias", "xref", "associated_with"} - (use `get_record_by_id` for concept ID lookup) - :return: list of matching records. Empty if lookup fails. - """ - pk = f'{query}##{match_type.lower()}' - filter_exp = Key('label_and_type').eq(pk) - try: - matches = self.genes.query(KeyConditionExpression=filter_exp) - return matches.get('Items', None) - except ClientError as e: - logger.error(f"boto3 client error on get_records_by_type for " - f"search term {query}: " - f"{e.response['Error']['Message']}") - return [] - - def get_ids_for_merge(self) -> Set[str]: - """Retrieve concept IDs for use in generating normalized records. - - :return: List of concept IDs as strings. - """ - last_evaluated_key = None - concept_ids = [] - params = { - 'ProjectionExpression': 'concept_id', - } - while True: - if last_evaluated_key: - response = self.genes.scan( - ExclusiveStartKey=last_evaluated_key, **params - ) - else: - response = self.genes.scan(**params) - records = response['Items'] - for record in records: - concept_ids.append(record['concept_id']) - last_evaluated_key = response.get('LastEvaluatedKey') - if not last_evaluated_key: - break - return set(concept_ids) - - def add_record(self, record: Dict, record_type: str = "identity"): - """Add new record to database. - :param Dict record: record to upload - :param str record_type: type of record (either 'identity' or 'merger') - """ - id_prefix = record['concept_id'].split(':')[0].lower() - record['src_name'] = PREFIX_LOOKUP[id_prefix] - label_and_type = f'{record["concept_id"].lower()}##{record_type}' - record['label_and_type'] = label_and_type - record['item_type'] = record_type - try: - self.genes.put_item( - Item=record, - ConditionExpression='attribute_not_exists(concept_id) AND attribute_not_exists(label_and_type)' # noqa: E501 - ) - except ClientError as e: - logger.error("boto3 client error on add_record for " - f"{record['concept_id']}: " - f"{e.response['Error']['Message']}") - - def add_ref_record(self, term: str, concept_id: str, ref_type: str): - """Add auxiliary/reference record to database. - :param str term: referent term - :param str concept_id: concept ID to refer to - :param str ref_type: one of {'alias', 'label', 'xref', - 'associated_with'} - """ - label_and_type = f'{term.lower()}##{ref_type}' - src_name = PREFIX_LOOKUP[concept_id.split(':')[0].lower()] - record = { - 'label_and_type': label_and_type, - 'concept_id': concept_id.lower(), - 'src_name': src_name, - 'item_type': ref_type, - } - try: - self.batch.put_item(Item=record) - except ClientError as e: - logger.error(f"boto3 client error adding reference {term} for " - f"{concept_id} with match type {ref_type}: " - f"{e.response['Error']['Message']}") - - def update_record(self, concept_id: str, field: str, new_value: Any, - item_type: str = 'identity'): - """Update the field of an individual record to a new value. - :param str concept_id: record to update - :param str field: name of field to update - :param str new_value: new value - :param str item_type: record type, one of {'identity', 'merger'} - """ - key = { - 'label_and_type': f'{concept_id.lower()}##{item_type}', - 'concept_id': concept_id - } - update_expression = f"set {field}=:r" - update_values = {':r': new_value} - try: - self.genes.update_item(Key=key, - UpdateExpression=update_expression, - ExpressionAttributeValues=update_values) - except ClientError as e: - logger.error(f"boto3 client error in `database.update_record()`: " - f"{e.response['Error']['Message']}") - - def flush_batch(self): - """Flush internal batch_writer.""" - self.batch.__exit__(*sys.exc_info()) - self.batch = self.genes.batch_writer() diff --git a/gene/database/__init__.py b/gene/database/__init__.py new file mode 100644 index 00000000..9ec66d98 --- /dev/null +++ b/gene/database/__init__.py @@ -0,0 +1,4 @@ +"""Provide database clients.""" +from .database import AbstractDatabase, DatabaseException, DatabaseReadException, \ + DatabaseWriteException, DatabaseInitializationException, create_db, \ + AWS_ENV_VAR_NAME # noqa: F401 diff --git a/gene/database/database.py b/gene/database/database.py new file mode 100644 index 00000000..56ea518f --- /dev/null +++ b/gene/database/database.py @@ -0,0 +1,275 @@ +"""Provide core database classes and parameters.""" +import abc +from enum import Enum +from os import environ +from pathlib import Path +from typing import Any, Dict, List, Optional, Set, Union +import sys + +import click + +from gene.schemas import RefType, SourceMeta, SourceName + + +class DatabaseException(Exception): + """Create custom class for handling database exceptions""" + + +class DatabaseInitializationException(DatabaseException): + """Create custom exception for errors during DB connection initialization.""" + + +class DatabaseReadException(DatabaseException): + """Create custom exception for lookup/read errors""" + + +class DatabaseWriteException(DatabaseException): + """Create custom exception for write errors""" + + +class AbstractDatabase(abc.ABC): + """Define a database interface.""" + + @abc.abstractmethod + def __init__(self, db_url: Optional[str] = None, **db_args): + """Initialize database instance. + + Generally, implementing classes should be able to construct a connection by + something like a libpq URL. Any additional arguments or DB-specific parameters + can be passed as keywords. + + :param db_url: address/connection description for database + :param db_args: any DB implementation-specific parameters + :raise DatabaseInitializationException: if initial setup fails + """ + + @abc.abstractmethod + def list_tables(self) -> List[str]: + """Return names of tables in database. + + :return: Table names in database + """ + + @staticmethod + def _check_delete_okay() -> bool: + """Check that environmental conditions permit DB deletion, and require + confirmation. + + :raise DatabaseWriteException: if skip confirmation variable is set -- manual + approval is required. + """ + if environ.get(AWS_ENV_VAR_NAME, "") == AwsEnvName.PRODUCTION: + if environ.get(SKIP_AWS_DB_ENV_NAME, "") == "true": + raise DatabaseWriteException( + f"Must unset {SKIP_AWS_DB_ENV_NAME} env variable to enable drop_db()" # noqa: E501 + ) + return click.confirm("Are you sure you want to delete existing data?") + else: + return True + + @abc.abstractmethod + def drop_db(self) -> None: + """Initiate total teardown of DB. Useful for quickly resetting the entirety of + the data. Requires manual confirmation. + + :raise DatabaseWriteException: if called in a protected setting with + confirmation silenced. + """ + + @abc.abstractmethod + def initialize_db(self) -> None: + """Perform all necessary parts of database setup. Should be tolerant of + existing content -- ie, this method is also responsible for checking whether + the DB is already set up. + """ + + @abc.abstractmethod + def get_source_metadata(self, src_name: Union[str, SourceName]) -> Dict: + """Get license, versioning, data lookup, etc information for a source. + + :param src_name: name of the source to get data for + """ + + @abc.abstractmethod + def get_record_by_id(self, concept_id: str, case_sensitive: bool = True, + merge: bool = False) -> Optional[Dict]: + """Fetch record corresponding to provided concept ID + + :param concept_id: concept ID for gene record + :param case_sensitive: if true, performs exact lookup, which may be quicker. + Otherwise, performs filter operation, which doesn't require correct casing. + :param merge: if true, look for merged record; look for identity + record otherwise. + :return: complete gene record, if match is found; None otherwise + """ + + @abc.abstractmethod + def get_refs_by_type(self, search_term: str, ref_type: RefType) -> List[str]: + """Retrieve concept IDs for records matching the user's query. Other methods + are responsible for actually retrieving full records. + + :param search_term: string to match against + :param ref_type: type of match to look for. + :return: list of associated concept IDs. Empty if lookup fails. + """ + + @abc.abstractmethod + def get_all_concept_ids(self) -> Set[str]: + """Retrieve all available concept IDs for use in generating normalized records. + + :return: List of concept IDs as strings. + """ + + @abc.abstractmethod + def add_source_metadata(self, src_name: SourceName, data: SourceMeta) -> None: + """Add new source metadata entry. + + :param src_name: name of source + :param data: known source attributes + :raise DatabaseWriteException: if write fails + """ + + @abc.abstractmethod + def add_record(self, record: Dict, src_name: SourceName) -> None: + """Add new record to database. + + :param record: record to upload + :param src_name: name of source for record. + """ + + @abc.abstractmethod + def add_merged_record(self, record: Dict) -> None: + """Add merged record to database. + + :param record: merged record to add + """ + + @abc.abstractmethod + def update_merge_ref(self, concept_id: str, merge_ref: Any) -> None: + """Update the merged record reference of an individual record to a new value. + + :param concept_id: record to update + :param merge_ref: new ref value + :raise DatabaseWriteException: if attempting to update non-existent record + """ + + @abc.abstractmethod + def delete_normalized_concepts(self) -> None: + """Remove merged records from the database. Use when performing a new update + of normalized data. + + :raise DatabaseReadException: if DB client requires separate read calls and + encounters a failure in the process + :raise DatabaseWriteException: if deletion call fails + """ + + @abc.abstractmethod + def delete_source(self, src_name: SourceName) -> None: + """Delete all data for a source. Use when updating source data. + + :param src_name: name of source to delete + :raise DatabaseReadException: if DB client requires separate read calls and + encounters a failure in the process + :raise DatabaseWriteException: if deletion call fails + """ + + @abc.abstractmethod + def complete_write_transaction(self) -> None: + """Conclude transaction or batch writing if relevant.""" + + @abc.abstractmethod + def close_connection(self) -> None: + """Perform any manual connection closure procedures if necessary.""" + + @abc.abstractmethod + def load_from_remote(self, url: Optional[str] = None) -> None: + """Load DB from remote dump. Warning: Deletes all existing data. + + :param url: remote location to retrieve gzipped dump file from + :raise: NotImplementedError if not supported by DB + """ + + @abc.abstractmethod + def export_db(self, export_location: Path) -> None: + """Dump DB to specified location. + + :param export_location: path to save DB dump at + :raise: NotImplementedError if not supported by DB + """ + + +# can be set to either `Dev`, `Staging`, or `Prod` +# ONLY set when wanting to access aws instance +AWS_ENV_VAR_NAME = "GENE_NORM_ENV" + +# Set to "true" if want to skip db confirmation check. Should ONLY be used for +# deployment needs +SKIP_AWS_DB_ENV_NAME = "SKIP_AWS_CONFIRMATION" + + +class AwsEnvName(str, Enum): + """AWS environment name that is being used""" + + DEVELOPMENT = "Dev" + STAGING = "Staging" + PRODUCTION = "Prod" + + +VALID_AWS_ENV_NAMES = {v.value for v in AwsEnvName.__members__.values()} + + +def confirm_aws_db_use(env_name: str) -> None: + """Check to ensure that AWS instance should actually be used.""" + if click.confirm(f"Are you sure you want to use the AWS {env_name} database?", + default=False): + click.echo(f"***GENE AWS {env_name.upper()} DATABASE IN USE***") + else: + click.echo("Exiting.") + sys.exit() + + +def create_db( + db_url: Optional[str] = None, aws_instance: bool = False +) -> AbstractDatabase: + """Database factory method. Checks environment variables and provided parameters + and creates a DB instance. + + Generally prefers to return a DynamoDB instance, unless all DDB-relevant + environment variables are unset and a libpq-compliant URI is passed to + `db_url`. + + :param db_url: address to database instance + :param aws_instance: use hosted DynamoDB instance, not local DB + :return: constructed Database instance + """ + # If SKIP_AWS_CONFIRMATION is accidentally set, we should verify that the + # aws instance should actually be used + invalid_aws_msg = f"{AWS_ENV_VAR_NAME} must be set to one of {VALID_AWS_ENV_NAMES}" + aws_env_var_set = False + if AWS_ENV_VAR_NAME in environ: + aws_env_var_set = True + assert environ[AWS_ENV_VAR_NAME] in VALID_AWS_ENV_NAMES, invalid_aws_msg + confirm_aws_db_use(environ[AWS_ENV_VAR_NAME].upper()) + + if aws_env_var_set or aws_instance: + assert AWS_ENV_VAR_NAME in environ, invalid_aws_msg + environ[SKIP_AWS_DB_ENV_NAME] = "true" # this is already checked above + + from gene.database.dynamodb import DynamoDbDatabase + db = DynamoDbDatabase() + else: + if db_url: + endpoint_url = db_url + elif 'GENE_NORM_DB_URL' in environ.keys(): + endpoint_url = environ['GENE_NORM_DB_URL'] + else: + endpoint_url = 'http://localhost:8000' + + # prefer DynamoDB unless connection explicitly reads like a libpq URI + if endpoint_url.startswith("postgres"): + from gene.database.postgresql import PostgresDatabase + db = PostgresDatabase(endpoint_url) + else: + from gene.database.dynamodb import DynamoDbDatabase + db = DynamoDbDatabase(endpoint_url) + return db diff --git a/gene/database/dynamodb.py b/gene/database/dynamodb.py new file mode 100644 index 00000000..0014419f --- /dev/null +++ b/gene/database/dynamodb.py @@ -0,0 +1,517 @@ +"""Provide DynamoDB client.""" +import atexit +import logging +from os import environ +from pathlib import Path +import sys +from typing import Any, Dict, List, Optional, Set, Union +import boto3 +from boto3.dynamodb.conditions import Key +from botocore.exceptions import ClientError + +import click +from gene import ITEM_TYPES, PREFIX_LOOKUP +from gene.database.database import AWS_ENV_VAR_NAME, SKIP_AWS_DB_ENV_NAME, \ + VALID_AWS_ENV_NAMES, AbstractDatabase, AwsEnvName, DatabaseException, \ + DatabaseReadException, DatabaseWriteException, confirm_aws_db_use +from gene.schemas import RefType, SourceMeta, SourceName + +logger = logging.getLogger() +logger.setLevel(logging.DEBUG) + + +class DynamoDbDatabase(AbstractDatabase): + """Database class employing DynamoDB.""" + + def __init__(self, db_url: Optional[str] = None, **db_args): + """Initialize Database class. + + :param str db_url: URL endpoint for DynamoDB source + :Keyword Arguments: + * region_name: AWS region (defaults to "us-east-2") + """ + gene_concepts_table = "gene_concepts" # default + gene_metadata_table = "gene_metadata" # default + + region_name = db_args.get("region_name", "us-east-2") + + if AWS_ENV_VAR_NAME in environ: + if "GENE_TEST" in environ: + raise DatabaseException(f"Cannot have both GENE_TEST and {AWS_ENV_VAR_NAME} set.") # noqa: E501 + + aws_env = environ[AWS_ENV_VAR_NAME] + if aws_env not in VALID_AWS_ENV_NAMES: + raise DatabaseException(f"{AWS_ENV_VAR_NAME} must be one of {VALID_AWS_ENV_NAMES}") # noqa: E501 + + skip_confirmation = environ.get(SKIP_AWS_DB_ENV_NAME) + if (not skip_confirmation) or (skip_confirmation and skip_confirmation != "true"): # noqa: E501 + confirm_aws_db_use(environ[AWS_ENV_VAR_NAME]) + + boto_params = { + "region_name": region_name + } + + if aws_env == AwsEnvName.DEVELOPMENT: + gene_concepts_table = "gene_concepts_nonprod" + gene_metadata_table = "gene_metadata_nonprod" + else: + if db_url: + endpoint_url = db_url + elif 'GENE_NORM_DB_URL' in environ: + endpoint_url = environ['GENE_NORM_DB_URL'] + else: + endpoint_url = 'http://localhost:8000' + click.echo(f"***Using Gene Database Endpoint: {endpoint_url}***") + boto_params = { + 'region_name': region_name, + 'endpoint_url': endpoint_url + } + + self.dynamodb = boto3.resource('dynamodb', **boto_params) + self.dynamodb_client = boto3.client('dynamodb', **boto_params) + + # Only create tables for local instance + envs_do_not_create_tables = {AWS_ENV_VAR_NAME, "GENE_TEST"} + if not set(envs_do_not_create_tables) & set(environ): + self.initialize_db() + + self.genes = self.dynamodb.Table(gene_concepts_table) + self.metadata = self.dynamodb.Table(gene_metadata_table) + self.batch = self.genes.batch_writer() + self._cached_sources = {} + atexit.register(self.close_connection) + + def list_tables(self) -> List[str]: + """Return names of tables in database. + + :return: Table names in DynamoDB + """ + return self.dynamodb_client.list_tables()['TableNames'] + + def drop_db(self) -> None: + """Delete all tables from database. Requires manual confirmation. + + :raise DatabaseWriteException: if called in a protected setting with + confirmation silenced. + """ + try: + if not self._check_delete_okay(): + return + except DatabaseWriteException as e: + raise e + + existing_tables = self.list_tables() + for table_name in existing_tables: + self.dynamodb.Table(table_name).delete() + + def _create_genes_table(self, existing_tables: List[str]): + """Create Genes table if non-existent. + + :param List[str] existing_tables: table names already in DB + """ + table_name = 'gene_concepts' + if table_name not in existing_tables: + self.dynamodb.create_table( + TableName=table_name, + KeySchema=[ + { + 'AttributeName': 'label_and_type', + 'KeyType': 'HASH' # Partition key + }, + { + 'AttributeName': 'concept_id', + 'KeyType': 'RANGE' # Sort key + } + ], + AttributeDefinitions=[ + { + 'AttributeName': 'label_and_type', + 'AttributeType': 'S' + }, + { + 'AttributeName': 'concept_id', + 'AttributeType': 'S' + }, + { + 'AttributeName': 'src_name', + 'AttributeType': 'S' + }, + { + 'AttributeName': 'item_type', + 'AttributeType': 'S' + } + + ], + GlobalSecondaryIndexes=[ + { + 'IndexName': 'src_index', + 'KeySchema': [ + { + 'AttributeName': 'src_name', + 'KeyType': 'HASH' + } + ], + 'Projection': { + 'ProjectionType': 'KEYS_ONLY' + }, + 'ProvisionedThroughput': { + 'ReadCapacityUnits': 10, + 'WriteCapacityUnits': 10 + } + }, + { + 'IndexName': 'item_type_index', + 'KeySchema': [ + { + 'AttributeName': 'item_type', + 'KeyType': 'HASH' + } + ], + 'Projection': { + 'ProjectionType': 'KEYS_ONLY' + }, + 'ProvisionedThroughput': { + 'ReadCapacityUnits': 10, + 'WriteCapacityUnits': 10 + } + } + ], + ProvisionedThroughput={ + 'ReadCapacityUnits': 10, + 'WriteCapacityUnits': 10 + } + ) + + def _create_meta_data_table(self, existing_tables: List[str]) -> None: + """Create MetaData table if non-existent. + + :param List[str] existing_tables: table names already in DB + """ + table_name = 'gene_metadata' + if table_name not in existing_tables: + self.dynamodb.create_table( + TableName=table_name, + KeySchema=[ + { + 'AttributeName': 'src_name', + 'KeyType': 'HASH' # Partition key + } + ], + AttributeDefinitions=[ + { + 'AttributeName': 'src_name', + 'AttributeType': 'S' + }, + ], + ProvisionedThroughput={ + 'ReadCapacityUnits': 10, + 'WriteCapacityUnits': 10 + } + ) + + def initialize_db(self) -> None: + """Create gene_concepts and gene_metadata tables.""" + existing_tables = self.list_tables() + self._create_genes_table(existing_tables) + self._create_meta_data_table(existing_tables) + + def get_source_metadata(self, src_name: Union[str, SourceName]) -> Dict: + """Get license, versioning, data lookup, etc information for a source. + + :param src_name: name of the source to get data for + """ + if isinstance(src_name, SourceName): + src_name = src_name.value + if src_name in self._cached_sources: + return self._cached_sources[src_name] + else: + metadata = self.metadata.get_item(Key={"src_name": src_name}).get("Item") + self._cached_sources[src_name] = metadata + return metadata + + def get_record_by_id(self, concept_id: str, + case_sensitive: bool = True, + merge: bool = False) -> Optional[Dict]: + """Fetch record corresponding to provided concept ID + :param str concept_id: concept ID for gene record + :param bool case_sensitive: if true, performs exact lookup, which is more + efficient. Otherwise, performs filter operation, which doesn't require correct + casing. + :param bool merge: if true, look for merged record; look for identity record + otherwise. + :return: complete gene record, if match is found; None otherwise + """ + try: + if merge: + pk = f'{concept_id.lower()}##merger' + else: + pk = f'{concept_id.lower()}##identity' + if case_sensitive: + match = self.genes.get_item(Key={ + 'label_and_type': pk, + 'concept_id': concept_id + }) + return match['Item'] + else: + exp = Key('label_and_type').eq(pk) + response = self.genes.query(KeyConditionExpression=exp) + record = response['Items'][0] + del record["label_and_type"] + return record + except ClientError as e: + logger.error(f"boto3 client error on get_records_by_id for " + f"search term {concept_id}: " + f"{e.response['Error']['Message']}") + return None + except (KeyError, IndexError): # record doesn't exist + return None + + def get_refs_by_type(self, search_term: str, ref_type: RefType) -> List[str]: + """Retrieve concept IDs for records matching the user's query. Other methods + are responsible for actually retrieving full records. + + :param search_term: string to match against + :param ref_type: type of match to look for. + :return: list of associated concept IDs. Empty if lookup fails. + """ + pk = f"{search_term}##{ref_type.value.lower()}" + filter_exp = Key("label_and_type").eq(pk) + try: + matches = self.genes.query(KeyConditionExpression=filter_exp) + return [m["concept_id"] for m in matches.get("Items", None)] + except ClientError as e: + logger.error(f"boto3 client error on get_refs_by_type for " + f"search term {search_term}: " + f"{e.response['Error']['Message']}") + return [] + + def get_all_concept_ids(self) -> Set[str]: + """Retrieve concept IDs for use in generating normalized records. + + :return: List of concept IDs as strings. + """ + last_evaluated_key = None + concept_ids = [] + params = { + 'ProjectionExpression': 'concept_id', + } + while True: + if last_evaluated_key: + response = self.genes.scan( + ExclusiveStartKey=last_evaluated_key, **params + ) + else: + response = self.genes.scan(**params) + records = response['Items'] + for record in records: + concept_ids.append(record['concept_id']) + last_evaluated_key = response.get('LastEvaluatedKey') + if not last_evaluated_key: + break + return set(concept_ids) + + def add_source_metadata(self, src_name: SourceName, metadata: SourceMeta) -> None: + """Add new source metadata entry. + + :param src_name: name of source + :param data: known source attributes + :raise DatabaseWriteException: if write fails + """ + metadata_item = metadata.dict() + metadata_item["src_name"] = src_name.value + try: + self.metadata.put_item(Item=metadata_item) + except ClientError as e: + raise DatabaseWriteException(e) + + def add_record(self, record: Dict, src_name: SourceName) -> None: + """Add new record to database. + + :param Dict record: record to upload + :param SourceName src_name: name of source for record + """ + concept_id = record["concept_id"] + record["src_name"] = src_name.value + label_and_type = f"{concept_id.lower()}##identity" + record["label_and_type"] = label_and_type + record["item_type"] = "identity" + try: + self.batch.put_item(Item=record) + except ClientError as e: + logger.error("boto3 client error on add_record for " + f"{concept_id}: {e.response['Error']['Message']}") + for attr_type, item_type in ITEM_TYPES.items(): + if attr_type in record: + value = record.get(attr_type) + if not value: + continue + if isinstance(value, str): + items = [value.lower()] + else: + items = {item.lower() for item in value} + for item in items: + self._add_ref_record( + item, record["concept_id"], item_type, src_name + ) + + def add_merged_record(self, record: Dict) -> None: + """Add merged record to database. + + :param record: merged record to add + """ + concept_id = record["concept_id"] + id_prefix = concept_id.split(":")[0].lower() + record["src_name"] = PREFIX_LOOKUP[id_prefix] + label_and_type = f"{concept_id.lower()}##merger" + record["label_and_type"] = label_and_type + record["item_type"] = "merger" + try: + self.batch.put_item(Item=record) + except ClientError as e: + logger.error("boto3 client error on add_record for " + f"{concept_id}: {e.response['Error']['Message']}") + + def _add_ref_record(self, term: str, concept_id: str, ref_type: str, + src_name: SourceName) -> None: + """Add auxiliary/reference record to database. + + :param str term: referent term + :param str concept_id: concept ID to refer to + :param str ref_type: one of {'alias', 'label', 'xref', + 'associated_with'} + :param src_name: name of source for record + """ + label_and_type = f"{term.lower()}##{ref_type}" + record = { + "label_and_type": label_and_type, + "concept_id": concept_id.lower(), + "src_name": src_name.value, + "item_type": ref_type, + } + try: + self.batch.put_item(Item=record) + except ClientError as e: + logger.error(f"boto3 client error adding reference {term} for " + f"{concept_id} with match type {ref_type}: " + f"{e.response['Error']['Message']}") + + def update_merge_ref(self, concept_id: str, merge_ref: Any) -> None: + """Update the merged record reference of an individual record to a new value. + + :param concept_id: record to update + :param merge_ref: new ref value + :raise DatabaseWriteException: if attempting to update non-existent record + """ + label_and_type = f"{concept_id.lower()}##identity" + key = { + "label_and_type": label_and_type, + "concept_id": concept_id + } + update_expression = "set merge_ref=:r" + update_values = {':r': merge_ref.lower()} + condition_expression = "attribute_exists(label_and_type)" + try: + self.genes.update_item(Key=key, + UpdateExpression=update_expression, + ExpressionAttributeValues=update_values, + ConditionExpression=condition_expression) + except ClientError as e: + code = e.response.get("Error", {}).get("Code") + if code == "ConditionalCheckFailedException": + raise DatabaseWriteException( + f"No such record exists for keys {label_and_type}, {concept_id}" + ) + else: + logger.error(f"boto3 client error in `database.update_record()`: " + f"{e.response['Error']['Message']}") + + def delete_normalized_concepts(self) -> None: + """Remove merged records from the database. Use when performing a new update + of normalized data. + + :raise DatabaseReadException: if DB client requires separate read calls and + encounters a failure in the process + :raise DatabaseWriteException: if deletion call fails + """ + while True: + with self.genes.batch_writer( + overwrite_by_pkeys=["label_and_type", "concept_id"] + ) as batch: + try: + response = self.genes.query( + IndexName="item_type_index", + KeyConditionExpression=Key("item_type").eq("merger"), + ) + except ClientError as e: + raise DatabaseReadException(e) + records = response["Items"] + if not records: + break + for record in records: + batch.delete_item(Key={ + "label_and_type": record["label_and_type"], + "concept_id": record["concept_id"] + }) + + def delete_source(self, src_name: SourceName) -> None: + """Delete all data for a source. Use when updating source data. + + :param src_name: name of source to delete + :raise DatabaseReadException: if DB client requires separate read calls and + encounters a failure in the process + :raise DatabaseWriteException: if deletion call fails + """ + while True: + try: + response = self.genes.query( + IndexName="src_index", + KeyConditionExpression=Key("src_name").eq( + src_name.value + ) + ) + except ClientError as e: + raise DatabaseReadException(e) + records = response["Items"] + if not records: + break + with self.genes.batch_writer( + overwrite_by_pkeys=["label_and_type", "concept_id"] + ) as batch: + for record in records: + try: + batch.delete_item(Key={ + "label_and_type": record["label_and_type"], + "concept_id": record["concept_id"] + }) + except ClientError as e: + raise DatabaseWriteException(e) + + try: + self.metadata.delete_item(Key={ + "src_name": src_name.value + }) + except ClientError as e: + raise DatabaseWriteException(e) + + def complete_write_transaction(self) -> None: + """Conclude transaction or batch writing if relevant.""" + self.batch.__exit__(*sys.exc_info()) + self.batch = self.genes.batch_writer() + + def close_connection(self) -> None: + """Perform any manual connection closure procedures if necessary.""" + self.batch.__exit__(*sys.exc_info()) + + def load_from_remote(self, url: Optional[str] = None) -> None: + """Load DB from remote dump. Not available for DynamoDB database backend. + + :param url: remote location to retrieve gzipped dump file from + """ + raise NotImplementedError + + def export_db(self, export_location: Path) -> None: + """Dump DB to specified location. Not available for DynamoDB database backend. + + :param export_location: path to save DB dump at + """ + raise NotImplementedError diff --git a/gene/database/postgresql.py b/gene/database/postgresql.py new file mode 100644 index 00000000..32c94274 --- /dev/null +++ b/gene/database/postgresql.py @@ -0,0 +1,667 @@ +"""Provide PostgreSQL client.""" +import tarfile +import atexit +import json +import logging +import os +from pathlib import Path +from typing import Any, Dict, List, Optional, Set +import tempfile +from datetime import datetime + +import psycopg +from psycopg.errors import UndefinedTable, UniqueViolation +import requests + +from gene.database import AbstractDatabase, DatabaseException, DatabaseReadException, \ + DatabaseWriteException +from gene.schemas import RefType, SourceMeta, SourceName + + +logger = logging.getLogger() +logger.setLevel(logging.DEBUG) + + +SCRIPTS_DIR = Path(__file__).parent / "postgresql" + + +class PostgresDatabase(AbstractDatabase): + """Database class employing PostgreSQL.""" + + def __init__(self, db_url: Optional[str] = None, **db_args) -> None: + """Initialize Postgres connection. + + :param db_url: libpq compliant database connection URI + :param **db_args: see below + + :Keyword Arguments: + * user: Postgres username + * password: Postgres password (optional or blank if unneeded) + * db_name: name of database to connect to + + :raise DatabaseInitializationException: if initial setup fails + """ + if db_url: + conninfo = db_url + elif "GENE_NORM_DB_URL" in os.environ: + conninfo = os.environ["GENE_NORM_DB_URL"] + else: + user = db_args.get("user", "postgres") + password = db_args.get("password", "") + db_name = db_args.get("db_name", "gene_normalizer") + if password: + conninfo = f"dbname={db_name} user={user} password={password}" + else: + conninfo = f"dbname={db_name} user={user}" + + self.conn = psycopg.connect(conninfo) + self.initialize_db() + self._cached_sources = {} + + atexit.register(self.close_connection) + + def list_tables(self) -> List[str]: + """Return names of tables in database. + + :return: Table names in database + """ + with self.conn.cursor() as cur: + cur.execute( + """SELECT table_name FROM information_schema.tables + WHERE table_schema = 'public' + AND table_type = 'BASE TABLE'; + """ + ) + tables = cur.fetchall() + return [t[0] for t in tables] + + def drop_db(self) -> None: + """Perform complete teardown of DB. Useful for quickly resetting all data or + reconstructing after apparent schema error. If in a protected environment, + require confirmation. + + :raise DatabaseWriteException: if called in a protected setting with + confirmation silenced. + """ + try: + if not self._check_delete_okay(): + return + except DatabaseWriteException as e: + raise e + + drop_query = """ + DROP MATERIALIZED VIEW IF EXISTS record_lookup_view; + DROP TABLE IF EXISTS + gene_associations, + gene_symbols, + gene_previous_symbols, + gene_aliases, + gene_xrefs, + gene_concepts, + gene_merged, + gene_sources; + """ + with self.conn.cursor() as cur: + cur.execute(drop_query) + self.conn.commit() + logger.info("Dropped all existing gene normalizer tables.") + + def initialize_db(self) -> None: + """Check if DB is set up. If not, create tables/indexes/views.""" + tables = self.list_tables() + expected_tables = ["gene_associations", "gene_sources", "gene_concepts", + "gene_symbols", "gene_previous_symbols", "gene_aliases", + "gene_merged"] + for table in expected_tables: + if table not in tables: + logger.info(f"{table} was missing -- resetting gene normalizer tables") + self.drop_db() + self._create_tables() + self._create_views() + self._add_indexes() + break + + def _create_views(self) -> None: + """Create materialized views.""" + create_view_query = (SCRIPTS_DIR / "create_record_lookup_view.sql").read_bytes() + with self.conn.cursor() as cur: + cur.execute(create_view_query) + self.conn.commit() + + def _refresh_views(self) -> None: + """Update materialized views. + + Not responsible for ensuring existence of views. Calling functions should + either check beforehand or catch psycopg.UndefinedTable. + """ + with self.conn.cursor() as cur: + cur.execute("REFRESH MATERIALIZED VIEW record_lookup_view;") + self.conn.commit() + + def _add_fkeys(self) -> None: + """Add fkey relationships.""" + add_fkey_query = (SCRIPTS_DIR / "add_fkeys.sql").read_bytes() + with self.conn.cursor() as cur: + cur.execute(add_fkey_query) + self.conn.commit() + + def _drop_fkeys(self) -> None: + """Drop fkey relationships.""" + drop_fkey_query = (SCRIPTS_DIR / "drop_fkeys.sql").read_bytes() + with self.conn.cursor() as cur: + cur.execute(drop_fkey_query) + self.conn.commit() + + def _add_indexes(self) -> None: + """Create core search indexes.""" + add_indexes_query = (SCRIPTS_DIR / "add_indexes.sql").read_bytes() + with self.conn.cursor() as cur: + cur.execute(add_indexes_query) + self.conn.commit() + + def _drop_indexes(self) -> None: + """Drop all custom indexes.""" + drop_indexes_query = (SCRIPTS_DIR / "drop_indexes.sql").read_bytes() + with self.conn.cursor() as cur: + cur.execute(drop_indexes_query) + self.conn.commit() + + def _create_tables(self) -> None: + """Create all tables, indexes, and views.""" + logger.debug("Creating new gene normalizer tables.") + tables_query = (SCRIPTS_DIR / "create_tables_query.sql").read_bytes() + + with self.conn.cursor() as cur: + cur.execute(tables_query) + self.conn.commit() + + def get_source_metadata(self, src_name: SourceName) -> Dict: + """Get license, versioning, data lookup, etc information for a source. + + :param src_name: name of the source to get data for + """ + if isinstance(src_name, SourceName): + src_name = src_name.value + + if src_name in self._cached_sources: + return self._cached_sources[src_name] + + metadata_query = "SELECT * FROM gene_sources WHERE name = %s;" + with self.conn.cursor() as cur: + cur.execute(metadata_query, [src_name]) + metadata_result = cur.fetchone() + if not metadata_result: + raise DatabaseReadException(f"{src_name} metadata lookup failed") + metadata = { + "data_license": metadata_result[1], + "data_license_url": metadata_result[2], + "version": metadata_result[3], + "data_url": metadata_result[4], + "rdp_url": metadata_result[5], + "data_license_attributes": { + "non_commercial": metadata_result[6], + "attribution": metadata_result[7], + "share_alike": metadata_result[8], + }, + "genome_assemblies": metadata_result[9] + } + self._cached_sources[src_name] = metadata + return metadata + + def _get_record(self, concept_id: str, case_sensitive: bool) -> Optional[Dict]: + """Retrieve non-merged record. The query is pretty different, so this method + is broken out for PostgreSQL. + + :param concept_id: ID of concept to get + :param case_sensitive: record lookups are performed using a case-insensitive + index, so this parameter isn't used by Postgres + :return: complete record object if successful + """ + query = "SELECT * FROM record_lookup_view WHERE lower(concept_id) = %s;" + concept_id_param = concept_id.lower() + + with self.conn.cursor() as cur: + cur.execute(query, [concept_id_param]) + result = cur.fetchone() + if not result: + return None + + gene_record = { + "concept_id": result[0], + "symbol_status": result[1], + "label": result[2], + "strand": result[3], + "location_annotations": result[4], + "locations": result[5], + "gene_type": result[6], + "aliases": result[7], + "associated_with": result[8], + "previous_symbols": result[9], + "symbol": result[10], + "xrefs": result[11], + "src_name": result[12], + "merge_ref": result[13], + "item_type": "identity", + } + return {k: v for k, v in gene_record.items() if v} + + def _get_merged_record( + self, concept_id: str, case_sensitive: bool + ) -> Optional[Dict]: + """Retrieve normalized record from DB. + + :param concept_id: normalized ID for the merged record + :param case_sensitive: record lookups are performed using a case-insensitive + index, so this parameter isn't used by Postgres + :return: normalized record if successful + """ + concept_id = concept_id.lower() + query = "SELECT * FROM gene_merged WHERE lower(concept_id) = %s;" + with self.conn.cursor() as cur: + cur.execute(query, [concept_id]) + result = cur.fetchone() + if not result: + return None + + merged_record = { + "concept_id": result[0], + "symbol": result[1], + "symbol_status": result[2], + "previous_symbols": result[3], + "label": result[4], + "strand": result[5], + "ensembl_locations": result[6], + "hgnc_locations": result[7], + "ncbi_locations": result[8], + "location_annotations": result[9], + "ensembl_biotype": result[10], + "hgnc_locus_type": result[11], + "ncbi_gene_type": result[12], + "aliases": result[13], + "associated_with": result[14], + "xrefs": result[15], + "item_type": "merger", + } + return {k: v for k, v in merged_record.items() if v} + + def get_record_by_id(self, concept_id: str, case_sensitive: bool = True, + merge: bool = False) -> Optional[Dict]: + """Fetch record corresponding to provided concept ID + :param str concept_id: concept ID for gene record + :param bool case_sensitive: + :param bool merge: if true, look for merged record; look for identity record + otherwise. + :return: complete gene record, if match is found; None otherwise + """ + if merge: + return self._get_merged_record(concept_id, case_sensitive) + else: + return self._get_record(concept_id, case_sensitive) + + def get_refs_by_type(self, search_term: str, ref_type: RefType) -> List[str]: + """Retrieve concept IDs for records matching the user's query. Other methods + are responsible for actually retrieving full records. + + :param search_term: string to match against + :param ref_type: type of match to look for. + :return: list of associated concept IDs. Empty if lookup fails. + """ + if ref_type == RefType.SYMBOL: + query = "SELECT concept_id FROM gene_symbols WHERE lower(symbol) = %s;" + elif ref_type == RefType.PREVIOUS_SYMBOLS: + query = "SELECT concept_id FROM gene_previous_symbols WHERE lower(prev_symbol) = %s;" # noqa: E501 + elif ref_type == RefType.ALIASES: + query = "SELECT concept_id FROM gene_aliases WHERE lower(alias) = %s;" + elif ref_type == RefType.XREFS: + query = "SELECT concept_id FROM gene_xrefs WHERE lower(xref) = %s;" + elif ref_type == RefType.ASSOCIATED_WITH: + query = "SELECT concept_id FROM gene_associations WHERE lower(associated_with) = %s;" # noqa: E501 + else: + raise ValueError("invalid reference type") + + with self.conn.cursor() as cur: + cur.execute(query, (search_term.lower(), )) + concept_ids = cur.fetchall() + if concept_ids: + return [i[0] for i in concept_ids] + else: + return [] + + def get_all_concept_ids(self) -> Set[str]: + """Retrieve concept IDs for use in generating normalized records. + + :return: Set of concept IDs as strings. + """ + ids_query = "SELECT concept_id FROM gene_concepts;" + with self.conn.cursor() as cur: + cur.execute(ids_query) + ids_tuple = cur.fetchall() + return {i[0] for i in ids_tuple} + + def add_source_metadata(self, src_name: SourceName, meta: SourceMeta) -> None: + """Add new source metadata entry. + + :param src_name: name of source + :param meta: known source attributes + :raise DatabaseWriteException: if write fails + """ + with self.conn.cursor() as cur: + cur.execute( + """ + INSERT INTO gene_sources( + name, data_license, data_license_url, version, data_url, rdp_url, + data_license_nc, data_license_attr, data_license_sa, + genome_assemblies + ) + VALUES ( %s, %s, %s, %s, %s, %s, %s, %s, %s, %s ); + """, + [ + src_name.value, + meta.data_license, meta.data_license_url, meta.version, + meta.data_url, meta.rdp_url, + meta.data_license_attributes["non_commercial"], + meta.data_license_attributes["attribution"], + meta.data_license_attributes["share_alike"], + meta.genome_assemblies + ] + ) + self.conn.commit() + + def add_record(self, record: Dict, src_name: SourceName) -> None: + """Add new record to database. + + :param record: record to upload + :param src_name: name of source for record. Not used by PostgreSQL instance. + """ + record_query = """ + INSERT INTO gene_concepts ( + concept_id, source, symbol_status, label, + strand, location_annotations, locations, gene_type + ) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s); + """ + + insert_symbol = "INSERT INTO gene_symbols (symbol, concept_id) VALUES (%s, %s)" + insert_prev_symbol = "INSERT INTO gene_previous_symbols (prev_symbol, concept_id) VALUES (%s, %s)" # noqa: E501 + insert_alias = "INSERT INTO gene_aliases (alias, concept_id) VALUES (%s, %s)" + insert_xref = "INSERT INTO gene_xrefs (xref, concept_id) VALUES (%s, %s)" + insert_assoc = "INSERT INTO gene_associations (associated_with, concept_id) VALUES (%s, %s)" # noqa: E501 + + concept_id = record["concept_id"] + locations = [json.dumps(loc) for loc in record.get("locations", [])] + if not locations: + locations = None + with self.conn.cursor() as cur: + try: + cur.execute(record_query, [ + concept_id, record["src_name"], record.get("symbol_status"), + record.get("label"), record.get("strand"), + record.get("location_annotations"), + locations, + record.get("gene_type") + ]) + for a in record.get("aliases", []): + cur.execute(insert_alias, [a, concept_id]) + for x in record.get("xrefs", []): + cur.execute(insert_xref, [x, concept_id]) + for a in record.get("associated_with", []): + cur.execute(insert_assoc, [a, concept_id]) + for p in record.get("previous_symbols", []): + cur.execute(insert_prev_symbol, [p, concept_id]) + if record.get("symbol"): + cur.execute(insert_symbol, [record["symbol"], concept_id]) + self.conn.commit() + except UniqueViolation: + logger.error( + f"Record with ID {concept_id} already exists" + ) + self.conn.rollback() + + def add_merged_record(self, record: Dict) -> None: + """Add merged record to database. + + :param record: merged record to add + """ + record_query = """ + INSERT INTO gene_merged ( + concept_id, symbol, symbol_status, previous_symbols, label, strand, + location_annotations, ensembl_locations, hgnc_locations, ncbi_locations, + hgnc_locus_type, ensembl_biotype, ncbi_gene_type, aliases, associated_with, + xrefs + ) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s); + """ + ensembl_locations = record.get("ensembl_locations") + if ensembl_locations: + ensembl_locations = [json.dumps(i) for i in ensembl_locations] + ncbi_locations = record.get("ncbi_locations") + if ncbi_locations: + ncbi_locations = [json.dumps(i) for i in ncbi_locations] + hgnc_locations = record.get("hgnc_locations") + if hgnc_locations: + hgnc_locations = [json.dumps(i) for i in hgnc_locations] + with self.conn.cursor() as cur: + cur.execute(record_query, [ + record["concept_id"], + record.get("symbol"), + record.get("symbol_status"), + record.get("previous_symbols"), + record.get("label"), + record.get("strand"), + record.get("location_annotations"), + ensembl_locations, + hgnc_locations, + ncbi_locations, + record.get("hgnc_locus_type"), + record.get("ensembl_biotype"), + record.get("ncbi_gene_type"), + record.get("aliases"), + record.get("associated_with"), + record.get("xrefs"), + ]) + self.conn.commit() + + def update_merge_ref(self, concept_id: str, merge_ref: Any) -> None: + """Update the merged record reference of an individual record to a new value. + + :param concept_id: record to update + :param merge_ref: new ref value + :raise DatabaseWriteException: if attempting to update non-existent record + """ + update_query = """ + UPDATE gene_concepts + SET merge_ref = %(merge_ref)s + WHERE concept_id = %(concept_id)s; + """ + with self.conn.cursor() as cur: + cur.execute( + update_query, + {"merge_ref": merge_ref, "concept_id": concept_id} + ) + row_count = cur.rowcount + self.conn.commit() + + # UPDATE will fail silently unless we check the # of affected rows + if row_count < 1: + raise DatabaseWriteException( + f"No such record exists for primary key {concept_id}" + ) + + def delete_normalized_concepts(self) -> None: + """Remove merged records from the database. Use when performing a new update + of normalized data. + + It would be faster to drop the entire table and do a cascading delete onto the + merge_ref column in gene_concepts, but that requires an exclusive access lock + on the DB, which can be annoying (ie you couldn't have multiple processes + accessing it, or PgAdmin, etc...) + + :raise DatabaseReadException: if DB client requires separate read calls and + encounters a failure in the process + :raise DatabaseWriteException: if deletion call fails + """ + query = """ + UPDATE gene_concepts SET merge_ref = NULL; + DELETE FROM gene_merged; + """ + with self.conn.cursor() as cur: + cur.execute(query) + self.conn.commit() + self._create_tables() + + def delete_source(self, src_name: SourceName) -> None: + """Delete all data for a source. Use when updating source data. + + All of the foreign key relations make deletes *extremely* slow, so this method + drops and then re-adds them once deletes are finished. This makes it a little + brittle, and it'd be nice to revisit in the future to perform as a single + atomic transaction. + + Refreshing the materialized view at the end might be redundant, because + this method will almost always be called right before more data is written, + but it's probably necessary just in case that doesn't happen. + + :param src_name: name of source to delete + :raise DatabaseWriteException: if deletion call fails + """ + drop_aliases_query = """ + DELETE FROM gene_aliases WHERE id IN ( + SELECT ga.id FROM gene_aliases ga LEFT JOIN gene_concepts gc + ON gc.concept_id = ga.concept_id + WHERE gc.source = %s + ); + """ + drop_associations_query = """ + DELETE FROM gene_associations WHERE id IN ( + SELECT ga.id FROM gene_associations ga LEFT JOIN gene_concepts gc + ON gc.concept_id = ga.concept_id + WHERE gc.source = %s + ); + """ + drop_prev_symbols_query = """ + DELETE FROM gene_previous_symbols WHERE id IN ( + SELECT gps.id FROM gene_previous_symbols gps LEFT JOIN gene_concepts gc + ON gc.concept_id = gps.concept_id + WHERE gc.source = %s + ); + """ + drop_symbols_query = """ + DELETE FROM gene_symbols WHERE id IN ( + SELECT gs.id FROM gene_symbols gs LEFT JOIN gene_concepts gc + ON gc.concept_id = gs.concept_id + WHERE gc.source = %s + ); + """ + drop_xrefs_query = """ + DELETE FROM gene_xrefs WHERE id IN ( + SELECT gx.id FROM gene_xrefs gx LEFT JOIN gene_concepts gc + ON gc.concept_id = gx.concept_id + WHERE gc.source = %s + ); + """ + with self.conn.cursor() as cur: + cur.execute(drop_aliases_query, [src_name.value]) + cur.execute(drop_associations_query, [src_name.value]) + cur.execute(drop_prev_symbols_query, [src_name.value]) + cur.execute(drop_symbols_query, [src_name.value]) + cur.execute(drop_xrefs_query, [src_name.value]) + self._drop_fkeys() + self._drop_indexes() + + drop_concepts_query = "DELETE FROM gene_concepts WHERE source = %s;" + drop_source_query = "DELETE FROM gene_sources gs WHERE gs.name = %s;" + with self.conn.cursor() as cur: + cur.execute(drop_concepts_query, [src_name.value]) + cur.execute(drop_source_query, [src_name.value]) + self.conn.commit() + + self._add_fkeys() + self._add_indexes() + self._refresh_views() + + def complete_write_transaction(self) -> None: + """Conclude transaction or batch writing if relevant.""" + if not self.conn.closed: + try: + self._refresh_views() + except UndefinedTable: + self.conn.rollback() + + def close_connection(self) -> None: + """Perform any manual connection closure procedures if necessary.""" + if not self.conn.closed: + self.conn.commit() + self.conn.close() + + def load_from_remote(self, url: Optional[str]) -> None: + """Load DB from remote dump. Warning: Deletes all existing data. If not + passed as an argument, will try to grab latest release from VICC S3 bucket. + + :param url: location of .tar.gz file created from output of pg_dump + :raise DatabaseException: if unable to retrieve file from URL or if psql + command fails + """ + if not url: + url = "https://vicc-normalizers.s3.us-east-2.amazonaws.com/gene_normalization/postgresql/gene_norm_latest.sql.tar.gz" # noqa: E501 + with tempfile.TemporaryDirectory() as tempdir: + tempdir_path = Path(tempdir) + temp_tarfile = tempdir_path / "gene_norm_latest.tar.gz" + with requests.get(url, stream=True) as r: + try: + r.raise_for_status() + except requests.HTTPError: + raise DatabaseException( + f"Unable to retrieve PostgreSQL dump file from {url}" + ) + with open(temp_tarfile, "wb") as h: + for chunk in r.iter_content(chunk_size=8192): + if chunk: + h.write(chunk) + tar = tarfile.open(temp_tarfile, "r:gz") + tar_dump_file = [ + f for f in tar.getmembers() if f.name.startswith("gene_norm_") + ][0] + tar.extractall(path=tempdir_path, members=[tar_dump_file]) + dump_file = tempdir_path / tar_dump_file.name + + if self.conn.info.password: + pw_param = f"-W {self.conn.info.password}" + else: + pw_param = "-w" + + self.drop_db() + system_call = f"psql -d {self.conn.info.dbname} -U {self.conn.info.user} {pw_param} -f {dump_file.absolute()}" # noqa: E501 + result = os.system(system_call) + if result != 0: + raise DatabaseException( + f"System call '{result}' returned failing exit code." + ) + + def export_db(self, output_directory: Path) -> None: + """Dump DB to specified location. + + :param export_location: path to directory to save DB dump in + :return: Nothing, but saves results of pg_dump to file named + `gene_norm_.sql` + :raise ValueError: if output directory isn't a directory or doesn't exist + :raise DatabaseException: if psql call fails + """ + if not output_directory.is_dir() or not output_directory.exists(): + raise ValueError(f"Output location {output_directory} isn't a directory or doesn't exist") # noqa: E501 + now = datetime.now().strftime("%Y%m%d%H%M%S") + output_location = output_directory / f"gene_norm_{now}.sql" + user = self.conn.info.user + host = self.conn.info.host + port = self.conn.info.port + database_name = self.conn.info.dbname + if self.conn.info.password: + pw_param = f"-W {self.conn.info.password}" + else: + pw_param = "-w" + + system_call = f"pg_dump -E UTF8 -f {output_location} -U {user} {pw_param} -h {host} -p {port} {database_name}" # noqa: E501 + result = os.system(system_call) + if result != 0: + raise DatabaseException( + f"System call '{system_call}' returned failing exit code." + ) diff --git a/gene/database/postgresql/add_fkeys.sql b/gene/database/postgresql/add_fkeys.sql new file mode 100644 index 00000000..f93459b3 --- /dev/null +++ b/gene/database/postgresql/add_fkeys.sql @@ -0,0 +1,11 @@ +ALTER TABLE gene_aliases ADD CONSTRAINT gene_aliases_concept_id_fkey + FOREIGN KEY (concept_id) REFERENCES gene_concepts (concept_id); +ALTER TABLE gene_associations ADD CONSTRAINT gene_associations_concept_id_fkey + FOREIGN KEY (concept_id) REFERENCES gene_concepts (concept_id); +ALTER TABLE gene_previous_symbols + ADD CONSTRAINT gene_previous_symbols_concept_id_fkey + FOREIGN KEY (concept_id) REFERENCES gene_concepts (concept_id); +ALTER TABLE gene_symbols ADD CONSTRAINT gene_symbols_concept_id_fkey + FOREIGN KEY (concept_id) REFERENCES gene_concepts (concept_id); +ALTER TABLE gene_xrefs ADD CONSTRAINT gene_xrefs_concept_id_fkey + FOREIGN KEY (concept_id) REFERENCES gene_concepts (concept_id); diff --git a/gene/database/postgresql/add_indexes.sql b/gene/database/postgresql/add_indexes.sql new file mode 100644 index 00000000..22b09552 --- /dev/null +++ b/gene/database/postgresql/add_indexes.sql @@ -0,0 +1,13 @@ +CREATE INDEX IF NOT EXISTS idx_g_concept_id_low + ON gene_concepts (lower(concept_id)); +CREATE INDEX IF NOT EXISTS idx_gm_concept_id_low + ON gene_merged (lower(concept_id)); +CREATE INDEX IF NOT EXISTS idx_gs_symbol_low ON gene_symbols (lower(symbol)); +CREATE INDEX IF NOT EXISTS idx_gps_symbol_low + ON gene_previous_symbols (lower(prev_symbol)); +CREATE INDEX IF NOT EXISTS idx_ga_alias_low ON gene_aliases (lower(alias)); +CREATE INDEX IF NOT EXISTS idx_gx_xref_low ON gene_xrefs (lower(xref)); +CREATE INDEX IF NOT EXISTS idx_g_as_association_low + ON gene_associations (lower(associated_with)); +CREATE INDEX IF NOT EXISTS idx_rlv_concept_id_low + ON record_lookup_view (lower(concept_id)); diff --git a/gene/database/postgresql/create_record_lookup_view.sql b/gene/database/postgresql/create_record_lookup_view.sql new file mode 100644 index 00000000..b3bceddb --- /dev/null +++ b/gene/database/postgresql/create_record_lookup_view.sql @@ -0,0 +1,38 @@ +CREATE MATERIALIZED VIEW IF NOT EXISTS record_lookup_view AS +SELECT gc.concept_id, + gc.symbol_status, + gc.label, + gc.strand, + gc.location_annotations, + gc.locations, + gc.gene_type, + ga.aliases, + gas.associated_with, + gps.previous_symbols, + gs.symbol, + gx.xrefs, + gc.source, + gc.merge_ref, + lower(gc.concept_id) AS concept_id_lowercase +FROM gene_concepts gc +FULL JOIN ( + SELECT ga_1.concept_id, array_agg(ga_1.alias) AS aliases + FROM gene_aliases ga_1 + GROUP BY ga_1.concept_id +) ga ON gc.concept_id::text = ga.concept_id::text +FULL JOIN ( + SELECT gas_1.concept_id, array_agg(gas_1.associated_with) AS associated_with + FROM gene_associations gas_1 + GROUP BY gas_1.concept_id +) gas ON gc.concept_id::text = gas.concept_id::text +FULL JOIN ( + SELECT gps_1.concept_id, array_agg(gps_1.prev_symbol) AS previous_symbols + FROM gene_previous_symbols gps_1 + GROUP BY gps_1.concept_id +) gps ON gc.concept_id::text = gps.concept_id::text +FULL JOIN gene_symbols gs ON gc.concept_id::text = gs.concept_id::text +FULL JOIN ( + SELECT gx_1.concept_id, array_agg(gx_1.xref) AS xrefs + FROM gene_xrefs gx_1 + GROUP BY gx_1.concept_id +) gx ON gc.concept_id::text = gx.concept_id::text; diff --git a/gene/database/postgresql/create_tables_query.sql b/gene/database/postgresql/create_tables_query.sql new file mode 100644 index 00000000..bc9d758b --- /dev/null +++ b/gene/database/postgresql/create_tables_query.sql @@ -0,0 +1,66 @@ +CREATE TABLE IF NOT EXISTS gene_sources ( + name VARCHAR(127) PRIMARY KEY, + data_license TEXT NOT NULL, + data_license_url TEXT NOT NULL, + version TEXT NOT NULL, + data_url TEXT NOT NULL, + rdp_url TEXT, + data_license_nc BOOLEAN NOT NULL, + data_license_attr BOOLEAN NOT NULL, + data_license_sa BOOLEAN NOT NULL, + genome_assemblies TEXT [] NOT NULL +); +CREATE TABLE IF NOT EXISTS gene_merged ( + concept_id VARCHAR(127) PRIMARY KEY, + symbol TEXT, + symbol_status VARCHAR(127), + previous_symbols TEXT [], + label TEXT, + strand VARCHAR(1), + ensembl_locations JSON [], + hgnc_locations JSON [], + ncbi_locations JSON [], + location_annotations TEXT [], + ensembl_biotype TEXT [], + hgnc_locus_type TEXT [], + ncbi_gene_type TEXT [], + aliases TEXT [], + associated_with TEXT [], + xrefs TEXT [] +); +CREATE TABLE IF NOT EXISTS gene_concepts ( + concept_id VARCHAR(127) PRIMARY KEY, + source VARCHAR(127) NOT NULL REFERENCES gene_sources (name), + symbol_status VARCHAR(127), + label TEXT, + strand VARCHAR(1), + location_annotations TEXT [], + locations JSON [], + gene_type TEXT, + merge_ref VARCHAR(127) REFERENCES gene_merged (concept_id) +); +CREATE TABLE IF NOT EXISTS gene_symbols ( + id SERIAL PRIMARY KEY, + symbol TEXT NOT NULL, + concept_id VARCHAR(127) REFERENCES gene_concepts (concept_id) +); +CREATE TABLE IF NOT EXISTS gene_previous_symbols ( + id SERIAL PRIMARY KEY, + prev_symbol TEXT NOT NULL, + concept_id VARCHAR(127) NOT NULL REFERENCES gene_concepts (concept_id) +); +CREATE TABLE IF NOT EXISTS gene_aliases ( + id SERIAL PRIMARY KEY, + alias TEXT NOT NULL, + concept_id VARCHAR(127) NOT NULL REFERENCES gene_concepts (concept_id) +); +CREATE TABLE IF NOT EXISTS gene_xrefs ( + id SERIAL PRIMARY KEY, + xref TEXT NOT NULL, + concept_id VARCHAR(127) NOT NULL REFERENCES gene_concepts (concept_id) +); +CREATE TABLE IF NOT EXISTS gene_associations ( + id SERIAL PRIMARY KEY, + associated_with TEXT NOT NULL, + concept_ID VARCHAR(127) NOT NULL REFERENCES gene_concepts (concept_id) +); diff --git a/gene/database/postgresql/drop_fkeys.sql b/gene/database/postgresql/drop_fkeys.sql new file mode 100644 index 00000000..f804ca1e --- /dev/null +++ b/gene/database/postgresql/drop_fkeys.sql @@ -0,0 +1,6 @@ +ALTER TABLE gene_aliases DROP CONSTRAINT gene_aliases_concept_id_fkey; +ALTER TABLE gene_associations DROP CONSTRAINT gene_associations_concept_id_fkey; +ALTER TABLE gene_previous_symbols + DROP CONSTRAINT gene_previous_symbols_concept_id_fkey; +ALTER TABLE gene_symbols DROP CONSTRAINT gene_symbols_concept_id_fkey; +ALTER TABLE gene_xrefs DROP CONSTRAINT gene_xrefs_concept_id_fkey; diff --git a/gene/database/postgresql/drop_indexes.sql b/gene/database/postgresql/drop_indexes.sql new file mode 100644 index 00000000..f1465b79 --- /dev/null +++ b/gene/database/postgresql/drop_indexes.sql @@ -0,0 +1,8 @@ +DROP INDEX IF EXISTS idx_g_concept_id_low; +DROP INDEX IF EXISTS idx_gm_concept_id_low; +DROP INDEX IF EXISTS idx_gs_symbol_low; +DROP INDEX IF EXISTS idx_gps_symbol_low; +DROP INDEX IF EXISTS idx_gx_xref_low; +DROP INDEX IF EXISTS idx_ga_alias_low; +DROP INDEX IF EXISTS idx_g_as_association_low; +DROP INDEX IF EXISTS idx_rlv_concept_id_low; diff --git a/gene/etl/base.py b/gene/etl/base.py index c0ecc8cc..b01b9aaa 100644 --- a/gene/etl/base.py +++ b/gene/etl/base.py @@ -1,8 +1,8 @@ """A base class for extraction, transformation, and loading of data.""" from abc import ABC, abstractmethod from typing import Optional, List -from gene.database import Database -from gene import PREFIX_LOOKUP, ITEM_TYPES, SEQREPO_DATA_PATH +from gene.database import AbstractDatabase +from gene import ITEM_TYPES, SEQREPO_DATA_PATH from biocommons.seqrepo import SeqRepo from pathlib import Path from ftplib import FTP @@ -13,7 +13,7 @@ import datetime import logging import pydantic -from gene.schemas import Gene, MatchType +from gene.schemas import Gene, MatchType, SourceName logger = logging.getLogger('gene') logger.setLevel(logging.DEBUG) @@ -22,18 +22,19 @@ class Base(ABC): """The ETL base class.""" - def __init__(self, database: Database, host: str, data_dir: str, + def __init__(self, database: AbstractDatabase, host: str, data_dir: str, src_data_dir: Path, seqrepo_dir=SEQREPO_DATA_PATH, *args, **kwargs) -> None: """Instantiate Base class. - :param Database database: DynamoDB database + :param AbstractDatabase database: database instance :param str host: Hostname of FTP site :param str data_dir: Data directory of FTP site to look at :param Path src_data_dir: Data directory for source :param Path seqrepo_dir: Path to seqrepo directory """ + self._src_name = SourceName(self.__class__.__name__) self._database = database self._host = host self._data_dir = data_dir @@ -43,7 +44,7 @@ def __init__(self, database: Database, host: str, data_dir: str, @abstractmethod def perform_etl(self) -> List[str]: - """Extract, Transform, and Load data into DynamoDB database. + """Extract, Transform, and Load data into database. :return: Concept IDs of concepts successfully loaded """ @@ -61,36 +62,20 @@ def _transform_data(self, *args, **kwargs) -> None: @abstractmethod def _add_meta(self, *args, **kwargs) -> None: - """Add source meta to DynamoDB table.""" + """Add source meta to database source info.""" raise NotImplementedError def _create_data_directory(self): """Create data directory for source.""" self.src_data_dir.mkdir(exist_ok=True, parents=True) - def _load_meta(self, db, metadata, source_name) -> None: - """Load source metadata into database. - - :param Database db: DynamoDB Database - :param SourceMeta metadata: Source's metadata - :param str source_name: Source to load metadata for - """ - db.metadata.put_item(Item={ - 'src_name': source_name, - 'data_license': metadata.data_license, - 'data_license_url': metadata.data_license_url, - 'version': metadata.version, - 'data_url': metadata.data_url, - 'rdp_url': metadata.rdp_url, - 'data_license_attributes': metadata.data_license_attributes, - 'genome_assemblies': metadata.genome_assemblies - }) - - def _load_gene(self, gene, batch) -> None: - """Load a gene record into database. + def _load_gene(self, gene) -> None: + """Load a gene record into database. This method takes responsibility for: + * validating structure correctness + * removing duplicates from list-like fields + * removing empty fields :param dict gene: Gene record - :param BatchWriter batch: Object to write data to DynamoDB """ try: assert Gene(match_type=MatchType.NO_MATCH, **gene) @@ -100,29 +85,19 @@ def _load_gene(self, gene, batch) -> None: else: concept_id = gene['concept_id'].lower() gene['label_and_type'] = f"{concept_id}##identity" - gene['src_name'] = \ - PREFIX_LOOKUP[gene['concept_id'].split(':')[0].lower()] + gene["src_name"] = self._src_name.value gene['item_type'] = 'identity' - for attr_type, item_type in ITEM_TYPES.items(): + for attr_type in ITEM_TYPES: if attr_type in gene: value = gene[attr_type] - if value is not None and value != []: - if isinstance(value, str): - items = [value.lower()] - else: - gene[attr_type] = list(set(value)) - items = {item.lower() for item in value} - for item in items: - batch.put_item(Item={ - 'label_and_type': f"{item}##{item_type}", - 'concept_id': concept_id, - 'src_name': gene['src_name'], - 'item_type': item_type - }) - else: + if value is None or value == []: del gene[attr_type] - batch.put_item(Item=gene) + elif isinstance(value, str): + continue + gene[attr_type] = list(set(value)) + + self._database.add_record(gene, self._src_name) self._processed_ids.append(concept_id) def _ftp_download(self, host: str, data_dir: str, fn: str, diff --git a/gene/etl/ensembl.py b/gene/etl/ensembl.py index 13720d5a..8ba7d2bc 100644 --- a/gene/etl/ensembl.py +++ b/gene/etl/ensembl.py @@ -8,7 +8,7 @@ from .base import Base from gene import APP_ROOT from gene.schemas import SourceName, NamespacePrefix, Strand, SourceMeta -from gene.database import Database +from gene.database import AbstractDatabase from gene.etl.vrs_locations import SequenceLocation @@ -19,20 +19,18 @@ class Ensembl(Base): """ETL the Ensembl source into the normalized database.""" - def __init__(self, database: Database, host="ftp.ensembl.org", + def __init__(self, database: AbstractDatabase, host="ftp.ensembl.org", data_dir="pub/current_gff3/homo_sapiens/", src_data_dir=APP_ROOT / "data" / "ensembl") -> None: """Initialize Ensembl ETL class. - :param Database database: DynamoDB database + :param AbstractDatabase database: DynamoDB database :param str host: FTP host name :param str data_dir: FTP data directory to use :param Path src_data_dir: Data directory for Ensembl """ super().__init__(database, host, data_dir, src_data_dir) self._sequence_location = SequenceLocation() - self._host = host - self._data_dir = data_dir self._version = None self._fn = None self._data_url = None @@ -55,7 +53,7 @@ def _download_data(self) -> None: self._assembly = resp["assembly"] self._version = resp["version"] self._fn = f - self._data_url = f"ftp://{self._host}/{self._data_dir}{self._fn}" + self._data_url = f"ftp://{self._host}/{self._data_dir}{self._fn}" # noqa: E501 new_fn = f"ensembl_{self._version}.gff3" if not (self.src_data_dir / new_fn).exists(): self._ftp_download_file(ftp, self._fn, self.src_data_dir, @@ -89,15 +87,14 @@ def _transform_data(self, *args, **kwargs): for item in db.features_of_type("chromosome"): accession_numbers[item[0]] = item[8]["Alias"][-1] - with self._database.genes.batch_writer() as batch: - for f in db.all_features(): - if f.attributes.get("ID"): - f_id = f.attributes.get("ID")[0].split(":")[0] - if f_id == "gene": - gene = \ - self._add_gene(f, self.seqrepo, accession_numbers) - if gene: - self._load_gene(gene, batch) + for f in db.all_features(): + if f.attributes.get("ID"): + f_id = f.attributes.get("ID")[0].split(":")[0] + if f_id == "gene": + gene = \ + self._add_gene(f, self.seqrepo, accession_numbers) + if gene: + self._load_gene(gene) logger.info("Successfully transformed Ensembl.") def _add_gene(self, f, sr, accession_numbers): @@ -215,7 +212,7 @@ def perform_etl(self, *args, **kwargs): self._extract_data() self._add_meta() self._transform_data() - self._database.flush_batch() + self._database.complete_write_transaction() return self._processed_ids def _add_meta(self, *args, **kwargs): @@ -235,17 +232,4 @@ def _add_meta(self, *args, **kwargs): genome_assemblies=[self._assembly] ) - self._database.metadata.put_item( - Item={ - "src_name": SourceName.ENSEMBL.value, - "data_license": metadata.data_license, - "data_license_url": metadata.data_license_url, - "version": metadata.version, - "data_url": metadata.data_url, - "rdp_url": metadata.rdp_url, - "data_license_attributes": metadata.data_license_attributes, - "genome_assemblies": metadata.genome_assemblies - } - ) - - self._load_meta(self._database, metadata, SourceName.ENSEMBL.value) + self._database.add_source_metadata(self._src_name, metadata) diff --git a/gene/etl/hgnc.py b/gene/etl/hgnc.py index 205c0f85..cfea14dc 100644 --- a/gene/etl/hgnc.py +++ b/gene/etl/hgnc.py @@ -5,7 +5,7 @@ import re from gene import APP_ROOT, PREFIX_LOOKUP -from gene.database import Database +from gene.database import AbstractDatabase from gene.schemas import SourceName, SymbolStatus, NamespacePrefix, \ SourceMeta, Annotation, Chromosome from gene.etl.base import Base @@ -19,7 +19,7 @@ class HGNC(Base): """ETL the HGNC source into the normalized database.""" def __init__(self, - database: Database, + database: AbstractDatabase, host='ftp.ebi.ac.uk', data_dir='pub/databases/genenames/hgnc/json/', src_data_dir=APP_ROOT / 'data' / 'hgnc', @@ -27,7 +27,7 @@ def __init__(self, ): """Initialize HGNC ETL class. - :param Database database: DynamoDB database + :param AbstractDatabase database: DynamoDB database :param str host: FTP host name :param str data_dir: FTP data directory to use :param Path src_data_dir: Data directory for HGNC @@ -66,36 +66,34 @@ def _transform_data(self, *args, **kwargs): records = data['response']['docs'] - with self._database.genes.batch_writer() as batch: - for r in records: - gene = dict() - gene['concept_id'] = r['hgnc_id'].lower() - gene['label_and_type'] = \ - f"{gene['concept_id']}##identity" - gene['item_type'] = 'identity' - gene['symbol'] = r['symbol'] - gene['label'] = r['name'] - gene['src_name'] = SourceName.HGNC.value - if r['status']: - if r['status'] == 'Approved': - gene['symbol_status'] = \ - SymbolStatus.APPROVED.value - elif r['status'] == 'Entry Withdrawn': - gene['symbol_status'] =\ - SymbolStatus.WITHDRAWN.value - gene['src_name'] = SourceName.HGNC.value - - # store alias, xref, associated_with, prev_symbols, location - self._get_aliases(r, gene) - self._get_xrefs_associated_with(r, gene) - if 'prev_symbol' in r: - self._get_previous_symbols(r, gene) - if 'location' in r: - self._get_location(r, gene) - if "locus_type" in r: - gene["gene_type"] = r["locus_type"] - self._load_gene(gene, batch) - logger.info('Successfully transformed HGNC.') + for r in records: + gene = dict() + gene["concept_id"] = r["hgnc_id"].lower() + gene["label_and_type"] = f"{gene['concept_id']}##identity" + gene["item_type"] = "identity" + gene["symbol"] = r["symbol"] + gene["label"] = r["name"] + gene["src_name"] = SourceName.HGNC.value + if r["status"]: + if r["status"] == "Approved": + gene["symbol_status"] = \ + SymbolStatus.APPROVED.value + elif r["status"] == "Entry Withdrawn": + gene["symbol_status"] =\ + SymbolStatus.WITHDRAWN.value + gene["src_name"] = SourceName.HGNC.value + + # store alias, xref, associated_with, prev_symbols, location + self._get_aliases(r, gene) + self._get_xrefs_associated_with(r, gene) + if "prev_symbol" in r: + self._get_previous_symbols(r, gene) + if "location" in r: + self._get_location(r, gene) + if "locus_type" in r: + gene["gene_type"] = r["locus_type"] + self._load_gene(gene) + logger.info("Successfully transformed HGNC.") def _get_aliases(self, r, gene): """Store aliases in a gene record. @@ -256,8 +254,7 @@ def _set_location(self, loc, location, gene): if '-' in loc: # Location gives both start and end - self._chromosome_location.set_interval_range(loc, - arm_ix, location) + self._chromosome_location.set_interval_range(loc, arm_ix, location) else: # Location only gives start start = loc[arm_ix:] @@ -276,10 +273,10 @@ def perform_etl(self, *args, **kwargs): self._extract_data() self._add_meta() self._transform_data() - self._database.flush_batch() + self._database.complete_write_transaction() return self._processed_ids - def _add_meta(self, *args, **kwargs): + def _add_meta(self) -> None: """Add HGNC metadata to the gene_metadata table.""" metadata = SourceMeta( data_license='custom', @@ -294,5 +291,4 @@ def _add_meta(self, *args, **kwargs): }, genome_assemblies=[] ) - - self._load_meta(self._database, metadata, SourceName.HGNC.value) + self._database.add_source_metadata(SourceName.HGNC, metadata) diff --git a/gene/etl/merge.py b/gene/etl/merge.py index df360e10..17c8d1ff 100644 --- a/gene/etl/merge.py +++ b/gene/etl/merge.py @@ -1,7 +1,8 @@ """Create concept groups and merged records.""" -from gene.database import Database +from gene.database import AbstractDatabase +from gene.database.database import DatabaseWriteException from gene.schemas import SourcePriority, GeneTypeFieldName -from typing import Set, Dict +from typing import Optional, Set, Dict from timeit import default_timer as timer import logging @@ -13,7 +14,7 @@ class Merge: """Handles record merging.""" - def __init__(self, database: Database): + def __init__(self, database: AbstractDatabase): """Initialize Merge instance. :param Database database: db instance to use for record retrieval and creation. @@ -49,28 +50,34 @@ def create_merged_concepts(self, record_ids: Set[str]): merged_record = self._generate_merged_record(group) # add group merger item to DB - self._database.add_record(merged_record, 'merger') + self._database.add_merged_record(merged_record) # add updated references for concept_id in group: - if not self._database.get_record_by_id(concept_id, False): - logger.error(f"Updating nonexistent record: {concept_id} " - f"for {merged_record['label_and_type']}") - else: - merge_ref = merged_record['concept_id'].lower() - self._database.update_record(concept_id, 'merge_ref', - merge_ref) + merge_ref = merged_record["concept_id"] + try: + self._database.update_merge_ref(concept_id, merge_ref) + except DatabaseWriteException as dw: + if str(dw).startswith("No such record exists"): + logger.error(f"Updating nonexistent record: {concept_id} " + f"for merge ref to {merge_ref}") + else: + logger.error(str(dw)) uploaded_ids |= group + self._database.complete_write_transaction() logger.info('Merged concept generation successful.') end = timer() logger.debug(f'Generated and added concepts in {end - start} seconds') def _create_record_id_set(self, record_id: str, - observed_id_set: Set = set()) -> Set[str]: + observed_id_set: Optional[Set] = None) -> Set[str]: """Create concept ID group for an individual record ID. :param str record_id: concept ID for record to build group from :return: set of related identifiers pertaining to a common concept. """ + if observed_id_set is None: + observed_id_set = set() + if record_id in self._groups: return self._groups[record_id] else: @@ -81,9 +88,11 @@ def _create_record_id_set(self, record_id: str, f"{observed_id_set}") return observed_id_set - {record_id} - local_id_set = set(db_record.get('xrefs', [])) - if not local_id_set: - return observed_id_set | {db_record['concept_id']} + record_xrefs = db_record.get("xrefs") + if not record_xrefs: + return observed_id_set | {db_record["concept_id"]} + else: + local_id_set = set(record_xrefs) merged_id_set = {record_id} | observed_id_set for local_record_id in local_id_set - observed_id_set: merged_id_set |= self._create_record_id_set(local_record_id, @@ -161,7 +170,5 @@ def record_order(record): else: del merged_attrs[field] - merged_attrs['label_and_type'] = \ - f'{merged_attrs["concept_id"].lower()}##merger' merged_attrs['item_type'] = 'merger' return merged_attrs diff --git a/gene/etl/ncbi.py b/gene/etl/ncbi.py index e1239f2f..e44b7d00 100644 --- a/gene/etl/ncbi.py +++ b/gene/etl/ncbi.py @@ -9,7 +9,7 @@ import gffutils from gene import APP_ROOT, PREFIX_LOOKUP -from gene.database import Database +from gene.database import AbstractDatabase from gene.schemas import SourceMeta, SourceName, NamespacePrefix, Annotation, \ Chromosome, SymbolStatus from gene.etl.base import Base @@ -24,13 +24,13 @@ class NCBI(Base): """ETL class for NCBI source""" def __init__(self, - database: Database, + database: AbstractDatabase, host='ftp.ncbi.nlm.nih.gov', data_dir='gene/DATA/', src_data_dir=APP_ROOT / 'data' / 'ncbi'): """Construct the NCBI ETL instance. - :param Database database: gene database for adding new data + :param AbstractDatabase database: gene database for adding new data :param str host: FTP host name :param str data_dir: FTP data directory to use :param Path src_data_dir: Data directory for NCBI @@ -48,8 +48,9 @@ def perform_etl(self): :return: Concept IDs of concepts successfully loaded """ self._extract_data() + self._add_meta() self._transform_data() - self._database.flush_batch() + self._database.complete_write_transaction() return self._processed_ids def _download_data(self): @@ -69,7 +70,8 @@ def _download_data(self): fn = f'ncbi_history_{self._date_today}.tsv' data_fn = 'gene_history.gz' logger.info('Downloading NCBI gene_history...') - self._ftp_download(self._host, self._data_dir, fn, self.src_data_dir, data_fn) + self._ftp_download(self._host, self._data_dir, fn, self.src_data_dir, + data_fn) logger.info('Successfully downloaded NCBI gene_history.') # Download gff @@ -148,26 +150,23 @@ def _get_prev_symbols(self): history = csv.reader(history_file, delimiter='\t') next(history) prev_symbols = {} - with self._database.genes.batch_writer() as batch: - for row in history: - # Only interested in rows that have homo sapiens tax id - if row[0] == '9606': - if row[1] != '-': - gene_id = row[1] - if gene_id in prev_symbols.keys(): - prev_symbols[gene_id].append(row[3]) - else: - prev_symbols[gene_id] = [row[3]] + for row in history: + # Only interested in rows that have homo sapiens tax id + if row[0] == "9606": + if row[1] != "-": + gene_id = row[1] + if gene_id in prev_symbols.keys(): + prev_symbols[gene_id].append(row[3]) else: - # Load discontinued genes - params = { - 'concept_id': - f'{NamespacePrefix.NCBI.value.lower()}:' - f'{row[2]}', - 'symbol': row[3], - 'symbol_status': SymbolStatus.DISCONTINUED.value - } - self._load_gene(params, batch) + prev_symbols[gene_id] = [row[3]] + else: + # Load discontinued genes + params = { + "concept_id": f"{NamespacePrefix.NCBI.value}:{row[2]}", + "symbol": row[3], + "symbol_status": SymbolStatus.DISCONTINUED.value + } + self._load_gene(params) history_file.close() return prev_symbols @@ -529,7 +528,6 @@ def _set_centromere_location(self, loc, location): def _transform_data(self): """Modify data and pass to loading functions.""" logger.info('Transforming NCBI...') - self._add_meta() prev_symbols = self._get_prev_symbols() info_genes = self._get_gene_info(prev_symbols) @@ -542,9 +540,8 @@ def _transform_data(self): self._get_gene_gff(db, info_genes, self.seqrepo) - with self._database.genes.batch_writer() as batch: - for gene in info_genes.keys(): - self._load_gene(info_genes[gene], batch) + for gene in info_genes.keys(): + self._load_gene(info_genes[gene]) logger.info('Successfully transformed NCBI.') def _add_meta(self): @@ -564,4 +561,4 @@ def _add_meta(self): genome_assemblies=[self._assembly] ) - self._load_meta(self._database, metadata, SourceName.NCBI.value) + self._database.add_source_metadata(SourceName.NCBI, metadata) diff --git a/gene/main.py b/gene/main.py index 7c4e9f54..77fc07c1 100644 --- a/gene/main.py +++ b/gene/main.py @@ -3,13 +3,16 @@ from fastapi.openapi.utils import get_openapi from typing import Optional from gene import __version__ +from gene.database import create_db from gene.query import QueryHandler, InvalidParameterException from gene.schemas import SearchService, NormalizeService, \ UnmergedNormalizationService import html -query_handler = QueryHandler() +db = create_db() +query_handler = QueryHandler(db) + app = FastAPI( docs_url="/gene", openapi_url="/gene/openapi.json", diff --git a/gene/query.py b/gene/query.py index 58b4fb8c..cefe70d0 100644 --- a/gene/query.py +++ b/gene/query.py @@ -1,21 +1,19 @@ """This module provides methods for handling queries.""" import re -from typing import List, Dict, Set, Any, TypeVar, Callable, Optional +from typing import List, Dict, Set, Any, Tuple, TypeVar, Callable, Optional from urllib.parse import quote from datetime import datetime from ga4gh.vrsatile.pydantic.vrs_models import VRSTypes from ga4gh.vrsatile.pydantic.vrsatile_models import GeneDescriptor, Extension -from botocore.exceptions import ClientError -from boto3.dynamodb.conditions import Key from ga4gh.vrs import models from ga4gh.core import ga4gh_identify from gene import logger from gene import NAMESPACE_LOOKUP, PREFIX_LOOKUP, ITEM_TYPES -from gene.database import Database -from gene.schemas import BaseGene, Gene, SourceMeta, MatchType, SourceName, \ - ServiceMeta, SourcePriority, NormalizeService, SearchService, \ +from gene.database import AbstractDatabase, DatabaseReadException +from gene.schemas import BaseGene, Gene, NamespacePrefix, RefType, MatchType, \ + SourceName, ServiceMeta, SourcePriority, NormalizeService, SearchService, \ GeneTypeFieldName, UnmergedNormalizationService, MatchesNormalized, \ BaseNormalizationService from gene.version import __version__ @@ -34,13 +32,9 @@ class QueryHandler: and normalizes query input. """ - def __init__(self, db_url: str = '', db_region: str = 'us-east-2'): - """Initialize QueryHandler instance. - - :param str db_url: URL to database source. - :param str db_region: AWS default region. - """ - self.db = Database(db_url=db_url, region_name=db_region) + def __init__(self, database: AbstractDatabase) -> None: + """Initialize QueryHandler instance.""" + self.db = database @staticmethod def emit_warnings(query_str: str) -> List: @@ -63,27 +57,10 @@ def emit_warnings(query_str: str) -> List: ) return warnings - def fetch_meta(self, src_name: str) -> SourceMeta: - """Fetch metadata for src_name. - - :param str src_name: name of source to get metadata for - :return: SourceMeta object containing source metadata - """ - if src_name in self.db.cached_sources.keys(): - return self.db.cached_sources[src_name] - else: - try: - db_response = self.db.metadata.get_item(Key={'src_name': - src_name}) - response = SourceMeta(**db_response['Item']) - self.db.cached_sources[src_name] = response - return response - except ClientError as e: - logger.error(e.response['Error']['Message']) - @staticmethod def _transform_sequence_location(loc: Dict) -> models.SequenceLocation: """Transform a sequence location to VRS sequence location + :param Dict loc: Sequence location :return: VRS sequence location """ @@ -114,15 +91,16 @@ def _transform_chromosome_location(loc: Dict) -> models.ChromosomeLocation: def _transform_location(self, loc: Dict) -> Dict: """Transform a sequence/chromosome location to VRS sequence/chromosome location + :param Dict loc: Sequence or Chromosome location :return: VRS sequence or chromosome location represented as a dictionary """ if loc["type"] == VRSTypes.SEQUENCE_LOCATION: - loc = self._transform_sequence_location(loc) + transformed_loc = self._transform_sequence_location(loc) else: - loc = self._transform_chromosome_location(loc) - loc._id = ga4gh_identify(loc) - return loc.as_dict() + transformed_loc = self._transform_chromosome_location(loc) + transformed_loc._id = ga4gh_identify(transformed_loc) + return transformed_loc.as_dict() def _transform_locations(self, record: Dict) -> Dict: """Transform gene locations to VRS Chromosome/Sequence Locations @@ -137,25 +115,37 @@ def _transform_locations(self, record: Dict) -> Dict: record["locations"] = record_locations return record + def _get_src_name(self, concept_id: str) -> SourceName: + """Get source name enum from ID. + + :param concept_id: candidate concept ID string to check + :return: SourceName option + :raise: ValueError if unrecognized ID provided + """ + if concept_id.startswith(NamespacePrefix.ENSEMBL.value): + return SourceName.ENSEMBL + elif concept_id.startswith(NamespacePrefix.NCBI.value): + return SourceName.NCBI + elif concept_id.startswith(NamespacePrefix.HGNC.value): + return SourceName.HGNC + else: + raise ValueError("Invalid or unrecognized concept ID provided") + def add_record(self, response: Dict[str, Dict], item: Dict, - match_type: MatchType) -> (Dict, str): + match_type: MatchType): """Add individual record (i.e. Item in DynamoDB) to response object :param Dict[str, Dict] response: in-progress response object to return to client :param Dict item: Item retrieved from DynamoDB :param MatchType match_type: match type for query - :return: Tuple containing updated response object, and string - containing name of the source of the match """ - del item['label_and_type'] - # DynamoDB Numbers get converted to Decimal item = self._transform_locations(item) item["match_type"] = match_type gene = Gene(**item) - src_name = item['src_name'] + src_name = item["src_name"] matches = response['source_matches'] if src_name not in matches.keys(): @@ -163,13 +153,11 @@ def add_record(self, elif matches[src_name] is None: matches[src_name] = { 'records': [gene], - 'source_meta_': self.fetch_meta(src_name) + 'source_meta_': self.db.get_source_metadata(src_name) } else: matches[src_name]['records'].append(gene) - return response, src_name - def fetch_record(self, response: Dict[str, Dict], concept_id: str, match_type: MatchType) -> None: """Add fetched record to response @@ -181,13 +169,16 @@ def fetch_record(self, response: Dict[str, Dict], concept_id: str, :param MatchType match_type: match type for record """ try: - pk = f'{concept_id}##identity' - filter_exp = Key('label_and_type').eq(pk) - result = self.db.genes.query(KeyConditionExpression=filter_exp) - match = result['Items'][0] - self.add_record(response, match, match_type) - except ClientError as e: - logger.error(e.response['Error']['Message']) + match = self.db.get_record_by_id(concept_id, case_sensitive=False) + except DatabaseReadException as e: + logger.error( + f"Encountered DatabaseReadException looking up {concept_id}: {e}" + ) + else: + if match: + self.add_record(response, match, match_type) + else: + logger.error(f"Unable to find expected record for {concept_id} matching as {match_type}") # noqa: E501 def post_process_resp(self, resp: Dict) -> Dict: """Fill all empty source_matches slots with NO_MATCH results and @@ -202,7 +193,7 @@ def post_process_resp(self, resp: Dict) -> Dict: resp['source_matches'][src_name] = { 'match_type': MatchType.NO_MATCH, 'records': [], - 'source_meta_': self.fetch_meta(src_name) + 'source_meta_': self.db.get_source_metadata(src_name) } else: records = resp['source_matches'][src_name]['records'] @@ -232,39 +223,35 @@ def response_keyed(self, query: str, sources: Set[str]) -> Dict: queries = list() if [p for p in PREFIX_LOOKUP.keys() if query_l.startswith(p)]: - pk = f'{query_l}##identity' - queries.append(pk) + queries.append((query_l, "identity")) for prefix in [p for p in NAMESPACE_LOOKUP.keys() if query_l.startswith(p)]: - pk = f'{NAMESPACE_LOOKUP[prefix].lower()}:{query_l}##identity' - queries.append(pk) + term = f"{NAMESPACE_LOOKUP[prefix].lower()}:{query_l}" + queries.append((term, "identity")) for match in ITEM_TYPES.values(): - pk = f'{query_l}##{match}' - queries.append(pk) + queries.append((query_l, match)) matched_concept_ids = list() - for q in queries: + for term, item_type in queries: try: - query_resp = self.db.genes.query( - KeyConditionExpression=Key('label_and_type').eq(q) + if item_type == "identity": + record = self.db.get_record_by_id(term, False) + if record and record['concept_id'] not in matched_concept_ids: + self.add_record(resp, record, MatchType.CONCEPT_ID) + else: + refs = self.db.get_refs_by_type(term, RefType(item_type)) + for ref in refs: + if ref not in matched_concept_ids: + self.fetch_record(resp, ref, MatchType[item_type.upper()]) + matched_concept_ids.append(ref) + + except DatabaseReadException as e: + logger.error( + f"Encountered DatabaseReadException looking up {item_type}" + f" {term}: {e}" ) - for record in query_resp['Items']: - concept_id = record['concept_id'] - if concept_id in matched_concept_ids: - continue - else: - if record['item_type'] == "identity": - self.add_record(resp, record, MatchType.CONCEPT_ID) - else: - self.fetch_record( - resp, concept_id, - MatchType[record['item_type'].upper()]) - matched_concept_ids.append(concept_id) - - except ClientError as e: - logger.error(e.response['Error']['Message']) continue # remaining sources get no match @@ -324,7 +311,7 @@ def search(self, query_str: str, keyed: bool = False, SourceName.__members__.values()} sources = dict() for k, v in possible_sources.items(): - if self.db.metadata.get_item(Key={'src_name': v}).get('Item'): + if self.db.get_source_metadata(v): sources[k] = v if not incl and not excl: @@ -383,7 +370,7 @@ def _add_merged_meta(self, response: NormalizeService) -> NormalizeService: prefix = concept_id.split(':')[0] src_name = PREFIX_LOOKUP[prefix.lower()] if src_name not in sources_meta: - sources_meta[src_name] = self.fetch_meta(src_name) + sources_meta[src_name] = self.db.get_source_metadata(src_name) response.source_meta_ = sources_meta return response @@ -507,7 +494,7 @@ def add_gene_descriptor( return response @staticmethod - def _record_order(record: Dict) -> (int, str): + def _record_order(record: Dict) -> Tuple[int, str]: """Construct priority order for matching. Only called by sort(). :param Dict record: individual record item in iterable to sort @@ -586,21 +573,11 @@ def _resolve_merge( # record is sole member of concept group return callback(response, record, match_type, possible_concepts) - def _get_matches_by_type(self, query: str, match_type: str) -> List[Dict]: - """Get matches list for match tier. - :param str query: user-provided query - :param str match_type: keyword of match type to check - :return: List of records matching the query and match level - """ - matching_refs = self.db.get_records_by_type(query, match_type) - matching_records = [self.db.get_record_by_id(m["concept_id"], False) - for m in matching_refs] - return sorted(matching_records, key=self._record_order) # type: ignore - def _perform_normalized_lookup( self, response: NormService, query: str, response_builder: Callable ) -> NormService: """Retrieve normalized concept, for use in normalization endpoints + :param NormService response: in-progress response object :param str query: user-provided query :param Callable response_builder: response constructor callback method @@ -621,17 +598,15 @@ def _perform_normalized_lookup( return self._resolve_merge(response, record, MatchType.CONCEPT_ID, response_builder) - for match_type in ITEM_TYPES.values(): + for match_type in RefType: # get matches list for match tier - matching_refs = self.db.get_records_by_type(query_str, match_type) + matching_refs = self.db.get_refs_by_type(query_str, match_type) matching_records = \ - [self.db.get_record_by_id(m['concept_id'], False) - for m in matching_refs] + [self.db.get_record_by_id(ref, False) for ref in matching_refs] matching_records.sort(key=self._record_order) # type: ignore if len(matching_refs) > 1: - possible_concepts = \ - [ref["concept_id"] for ref in matching_refs] + possible_concepts = [ref for ref in matching_refs] else: possible_concepts = None @@ -640,7 +615,7 @@ def _perform_normalized_lookup( assert match is not None record = self.db.get_record_by_id(match["concept_id"], False) if record: - match_type_value = MatchType[match_type.upper()] + match_type_value = MatchType[match_type.value.upper()] return self._resolve_merge( response, record, match_type_value, response_builder, possible_concepts @@ -664,9 +639,10 @@ def _add_normalized_records( response.normalized_concept_id = normalized_record["concept_id"] if normalized_record["item_type"] == "identity": record_source = SourceName[normalized_record["src_name"].upper()] + meta = self.db.get_source_metadata(record_source.value) response.source_matches[record_source] = MatchesNormalized( records=[BaseGene(**self._transform_locations(normalized_record))], - source_meta_=self.fetch_meta(record_source.value) + source_meta_=meta # type: ignore ) else: concept_ids = [normalized_record["concept_id"]] + \ @@ -680,9 +656,10 @@ def _add_normalized_records( if record_source in response.source_matches: response.source_matches[record_source].records.append(gene) else: + meta = self.db.get_source_metadata(record_source.value) response.source_matches[record_source] = MatchesNormalized( records=[gene], - source_meta_=self.fetch_meta(record_source.value) + source_meta_=meta, # type: ignore ) if possible_concepts: response = self._add_alt_matches(response, normalized_record, diff --git a/gene/schemas.py b/gene/schemas.py index 6b178c4d..539c9c06 100644 --- a/gene/schemas.py +++ b/gene/schemas.py @@ -207,8 +207,8 @@ class DataLicenseAttributes(BaseModel): attribution: StrictBool -class ItemTypes(str, Enum): - """Item types used in DynamoDB.""" +class RefType(str, Enum): + """Reference item types.""" # Must be in descending MatchType order. SYMBOL = 'symbol' diff --git a/gene/version.py b/gene/version.py index 33d4b3ae..7766a73f 100644 --- a/gene/version.py +++ b/gene/version.py @@ -1,2 +1,2 @@ """Gene normalizer version""" -__version__ = "0.1.33" +__version__ = "0.1.34" diff --git a/requirements-dev.txt b/requirements-dev.txt index 5c92578f..3eae55a4 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -1,116 +1,119 @@ -i https://pypi.org/simple -anyio==3.6.2 ; python_full_version >= '3.6.2' +anyio==3.6.2; python_full_version >= '3.6.2' appdirs==1.4.4 -appnope==0.1.3 ; platform_system == 'Darwin' -argcomplete==3.0.5 ; python_version >= '3.6' -argh==0.28.1 ; python_version >= '3.8' +appnope==0.1.3; platform_system == 'Darwin' +argcomplete==3.0.5; python_version >= '3.6' +argh==0.28.1; python_version >= '3.8' asttokens==2.2.1 -attrs==22.2.0 ; python_version >= '3.6' +attrs==22.2.0; python_version >= '3.6' backcall==0.2.0 -beautifulsoup4==4.12.0 ; python_full_version >= '3.6.0' +beautifulsoup4==4.12.0; python_version >= '3.6' biocommons.seqrepo==0.6.5 -bioutils==0.5.7 ; python_version >= '3.6' +bioutils==0.5.7; python_version >= '3.6' boto3==1.26.103 -botocore==1.29.103 ; python_version >= '3.7' +botocore==1.29.103; python_version >= '3.7' bs4==0.0.1 -canonicaljson==2.0.0 ; python_version >= '3.7' -certifi==2022.12.7 ; python_version >= '3.6' -cfgv==3.3.1 ; python_full_version >= '3.6.1' -charset-normalizer==3.1.0 ; python_full_version >= '3.7.0' +canonicaljson==2.0.0; python_version >= '3.7' +certifi==2022.12.7; python_version >= '3.6' +cfgv==3.3.1; python_full_version >= '3.6.1' +charset-normalizer==3.1.0; python_version >= '3.7' click==8.1.3 -coloredlogs==15.0.1 ; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3, 3.4' -comm==0.1.3 ; python_version >= '3.6' -coverage[toml]==7.2.2 ; python_version >= '3.7' -cssselect==1.2.0 ; python_version >= '3.7' -debugpy==1.6.6 ; python_version >= '3.7' -decorator==5.1.1 ; python_version >= '3.5' +coloredlogs==15.0.1; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3, 3.4' +comm==0.1.3; python_version >= '3.6' +coverage[toml]==7.2.2; python_version >= '3.7' +cssselect==1.2.0; python_version >= '3.7' +debugpy==1.6.6; python_version >= '3.7' +decorator==5.1.1; python_version >= '3.5' distlib==0.3.6 -exceptiongroup==1.1.1 ; python_version < '3.11' +exceptiongroup==1.1.1; python_version < '3.11' executing==1.2.0 fake-useragent==1.1.3 fastapi==0.95.0 -filelock==3.10.7 ; python_version >= '3.7' +filelock==3.10.7; python_version >= '3.7' flake8==6.0.0 flake8-docstrings==1.7.0 ga4gh.vrs==0.7.8 ga4gh.vrsatile.pydantic==0.0.11 +-e . +-e . gffutils==0.11.1 -h11==0.14.0 ; python_version >= '3.7' -humanfriendly==10.0 ; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3, 3.4' -identify==2.5.22 ; python_version >= '3.7' -idna==3.4 ; python_version >= '3.5' -importlib-metadata==6.1.0 ; python_version >= '3.7' -inflection==0.5.1 ; python_version >= '3.5' -iniconfig==2.0.0 ; python_version >= '3.7' +h11==0.14.0; python_version >= '3.7' +humanfriendly==10.0; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3, 3.4' +identify==2.5.22; python_version >= '3.7' +idna==3.4; python_version >= '3.5' +importlib-metadata==6.1.0; python_version < '3.10' +importlib-resources==5.12.0; python_version < '3.10' +inflection==0.5.1; python_version >= '3.5' +iniconfig==2.0.0; python_version >= '3.7' ipykernel==6.22.0 -ipython==8.12.0 ; python_version >= '3.8' -jedi==0.18.2 ; python_version >= '3.6' -jmespath==1.0.1 ; python_version >= '3.7' +ipython==8.12.0; python_version >= '3.8' +jedi==0.18.2; python_version >= '3.6' +jmespath==1.0.1; python_version >= '3.7' jsonschema==3.2.0 -jupyter-client==8.1.0 ; python_version >= '3.8' -jupyter-core==5.3.0 ; python_version >= '3.8' -lxml==4.9.2 ; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3, 3.4' -markdown==3.4.3 ; python_version >= '3.7' -matplotlib-inline==0.1.6 ; python_version >= '3.5' -mccabe==0.7.0 ; python_version >= '3.6' +jupyter-client==8.1.0; python_version >= '3.8' +jupyter-core==5.3.0; python_version >= '3.8' +lxml==4.9.2; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3, 3.4' +markdown==3.4.3; python_version >= '3.7' +matplotlib-inline==0.1.6; python_version >= '3.5' +mccabe==0.7.0; python_version >= '3.6' mock==5.0.1 -nest-asyncio==1.5.6 ; python_version >= '3.5' -nodeenv==1.7.0 ; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3, 3.4, 3.5, 3.6' -numpy==1.24.2 ; python_version >= '3.8' -packaging==23.0 ; python_version >= '3.7' +nest-asyncio==1.5.6; python_version >= '3.5' +nodeenv==1.7.0; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3, 3.4, 3.5, 3.6' +numpy==1.24.2; python_version >= '3.8' +packaging==23.0; python_version >= '3.7' parse==1.19.0 -parso==0.8.3 ; python_version >= '3.6' -pexpect==4.8.0 ; sys_platform != 'win32' +parso==0.8.3; python_version >= '3.6' +pexpect==4.8.0; sys_platform != 'win32' pickleshare==0.7.5 -platformdirs==3.2.0 ; python_version >= '3.7' -pluggy==1.0.0 ; python_version >= '3.6' +platformdirs==3.2.0; python_version >= '3.7' +pluggy==1.0.0; python_version >= '3.6' pre-commit==3.2.1 -prompt-toolkit==3.0.38 ; python_full_version >= '3.7.0' -psutil==5.9.4 ; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3' -psycopg2-binary==2.9.5 +prompt-toolkit==3.0.38; python_version >= '3.7' +psutil==5.9.4; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3' +psycopg[binary]==3.1.8 +psycopg-binary==3.1.8 ptyprocess==0.7.0 pure-eval==0.2.2 -pycodestyle==2.10.0 ; python_version >= '3.6' +pycodestyle==2.10.0; python_version >= '3.6' pydantic==1.10.7 -pydocstyle==6.3.0 ; python_version >= '3.6' +pydocstyle==6.3.0; python_version >= '3.6' pyee==8.2.2 -pyfaidx==0.7.2.1 ; python_version >= '3.7' -pyflakes==3.0.1 ; python_version >= '3.6' -pygments==2.14.0 ; python_version >= '3.6' -pyppeteer==1.0.2 ; python_version >= '3.7' and python_version < '4.0' +pyfaidx==0.7.2.1; python_version >= '3.7' +pyflakes==3.0.1; python_version >= '3.6' +pygments==2.14.0; python_version >= '3.6' +pyppeteer==1.0.2; python_version >= '3.7' and python_version < '4.0' pyquery==2.0.0 -pyrsistent==0.19.3 ; python_version >= '3.7' +pyrsistent==0.19.3; python_version >= '3.7' pysam==0.20.0 pytest==7.2.2 pytest-cov==4.0.0 -python-dateutil==2.8.2 ; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2' +python-dateutil==2.8.2; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3' python-jsonschema-objects==0.4.1 -pyyaml==6.0 ; python_version >= '3.6' -pyzmq==25.0.2 ; python_version >= '3.6' -requests==2.28.2 ; python_version >= '3.7' and python_version < '4' -requests-html==0.10.0 ; python_full_version >= '3.6.0' -s3transfer==0.6.0 ; python_version >= '3.7' -setuptools==67.6.1 ; python_version >= '3.7' -simplejson==3.18.4 ; python_version >= '2.5' and python_version not in '3.0, 3.1, 3.2' -six==1.16.0 ; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2' -sniffio==1.3.0 ; python_version >= '3.7' +pyyaml==6.0; python_version >= '3.6' +pyzmq==25.0.2; python_version >= '3.6' +requests==2.28.2; python_version >= '3.7' and python_version < '4' +requests-html==0.10.0; python_version >= '3.6' +s3transfer==0.6.0; python_version >= '3.7' +setuptools==67.6.1; python_version >= '3.7' +simplejson==3.18.4; python_version >= '2.5' and python_version not in '3.0, 3.1, 3.2, 3.3' +six==1.16.0; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3' +sniffio==1.3.0; python_version >= '3.7' snowballstemmer==2.2.0 -soupsieve==2.4 ; python_version >= '3.7' -sqlparse==0.4.3 ; python_version >= '3.5' +soupsieve==2.4; python_version >= '3.7' +sqlparse==0.4.3; python_version >= '3.5' stack-data==0.6.2 -starlette==0.26.1 ; python_version >= '3.7' -tabulate==0.9.0 ; python_version >= '3.7' -tomli==2.0.1 ; python_version < '3.11' -tornado==6.2 ; python_version >= '3.7' -tqdm==4.65.0 ; python_version >= '3.7' -traitlets==5.9.0 ; python_version >= '3.7' -typing-extensions==4.5.0 ; python_version >= '3.7' -urllib3==1.26.15 ; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3, 3.4, 3.5' +starlette==0.26.1; python_version >= '3.7' +tabulate==0.9.0; python_version >= '3.7' +tomli==2.0.1; python_version < '3.11' +tornado==6.2; python_version >= '3.7' +tqdm==4.65.0; python_version >= '3.7' +traitlets==5.9.0; python_version >= '3.7' +typing-extensions==4.5.0; python_version >= '3.7' +urllib3==1.26.15; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3, 3.4, 3.5' uvicorn==0.21.1 -virtualenv==20.21.0 ; python_version >= '3.7' -w3lib==2.1.1 ; python_version >= '3.7' +virtualenv==20.21.0; python_version >= '3.7' +w3lib==2.1.1; python_version >= '3.7' wcwidth==0.2.6 -websockets==10.4 ; python_version >= '3.7' --e . +websockets==10.4; python_version >= '3.7' yoyo-migrations==8.2.0 -zipp==3.15.0 ; python_version >= '3.7' +zipp==3.15.0; python_version >= '3.7' diff --git a/requirements.txt b/requirements.txt index e3b3af6e..9bbd8433 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,36 +1,38 @@ -i https://pypi.org/simple -anyio==3.6.2 ; python_full_version >= '3.6.2' -attrs==22.2.0 ; python_version >= '3.6' -bioutils==0.5.7 ; python_version >= '3.6' +anyio==3.6.2; python_full_version >= '3.6.2' +attrs==22.2.0; python_version >= '3.6' +bioutils==0.5.7; python_version >= '3.6' boto3==1.26.103 -botocore==1.29.103 ; python_version >= '3.7' -canonicaljson==2.0.0 ; python_version >= '3.7' -certifi==2022.12.7 ; python_version >= '3.6' -charset-normalizer==3.1.0 ; python_full_version >= '3.7.0' +botocore==1.29.103; python_version >= '3.7' +canonicaljson==2.0.0; python_version >= '3.7' +certifi==2022.12.7; python_version >= '3.6' +charset-normalizer==3.1.0; python_version >= '3.7' click==8.1.3 -coloredlogs==15.0.1 ; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3, 3.4' +coloredlogs==15.0.1; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3, 3.4' fastapi==0.95.0 ga4gh.vrs==0.7.8 ga4gh.vrsatile.pydantic==0.0.11 -h11==0.14.0 ; python_version >= '3.7' -humanfriendly==10.0 ; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3, 3.4' -idna==3.4 ; python_version >= '3.5' -inflection==0.5.1 ; python_version >= '3.5' -jmespath==1.0.1 ; python_version >= '3.7' +h11==0.14.0; python_version >= '3.7' +humanfriendly==10.0; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3, 3.4' +idna==3.4; python_version >= '3.5' +importlib-metadata==6.1.0; python_version < '3.10' +inflection==0.5.1; python_version >= '3.5' +jmespath==1.0.1; python_version >= '3.7' jsonschema==3.2.0 -markdown==3.4.3 ; python_version >= '3.7' -numpy==1.24.2 ; python_version >= '3.8' +markdown==3.4.3; python_version >= '3.7' +numpy==1.24.2; python_version >= '3.8' pydantic==1.10.7 -pyrsistent==0.19.3 ; python_version >= '3.7' -python-dateutil==2.8.2 ; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2' +pyrsistent==0.19.3; python_version >= '3.7' +python-dateutil==2.8.2; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3' python-jsonschema-objects==0.4.1 -pyyaml==6.0 ; python_version >= '3.6' -requests==2.28.2 ; python_version >= '3.7' and python_version < '4' -s3transfer==0.6.0 ; python_version >= '3.7' -setuptools==67.6.1 ; python_version >= '3.7' -six==1.16.0 ; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2' -sniffio==1.3.0 ; python_version >= '3.7' -starlette==0.26.1 ; python_version >= '3.7' -typing-extensions==4.5.0 ; python_version >= '3.7' -urllib3==1.26.15 ; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3, 3.4, 3.5' +pyyaml==6.0; python_version >= '3.6' +requests==2.28.2; python_version >= '3.7' and python_version < '4' +s3transfer==0.6.0; python_version >= '3.7' +setuptools==67.6.1; python_version >= '3.7' +six==1.16.0; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3' +sniffio==1.3.0; python_version >= '3.7' +starlette==0.26.1; python_version >= '3.7' +typing-extensions==4.5.0; python_version >= '3.7' +urllib3==1.26.15; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3, 3.4, 3.5' uvicorn==0.21.1 +zipp==3.15.0; python_version >= '3.7' diff --git a/setup.cfg b/setup.cfg index a66ca811..033f21a6 100644 --- a/setup.cfg +++ b/setup.cfg @@ -37,11 +37,21 @@ tests_require = pytest-cov mock +[options.package_data] +gene = + database/postgresql/* + +[options.entry_points] +console_scripts = + gene_norm_update = gene.cli:update_normalizer_db + gene_norm_update_remote = gene.cli:update_from_remote + gene_norm_dump = gene.cli:dump_database + [options.extras_require] dev = + psycopg[binary] gffutils biocommons.seqrepo - psycopg2-binary pytest pre-commit flake8 diff --git a/tests/conftest.py b/tests/conftest.py index edee380d..d07d90f5 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,7 +1,16 @@ """Provide utilities for test cases.""" +import pytest +from gene.database import AbstractDatabase, create_db + from gene.schemas import SymbolStatus +@pytest.fixture(scope="session") +def database() -> AbstractDatabase: + """Create database instance.""" + return create_db() + + def assertion_checks(normalized_gene, test_gene, match_type): """Check that normalized_gene and test_gene are the same.""" assert normalized_gene.match_type == match_type diff --git a/tests/unit/test_database_and_etl.py b/tests/unit/test_database_and_etl.py index d1910b7f..11e6b306 100644 --- a/tests/unit/test_database_and_etl.py +++ b/tests/unit/test_database_and_etl.py @@ -9,7 +9,7 @@ from gene.etl import Ensembl, HGNC, NCBI from gene.etl.merge import Merge -from gene.database import Database, AWS_ENV_VAR_NAME +from gene.database import AWS_ENV_VAR_NAME ALIASES = { @@ -30,23 +30,21 @@ "NT_167249.2": ["ga4gh:SQ.Q8IworEhpLeXwpz1CHM7C3luysh-ltx-"] } - -@pytest.fixture(scope='module') -def is_test_env(): - """Test fixture to determine whether or not using test environment.""" - return environ.get("GENE_TEST", "").lower() == "true" +IS_TEST_ENV = environ.get("GENE_TEST", "").lower() == "true" +IS_DDB_TEST = not environ.get("GENE_NORM_DB_URL", "").lower().startswith("postgres") -@pytest.fixture(scope='module') -def dynamodb(is_test_env): - """Create a DynamoDB test fixture.""" +@pytest.fixture(scope="module") +def db_fixture(database): + """Create a database test fixture.""" class DB: def __init__(self): - self.db = Database() + self.db = database + self.db_name = self.db.__class__.__name__ self.merge = Merge(database=self.db) - if is_test_env and AWS_ENV_VAR_NAME not in environ: - self.db.delete_all_db_tables() - self.db.create_db_tables() + if IS_TEST_ENV and AWS_ENV_VAR_NAME not in environ: + self.db.drop_db() + self.db.initialize_db() return DB() @@ -73,115 +71,131 @@ def etl_data_path(): return test_root / 'tests' / 'unit' / 'data' / 'etl_data' -def test_tables_created(dynamodb): +def test_tables_created(db_fixture): """Check that gene_concepts and gene_metadata are created.""" - existing_tables = dynamodb.db.dynamodb_client.list_tables()['TableNames'] - assert 'gene_concepts' in existing_tables - assert 'gene_metadata' in existing_tables - - + existing_tables = db_fixture.db.list_tables() + if db_fixture.db_name == "PostgresDatabase": + assert set(existing_tables) == { + "gene_associations", + "gene_symbols", + "gene_previous_symbols", + "gene_aliases", + "gene_xrefs", + "gene_concepts", + "gene_merged", + "gene_sources", + } + else: + assert 'gene_concepts' in existing_tables + assert 'gene_metadata' in existing_tables + + +@pytest.mark.skipif(not IS_TEST_ENV, reason="not in test environment") @patch.object(Ensembl, 'get_seqrepo') -def test_ensembl_etl(test_get_seqrepo, processed_ids, dynamodb, etl_data_path, - is_test_env): +def test_ensembl_etl(test_get_seqrepo, processed_ids, db_fixture, etl_data_path): """Test that ensembl etl methods work correctly.""" - if is_test_env: - test_get_seqrepo.return_value = None - e = Ensembl(dynamodb.db) + test_get_seqrepo.return_value = None + e = Ensembl(db_fixture.db) - e.src_data_dir = etl_data_path / 'ensembl' - e._download_data() - e._extract_data() - shutil.rmtree(e.src_data_dir) + e.src_data_dir = etl_data_path / 'ensembl' + e._download_data() + e._extract_data() + shutil.rmtree(e.src_data_dir) - e._sequence_location.get_aliases = _get_aliases - e._data_src = etl_data_path / 'ensembl_109.gff3' - e._transform_data() - e._add_meta() - processed_ids += e._processed_ids + e._sequence_location.get_aliases = _get_aliases + e._data_src = etl_data_path / 'ensembl_109.gff3' + e._add_meta() + e._transform_data() + db_fixture.db.complete_write_transaction() + processed_ids += e._processed_ids +@pytest.mark.skipif(not IS_TEST_ENV, reason="not in test environment") @patch.object(HGNC, 'get_seqrepo') -def test_hgnc_etl(test_get_seqrepo, processed_ids, dynamodb, etl_data_path, - is_test_env): +def test_hgnc_etl(test_get_seqrepo, processed_ids, db_fixture, etl_data_path): """Test that hgnc etl methods work correctly.""" - if is_test_env: - test_get_seqrepo.return_value = None - h = HGNC(dynamodb.db) + test_get_seqrepo.return_value = None + h = HGNC(db_fixture.db) - h.src_data_dir = etl_data_path / 'hgnc' - h._download_data() - h._extract_data() - shutil.rmtree(h.src_data_dir) + h.src_data_dir = etl_data_path / 'hgnc' + h._download_data() + h._extract_data() + shutil.rmtree(h.src_data_dir) - h._data_src = etl_data_path / 'hgnc_20210810.json' - h._version = '20210810' - h._transform_data() - h._add_meta() - processed_ids += h._processed_ids + h._data_src = etl_data_path / 'hgnc_20210810.json' + h._version = '20210810' + h._add_meta() + h._transform_data() + db_fixture.db.complete_write_transaction() + processed_ids += h._processed_ids +@pytest.mark.skipif(not IS_TEST_ENV, reason="not in test environment") @patch.object(NCBI, 'get_seqrepo') -def test_ncbi_etl(test_get_seqrepo, processed_ids, dynamodb, etl_data_path, - is_test_env): +def test_ncbi_etl(test_get_seqrepo, processed_ids, db_fixture, etl_data_path): """Test that ncbi etl methods work correctly.""" - if is_test_env: - test_get_seqrepo.return_value = None - n = NCBI(dynamodb.db) - - n.src_data_dir = etl_data_path / 'ncbi' - n._extract_data() - shutil.rmtree(n.src_data_dir) - - n._sequence_location.get_aliases = _get_aliases - n._info_src = etl_data_path / 'ncbi_info_20210813.tsv' - n._history_src = etl_data_path / 'ncbi_history_20210813.tsv' - n._gff_src = etl_data_path / 'ncbi_GRCh38.p14.gff' - n._version = n._info_src.stem.split('_')[-1] - n._transform_data() - n._add_meta() - processed_ids += n._processed_ids - - -def test_merged_conecpts(processed_ids, dynamodb, is_test_env): + test_get_seqrepo.return_value = None + n = NCBI(db_fixture.db) + + n.src_data_dir = etl_data_path / 'ncbi' + n._extract_data() + shutil.rmtree(n.src_data_dir) + + n._sequence_location.get_aliases = _get_aliases + n._info_src = etl_data_path / 'ncbi_info_20210813.tsv' + n._history_src = etl_data_path / 'ncbi_history_20210813.tsv' + n._gff_src = etl_data_path / 'ncbi_GRCh38.p14.gff' + n._version = n._info_src.stem.split('_')[-1] + n._add_meta() + n._transform_data() + db_fixture.db.complete_write_transaction() + processed_ids += n._processed_ids + + +@pytest.mark.skipif( + not IS_TEST_ENV, + reason="not in test environment" +) +def test_merged_concepts(processed_ids, db_fixture): """Create merged concepts and load to db.""" - if is_test_env: - dynamodb.merge.create_merged_concepts(processed_ids) + db_fixture.merge.create_merged_concepts(processed_ids) -def test_item_type(dynamodb): +@pytest.mark.skipif(not IS_DDB_TEST, reason="only applies to DynamoDB in test env") +def test_item_type(db_fixture): """Check that items are tagged with item_type attribute.""" filter_exp = Key('label_and_type').eq('ncbigene:8193##identity') item = \ - dynamodb.db.genes.query(KeyConditionExpression=filter_exp)['Items'][0] + db_fixture.db.genes.query(KeyConditionExpression=filter_exp)['Items'][0] assert 'item_type' in item assert item['item_type'] == 'identity' filter_exp = Key('label_and_type').eq('prkrap1##symbol') item = \ - dynamodb.db.genes.query(KeyConditionExpression=filter_exp)['Items'][0] + db_fixture.db.genes.query(KeyConditionExpression=filter_exp)['Items'][0] assert 'item_type' in item assert item['item_type'] == 'symbol' filter_exp = Key('label_and_type').eq('loc157663##prev_symbol') item = \ - dynamodb.db.genes.query(KeyConditionExpression=filter_exp)['Items'][0] + db_fixture.db.genes.query(KeyConditionExpression=filter_exp)['Items'][0] assert 'item_type' in item assert item['item_type'] == 'prev_symbol' filter_exp = Key('label_and_type').eq('flj23569##alias') item = \ - dynamodb.db.genes.query(KeyConditionExpression=filter_exp)['Items'][0] + db_fixture.db.genes.query(KeyConditionExpression=filter_exp)['Items'][0] assert 'item_type' in item assert item['item_type'] == 'alias' filter_exp = Key('label_and_type').eq('omim:606689##associated_with') item = \ - dynamodb.db.genes.query(KeyConditionExpression=filter_exp)['Items'][0] + db_fixture.db.genes.query(KeyConditionExpression=filter_exp)['Items'][0] assert 'item_type' in item assert item['item_type'] == 'associated_with' filter_exp = Key('label_and_type').eq('ensembl:ensg00000268895##xref') item = \ - dynamodb.db.genes.query(KeyConditionExpression=filter_exp)['Items'][0] + db_fixture.db.genes.query(KeyConditionExpression=filter_exp)['Items'][0] assert 'item_type' in item assert item['item_type'] == 'xref' diff --git a/tests/unit/test_emit_warnings.py b/tests/unit/test_emit_warnings.py index 88a4c39c..cf420884 100644 --- a/tests/unit/test_emit_warnings.py +++ b/tests/unit/test_emit_warnings.py @@ -1,4 +1,5 @@ """Test the emit_warnings function.""" +from gene.database import create_db from gene.query import QueryHandler @@ -10,7 +11,8 @@ def test_emit_warnings(): "Query contains non-breaking space characters" } ] - query_handler = QueryHandler() + db = create_db() + query_handler = QueryHandler(db) # Test emit no warnings actual_warnings = query_handler.emit_warnings('spry3') diff --git a/tests/unit/test_ensembl_source.py b/tests/unit/test_ensembl_source.py index 77fe0d24..6597ce34 100644 --- a/tests/unit/test_ensembl_source.py +++ b/tests/unit/test_ensembl_source.py @@ -6,11 +6,11 @@ @pytest.fixture(scope="module") -def ensembl(): +def ensembl(database): """Build ensembl test fixture.""" class QueryGetter: def __init__(self): - self.query_handler = QueryHandler() + self.query_handler = QueryHandler(database) def search(self, query_str, incl="ensembl"): resp = self.query_handler.search(query_str, keyed=True, incl=incl) diff --git a/tests/unit/test_hgnc_source.py b/tests/unit/test_hgnc_source.py index bc9f8f96..f9631c59 100644 --- a/tests/unit/test_hgnc_source.py +++ b/tests/unit/test_hgnc_source.py @@ -7,11 +7,11 @@ @pytest.fixture(scope="module") -def hgnc(): +def hgnc(database): """Build hgnc test fixture.""" class QueryGetter: def __init__(self): - self.query_handler = QueryHandler() + self.query_handler = QueryHandler(database) def search(self, query_str, incl="hgnc"): resp = self.query_handler.search(query_str, keyed=True, incl=incl) diff --git a/tests/unit/test_ncbi_source.py b/tests/unit/test_ncbi_source.py index ee49225c..b4c02756 100644 --- a/tests/unit/test_ncbi_source.py +++ b/tests/unit/test_ncbi_source.py @@ -8,11 +8,11 @@ @pytest.fixture(scope="module") -def ncbi(): +def ncbi(database): """Build ncbi test fixture.""" class QueryGetter: def __init__(self): - self.query_handler = QueryHandler() + self.query_handler = QueryHandler(database) def search(self, query_str, incl="ncbi"): resp = self.query_handler.search(query_str, keyed=True, incl=incl) diff --git a/tests/unit/test_query.py b/tests/unit/test_query.py index 061ffb09..53080cd6 100644 --- a/tests/unit/test_query.py +++ b/tests/unit/test_query.py @@ -7,12 +7,12 @@ @pytest.fixture(scope='module') -def query_handler(): +def query_handler(database): """Build query_handler test fixture.""" class QueryGetter: def __init__(self): - self.query_handler = QueryHandler() + self.query_handler = QueryHandler(database) def search(self, query_str, keyed=False, incl='', excl=''): return self.query_handler.search(query_str=query_str, keyed=keyed,