Skip to content

Commit

Permalink
Upgrade ray and commit rcf before catalog commit (#324)
Browse files Browse the repository at this point in the history
* Upgrade ray and commit rcf before catalog commit

* update ci

* Support 3.11

* 3.11 not supported

* fix test failure

* Added a e2e backward compatibility test

* Address comments
  • Loading branch information
raghumdani authored Jun 25, 2024
1 parent 45ecd26 commit dcfc966
Show file tree
Hide file tree
Showing 25 changed files with 673 additions and 752 deletions.
42 changes: 21 additions & 21 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ jobs:
steps:
- name: checkout
uses: actions/checkout@v3
- name: Set up Python 3.8
uses: actions/setup-python@v1
- name: Set up Python 3.9
uses: actions/setup-python@v4
with:
python-version: 3.8
python-version: 3.9
- name: Linting
run: |
python -m pip install --upgrade pip
Expand All @@ -27,14 +27,14 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ["3.8", "3.9", "3.10"]
python-version: ["3.9", "3.10"]
timeout-minutes: 20
steps:
- name: "checkout repository"
uses: actions/checkout@v3
uses: actions/checkout@v4
with:
fetch-depth: 0
- name: Set up Python ${{ matrix.python-version }} (minimum supported python version for deltaCAT is 3.8)
- name: Set up Python ${{ matrix.python-version }} (minimum supported python version for deltaCAT is 3.9)
uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}
Expand All @@ -55,24 +55,24 @@ jobs:
- name: Download previous benchmark data
uses: actions/cache@v1
with:
path: ./cache
key: ${{ runner.os }}-benchmark
path: ./cache
key: ${{ runner.os }}-benchmark
- name: Store benchmark results
uses: benchmark-action/github-action-benchmark@v1
with:
tool: 'pytest'
output-file-path: output.json
auto-push: false
github-token: ${{ secrets.GITHUB_TOKEN }}
tool: "pytest"
output-file-path: output.json
auto-push: false
github-token: ${{ secrets.GITHUB_TOKEN }}

# Where the previous data file is stored
external-data-json-path: ./cache/benchmark-data.json
# Where the previous data file is stored
external-data-json-path: ./cache/benchmark-data.json

# Enable Job Summary for PRs
summary-always: true
# Enable Job Summary for PRs
summary-always: true

# Enable alert commit comment
#
# By default, this action marks the result as performance regression
# when it is worse than the previous exceeding 200% threshold.
comment-on-alert: true
# Enable alert commit comment
#
# By default, this action marks the result as performance regression
# when it is worse than the previous exceeding 200% threshold.
comment-on-alert: true
78 changes: 39 additions & 39 deletions .github/workflows/publish-to-pypi.yml
Original file line number Diff line number Diff line change
@@ -1,42 +1,42 @@
name: Publish Python distributions to PyPI
on:
release:
types: [published] # triggered whenever a new GitHub release is published
release:
types: [published] # triggered whenever a new GitHub release is published
jobs:
build-n-publish:
name: Build and publish Python distributions to PyPI
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@main
with:
fetch-depth: 0
- name: Set up Python 3.8 (minimum supported python version for deltaCAT)
uses: actions/setup-python@v3
with:
python-version: "3.8"
- name: Install pypa/build
run: >-
python -m
pip install
build
--user
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install pytest
if [ -f benchmark-requirements.txt ]; then pip install -r benchmark-requirements.txt; fi
- name: Run unit tests
run: >-
python -m pytest
- name: Echo release tag
run: echo ${{ github.ref_name }}
- name: Build a binary wheel and a source tarball
run: >-
python setup.py sdist bdist_wheel
- name: Publish distribution to PyPI
if: startsWith(github.ref, 'refs/tags')
uses: pypa/gh-action-pypi-publish@release/v1
with:
password: ${{ secrets.PYPI_API_TOKEN }}
verbose: true
build-n-publish:
name: Build and publish Python distributions to PyPI
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@main
with:
fetch-depth: 0
- name: Set up Python 3.9 (minimum supported python version for deltaCAT)
uses: actions/setup-python@v4
with:
python-version: "3.9"
- name: Install pypa/build
run: >-
python -m
pip install
build
--user
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install pytest
if [ -f benchmark-requirements.txt ]; then pip install -r benchmark-requirements.txt; fi
- name: Run unit tests
run: >-
python -m pytest
- name: Echo release tag
run: echo ${{ github.ref_name }}
- name: Build a binary wheel and a source tarball
run: >-
python setup.py sdist bdist_wheel
- name: Publish distribution to PyPI
if: startsWith(github.ref, 'refs/tags')
uses: pypa/gh-action-pypi-publish@release/v1
with:
password: ${{ secrets.PYPI_API_TOKEN }}
verbose: true
32 changes: 19 additions & 13 deletions deltacat/aws/s3u.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from botocore.client import BaseClient
from botocore.exceptions import ClientError
from ray.data.block import Block, BlockAccessor, BlockMetadata
from ray.data.datasource import BlockWritePathProvider
from ray.data.datasource import FilenameProvider
from ray.types import ObjectRef
from tenacity import (
Retrying,
Expand Down Expand Up @@ -70,9 +70,6 @@

logger = logs.configure_deltacat_logger(logging.getLogger(__name__))

# TODO(raghumdani): refactor redshift datasource to reuse the
# same module for writing output files.


class CapturedBlockWritePaths:
def __init__(self):
Expand Down Expand Up @@ -100,12 +97,15 @@ def block_refs(self) -> List[ObjectRef[Block]]:
return self._block_refs


class UuidBlockWritePathProvider(BlockWritePathProvider):
class UuidBlockWritePathProvider(FilenameProvider):
"""Block write path provider implementation that writes each
dataset block out to a file of the form: {base_path}/{uuid}
"""

def __init__(self, capture_object: CapturedBlockWritePaths):
def __init__(
self, capture_object: CapturedBlockWritePaths, base_path: Optional[str] = None
):
self.base_path = base_path
self.write_paths: List[str] = []
self.block_refs: List[ObjectRef[Block]] = []
self.capture_object = capture_object
Expand All @@ -117,6 +117,19 @@ def __del__(self):
self.block_refs,
)

