From dcfc966709433b84318a509883b78fa5e67a2a36 Mon Sep 17 00:00:00 2001 From: Raghavendra M Dani Date: Tue, 25 Jun 2024 12:32:36 -0700 Subject: [PATCH] Upgrade ray and commit rcf before catalog commit (#324) * Upgrade ray and commit rcf before catalog commit * update ci * Support 3.11 * 3.11 not supported * fix test failure * Added a e2e backward compatibility test * Address comments --- .github/workflows/ci.yml | 42 +- .github/workflows/publish-to-pypi.yml | 78 +-- deltacat/aws/s3u.py | 32 +- .../compute/compactor/compaction_session.py | 6 +- .../compute/compactor/repartition_session.py | 1 + .../compactor/utils/round_completion_file.py | 48 +- .../compactor_v2/compaction_session.py | 23 +- ...ssion.py => evaluate_compaction_result.py} | 3 +- .../io/aws/redshift/redshift_datasource.py | 578 ------------------ deltacat/io/dataset.py | 22 +- deltacat/tests/aws/test_s3u.py | 2 + .../utils/test_round_completion_file.py | 209 +++++++ .../data/backfill_source_date_pk.csv | 5 + .../data/incremental_source_date_pk.csv | 3 + .../compactor_v2/test_compaction_session.py | 237 +++++-- .../compute/test_compact_partition_rebase.py | 2 +- deltacat/tests/compute/test_util_common.py | 23 +- deltacat/tests/test_utils/pyarrow.py | 5 +- .../tests/utils/ray_utils/test_dataset.py | 66 ++ deltacat/utils/numpy.py | 6 +- deltacat/utils/pandas.py | 6 +- deltacat/utils/pyarrow.py | 6 +- deltacat/utils/ray_utils/dataset.py | 14 +- requirements.txt | 2 +- setup.py | 6 +- 25 files changed, 673 insertions(+), 752 deletions(-) rename deltacat/compute/compactor_v2/model/{compaction_session.py => evaluate_compaction_result.py} (83%) delete mode 100644 deltacat/io/aws/redshift/redshift_datasource.py create mode 100644 deltacat/tests/compute/compactor/utils/test_round_completion_file.py create mode 100644 deltacat/tests/compute/compactor_v2/data/backfill_source_date_pk.csv create mode 100644 deltacat/tests/compute/compactor_v2/data/incremental_source_date_pk.csv create mode 100644 deltacat/tests/utils/ray_utils/test_dataset.py diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 8f4d6ddd..4b197bf8 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -13,10 +13,10 @@ jobs: steps: - name: checkout uses: actions/checkout@v3 - - name: Set up Python 3.8 - uses: actions/setup-python@v1 + - name: Set up Python 3.9 + uses: actions/setup-python@v4 with: - python-version: 3.8 + python-version: 3.9 - name: Linting run: | python -m pip install --upgrade pip @@ -27,14 +27,14 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - python-version: ["3.8", "3.9", "3.10"] + python-version: ["3.9", "3.10"] timeout-minutes: 20 steps: - name: "checkout repository" - uses: actions/checkout@v3 + uses: actions/checkout@v4 with: fetch-depth: 0 - - name: Set up Python ${{ matrix.python-version }} (minimum supported python version for deltaCAT is 3.8) + - name: Set up Python ${{ matrix.python-version }} (minimum supported python version for deltaCAT is 3.9) uses: actions/setup-python@v4 with: python-version: ${{ matrix.python-version }} @@ -55,24 +55,24 @@ jobs: - name: Download previous benchmark data uses: actions/cache@v1 with: - path: ./cache - key: ${{ runner.os }}-benchmark + path: ./cache + key: ${{ runner.os }}-benchmark - name: Store benchmark results uses: benchmark-action/github-action-benchmark@v1 with: - tool: 'pytest' - output-file-path: output.json - auto-push: false - github-token: ${{ secrets.GITHUB_TOKEN }} + tool: "pytest" + output-file-path: output.json + auto-push: false + github-token: ${{ secrets.GITHUB_TOKEN }} - # Where the previous data file is stored - external-data-json-path: ./cache/benchmark-data.json + # Where the previous data file is stored + external-data-json-path: ./cache/benchmark-data.json - # Enable Job Summary for PRs - summary-always: true + # Enable Job Summary for PRs + summary-always: true - # Enable alert commit comment - # - # By default, this action marks the result as performance regression - # when it is worse than the previous exceeding 200% threshold. - comment-on-alert: true + # Enable alert commit comment + # + # By default, this action marks the result as performance regression + # when it is worse than the previous exceeding 200% threshold. + comment-on-alert: true diff --git a/.github/workflows/publish-to-pypi.yml b/.github/workflows/publish-to-pypi.yml index d09ee730..c0c184a3 100644 --- a/.github/workflows/publish-to-pypi.yml +++ b/.github/workflows/publish-to-pypi.yml @@ -1,42 +1,42 @@ name: Publish Python distributions to PyPI on: - release: - types: [published] # triggered whenever a new GitHub release is published + release: + types: [published] # triggered whenever a new GitHub release is published jobs: - build-n-publish: - name: Build and publish Python distributions to PyPI - runs-on: ubuntu-latest - steps: - - name: Checkout - uses: actions/checkout@main - with: - fetch-depth: 0 - - name: Set up Python 3.8 (minimum supported python version for deltaCAT) - uses: actions/setup-python@v3 - with: - python-version: "3.8" - - name: Install pypa/build - run: >- - python -m - pip install - build - --user - - name: Install dependencies - run: | - python -m pip install --upgrade pip - pip install pytest - if [ -f benchmark-requirements.txt ]; then pip install -r benchmark-requirements.txt; fi - - name: Run unit tests - run: >- - python -m pytest - - name: Echo release tag - run: echo ${{ github.ref_name }} - - name: Build a binary wheel and a source tarball - run: >- - python setup.py sdist bdist_wheel - - name: Publish distribution to PyPI - if: startsWith(github.ref, 'refs/tags') - uses: pypa/gh-action-pypi-publish@release/v1 - with: - password: ${{ secrets.PYPI_API_TOKEN }} - verbose: true + build-n-publish: + name: Build and publish Python distributions to PyPI + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@main + with: + fetch-depth: 0 + - name: Set up Python 3.9 (minimum supported python version for deltaCAT) + uses: actions/setup-python@v4 + with: + python-version: "3.9" + - name: Install pypa/build + run: >- + python -m + pip install + build + --user + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install pytest + if [ -f benchmark-requirements.txt ]; then pip install -r benchmark-requirements.txt; fi + - name: Run unit tests + run: >- + python -m pytest + - name: Echo release tag + run: echo ${{ github.ref_name }} + - name: Build a binary wheel and a source tarball + run: >- + python setup.py sdist bdist_wheel + - name: Publish distribution to PyPI + if: startsWith(github.ref, 'refs/tags') + uses: pypa/gh-action-pypi-publish@release/v1 + with: + password: ${{ secrets.PYPI_API_TOKEN }} + verbose: true diff --git a/deltacat/aws/s3u.py b/deltacat/aws/s3u.py index e08994f4..1d1bce0a 100644 --- a/deltacat/aws/s3u.py +++ b/deltacat/aws/s3u.py @@ -21,7 +21,7 @@ from botocore.client import BaseClient from botocore.exceptions import ClientError from ray.data.block import Block, BlockAccessor, BlockMetadata -from ray.data.datasource import BlockWritePathProvider +from ray.data.datasource import FilenameProvider from ray.types import ObjectRef from tenacity import ( Retrying, @@ -70,9 +70,6 @@ logger = logs.configure_deltacat_logger(logging.getLogger(__name__)) -# TODO(raghumdani): refactor redshift datasource to reuse the -# same module for writing output files. - class CapturedBlockWritePaths: def __init__(self): @@ -100,12 +97,15 @@ def block_refs(self) -> List[ObjectRef[Block]]: return self._block_refs -class UuidBlockWritePathProvider(BlockWritePathProvider): +class UuidBlockWritePathProvider(FilenameProvider): """Block write path provider implementation that writes each dataset block out to a file of the form: {base_path}/{uuid} """ - def __init__(self, capture_object: CapturedBlockWritePaths): + def __init__( + self, capture_object: CapturedBlockWritePaths, base_path: Optional[str] = None + ): + self.base_path = base_path self.write_paths: List[str] = [] self.block_refs: List[ObjectRef[Block]] = [] self.capture_object = capture_object @@ -117,6 +117,19 @@ def __del__(self): self.block_refs, ) + def get_filename_for_block( + self, block: Any, task_index: int, block_index: int + ) -> str: + if self.base_path is None: + raise ValueError( + "Base path must be provided to UuidBlockWritePathProvider", + ) + return self._get_write_path_for_block( + base_path=self.base_path, + block=block, + block_index=block_index, + ) + def _get_write_path_for_block( self, base_path: str, @@ -143,13 +156,6 @@ def __call__( block_index: Optional[int] = None, file_format: Optional[str] = None, ) -> str: - """ - TODO: BlockWritePathProvider is deprecated as of Ray version 2.20.0. Please use FilenameProvider. - See: https://docs.ray.io/en/master/data/api/doc/ray.data.datasource.FilenameProvider.html - Also See: https://github.com/ray-project/deltacat/issues/299 - - Hence, this class only works with Ray version 2.20.0 or lower when used in Ray Dataset. - """ return self._get_write_path_for_block( base_path, filesystem=filesystem, diff --git a/deltacat/compute/compactor/compaction_session.py b/deltacat/compute/compactor/compaction_session.py index 08060969..c832ec27 100644 --- a/deltacat/compute/compactor/compaction_session.py +++ b/deltacat/compute/compactor/compaction_session.py @@ -193,6 +193,7 @@ def compact_partition( round_completion_file_s3_url = rcf.write_round_completion_file( compaction_artifact_s3_bucket, new_rcf_partition_locator, + partition.locator, new_rci, **s3_client_kwargs, ) @@ -312,7 +313,10 @@ def _execute_compaction_round( round_completion_info = None if not rebase_source_partition_locator: round_completion_info = rcf.read_round_completion_file( - compaction_artifact_s3_bucket, source_partition_locator, **s3_client_kwargs + compaction_artifact_s3_bucket, + source_partition_locator, + destination_partition_locator, + **s3_client_kwargs, ) if not round_completion_info: logger.info( diff --git a/deltacat/compute/compactor/repartition_session.py b/deltacat/compute/compactor/repartition_session.py index dac3ff61..3f5a5be0 100644 --- a/deltacat/compute/compactor/repartition_session.py +++ b/deltacat/compute/compactor/repartition_session.py @@ -177,6 +177,7 @@ def repartition( s3_client_kwargs = {} return rcf.write_round_completion_file( + None, None, None, repartition_completion_info, diff --git a/deltacat/compute/compactor/utils/round_completion_file.py b/deltacat/compute/compactor/utils/round_completion_file.py index 184d573f..a4b7a094 100644 --- a/deltacat/compute/compactor/utils/round_completion_file.py +++ b/deltacat/compute/compactor/utils/round_completion_file.py @@ -12,10 +12,17 @@ def get_round_completion_file_s3_url( - bucket: str, source_partition_locator: PartitionLocator + bucket: str, + source_partition_locator: PartitionLocator, + destination_partition_locator: Optional[PartitionLocator] = None, ) -> str: base_url = source_partition_locator.path(f"s3://{bucket}") + if destination_partition_locator: + base_url = destination_partition_locator.path( + f"s3://{bucket}/{source_partition_locator.hexdigest()}" + ) + return f"{base_url}.json" @@ -23,20 +30,41 @@ def get_round_completion_file_s3_url( def read_round_completion_file( bucket: str, source_partition_locator: PartitionLocator, + destination_partition_locator: Optional[PartitionLocator] = None, **s3_client_kwargs: Optional[Dict[str, Any]], ) -> RoundCompletionInfo: - round_completion_file_url = get_round_completion_file_s3_url( + all_uris = [] + if destination_partition_locator: + round_completion_file_url_with_destination = get_round_completion_file_s3_url( + bucket, + source_partition_locator, + destination_partition_locator, + ) + all_uris.append(round_completion_file_url_with_destination) + + # Note: we read from RCF at two different URI for backward + # compatibility reasons. + round_completion_file_url_prev = get_round_completion_file_s3_url( bucket, source_partition_locator, ) - logger.info(f"reading round completion file from: {round_completion_file_url}") + + all_uris.append(round_completion_file_url_prev) + round_completion_info = None - result = s3_utils.download(round_completion_file_url, False, **s3_client_kwargs) - if result: - json_str = result["Body"].read().decode("utf-8") - round_completion_info = RoundCompletionInfo(json.loads(json_str)) - logger.info(f"read round completion info: {round_completion_info}") + + for rcf_uri in all_uris: + logger.info(f"Reading round completion file from: {rcf_uri}") + result = s3_utils.download(rcf_uri, False, **s3_client_kwargs) + if result: + json_str = result["Body"].read().decode("utf-8") + round_completion_info = RoundCompletionInfo(json.loads(json_str)) + logger.info(f"Read round completion info: {round_completion_info}") + break + else: + logger.warn(f"Round completion file not present at {rcf_uri}") + return round_completion_info @@ -44,8 +72,9 @@ def read_round_completion_file( def write_round_completion_file( bucket: Optional[str], source_partition_locator: Optional[PartitionLocator], + destination_partition_locator: Optional[PartitionLocator], round_completion_info: RoundCompletionInfo, - completion_file_s3_url: str = None, + completion_file_s3_url: Optional[str] = None, **s3_client_kwargs: Optional[Dict[str, Any]], ) -> str: if bucket is None and completion_file_s3_url is None: @@ -56,6 +85,7 @@ def write_round_completion_file( completion_file_s3_url = get_round_completion_file_s3_url( bucket, source_partition_locator, + destination_partition_locator, ) logger.info(f"writing round completion file to: {completion_file_s3_url}") s3_utils.upload( diff --git a/deltacat/compute/compactor_v2/compaction_session.py b/deltacat/compute/compactor_v2/compaction_session.py index 0875bfa2..99d109d8 100644 --- a/deltacat/compute/compactor_v2/compaction_session.py +++ b/deltacat/compute/compactor_v2/compaction_session.py @@ -24,7 +24,7 @@ ) from deltacat.compute.compactor_v2.model.merge_result import MergeResult from deltacat.compute.compactor_v2.model.hash_bucket_result import HashBucketResult -from deltacat.compute.compactor_v2.model.compaction_session import ( +from deltacat.compute.compactor_v2.model.evaluate_compaction_result import ( ExecutionCompactionResult, ) from deltacat.compute.compactor.model.materialize_result import MaterializeResult @@ -110,7 +110,6 @@ def compact_partition(params: CompactPartitionParams, **kwargs) -> Optional[str] f"Partition-{params.source_partition_locator} -> " f"{compaction_session_type} Compaction session data processing completed" ) - round_completion_file_s3_url: Optional[str] = None if execute_compaction_result.new_compacted_partition: previous_partition: Optional[Partition] = None if execute_compaction_result.is_inplace_compacted: @@ -132,19 +131,13 @@ def compact_partition(params: CompactPartitionParams, **kwargs) -> Optional[str] **params.deltacat_storage_kwargs, ) logger.info(f"Committed compacted partition: {committed_partition}") - round_completion_file_s3_url = rcf.write_round_completion_file( - params.compaction_artifact_s3_bucket, - execute_compaction_result.new_round_completion_file_partition_locator, - execute_compaction_result.new_round_completion_info, - **params.s3_client_kwargs, - ) else: logger.warning("No new partition was committed during compaction.") logger.info( f"Completed compaction session for: {params.source_partition_locator}" ) - return round_completion_file_s3_url + return execute_compaction_result.round_completion_file_s3_url def _execute_compaction( @@ -189,6 +182,7 @@ def _execute_compaction( round_completion_info = rcf.read_round_completion_file( params.compaction_artifact_s3_bucket, params.source_partition_locator, + params.destination_partition_locator, **params.s3_client_kwargs, ) if not round_completion_info: @@ -685,9 +679,18 @@ def merge_input_provider(index, item): f"and rcf source partition_id of {rcf_source_partition_locator.partition_id}." ) rcf_source_partition_locator = compacted_partition.locator + + round_completion_file_s3_url = rcf.write_round_completion_file( + params.compaction_artifact_s3_bucket, + rcf_source_partition_locator, + compacted_partition.locator, + new_round_completion_info, + **params.s3_client_kwargs, + ) + return ExecutionCompactionResult( compacted_partition, new_round_completion_info, - rcf_source_partition_locator, + round_completion_file_s3_url, is_inplace_compacted, ) diff --git a/deltacat/compute/compactor_v2/model/compaction_session.py b/deltacat/compute/compactor_v2/model/evaluate_compaction_result.py similarity index 83% rename from deltacat/compute/compactor_v2/model/compaction_session.py rename to deltacat/compute/compactor_v2/model/evaluate_compaction_result.py index 0607bb66..c95a2afa 100644 --- a/deltacat/compute/compactor_v2/model/compaction_session.py +++ b/deltacat/compute/compactor_v2/model/evaluate_compaction_result.py @@ -2,7 +2,6 @@ from deltacat.storage import ( Partition, - PartitionLocator, ) from deltacat.compute.compactor import ( RoundCompletionInfo, @@ -14,7 +13,7 @@ class ExecutionCompactionResult: new_compacted_partition: Optional[Partition] new_round_completion_info: Optional[RoundCompletionInfo] - new_round_completion_file_partition_locator: Optional[PartitionLocator] + round_completion_file_s3_url: Optional[str] is_inplace_compacted: bool def __iter__(self): diff --git a/deltacat/io/aws/redshift/redshift_datasource.py b/deltacat/io/aws/redshift/redshift_datasource.py deleted file mode 100644 index 903c4df3..00000000 --- a/deltacat/io/aws/redshift/redshift_datasource.py +++ /dev/null @@ -1,578 +0,0 @@ -import json -import logging -from collections import OrderedDict, defaultdict -from enum import Enum -from errno import ENOENT -from os import strerror -from typing import Any, Callable, Dict, List, Optional, Tuple, Union - -import pyarrow as pa -import ray -import s3fs -from pyarrow import parquet as pq -from pyarrow.fs import FileSystem, FileType, S3FileSystem -from ray.data.block import Block, BlockMetadata -from ray.data.datasource import ( - BlockWritePathProvider, - CSVDatasource, - DefaultBlockWritePathProvider, - DefaultFileMetadataProvider, - ParquetBaseDatasource, - ParquetMetadataProvider, - PathPartitionParser, -) -from ray.data.datasource.datasource import ArrowRow, Datasource, ReadTask, WriteResult -from ray.data.datasource.file_based_datasource import _resolve_paths_and_filesystem -from ray.data.datasource.file_meta_provider import FastFileMetadataProvider -from ray.types import ObjectRef - -from deltacat import ContentEncoding, ContentType, logs -from deltacat.aws.redshift.model.manifest import ( - Manifest, - ManifestEntry, - ManifestEntryList, - ManifestMeta, -) -from deltacat.aws.s3u import ( - S3Url, - filter_objects_by_prefix, - objects_to_paths, - parse_s3_url, -) -from deltacat.types.media import DELIMITED_TEXT_CONTENT_TYPES -from deltacat.utils.common import ReadKwargsProvider - -logger = logs.configure_deltacat_logger(logging.getLogger(__name__)) - - -class CapturingBlockWritePathProvider(BlockWritePathProvider): - """Delegating block write path provider that saves an ordered dictionary of - input keyword arguments for every block write path returned.""" - - def __init__(self, block_write_path_provider: BlockWritePathProvider): - self.block_write_path_provider = block_write_path_provider - self.write_path_kwargs: Dict[str, Dict[str, Any]] = OrderedDict() - - def _get_write_path_for_block(self, base_path: str, *args, **kwargs) -> str: - write_path = self.block_write_path_provider( - base_path, - *args, - **kwargs, - ) - kwargs["base_path"] = base_path - self.write_path_kwargs[write_path] = kwargs - return write_path - - -class CachedFileMetadataProvider( - FastFileMetadataProvider, - ParquetMetadataProvider, -): - def __init__(self, meta_cache: Dict[str, BlockMetadata]): - self._meta_cache = meta_cache - - def get_meta_cache(self) -> Dict[str, BlockMetadata]: - return self._meta_cache - - def _get_block_metadata( - self, - paths: List[str], - schema: Optional[Union[type, pa.Schema]], - **kwargs, - ) -> BlockMetadata: - agg_block_metadata = BlockMetadata( - num_rows=0, - size_bytes=0, - schema=schema, - input_files=[], - exec_stats=None, - ) - for path in paths: - block_metadata = self._meta_cache.get(path) - if block_metadata is None: - raise ValueError(f"Block metadata not found for path: {path}") - if block_metadata.num_rows is None: - agg_block_metadata.num_rows = None - elif agg_block_metadata.num_rows is not None: - agg_block_metadata.num_rows += block_metadata.num_rows - if block_metadata.size_bytes is None: - agg_block_metadata.size_bytes = None - elif agg_block_metadata.size_bytes is not None: - agg_block_metadata.size_bytes += block_metadata.size_bytes - agg_block_metadata.input_files.append(path) - return agg_block_metadata - - -class HivePartitionParser(PathPartitionParser): - def __init__( - self, - base_dir: Optional[str] = None, - filter_fn: Optional[Callable[[Dict[str, str]], bool]] = None, - ): - super(HivePartitionParser, self).__init__( - base_dir=base_dir, - filter_fn=filter_fn, - ) - - -class RedshiftUnloadTextArgs: - def __init__( - self, - csv: bool = False, - header: bool = False, - delimiter: Optional[str] = None, - bzip2: bool = False, - gzip: bool = False, - zstd: bool = False, - add_quotes: Optional[bool] = None, - null_as: str = "", - escape: bool = False, - fixed_width: bool = False, - ): - self.header = header - self.delimiter = delimiter if delimiter else "," if csv else "|" - self.bzip2 = bzip2 - self.gzip = gzip - self.zstd = zstd - self.add_quotes = add_quotes if add_quotes else True if csv else False - self.null_as = null_as - self.escape = escape - self.fixed_width = fixed_width - - def _get_arrow_compression_codec_name(self) -> str: - arrow_compression_codec_name = None - codecs_enabled = { - "bz2": self.bzip2, - "gzip": self.gzip, - "zstd": self.zstd, - } - for encoding, flag in codecs_enabled.items(): - if arrow_compression_codec_name and flag: - raise ValueError( - f"Multiple Redshift UNLOAD compression types specified " - f"({codecs_enabled}). Please ensure that only one " - f"compression type is set and try again." - ) - if flag: - arrow_compression_codec_name = encoding - return arrow_compression_codec_name - - def to_arrow_reader_kwargs( - self, include_columns: Optional[List[str]], schema: Optional[pa.Schema] - ) -> Dict[str, Any]: - from pyarrow import csv - - if self.fixed_width: - raise NotImplementedError( - "Redshift text files unloaded with FIXEDWIDTH are not " - "currently supported." - ) - open_stream_args = {} - arrow_compression_codec_name = self._get_arrow_compression_codec_name() - if arrow_compression_codec_name: - open_stream_args["compression"] = arrow_compression_codec_name - column_names = None - if schema: - column_names = schema.names - autogen_column_names = False if self.header or column_names else True - read_options = csv.ReadOptions( - use_threads=False, - column_names=column_names, - autogenerate_column_names=autogen_column_names, - ) - parse_options = csv.ParseOptions( - delimiter=self.delimiter, - quote_char='"' if self.add_quotes else False, - escape_char="\\" if self.escape else False, - double_quote=False if self.escape else True, - ) - convert_options = csv.ConvertOptions( - column_types=schema, - null_values=[self.null_as] if self.null_as is not None else [], - true_values=["t"], - false_values=["f"], - strings_can_be_null=True if self.null_as is not None else False, - quoted_strings_can_be_null=True if self.null_as else False, - include_columns=include_columns, - ) - return { - "open_stream_args": open_stream_args, - "read_options": read_options, - "parse_options": parse_options, - "convert_options": convert_options, - } - - -class S3PathType(str, Enum): - MANIFEST = "manifest" - PREFIX = "prefix" - FILES_AND_FOLDERS = "files_and_folders" - - -class RedshiftWriteResult: - def __init__(self): - self.metadata = None - self.path = None - self.dataset_uuid = None - self.block_write_path_provider = None - self.content_type = None - self.content_encoding = None - self.filesystem = None - - -def _normalize_s3_paths_for_filesystem( - paths: Union[str, List[str]], - filesystem: Union[S3FileSystem, s3fs.S3FileSystem], -) -> Tuple[List[str], List[S3Url]]: - if isinstance(paths, str): - paths = [paths] - urls = [parse_s3_url(url) for url in paths] - if isinstance(filesystem, FileSystem): - # pyarrow.fs.FileSystem paths should not start with "s3://" - # pyarrow.fs.FileSystem paths should not end with "/" - paths = [f"{u.bucket}/{u.key}".rstrip("/") for u in urls] - else: - # s3fs.S3FileSystem can start with "s3://" (presumably others can too) - paths = [u.url.rstrip("/") for u in urls] - return paths, urls - - -def _read_manifest_entry_paths( - entries: ManifestEntryList, - manifest_content_type: Optional[str], - content_type_provider: Callable[[str], ContentType], -) -> Tuple[Dict[ContentType, List[str]], CachedFileMetadataProvider]: - # support manifests with heterogenous content types - content_type_to_paths = defaultdict(list) - meta_cache: Dict[str, BlockMetadata] = {} - for e in entries: - url = e.url if e.url else e.uri - # get manifest entry content type or fall back to manifest content type - content_type = e.meta.content_type or manifest_content_type - if content_type: - content_type_to_paths[ContentType(content_type)] = url - else: - # fall back to content type inference by file extension - content_type_to_paths[content_type_provider(url)].append(url) - meta_cache[url] = BlockMetadata( - num_rows=e.meta.record_count, - size_bytes=e.meta.content_length, - schema=None, - input_files=[], - exec_stats=None, - ) - return content_type_to_paths, CachedFileMetadataProvider(meta_cache) - - -def _expand_manifest_paths( - paths: List[str], - filesystem: Optional[Union[S3FileSystem, s3fs.S3FileSystem]], - content_type_provider: Callable[[str], ContentType], -) -> Tuple[Dict[ContentType, List[str]], CachedFileMetadataProvider]: - assert len(paths) == 1, f"Expected 1 manifest path, found {len(paths)}." - path = paths[0] - with filesystem.open_input_file(path) as f: - manifest = Manifest(json.loads(f.read())) - content_type_to_paths = {} - meta_provider = CachedFileMetadataProvider({}) - if not manifest.entries: - logger.warning(f"No entries to read in Redshift Manifest: {path}") - else: - content_type_to_paths, meta_provider = _read_manifest_entry_paths( - manifest.entries, - manifest.meta.content_type if manifest.meta else None, - content_type_provider, - ) - # TODO(pdames): infer the schema from a verbose manifest if available? - # if not schema and ContentType.PARQUET not in content_type_to_paths: - # schema = _infer_schema_from_manifest(manifest) - return content_type_to_paths, meta_provider - - -def _infer_content_types_from_paths( - paths: List[str], - content_type_provider: Callable[[str], ContentType], -) -> Dict[ContentType, List[str]]: - content_type_to_paths = defaultdict(list) - for path in paths: - if not path.endswith("/"): - content_type_to_paths[content_type_provider(path)].append(path) - return content_type_to_paths - - -def _expand_prefix_paths( - urls: List[S3Url], - content_type_provider: Callable[[str], ContentType], - **s3_client_kwargs, -) -> Tuple[Dict[ContentType, List[str]], CachedFileMetadataProvider]: - assert len(urls) == 1, f"Expected 1 S3 prefix, found {len(urls)}." - objects = list( - filter_objects_by_prefix(urls[0].bucket, urls[0].key, **s3_client_kwargs) - ) - paths = list( - objects_to_paths( - urls[0].bucket, - objects, - ) - ) - meta_cache: Dict[str, BlockMetadata] = { - path: BlockMetadata( - num_rows=None, - size_bytes=objects[i]["ContentLength"], - schema=None, - input_files=[], - exec_stats=None, - ) - for i, path in enumerate(paths) - } - content_type_to_paths = _infer_content_types_from_paths( - paths, - content_type_provider, - ) - return content_type_to_paths, CachedFileMetadataProvider(meta_cache) - - -def _expand_paths_by_content_type( - base_paths: Union[str, List[str]], - base_urls: List[S3Url], - content_type_provider: Callable[[str], ContentType], - path_type: S3PathType, - user_fs: Optional[Union[S3FileSystem, s3fs.S3FileSystem]], - resolved_fs: S3FileSystem, - **s3_client_kwargs, -) -> Tuple[Dict[ContentType, List[str]], CachedFileMetadataProvider]: - if path_type == S3PathType.MANIFEST: - content_type_to_paths, meta_provider = _expand_manifest_paths( - base_paths, - resolved_fs, - content_type_provider, - ) - elif path_type == S3PathType.PREFIX: - content_type_to_paths, meta_provider = _expand_prefix_paths( - base_urls, - content_type_provider, - **s3_client_kwargs, - ) - elif path_type == S3PathType.FILES_AND_FOLDERS: - # TODO(pdames): Only allow files and call get_object(file_path)? - base_paths, file_infos = DefaultFileMetadataProvider().expand_paths( - base_paths, resolved_fs - ) - file_sizes = [file_info.size for file_info in file_infos] - meta_provider = CachedFileMetadataProvider( - { - path: BlockMetadata( - num_rows=None, - size_bytes=file_sizes[i], - schema=None, - input_files=[], - exec_stats=None, - ) - for i, path in enumerate(base_paths) - } - ) - content_type_to_paths = _infer_content_types_from_paths( - base_paths, - content_type_provider, - ) - else: - raise NotImplementedError(f"Unsupported S3 path type: {path_type}") - # TODO(pdames): normalize S3 file paths before adding them to either - # content_type_to_paths or meta_provider - # normalize S3 file paths for each content type based on the filesystem - for content_type, paths in content_type_to_paths.items(): - paths, urls = _normalize_s3_paths_for_filesystem( - paths, - user_fs, - ) - content_type_to_paths[content_type] = paths - # normalize block metadata provider S3 file paths based on the filesystem - meta_provider = CachedFileMetadataProvider( - { - _normalize_s3_paths_for_filesystem(path, user_fs)[0][0]: metadata - for path, metadata in meta_provider.get_meta_cache().items() - } - ) - return content_type_to_paths, meta_provider - - -class RedshiftDatasource(Datasource[Union[ArrowRow, Any]]): - def prepare_read( - self, - parallelism: int, - paths: Union[str, List[str]], - content_type_provider: Callable[[str], ContentType], - path_type: S3PathType = S3PathType.MANIFEST, - filesystem: Optional[Union[S3FileSystem, s3fs.S3FileSystem]] = None, - columns: Optional[List[str]] = None, - schema: Optional[pa.Schema] = None, - unload_args: RedshiftUnloadTextArgs = RedshiftUnloadTextArgs(), - partitioning: HivePartitionParser = None, - open_stream_args: Optional[Dict[str, Any]] = None, - read_kwargs_provider: Optional[ReadKwargsProvider] = None, - **s3_client_kwargs, - ) -> List[ReadTask]: - # default to pyarrow.fs.S3FileSystem if no filesystem given - if filesystem is None: - filesystem = S3FileSystem() - # normalize s3 paths to work with the filesystem provided - paths, urls = _normalize_s3_paths_for_filesystem(paths, filesystem) - paths, resolved_fs = _resolve_paths_and_filesystem( - paths, - filesystem, - ) - # find all files in manifests, prefixes, and folders - content_type_to_paths, meta_provider = _expand_paths_by_content_type( - paths, - urls, - content_type_provider, - path_type, - filesystem, - resolved_fs, - **s3_client_kwargs, - ) - num_content_types = len(content_type_to_paths) - if num_content_types > 1 and not schema: - # infer schema from a single parquet file - # TODO (pdames): read verbose manifest schema if available, and infer - # schema from a sample parquet dataset if not - path = content_type_to_paths[ContentType.PARQUET][0] - with resolved_fs.open_input_file(path, **open_stream_args) as f: - schema = pq.read_schema(f) - content_type_to_reader = { - ContentType.PARQUET: ParquetBaseDatasource(), - ContentType.CSV: CSVDatasource(), - } - all_read_tasks = [] - for content_type, paths in content_type_to_paths.items(): - reader = content_type_to_reader.get(content_type) - assert reader, f"No datasource found for: {content_type}" - prepare_read_kwargs = { - "parallelism": parallelism, - "paths": paths, - "filesystem": resolved_fs, - "schema": schema, - "meta_provider": meta_provider, - "partitioning": partitioning, - } - if content_type == ContentType.PARQUET: - if columns: - prepare_read_kwargs["columns"] = columns - elif content_type in DELIMITED_TEXT_CONTENT_TYPES: - prepare_read_kwargs.update( - unload_args.to_arrow_reader_kwargs(columns, schema) - ) - else: - raise NotImplementedError(f"Unsupported content type: {content_type}") - # merge any provided reader kwargs for this content type with those - # inferred from Redshift UNLOAD args - if read_kwargs_provider: - prepare_read_kwargs = read_kwargs_provider( - content_type, - prepare_read_kwargs, - ) - # explicitly specified `open_stream_args` override those inferred - # from Redshift UNLOAD args - if open_stream_args: - prepare_read_kwargs["open_stream_args"] = open_stream_args - read_tasks = reader.prepare_read(**prepare_read_kwargs) - all_read_tasks.extend(read_tasks) - return all_read_tasks - - def do_write( - self, - blocks: List[ObjectRef[Block]], - metadata: List[BlockMetadata], - path: str, - dataset_uuid: str, - filesystem: Optional[FileSystem] = None, - try_create_dir: bool = True, - open_stream_args: Optional[Dict[str, Any]] = None, - block_path_provider: BlockWritePathProvider = DefaultBlockWritePathProvider(), - write_args_fn: Callable[[], Dict[str, Any]] = lambda: {}, - _block_udf: Optional[Callable[[Block], Block]] = None, - **write_args, - ) -> List[ObjectRef[WriteResult]]: - if filesystem is None: - filesystem = S3FileSystem() - paths, _ = _normalize_s3_paths_for_filesystem(path, filesystem) - paths, filesystem = _resolve_paths_and_filesystem(paths, filesystem) - assert len(paths) == 1, f"Expected 1 write path, found {len(paths)}." - path = paths[0] - block_path_provider = CapturingBlockWritePathProvider(block_path_provider) - writer = ParquetBaseDatasource() - write_results = writer.do_write( - blocks, - metadata, - path, - dataset_uuid, - filesystem, - try_create_dir, - open_stream_args, - block_path_provider, - write_args_fn, - _block_udf, - **write_args, - ) - # append a summary of this write operation in the last write result - rwr = RedshiftWriteResult() - rwr.metadata = metadata - rwr.path = path - rwr.dataset_uuid = dataset_uuid - rwr.block_write_path_provider = block_path_provider - rwr.content_type = ContentType.PARQUET.value - rwr.content_encoding = ContentEncoding.IDENTITY.value - rwr.filesystem = filesystem - rwr_obj_ref = ray.put(rwr) - write_results.append(rwr_obj_ref) - return write_results - - def on_write_complete(self, write_results: List[WriteResult], **kwargs) -> None: - # TODO (pdames): time latency of this operation - overall redshift write times - # are 2-3x pure read_parquet_fast() times - # restore the write operation summary from the last write result - result: RedshiftWriteResult = write_results[len(write_results) - 1] - write_path_args = result.block_write_path_provider.write_path_kwargs - blocks_written = len(write_path_args) - expected_blocks_written = len(result.metadata) - # TODO(pdames): Corner cases where mismatch is expected? Emply blocks? - # Blocks filtered/split/merged to more/less write paths? - assert blocks_written == expected_blocks_written, ( - f"Dataset write result validation failed. Found " - f"{blocks_written}/{expected_blocks_written} Dataset blocks " - f"written. Refusing to commit Redshift Manifest." - ) - manifest_entries = ManifestEntryList() - for block_idx, path in enumerate(write_path_args.keys()): - file_info = result.filesystem.get_file_info(path) - if file_info.type == FileType.File: - content_length = file_info.size - else: - raise FileNotFoundError(ENOENT, strerror(ENOENT), path) - num_rows = result.metadata[block_idx].num_rows - source_content_length = result.metadata[block_idx].size_bytes - manifest_entry_meta = ManifestMeta.of( - int(num_rows) if num_rows is not None else None, - int(content_length) if content_length is not None else None, - result.content_type, - result.content_encoding, - int(source_content_length) if source_content_length else None, - ) - parsed_url = parse_s3_url(path) - manifest_entry = ManifestEntry.of( - parsed_url.url, - manifest_entry_meta, - ) - manifest_entries.append(manifest_entry) - manifest = Manifest.of(manifest_entries) - manifest_path = f"{result.path}/manifest" - logger.debug(f"Write succeeded for Dataset ID: {result.dataset_uuid}") - with result.filesystem.open_output_stream( - manifest_path, - # Also See: - # docs.aws.amazon.com/AmazonS3/latest/API/RESTCommonRequestHeaders.html - # Arrow s3fs.cc: tinyurl.com/2axa6m9m - metadata={"Content-Type": ContentType.JSON.value}, - ) as f: - f.write(json.dumps(manifest).encode("utf-8")) - logger.debug(f"Manifest committed to: {manifest_path}") diff --git a/deltacat/io/dataset.py b/deltacat/io/dataset.py index d8385ee0..791559b7 100644 --- a/deltacat/io/dataset.py +++ b/deltacat/io/dataset.py @@ -6,9 +6,6 @@ import pyarrow as pa import s3fs from ray.data import Dataset -from ray.data.datasource import BlockWritePathProvider, DefaultBlockWritePathProvider - -from deltacat.io.aws.redshift.redshift_datasource import RedshiftDatasource T = TypeVar("T") @@ -27,7 +24,6 @@ def write_redshift( filesystem: Optional[Union[pa.fs.FileSystem, s3fs.S3FileSystem]] = None, try_create_dir: bool = True, arrow_open_stream_args: Optional[Dict[str, Any]] = None, - block_path_provider: BlockWritePathProvider = DefaultBlockWritePathProvider(), arrow_parquet_args_fn: Callable[[], Dict[str, Any]] = lambda: {}, **arrow_parquet_args, ) -> None: @@ -59,9 +55,8 @@ def write_redshift( if True. Does nothing if all directories already exist. arrow_open_stream_args: kwargs passed to pyarrow.fs.FileSystem.open_output_stream - block_path_provider: BlockWritePathProvider implementation - to write each dataset block to a custom output path. Uses - DefaultBlockWritePathProvider if None. + filename_provider: FilenameProvider implementation + to write each dataset block to a custom output path. arrow_parquet_args_fn: Callable that returns a dictionary of write arguments to use when writing each block to a file. Overrides any duplicate keys from arrow_parquet_args. This should be used @@ -72,14 +67,7 @@ def write_redshift( pyarrow.parquet.write_table(), which is used to write out each block to a file. """ - self.write_datasource( - RedshiftDatasource(), - path=path, - dataset_uuid=self._uuid, - filesystem=filesystem, - try_create_dir=try_create_dir, - open_stream_args=arrow_open_stream_args, - block_path_provider=block_path_provider, - write_args_fn=arrow_parquet_args_fn, - **arrow_parquet_args, + raise NotImplementedError( + "Writing to Redshift is not yet supported. " + "Please use DeltacatDataset.write_parquet() instead." ) diff --git a/deltacat/tests/aws/test_s3u.py b/deltacat/tests/aws/test_s3u.py index f04374cd..6495c07d 100644 --- a/deltacat/tests/aws/test_s3u.py +++ b/deltacat/tests/aws/test_s3u.py @@ -20,6 +20,7 @@ ConnectTimeoutError, HTTPClientError, ) +from ray.data.datasource import FilenameProvider from deltacat.exceptions import NonRetryableError from moto import mock_s3 from tenacity import RetryError @@ -34,6 +35,7 @@ def test_uuid_block_write_provider_sanity(self): result = provider("base_path") + self.assertTrue(isinstance(provider, FilenameProvider)) self.assertRegex(result, r"^base_path/[\w-]{36}$") diff --git a/deltacat/tests/compute/compactor/utils/test_round_completion_file.py b/deltacat/tests/compute/compactor/utils/test_round_completion_file.py new file mode 100644 index 00000000..44a1d5ce --- /dev/null +++ b/deltacat/tests/compute/compactor/utils/test_round_completion_file.py @@ -0,0 +1,209 @@ +import pytest +import os +from moto import mock_s3 +import boto3 +from boto3.resources.base import ServiceResource +from deltacat.compute.compactor.utils.round_completion_file import ( + read_round_completion_file, + write_round_completion_file, +) +from deltacat.tests.compute.test_util_common import get_test_partition_locator +from deltacat.compute.compactor import RoundCompletionInfo + +RCF_BUCKET_NAME = "rcf-bucket" + + +@pytest.fixture(autouse=True, scope="module") +def mock_aws_credential(): + os.environ["AWS_ACCESS_KEY_ID"] = "testing" + os.environ["AWS_SECRET_ACCESS_ID"] = "testing" + os.environ["AWS_SECURITY_TOKEN"] = "testing" + os.environ["AWS_SESSION_TOKEN"] = "testing" + os.environ["AWS_DEFAULT_REGION"] = "us-east-1" + yield + + +@pytest.fixture(autouse=True, scope="module") +def s3_resource(mock_aws_credential): + with mock_s3(): + yield boto3.resource("s3") + + +@pytest.fixture(autouse=True, scope="function") +def setup_compaction_artifacts_s3_bucket(s3_resource: ServiceResource): + s3_resource.create_bucket( + ACL="authenticated-read", + Bucket=RCF_BUCKET_NAME, + ) + yield + s3_resource.Bucket(RCF_BUCKET_NAME).objects.all().delete() + + +class TestReadWriteRoundCompletionFile: + def test_read_when_rcf_written_without_destination(self): + """ + This test case tests the backward compatibility by successfully + reading the previously written rcf. + """ + + source_locator = get_test_partition_locator("source") + destination_locator = get_test_partition_locator("destination") + + expected_rcf = RoundCompletionInfo.of( + high_watermark=122, + compacted_delta_locator={}, + compacted_pyarrow_write_result={}, + sort_keys_bit_width=12, + ) + + rcf_url = write_round_completion_file( + RCF_BUCKET_NAME, source_locator, None, expected_rcf + ) + + rcf = read_round_completion_file( + RCF_BUCKET_NAME, source_locator, destination_locator + ) + + assert ( + rcf_url == "s3://rcf-bucket/f9829af39770d904dbb811bd8f4e886dd307f507.json" + ) + assert rcf == expected_rcf + + def test_read_when_rcf_written_with_destination(self): + """ + This test case tests the backward compatibility by successfully + reading the previously written rcf. + """ + + source_locator = get_test_partition_locator("source") + destination_locator = get_test_partition_locator("destination") + + expected_rcf = RoundCompletionInfo.of( + high_watermark=122, + compacted_delta_locator={}, + compacted_pyarrow_write_result={}, + sort_keys_bit_width=12, + ) + + rcf_url = write_round_completion_file( + RCF_BUCKET_NAME, source_locator, destination_locator, expected_rcf + ) + + rcf = read_round_completion_file( + RCF_BUCKET_NAME, source_locator, destination_locator + ) + + assert ( + rcf_url + == "s3://rcf-bucket/f9829af39770d904dbb811bd8f4e886dd307f507/e9939deadc091b3289a2eb0ca56b1ba86b9892f4.json" + ) + assert rcf == expected_rcf + + def test_read_without_destination_when_rcf_written_with_destination(self): + """ + This test case tests the backward compatibility by successfully + reading the previously written rcf. + """ + + source_locator = get_test_partition_locator("source") + destination_locator = get_test_partition_locator("destination") + + expected_rcf = RoundCompletionInfo.of( + high_watermark=122, + compacted_delta_locator={}, + compacted_pyarrow_write_result={}, + sort_keys_bit_width=12, + ) + + write_round_completion_file( + RCF_BUCKET_NAME, source_locator, destination_locator, expected_rcf + ) + + rcf = read_round_completion_file(RCF_BUCKET_NAME, source_locator, None) + + assert rcf is None + + def test_read_without_destination_when_rcf_written_without_destination(self): + """ + This test case tests the backward compatibility by successfully + reading the previously written rcf. + """ + + source_locator = get_test_partition_locator("source") + + expected_rcf = RoundCompletionInfo.of( + high_watermark=122, + compacted_delta_locator={}, + compacted_pyarrow_write_result={}, + sort_keys_bit_width=12, + ) + + write_round_completion_file(RCF_BUCKET_NAME, source_locator, None, expected_rcf) + + rcf = read_round_completion_file(RCF_BUCKET_NAME, source_locator, None) + + assert rcf == expected_rcf + + def test_read_when_rcf_written_both_with_and_without_destination(self): + """ + This test case tests the backward compatibility by successfully + reading the previously written rcf. + """ + + source_locator = get_test_partition_locator("source") + destination_locator = get_test_partition_locator("destination") + + expected_rcf = RoundCompletionInfo.of( + high_watermark=122, + compacted_delta_locator={}, + compacted_pyarrow_write_result={}, + sort_keys_bit_width=12, + ) + + expected_rcf_2 = RoundCompletionInfo.of( + high_watermark=1223, + compacted_delta_locator={}, + compacted_pyarrow_write_result={}, + sort_keys_bit_width=1233, + ) + + write_round_completion_file(RCF_BUCKET_NAME, source_locator, None, expected_rcf) + + write_round_completion_file( + RCF_BUCKET_NAME, source_locator, destination_locator, expected_rcf_2 + ) + + rcf = read_round_completion_file( + RCF_BUCKET_NAME, source_locator, destination_locator + ) + + assert rcf == expected_rcf_2 + + def test_write_when_custom_url_is_passed(self): + """ + This test case tests the backward compatibility by successfully + reading the previously written rcf. + """ + + source_locator = get_test_partition_locator("source") + + expected_rcf = RoundCompletionInfo.of( + high_watermark=122, + compacted_delta_locator={}, + compacted_pyarrow_write_result={}, + sort_keys_bit_width=12, + ) + + completion_file_s3_url = f"s3://{RCF_BUCKET_NAME}/test.json" + rcf_url = write_round_completion_file( + RCF_BUCKET_NAME, + source_locator, + None, + expected_rcf, + completion_file_s3_url=completion_file_s3_url, + ) + + rcf = read_round_completion_file(RCF_BUCKET_NAME, source_locator, None) + + assert rcf_url == completion_file_s3_url + assert rcf is None diff --git a/deltacat/tests/compute/compactor_v2/data/backfill_source_date_pk.csv b/deltacat/tests/compute/compactor_v2/data/backfill_source_date_pk.csv new file mode 100644 index 00000000..030823ca --- /dev/null +++ b/deltacat/tests/compute/compactor_v2/data/backfill_source_date_pk.csv @@ -0,0 +1,5 @@ +pk,value +2022-10-21,1 +2022-10-20,2 +2022-11-24,3 +2023-10-23,4 diff --git a/deltacat/tests/compute/compactor_v2/data/incremental_source_date_pk.csv b/deltacat/tests/compute/compactor_v2/data/incremental_source_date_pk.csv new file mode 100644 index 00000000..89670832 --- /dev/null +++ b/deltacat/tests/compute/compactor_v2/data/incremental_source_date_pk.csv @@ -0,0 +1,3 @@ +pk,value +2022-10-21,1 +2022-11-25,5 diff --git a/deltacat/tests/compute/compactor_v2/test_compaction_session.py b/deltacat/tests/compute/compactor_v2/test_compaction_session.py index 988a27d8..62a741a7 100644 --- a/deltacat/tests/compute/compactor_v2/test_compaction_session.py +++ b/deltacat/tests/compute/compactor_v2/test_compaction_session.py @@ -1,8 +1,12 @@ -import unittest -import sqlite3 +from typing import Dict, Any import ray import os -from unittest.mock import patch +import pytest +import boto3 +from deltacat.compute.compactor.model.compaction_session_audit_info import ( + CompactionSessionAuditInfo, +) +from boto3.resources.base import ServiceResource import deltacat.tests.local_deltacat_storage as ds from deltacat.types.media import ContentType from deltacat.compute.compactor_v2.compaction_session import ( @@ -11,80 +15,241 @@ from deltacat.compute.compactor.model.compact_partition_params import ( CompactPartitionParams, ) -from deltacat.utils.common import current_time_ms -from deltacat.tests.test_utils.pyarrow import stage_partition_from_file_paths +from deltacat.tests.test_utils.utils import read_s3_contents +from deltacat.tests.compute.test_util_constant import ( + TEST_S3_RCF_BUCKET_NAME, +) +from deltacat.tests.compute.test_util_common import get_rcf +from deltacat.tests.test_utils.pyarrow import ( + stage_partition_from_file_paths, + commit_delta_to_staged_partition, + commit_delta_to_partition, +) +from moto import mock_s3 + +DATABASE_FILE_PATH_KEY, DATABASE_FILE_PATH_VALUE = ( + "db_file_path", + "deltacat/tests/local_deltacat_storage/db_test.sqlite", +) + + +@pytest.fixture(autouse=True, scope="module") +def setup_ray_cluster(): + ray.init(local_mode=True, ignore_reinit_error=True) + yield + ray.shutdown() + + +@pytest.fixture(autouse=True, scope="module") +def mock_aws_credential(): + os.environ["AWS_ACCESS_KEY_ID"] = "testing" + os.environ["AWS_SECRET_ACCESS_ID"] = "testing" + os.environ["AWS_SECURITY_TOKEN"] = "testing" + os.environ["AWS_SESSION_TOKEN"] = "testing" + os.environ["AWS_DEFAULT_REGION"] = "us-east-1" + yield + + +@pytest.fixture(scope="module") +def s3_resource(mock_aws_credential): + with mock_s3(): + yield boto3.resource("s3") -class TestCompactionSession(unittest.TestCase): +@pytest.fixture(autouse=True, scope="module") +def setup_compaction_artifacts_s3_bucket(s3_resource: ServiceResource): + s3_resource.create_bucket( + ACL="authenticated-read", + Bucket=TEST_S3_RCF_BUCKET_NAME, + ) + yield + + +@pytest.fixture(scope="function") +def local_deltacat_storage_kwargs(request: pytest.FixtureRequest): + kwargs_for_local_deltacat_storage: Dict[str, Any] = { + DATABASE_FILE_PATH_KEY: DATABASE_FILE_PATH_VALUE, + } + yield kwargs_for_local_deltacat_storage + if os.path.exists(DATABASE_FILE_PATH_VALUE): + os.remove(DATABASE_FILE_PATH_VALUE) + + +class TestCompactionSession: """ This class adds specific tests that aren't part of the parametrized test suite. """ - DB_FILE_PATH = f"{current_time_ms()}.db" NAMESPACE = "compact_partition_v2_namespace" + BACKFILL_FILE_PATH = ( + "deltacat/tests/compute/compactor_v2/data/backfill_source_date_pk.csv" + ) + INCREMENTAL_FILE_PATH = ( + "deltacat/tests/compute/compactor_v2/data/incremental_source_date_pk.csv" + ) - @classmethod - def setUpClass(cls): - ray.init(local_mode=True, ignore_reinit_error=True) + def test_compact_partition_when_no_input_deltas_to_compact( + self, local_deltacat_storage_kwargs + ): + # setup + staged_source = stage_partition_from_file_paths( + self.NAMESPACE, ["test"], **local_deltacat_storage_kwargs + ) + source_partition = ds.commit_partition( + staged_source, **local_deltacat_storage_kwargs + ) - con = sqlite3.connect(cls.DB_FILE_PATH) - cur = con.cursor() - cls.kwargs = {ds.SQLITE_CON_ARG: con, ds.SQLITE_CUR_ARG: cur} - cls.deltacat_storage_kwargs = {ds.DB_FILE_PATH_ARG: cls.DB_FILE_PATH} + staged_dest = stage_partition_from_file_paths( + self.NAMESPACE, ["destination"], **local_deltacat_storage_kwargs + ) + dest_partition = ds.commit_partition( + staged_dest, **local_deltacat_storage_kwargs + ) - super().setUpClass() + # action + rcf_url = compact_partition( + CompactPartitionParams.of( + { + "compaction_artifact_s3_bucket": TEST_S3_RCF_BUCKET_NAME, + "compacted_file_content_type": ContentType.PARQUET, + "dd_max_parallelism_ratio": 1.0, + "deltacat_storage": ds, + "deltacat_storage_kwargs": local_deltacat_storage_kwargs, + "destination_partition_locator": dest_partition.locator, + "drop_duplicates": True, + "hash_bucket_count": 2, + "last_stream_position_to_compact": source_partition.stream_position, + "list_deltas_kwargs": { + **local_deltacat_storage_kwargs, + **{"equivalent_table_types": []}, + }, + "primary_keys": ["pk"], + "rebase_source_partition_locator": None, + "rebase_source_partition_high_watermark": None, + "records_per_compacted_file": 4000, + "s3_client_kwargs": {}, + "source_partition_locator": source_partition.locator, + } + ) + ) - @classmethod - def doClassCleanups(cls) -> None: - os.remove(cls.DB_FILE_PATH) - ray.shutdown() - super().tearDownClass() + # verify that no RCF is written + assert rcf_url is None + + def test_compact_partition_when_rcf_was_written_by_past_commit( + self, s3_resource, local_deltacat_storage_kwargs + ): + """ + Backward compatibility test for when a RCF was written by a previous commit. + """ - @patch("deltacat.compute.compactor_v2.compaction_session.rcf") - @patch("deltacat.compute.compactor_v2.compaction_session.s3_utils") - def test_compact_partition_when_no_input_deltas_to_compact(self, s3_utils, rcf_url): # setup - rcf_url.read_round_completion_file.return_value = None staged_source = stage_partition_from_file_paths( - self.NAMESPACE, ["test"], **self.deltacat_storage_kwargs + self.NAMESPACE, ["source"], **local_deltacat_storage_kwargs ) - source_partition = ds.commit_partition( - staged_source, **self.deltacat_storage_kwargs + + source_delta = commit_delta_to_staged_partition( + staged_source, [self.BACKFILL_FILE_PATH], **local_deltacat_storage_kwargs ) staged_dest = stage_partition_from_file_paths( - self.NAMESPACE, ["destination"], **self.deltacat_storage_kwargs + self.NAMESPACE, ["destination"], **local_deltacat_storage_kwargs ) dest_partition = ds.commit_partition( - staged_dest, **self.deltacat_storage_kwargs + staged_dest, **local_deltacat_storage_kwargs ) # action rcf_url = compact_partition( CompactPartitionParams.of( { - "compaction_artifact_s3_bucket": "test_bucket", + "compaction_artifact_s3_bucket": TEST_S3_RCF_BUCKET_NAME, "compacted_file_content_type": ContentType.PARQUET, "dd_max_parallelism_ratio": 1.0, "deltacat_storage": ds, - "deltacat_storage_kwargs": self.deltacat_storage_kwargs, + "deltacat_storage_kwargs": local_deltacat_storage_kwargs, "destination_partition_locator": dest_partition.locator, "drop_duplicates": True, "hash_bucket_count": 1, - "last_stream_position_to_compact": source_partition.stream_position, + "last_stream_position_to_compact": source_delta.stream_position, "list_deltas_kwargs": { - **self.deltacat_storage_kwargs, + **local_deltacat_storage_kwargs, **{"equivalent_table_types": []}, }, "primary_keys": [], + "rebase_source_partition_locator": source_delta.partition_locator, + "rebase_source_partition_high_watermark": None, + "records_per_compacted_file": 4000, + "s3_client_kwargs": {}, + "source_partition_locator": source_delta.partition_locator, + } + ) + ) + + bucket, backfill_key1, backfill_key2 = rcf_url.strip("s3://").split("/") + assert bucket == TEST_S3_RCF_BUCKET_NAME + + # Now delete the RCF at new location and copy it to old location + # Copy the RCF from rcf_url to another location + s3_resource.Object(TEST_S3_RCF_BUCKET_NAME, f"{backfill_key1}.json").copy_from( + CopySource=f"{TEST_S3_RCF_BUCKET_NAME}/{backfill_key1}/{backfill_key2}" + ) + + s3_resource.Object( + TEST_S3_RCF_BUCKET_NAME, f"{backfill_key1}/{backfill_key2}" + ).delete() + + # Now run an incremental compaction and verify if the previous RCF was read properly. + + new_source_delta = commit_delta_to_partition( + source_delta.partition_locator, + [self.INCREMENTAL_FILE_PATH], + **local_deltacat_storage_kwargs, + ) + + new_rcf_url = compact_partition( + CompactPartitionParams.of( + { + "compaction_artifact_s3_bucket": TEST_S3_RCF_BUCKET_NAME, + "compacted_file_content_type": ContentType.PARQUET, + "dd_max_parallelism_ratio": 1.0, + "deltacat_storage": ds, + "deltacat_storage_kwargs": local_deltacat_storage_kwargs, + "destination_partition_locator": dest_partition.locator, + "drop_duplicates": True, + "hash_bucket_count": 1, + "last_stream_position_to_compact": new_source_delta.stream_position, + "list_deltas_kwargs": { + **local_deltacat_storage_kwargs, + **{"equivalent_table_types": []}, + }, + "primary_keys": ["pk"], "rebase_source_partition_locator": None, "rebase_source_partition_high_watermark": None, "records_per_compacted_file": 4000, "s3_client_kwargs": {}, - "source_partition_locator": source_partition.locator, + "source_partition_locator": new_source_delta.partition_locator, } ) ) - # verify that no RCF is written - self.assertIsNone(rcf_url) + new_bucket, incremental_key1, incremental_key2 = new_rcf_url.strip( + "s3://" + ).split("/") + + assert new_bucket == TEST_S3_RCF_BUCKET_NAME + assert backfill_key1 == incremental_key1 + assert backfill_key2 != incremental_key2 + + rcf = get_rcf(s3_resource, new_rcf_url) + + _, compaction_audit_key = rcf.compaction_audit_url.strip("s3://").split("/", 1) + compaction_audit = CompactionSessionAuditInfo( + **read_s3_contents( + s3_resource, TEST_S3_RCF_BUCKET_NAME, compaction_audit_key + ) + ) + + # as it should be running incremental + assert compaction_audit.uniform_deltas_created == 1 + assert compaction_audit.input_records == 6 diff --git a/deltacat/tests/compute/test_compact_partition_rebase.py b/deltacat/tests/compute/test_compact_partition_rebase.py index df816257..aac5319f 100644 --- a/deltacat/tests/compute/test_compact_partition_rebase.py +++ b/deltacat/tests/compute/test_compact_partition_rebase.py @@ -254,7 +254,7 @@ def test_compact_partition_rebase_same_source_and_destination( } ) - from deltacat.compute.compactor_v2.model.compaction_session import ( + from deltacat.compute.compactor_v2.model.evaluate_compaction_result import ( ExecutionCompactionResult, ) diff --git a/deltacat/tests/compute/test_util_common.py b/deltacat/tests/compute/test_util_common.py index 21e1b7b3..18c770e9 100644 --- a/deltacat/tests/compute/test_util_common.py +++ b/deltacat/tests/compute/test_util_common.py @@ -24,6 +24,12 @@ RoundCompletionInfo, ) +from deltacat.storage.model.partition import PartitionLocator +from deltacat.storage.model.stream import StreamLocator +from deltacat.storage.model.table_version import TableVersionLocator +from deltacat.storage.model.table import TableLocator +from deltacat.storage.model.namespace import NamespaceLocator + class PartitionKeyType(str, Enum): INT = "int" @@ -51,6 +57,18 @@ def key_type(self) -> PartitionKeyType: """ +def get_test_partition_locator(partition_id): + tv_locator = TableVersionLocator.of( + TableLocator.of(NamespaceLocator.of("default"), "test_table"), "1" + ) + stream_locator = StreamLocator.of(tv_locator, "test_stream_id", "local") + partition_locator = PartitionLocator.of( + stream_locator, partition_id=partition_id, partition_values=[] + ) + + return partition_locator + + def _create_table( namespace: str, table_name: str, @@ -140,7 +158,7 @@ def create_rebase_table( def get_rcf(s3_resource, rcf_file_s3_uri: str) -> RoundCompletionInfo: from deltacat.tests.test_utils.utils import read_s3_contents - _, rcf_object_key = rcf_file_s3_uri.rsplit("/", 1) + _, rcf_object_key = rcf_file_s3_uri.strip("s3://").split("/", 1) rcf_file_output: Dict[str, Any] = read_s3_contents( s3_resource, TEST_S3_RCF_BUCKET_NAME, rcf_object_key ) @@ -151,9 +169,6 @@ def get_compacted_delta_locator_from_rcf( s3_resource: ServiceResource, rcf_file_s3_uri: str ): from deltacat.storage import DeltaLocator - from deltacat.compute.compactor import ( - RoundCompletionInfo, - ) round_completion_info: RoundCompletionInfo = get_rcf(s3_resource, rcf_file_s3_uri) diff --git a/deltacat/tests/test_utils/pyarrow.py b/deltacat/tests/test_utils/pyarrow.py index ca08ba75..94910e41 100644 --- a/deltacat/tests/test_utils/pyarrow.py +++ b/deltacat/tests/test_utils/pyarrow.py @@ -66,7 +66,10 @@ def download_delta(delta_like: Union[Delta, DeltaLocator], *args, **kwargs) -> D def commit_delta_to_partition( - partition: Partition, file_paths: List[str], *args, **kwargs + partition: Union[Partition, PartitionLocator], + file_paths: List[str], + *args, + **kwargs, ) -> Delta: tables = [] diff --git a/deltacat/tests/utils/ray_utils/test_dataset.py b/deltacat/tests/utils/ray_utils/test_dataset.py new file mode 100644 index 00000000..a83fd618 --- /dev/null +++ b/deltacat/tests/utils/ray_utils/test_dataset.py @@ -0,0 +1,66 @@ +from ray.data import from_items +from typing import Any +import pytest +import fsspec +from fsspec import AbstractFileSystem +from ray.data.datasource import FilenameProvider +from deltacat.types.media import ContentType +import ray + + +class TestDatasetToFile: + + BASE_PATH = "/tmp" + SUB_PATH = "abcd" + + @pytest.fixture(autouse=True, scope="module") + def ensure_ray_down(self): + # ray.data fails when ray is instantiated in local mode + ray.shutdown() + + @pytest.fixture(scope="module") + def mock_dataset(self): + return from_items([{"col1": i, "col2": i * 2} for i in range(1000)]) + + @pytest.fixture(scope="module") + def mock_filename_provider(self): + class MockFilenameProvider(FilenameProvider): + def get_filename_for_block( + self, block: Any, task_index: int, block_index: int + ) -> str: + return TestDatasetToFile.SUB_PATH + + return MockFilenameProvider() + + def test_parquet_sanity(self, mock_dataset, mock_filename_provider): + from deltacat.utils.ray_utils.dataset import dataset_to_file + + fs: AbstractFileSystem = fsspec.filesystem("local") + + dataset_to_file( + mock_dataset, + self.BASE_PATH, + file_system=fs, + block_path_provider=mock_filename_provider, + ) + + file_expected_at = f"{self.BASE_PATH}/{self.SUB_PATH}" + assert fs.exists(file_expected_at), "file was not written" + fs.delete(file_expected_at) + + def test_csv_sanity(self, mock_dataset, mock_filename_provider): + from deltacat.utils.ray_utils.dataset import dataset_to_file + + fs: AbstractFileSystem = fsspec.filesystem("local") + + dataset_to_file( + mock_dataset, + self.BASE_PATH, + file_system=fs, + block_path_provider=mock_filename_provider, + content_type=ContentType.CSV.value, + ) + + file_expected_at = f"{self.BASE_PATH}/{self.SUB_PATH}" + assert fs.exists(file_expected_at), "file was not written" + fs.delete(file_expected_at) diff --git a/deltacat/utils/numpy.py b/deltacat/utils/numpy.py index ea0e8687..94c7e9ca 100644 --- a/deltacat/utils/numpy.py +++ b/deltacat/utils/numpy.py @@ -1,10 +1,10 @@ -from typing import List, Optional +from typing import List, Optional, Callable, Union import numpy as np import pyarrow as pa from fsspec import AbstractFileSystem -from ray.data.datasource import BlockWritePathProvider +from ray.data.datasource import FilenameProvider from deltacat.types.media import ContentType from deltacat.utils import pandas as pd_utils from deltacat.utils import pyarrow as pa_utils @@ -52,7 +52,7 @@ def ndarray_to_file( np_array: np.ndarray, path: str, file_system: AbstractFileSystem, - block_path_provider: BlockWritePathProvider, + block_path_provider: Union[FilenameProvider, Callable], content_type: str = ContentType.PARQUET.value, **kwargs ) -> None: diff --git a/deltacat/utils/pandas.py b/deltacat/utils/pandas.py index 0e78af52..44dee957 100644 --- a/deltacat/utils/pandas.py +++ b/deltacat/utils/pandas.py @@ -2,12 +2,12 @@ import io import logging import math -from typing import Any, Callable, Dict, Iterable, List, Optional +from typing import Any, Callable, Dict, Iterable, List, Optional, Union import pandas as pd import pyarrow as pa from fsspec import AbstractFileSystem -from ray.data.datasource import BlockWritePathProvider +from ray.data.datasource import FilenameProvider from deltacat import logs from deltacat.types.media import ( @@ -262,7 +262,7 @@ def dataframe_to_file( dataframe: pd.DataFrame, base_path: str, file_system: AbstractFileSystem, - block_path_provider: BlockWritePathProvider, + block_path_provider: Union[Callable, FilenameProvider], content_type: str = ContentType.PARQUET.value, **kwargs, ) -> None: diff --git a/deltacat/utils/pyarrow.py b/deltacat/utils/pyarrow.py index 05f7c521..5177b114 100644 --- a/deltacat/utils/pyarrow.py +++ b/deltacat/utils/pyarrow.py @@ -6,7 +6,7 @@ import io import logging from functools import partial -from typing import Any, Callable, Dict, Iterable, List, Optional +from typing import Any, Callable, Dict, Iterable, List, Optional, Union from pyarrow.parquet import ParquetFile from deltacat.exceptions import ContentTypeValidationError @@ -18,7 +18,7 @@ from pyarrow import feather as paf from pyarrow import json as pajson from pyarrow import parquet as papq -from ray.data.datasource import BlockWritePathProvider +from ray.data.datasource import FilenameProvider from deltacat.utils.s3fs import create_s3_file_system from deltacat import logs @@ -523,7 +523,7 @@ def table_to_file( table: pa.Table, base_path: str, file_system: AbstractFileSystem, - block_path_provider: BlockWritePathProvider, + block_path_provider: Union[Callable, FilenameProvider], content_type: str = ContentType.PARQUET.value, **kwargs, ) -> None: diff --git a/deltacat/utils/ray_utils/dataset.py b/deltacat/utils/ray_utils/dataset.py index aa426ef2..bffd0ded 100644 --- a/deltacat/utils/ray_utils/dataset.py +++ b/deltacat/utils/ray_utils/dataset.py @@ -1,10 +1,10 @@ import logging -from typing import Callable, Dict, List, Optional +from typing import Callable, Dict, List, Optional, Union from fsspec import AbstractFileSystem from pyarrow import csv as pacsv from ray.data import Dataset -from ray.data.datasource import BlockWritePathProvider +from ray.data.datasource import FilenameProvider from deltacat import logs from deltacat.types.media import ContentEncoding, ContentType @@ -17,7 +17,7 @@ def write_parquet( base_path: str, *, filesystem: AbstractFileSystem, - block_path_provider: BlockWritePathProvider, + block_path_provider: Union[Callable, FilenameProvider], **kwargs, ) -> None: @@ -25,7 +25,7 @@ def write_parquet( base_path, filesystem=filesystem, try_create_dir=False, - block_path_provider=block_path_provider, + filename_provider=block_path_provider, **kwargs, ) @@ -35,7 +35,7 @@ def write_csv( base_path: str, *, filesystem: AbstractFileSystem, - block_path_provider: BlockWritePathProvider, + block_path_provider: Union[Callable, FilenameProvider], **kwargs, ) -> None: @@ -49,7 +49,7 @@ def write_csv( arrow_open_stream_args=pa_open_stream_args, filesystem=filesystem, try_create_dir=False, - block_path_provider=block_path_provider, + filename_provider=block_path_provider, arrow_csv_args_fn=arrow_csv_args_fn, **kwargs, ) @@ -89,7 +89,7 @@ def dataset_to_file( table: Dataset, base_path: str, file_system: AbstractFileSystem, - block_path_provider: BlockWritePathProvider, + block_path_provider: Union[Callable, FilenameProvider], content_type: str = ContentType.PARQUET.value, **kwargs, ) -> None: diff --git a/requirements.txt b/requirements.txt index 14fbee9a..96a3be2a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -8,7 +8,7 @@ pandas == 1.3.5 pyarrow == 12.0.1 pydantic == 1.10.4 pymemcache == 4.0.0 -ray[default] ~= 2.0 +ray[default] >= 2.20.0 redis == 4.6.0 s3fs == 2024.5.0 schedule == 1.2.0 diff --git a/setup.py b/setup.py index 1491a7d3..559b58c2 100644 --- a/setup.py +++ b/setup.py @@ -41,7 +41,7 @@ def find_version(*paths): "pandas == 1.3.5", "pyarrow == 12.0.1", "pydantic == 1.10.4", - "ray[default] ~= 2.0", + "ray[default] >= 2.20.0", "s3fs == 2024.5.0", "tenacity == 8.1.0", "typing-extensions == 4.4.0", @@ -58,9 +58,9 @@ def find_version(*paths): "Development Status :: 4 - Beta", "Intended Audience :: Developers", "Programming Language :: Python :: 3 :: Only", - "Programming Language :: Python :: 3.8", + "Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.9", "Operating System :: OS Independent", ], - python_requires=">=3.8", + python_requires=">=3.9", )