From 5a4507d5a4ac091123b7362f6e401cfe8e52eb90 Mon Sep 17 00:00:00 2001 From: Raghavendra M Dani Date: Thu, 29 Aug 2024 16:03:59 -0700 Subject: [PATCH] Adding a test to assert RCF values are calculated correctly (#349) * Add a test to assert RCF values are calculated correctly * Remove flaky assertions * Bump version to 1.1.16 --- deltacat/__init__.py | 2 +- .../compactor_v2/compaction_session.py | 7 +- .../compactor_v2/private/compaction_utils.py | 12 +- .../compactor_v2/test_compaction_session.py | 146 ++++++++++++++++++ 4 files changed, 158 insertions(+), 9 deletions(-) diff --git a/deltacat/__init__.py b/deltacat/__init__.py index 82b1e6f0..e953ab12 100644 --- a/deltacat/__init__.py +++ b/deltacat/__init__.py @@ -44,7 +44,7 @@ deltacat.logs.configure_deltacat_logger(logging.getLogger(__name__)) -__version__ = "1.1.15" +__version__ = "1.1.16" __all__ = [ diff --git a/deltacat/compute/compactor_v2/compaction_session.py b/deltacat/compute/compactor_v2/compaction_session.py index 9944f874..58baf382 100644 --- a/deltacat/compute/compactor_v2/compaction_session.py +++ b/deltacat/compute/compactor_v2/compaction_session.py @@ -43,7 +43,6 @@ _stage_new_partition, _run_hash_and_merge, _process_merge_results, - _upload_compaction_audit, _write_new_round_completion_file, _commit_compaction_result, ) @@ -201,11 +200,6 @@ def _execute_compaction( compaction_audit.save_round_completion_stats(mat_results) - _upload_compaction_audit( - params, - compaction_audit, - round_completion_info, - ) compaction_result: ExecutionCompactionResult = _write_new_round_completion_file( params, compaction_audit, @@ -215,5 +209,6 @@ def _execute_compaction( rcf_source_partition_locator, new_compacted_delta_locator, pyarrow_write_result, + round_completion_info, ) return compaction_result diff --git a/deltacat/compute/compactor_v2/private/compaction_utils.py b/deltacat/compute/compactor_v2/private/compaction_utils.py index 525b8558..786908ba 100644 --- a/deltacat/compute/compactor_v2/private/compaction_utils.py +++ b/deltacat/compute/compactor_v2/private/compaction_utils.py @@ -365,6 +365,7 @@ def _run_hash_and_merge( if mutable_compaction_audit.telemetry_time_in_seconds else 0.0 ) + mutable_compaction_audit.set_telemetry_time_in_seconds( telemetry_this_round + previous_telemetry ) @@ -598,10 +599,10 @@ def _process_merge_results( return merged_delta, mat_results, hb_id_to_entry_indices_range -def _upload_compaction_audit( +def _update_and_upload_compaction_audit( params: CompactPartitionParams, mutable_compaction_audit: CompactionSessionAuditInfo, - round_completion_info: RoundCompletionInfo, + round_completion_info: Optional[RoundCompletionInfo] = None, ) -> None: # After all incremental delta related calculations, we update @@ -637,6 +638,7 @@ def _write_new_round_completion_file( rcf_source_partition_locator: rcf.PartitionLocator, new_compacted_delta_locator: DeltaLocator, pyarrow_write_result: PyArrowWriteResult, + prev_round_completion_info: Optional[RoundCompletionInfo] = None, ) -> ExecutionCompactionResult: input_inflation = None input_average_record_size_bytes = None @@ -664,6 +666,12 @@ def _write_new_round_completion_file( f" and average record size={input_average_record_size_bytes}" ) + _update_and_upload_compaction_audit( + params, + mutable_compaction_audit, + prev_round_completion_info, + ) + new_round_completion_info = RoundCompletionInfo.of( high_watermark=params.last_stream_position_to_compact, compacted_delta_locator=new_compacted_delta_locator, diff --git a/deltacat/tests/compute/compactor_v2/test_compaction_session.py b/deltacat/tests/compute/compactor_v2/test_compaction_session.py index db317068..f1c54129 100644 --- a/deltacat/tests/compute/compactor_v2/test_compaction_session.py +++ b/deltacat/tests/compute/compactor_v2/test_compaction_session.py @@ -87,6 +87,7 @@ class TestCompactionSession: INCREMENTAL_FILE_PATH = ( "deltacat/tests/compute/compactor_v2/data/incremental_source_date_pk.csv" ) + ERROR_RATE = 0.05 def test_compact_partition_when_no_input_deltas_to_compact( self, local_deltacat_storage_kwargs @@ -253,3 +254,148 @@ def test_compact_partition_when_rcf_was_written_by_past_commit( # as it should be running incremental assert compaction_audit.uniform_deltas_created == 1 assert compaction_audit.input_records == 6 + + def test_compact_partition_when_incremental_then_rcf_stats_accurate( + self, s3_resource, local_deltacat_storage_kwargs + ): + """ + A test case which asserts the RCF stats are correctly generated for + a rebase and incremental use-case. + """ + + # setup + staged_source = stage_partition_from_file_paths( + self.NAMESPACE, ["source"], **local_deltacat_storage_kwargs + ) + + source_delta = commit_delta_to_staged_partition( + staged_source, [self.BACKFILL_FILE_PATH], **local_deltacat_storage_kwargs + ) + + staged_dest = stage_partition_from_file_paths( + self.NAMESPACE, ["destination"], **local_deltacat_storage_kwargs + ) + dest_partition = ds.commit_partition( + staged_dest, **local_deltacat_storage_kwargs + ) + + # action + rcf_url = compact_partition( + CompactPartitionParams.of( + { + "compaction_artifact_s3_bucket": TEST_S3_RCF_BUCKET_NAME, + "compacted_file_content_type": ContentType.PARQUET, + "dd_max_parallelism_ratio": 1.0, + "deltacat_storage": ds, + "deltacat_storage_kwargs": local_deltacat_storage_kwargs, + "destination_partition_locator": dest_partition.locator, + "drop_duplicates": True, + "hash_bucket_count": 2, + "last_stream_position_to_compact": source_delta.stream_position, + "list_deltas_kwargs": { + **local_deltacat_storage_kwargs, + **{"equivalent_table_types": []}, + }, + "primary_keys": ["pk"], + "rebase_source_partition_locator": source_delta.partition_locator, + "rebase_source_partition_high_watermark": source_delta.stream_position, + "records_per_compacted_file": 4000, + "s3_client_kwargs": {}, + "source_partition_locator": source_delta.partition_locator, + } + ) + ) + + backfill_rcf = get_rcf(s3_resource, rcf_url) + _, compaction_audit_key = backfill_rcf.compaction_audit_url.strip( + "s3://" + ).split("/", 1) + compaction_audit = CompactionSessionAuditInfo( + **read_s3_contents( + s3_resource, TEST_S3_RCF_BUCKET_NAME, compaction_audit_key + ) + ) + + assert abs(backfill_rcf.input_inflation - 0.05235042735042735) <= 1e-5 + assert abs(backfill_rcf.input_average_record_size_bytes - 12.25) <= 1e-5 + + assert compaction_audit.input_records == 4 + assert compaction_audit.records_deduped == 0 + assert compaction_audit.records_deleted == 0 + assert compaction_audit.untouched_file_count == 0 + assert compaction_audit.untouched_record_count == 0 + assert compaction_audit.untouched_size_bytes == 0 + assert compaction_audit.untouched_file_ratio == 0 + assert compaction_audit.uniform_deltas_created == 1 + assert compaction_audit.hash_bucket_count == 2 + assert compaction_audit.input_file_count == 1 + assert compaction_audit.output_file_count == 2 + assert abs(compaction_audit.output_size_bytes - 1832) / 1832 <= self.ERROR_RATE + assert abs(compaction_audit.input_size_bytes - 936) / 936 <= self.ERROR_RATE + + # Now run an incremental compaction and verify if the previous RCF was read properly. + new_source_delta = commit_delta_to_partition( + source_delta.partition_locator, + [self.INCREMENTAL_FILE_PATH], + **local_deltacat_storage_kwargs, + ) + + new_destination_partition = ds.get_partition( + dest_partition.stream_locator, [], **local_deltacat_storage_kwargs + ) + + new_rcf_url = compact_partition( + CompactPartitionParams.of( + { + "compaction_artifact_s3_bucket": TEST_S3_RCF_BUCKET_NAME, + "compacted_file_content_type": ContentType.PARQUET, + "dd_max_parallelism_ratio": 1.0, + "deltacat_storage": ds, + "deltacat_storage_kwargs": local_deltacat_storage_kwargs, + "destination_partition_locator": new_destination_partition.locator, + "drop_duplicates": True, + "hash_bucket_count": 2, + "last_stream_position_to_compact": new_source_delta.stream_position, + "list_deltas_kwargs": { + **local_deltacat_storage_kwargs, + **{"equivalent_table_types": []}, + }, + "primary_keys": ["pk"], + "rebase_source_partition_locator": None, + "rebase_source_partition_high_watermark": None, + "records_per_compacted_file": 4000, + "s3_client_kwargs": {}, + "source_partition_locator": new_source_delta.partition_locator, + } + ) + ) + + new_rcf = get_rcf(s3_resource, new_rcf_url) + _, compaction_audit_key = new_rcf.compaction_audit_url.strip("s3://").split( + "/", 1 + ) + compaction_audit = CompactionSessionAuditInfo( + **read_s3_contents( + s3_resource, TEST_S3_RCF_BUCKET_NAME, compaction_audit_key + ) + ) + + # as it should be running incremental + assert abs(new_rcf.input_inflation - 0.027292576419213975) <= 1e-5 + assert abs(new_rcf.input_average_record_size_bytes - 12.5) <= 1e-5 + + assert compaction_audit.input_records == 6 + assert compaction_audit.records_deduped == 1 + assert compaction_audit.records_deleted == 0 + assert compaction_audit.untouched_file_count == 1 + assert compaction_audit.untouched_record_count == 2 + assert ( + abs(compaction_audit.untouched_size_bytes - 916) / 916 <= self.ERROR_RATE + ) # 5% error + assert abs(compaction_audit.untouched_file_ratio - 50) <= 1e-5 + assert compaction_audit.uniform_deltas_created == 1 + assert compaction_audit.hash_bucket_count == 2 + assert compaction_audit.input_file_count == 3 + assert compaction_audit.output_file_count == 2 + assert abs(compaction_audit.output_size_bytes - 1843) / 1843 <= self.ERROR_RATE + assert abs(compaction_audit.input_size_bytes - 2748) / 2748 <= self.ERROR_RATE