def get_filename_for_block(
self, block: Any, task_index: int, block_index: int
) -> str:
if self.base_path is None:
raise ValueError(
"Base path must be provided to UuidBlockWritePathProvider",
)
return self._get_write_path_for_block(
base_path=self.base_path,
block=block,
block_index=block_index,
)

def _get_write_path_for_block(
self,
base_path: str,
Expand All @@ -143,13 +156,6 @@ def __call__(
block_index: Optional[int] = None,
file_format: Optional[str] = None,
) -> str:
"""
TODO: BlockWritePathProvider is deprecated as of Ray version 2.20.0. Please use FilenameProvider.
See: https://docs.ray.io/en/master/data/api/doc/ray.data.datasource.FilenameProvider.html
Also See: https://github.com/ray-project/deltacat/issues/299
Hence, this class only works with Ray version 2.20.0 or lower when used in Ray Dataset.
"""
return self._get_write_path_for_block(
base_path,
filesystem=filesystem,
Expand Down
6 changes: 5 additions & 1 deletion deltacat/compute/compactor/compaction_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ def compact_partition(
round_completion_file_s3_url = rcf.write_round_completion_file(
compaction_artifact_s3_bucket,
new_rcf_partition_locator,
partition.locator,
new_rci,
**s3_client_kwargs,
)
Expand Down Expand Up @@ -312,7 +313,10 @@ def _execute_compaction_round(
round_completion_info = None
if not rebase_source_partition_locator:
round_completion_info = rcf.read_round_completion_file(
compaction_artifact_s3_bucket, source_partition_locator, **s3_client_kwargs
compaction_artifact_s3_bucket,
source_partition_locator,
destination_partition_locator,
**s3_client_kwargs,
)
if not round_completion_info:
logger.info(
Expand Down
1 change: 1 addition & 0 deletions deltacat/compute/compactor/repartition_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ def repartition(
s3_client_kwargs = {}

return rcf.write_round_completion_file(
None,
None,
None,
repartition_completion_info,
Expand Down
48 changes: 39 additions & 9 deletions deltacat/compute/compactor/utils/round_completion_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,40 +12,69 @@


def get_round_completion_file_s3_url(
bucket: str, source_partition_locator: PartitionLocator
bucket: str,
source_partition_locator: PartitionLocator,
destination_partition_locator: Optional[PartitionLocator] = None,
) -> str:

base_url = source_partition_locator.path(f"s3://{bucket}")
if destination_partition_locator:
base_url = destination_partition_locator.path(
f"s3://{bucket}/{source_partition_locator.hexdigest()}"
)

return f"{base_url}.json"


@metrics
def read_round_completion_file(
bucket: str,
source_partition_locator: PartitionLocator,
destination_partition_locator: Optional[PartitionLocator] = None,
**s3_client_kwargs: Optional[Dict[str, Any]],
) -> RoundCompletionInfo:

round_completion_file_url = get_round_completion_file_s3_url(
all_uris = []
if destination_partition_locator:
round_completion_file_url_with_destination = get_round_completion_file_s3_url(
bucket,
source_partition_locator,
destination_partition_locator,
)
all_uris.append(round_completion_file_url_with_destination)

# Note: we read from RCF at two different URI for backward
# compatibility reasons.
round_completion_file_url_prev = get_round_completion_file_s3_url(
bucket,
source_partition_locator,
)
logger.info(f"reading round completion file from: {round_completion_file_url}")

all_uris.append(round_completion_file_url_prev)

round_completion_info = None
result = s3_utils.download(round_completion_file_url, False, **s3_client_kwargs)
if result:
json_str = result["Body"].read().decode("utf-8")
round_completion_info = RoundCompletionInfo(json.loads(json_str))
logger.info(f"read round completion info: {round_completion_info}")

for rcf_uri in all_uris:
logger.info(f"Reading round completion file from: {rcf_uri}")
result = s3_utils.download(rcf_uri, False, **s3_client_kwargs)
if result:
json_str = result["Body"].read().decode("utf-8")
round_completion_info = RoundCompletionInfo(json.loads(json_str))
logger.info(f"Read round completion info: {round_completion_info}")
break
else:
logger.warn(f"Round completion file not present at {rcf_uri}")

return round_completion_info


@metrics
def write_round_completion_file(
bucket: Optional[str],
source_partition_locator: Optional[PartitionLocator],
destination_partition_locator: Optional[PartitionLocator],
round_completion_info: RoundCompletionInfo,
completion_file_s3_url: str = None,
completion_file_s3_url: Optional[str] = None,
**s3_client_kwargs: Optional[Dict[str, Any]],
) -> str:
if bucket is None and completion_file_s3_url is None:
Expand All @@ -56,6 +85,7 @@ def write_round_completion_file(
completion_file_s3_url = get_round_completion_file_s3_url(
bucket,
source_partition_locator,
destination_partition_locator,
)
logger.info(f"writing round completion file to: {completion_file_s3_url}")
s3_utils.upload(
Expand Down
Loading

0 comments on commit dcfc966

Please sign in to comment.