diff --git a/sync_tests/tests/node_sync_test.py b/sync_tests/tests/node_sync_test.py index b77620b3..da0fe570 100644 --- a/sync_tests/tests/node_sync_test.py +++ b/sync_tests/tests/node_sync_test.py @@ -26,10 +26,9 @@ from sync_tests.utils.blockfrost_utils import get_epoch_start_datetime_from_blockfrost from sync_tests.utils.gitpython_utils import git_clone_iohk_repo, git_checkout -from sync_tests.utils.utils import print_info, print_ok, print_warn, print_info_warn, print_error, seconds_to_time, \ +from sync_tests.utils.utils import print_message(type="info", message=, print_message(type="ok", message=, print_message(type="warn", message=, print_message(type="info", message=_warn, print_error, seconds_to_time, \ date_diff_in_seconds, get_no_of_cpu_cores, \ get_current_date_time, get_os_type, get_directory_size, get_total_ram_in_GB, delete_file, \ - is_dir, \ list_absolute_file_paths, print_file_content CONFIGS_BASE_URL = "https://book.play.dev.cardano.org/environments" @@ -49,18 +48,18 @@ def set_repo_paths(): def execute_command(command): - print_info(f"Execute command: {command}") + print_message(type="info", message=f"Execute command: {command}") try: cmd = shlex.split(command) process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding='utf-8') outs, errors = process.communicate(timeout=7200) if errors: - print_warn(f"Warnings or Errors --> {errors}") + print_message(type="warn", message=f"Warnings or Errors --> {errors}") if outs: - print_ok(f"Output of command: {command} --> {outs}") + print_message(type="ok", message=f"Output of command: {command} --> {outs}") except (subprocess.CalledProcessError, subprocess.TimeoutExpired) as e: - print_error(f"Command {command} returned exception: {e}") + print_message(type="error", message=f"Command {command} returned exception: {e}") raise @@ -76,7 +75,7 @@ def git_get_last_closed_pr_cardano_node(): count += 1 response = requests.get(url) if count > 10: - print_error( + print_message(type="error", message= f"!!!! ERROR: Could not get the number of the last closed PR after {count} retries") response.raise_for_status() jData = json.loads(response.content) @@ -95,7 +94,7 @@ def check_string_format(input_string): def delete_node_files(): for p in Path("..").glob("cardano-*"): - print_info_warn(f"deleting file: {p}") + print_message(type="info", message=_warn(f"deleting file: {p}") p.unlink(missing_ok=True) @@ -133,7 +132,7 @@ def disable_p2p_node_config(): def rm_node_config_files() -> None: - print_info_warn('Removing existing config files') + print_message(type="info", message=_warn('Removing existing config files') os.chdir(Path(ROOT_TEST_PATH)) for gen in Path("..").glob("*-genesis.json"): Path(gen).unlink(missing_ok=True) @@ -166,15 +165,15 @@ def get_node_config_files(env, node_topology_type): else: download_config_file(env, 'topology.json') print(f" - listdir current_directory: {os.listdir(current_directory)}") - print_info_warn(" Config File Content: ") + print_message(type="info", message=_warn(" Config File Content: ") print_file_content('config.json') - print_info_warn(" Topology File Content: ") + print_message(type="info", message=_warn(" Topology File Content: ") print_file_content('topology.json') def enable_cardano_node_resources_monitoring(node_config_filepath): - print_warn('- Enable cardano node resource monitoring:') - print_info( + print_message(type="warn", message='- Enable cardano node resource monitoring:') + print_message(type="info", message= ' node_config_json["options"]["mapBackends"]["cardano.node.resources"] = ["KatipBK"]') os.chdir(Path(ROOT_TEST_PATH)) @@ -190,8 +189,8 @@ def enable_cardano_node_tracers(node_config_filepath): with open(node_config_filepath, 'r') as json_file: node_config_json = json.load(json_file) - print_warn('- Enable tracer:') - print_info(' Set minSeverity = Info') + print_message(type="warn", message='- Enable tracer:') + print_message(type="info", message=' Set minSeverity = Info') node_config_json['minSeverity'] = 'Info' # node_config_json["TestEnableDevelopmentNetworkProtocols"] = True # node_config_json["TestEnableDevelopmentHardForkEras"] = True @@ -278,7 +277,7 @@ def wait_for_node_to_start(timeout_minutes=20): get_current_tip(timeout_minutes) stop_counter = time.perf_counter() start_time_seconds = int(stop_counter - start_counter) - print_ok(f"It took {start_time_seconds} seconds for the QUERY TIP command to be available") + print_message(type="ok", message=f"It took {start_time_seconds} seconds for the QUERY TIP command to be available") return start_time_seconds @@ -305,7 +304,7 @@ def get_current_tip(timeout_minutes=10): except subprocess.CalledProcessError as e: print( f" === {get_current_date_time()} - Waiting 60s before retrying to get the tip again - {i}") - print_error( + print_message(type="error", message= f" !!! ERROR: command {e.cmd} returned with error (code {e.returncode}): {' '.join(str(e.output.decode('utf-8')).split())}") if "Invalid argument" in str(e.output): print(f" -- exiting on - {e.output.decode('utf-8')}") @@ -359,22 +358,22 @@ def start_node(cardano_node, tag_no, node_start_arguments, timeout_minutes=400): ).strip() logfile = open(NODE_LOG_FILE, "w+") - print_info_warn(f"start node cmd: {cmd}") + print_message(type="info", message=_warn(f"start node cmd: {cmd}") try: p = subprocess.Popen(cmd.split(" "), stdout=logfile, stderr=logfile) - print_info('waiting for db folder to be created') + print_message(type="info", message='waiting for db folder to be created') count = 0 count_timeout = 299 while not os.path.isdir(current_directory / 'db'): time.sleep(1) count += 1 if count > count_timeout: - print_error( + print_message(type="error", message= f"ERROR: waited {count_timeout} seconds and the DB folder was not created yet") exit(1) - print_ok(f"DB folder was created after {count} seconds") + print_message(type="ok", message=f"DB folder was created after {count} seconds") secs_to_start = wait_for_node_to_start(timeout_minutes) print(f" - listdir current_directory: {os.listdir(current_directory)}") print(f" - listdir db: {os.listdir(current_directory / 'db')}") @@ -390,7 +389,7 @@ def start_node(cardano_node, tag_no, node_start_arguments, timeout_minutes=400): def stop_node(platform_system): for proc in process_iter(): if 'cardano-node' in proc.name(): - print_info_warn(f"Killing the `cardano-node` process - {proc}") + print_message(type="info", message=_warn(f"Killing the `cardano-node` process - {proc}") if 'windows' in platform_system.lower(): proc.send_signal(signal.SIGTERM) else: @@ -398,7 +397,7 @@ def stop_node(platform_system): time.sleep(20) for proc in process_iter(): if 'cardano-node' in proc.name(): - print_error(f" !!! ERROR: `cardano-node` process is still active - {proc}") + print_message(type="error", message=f" !!! ERROR: `cardano-node` process is still active - {proc}") def copy_log_file_artifact(old_name, new_name): @@ -450,7 +449,7 @@ def wait_for_node_to_sync(env): if syncProgress is not None: while syncProgress < 100: if count % 60 == 0: - print_warn(f"{get_current_date_time()} - actual_era : {actual_era} " + print_message(type="warn", message=f"{get_current_date_time()} - actual_era : {actual_era} " f" - actual_epoch: {actual_epoch} " f" - actual_block: {actual_block} " f" - actual_slot : {actual_slot} " @@ -478,7 +477,7 @@ def wait_for_node_to_sync(env): else: while actual_slot <= last_slot_no: if count % 60 == 0: - print_warn(f"{get_current_date_time()} - actual_era : {actual_era} " + print_message(type="warn", message=f"{get_current_date_time()} - actual_era : {actual_era} " f" - actual_epoch: {actual_epoch} " f" - actual_block: {actual_block} " f" - actual_slot : {actual_slot} " @@ -510,7 +509,7 @@ def wait_for_node_to_sync(env): chunk_files = sorted(os.listdir(os.getcwd()), key=os.path.getmtime) latest_chunk_no = chunk_files[-1].split(".")[0] os.chdir(Path(ROOT_TEST_PATH)) - print_ok(f"Sync done!; latest_chunk_no: {latest_chunk_no}") + print_message(type="ok", message=f"Sync done!; latest_chunk_no: {latest_chunk_no}") # add "end_sync_time", "slots_in_era", "sync_duration_secs" and "sync_speed_sps" for each era; # for the last/current era, "end_sync_time" = current_utc_time / end_of_sync_time @@ -652,7 +651,7 @@ def get_cabal_build_files(): def get_node_executable_path_built_with_cabal(): for f in get_cabal_build_files(): if "\\x\\cardano-node\\build\\" in f and 'cardano-node-tmp' not in f and 'autogen' not in f: - print_info(f"Found node executable: {f}") + print_message(type="info", message=f"Found node executable: {f}") global NODE NODE = f; return f @@ -661,7 +660,7 @@ def get_node_executable_path_built_with_cabal(): def get_cli_executable_path_built_with_cabal(): for f in get_cabal_build_files(): if "\\x\\cardano-cli\\build\\" in f and 'cardano-cli-tmp' not in f and 'autogen' not in f: - print_info(f"Found node-cli executable: {f}") + print_message(type="info", message=f"Found node-cli executable: {f}") global CLI CLI = f return f @@ -673,11 +672,11 @@ def copy_node_executables(src_location, dst_location, build_mode): cli_binary_location = 'cardano-cli-bin/bin/' os.chdir(Path(src_location) / node_binary_location) - print_info('files permissions inside cardano-node-bin/bin folder:') + print_message(type="info", message='files permissions inside cardano-node-bin/bin folder:') subprocess.check_call(['ls', '-la']) os.chdir(Path(src_location) / cli_binary_location) - print_info('files permissions inside cardano-cli-bin/bin folder:') + print_message(type="info", message='files permissions inside cardano-cli-bin/bin folder:') subprocess.check_call(['ls', '-la']) os.chdir(Path(dst_location)) @@ -685,13 +684,13 @@ def copy_node_executables(src_location, dst_location, build_mode): shutil.copy2(Path(src_location) / node_binary_location / 'cardano-node', Path(dst_location) / 'cardano-node') except Exception as e: - print_error(f" !!! ERROR - could not copy the cardano-node file - {e}") + print_message(type="error", message=f" !!! ERROR - could not copy the cardano-node file - {e}") exit(1) try: shutil.copy2(Path(src_location) / cli_binary_location / 'cardano-cli', Path(dst_location) / 'cardano-cli') except Exception as e: - print_error(f" !!! ERROR - could not copy the cardano-cli file - {e}") + print_message(type="error", message=f" !!! ERROR - could not copy the cardano-cli file - {e}") exit(1) time.sleep(5) @@ -702,25 +701,25 @@ def copy_node_executables(src_location, dst_location, build_mode): try: shutil.copy2(node_binary_location, Path(dst_location) / 'cardano-node') except Exception as e: - print_error(f" !!! ERROR - could not copy the cardano-cli file - {e}") + print_message(type="error", message=f" !!! ERROR - could not copy the cardano-cli file - {e}") try: shutil.copy2(cli_binary_location, Path(dst_location) / 'cardano-cli') except Exception as e: - print_error(f" !!! ERROR - could not copy the cardano-cli file - {e}") + print_message(type="error", message=f" !!! ERROR - could not copy the cardano-cli file - {e}") time.sleep(5) def get_node_files(node_rev, repository=None, build_tool='nix'): test_directory = Path.cwd() repo = None - print_info(f"test_directory: {test_directory}") + print_message(type="info", message=f"test_directory: {test_directory}") print(f" - listdir test_directory: {os.listdir(test_directory)}") node_repo_name = 'cardano-node' node_repo_dir = test_directory / 'cardano_node_dir' - if is_dir(node_repo_dir): + if node_repo_dir.is_dir(): repo = git_checkout(repository, node_rev) else: repo = git_clone_iohk_repo(node_repo_name, node_repo_dir, node_rev) @@ -740,7 +739,7 @@ def get_node_files(node_rev, repository=None, build_tool='nix'): cli_repo_name = 'cardano-cli' cli_repo_dir = test_directory / 'cardano_cli_dir' - if is_dir(cli_repo_dir): + if cli_repo_dir.is_dir(): git_checkout(repository, cli_rev) else: git_clone_iohk_repo(cli_repo_name, cli_repo_dir, cli_rev) @@ -772,7 +771,7 @@ def get_node_files(node_rev, repository=None, build_tool='nix'): os.chdir(test_directory) subprocess.check_call(['chmod', '+x', NODE]) subprocess.check_call(['chmod', '+x', CLI]) - print_info("files permissions inside test folder:") + print_message(type="info", message="files permissions inside test folder:") subprocess.check_call(['ls', '-la']) return repo @@ -785,8 +784,8 @@ def main(): print('--- Test data information', flush=True) start_test_time = get_current_date_time() - print_info(f"Test start time: {start_test_time}") - print_warn('Test parameters:') + print_message(type="info", message=f"Test start time: {start_test_time}") + print_message(type="warn", message='Test parameters:') env = vars(args)['environment'] node_build_mode = str(vars(args)['build_mode']).strip() node_rev1 = str(vars(args)['node_rev1']).strip() @@ -813,21 +812,21 @@ def main(): print(f"- platform: {platform_system, platform_release, platform_version}") print('--- Get the cardano-node files', flush=True) - print_info(f"Get the cardano-node and cardano-cli files using - {node_build_mode}") + print_message(type="info", message=f"Get the cardano-node and cardano-cli files using - {node_build_mode}") start_build_time = get_current_date_time() if 'windows' not in platform_system.lower(): repository = get_node_files(node_rev1) elif 'windows' in platform_system.lower(): repository = get_node_files(node_rev1, build_tool='cabal') else: - print_error( + print_message(type="error", message= f"ERROR: method not implemented yet!!! Only building with NIX is supported at this moment - {node_build_mode}") exit(1) end_build_time = get_current_date_time() - print_info(f" - start_build_time: {start_build_time}") - print_info(f" - end_build_time: {end_build_time}") + print_message(type="info", message=f" - start_build_time: {start_build_time}") + print_message(type="info", message=f" - end_build_time: {end_build_time}") - print_warn('--- node version ') + print_message(type="warn", message='--- node version ') cli_version1, cli_git_rev1 = get_node_version() print(f" - cardano_cli_version1: {cli_version1}") print(f" - cardano_cli_git_rev1: {cli_git_rev1}") @@ -842,14 +841,14 @@ def main(): enable_cardano_node_tracers('config.json') print(f"--- Start node sync test using node_rev1: {node_rev1}") - print_ok("===================================================================================") - print_ok(f"================== Start node sync test using node_rev1: {node_rev1} =============") - print_ok("===================================================================================") + print_message(type="ok", message="===================================================================================") + print_message(type="ok", message=f"================== Start node sync test using node_rev1: {node_rev1} =============") + print_message(type="ok", message="===================================================================================") print('') start_sync_time1 = get_current_date_time() secs_to_start1 = start_node(NODE, tag_no1, node_start_arguments1, timeout_minutes=10) - print_info(' - waiting for the node to sync') + print_message(type="info", message=' - waiting for the node to sync') sync_time_seconds1, last_slot_no1, latest_chunk_no1, era_details_dict1, epoch_details_dict1 = wait_for_node_to_sync( env) @@ -857,7 +856,7 @@ def main(): print(f"secs_to_start1: {secs_to_start1}") print(f"start_sync_time1: {start_sync_time1}") print(f"end_sync_time1: {end_sync_time1}") - print_warn(f"Stop node for: {node_rev1}") + print_message(type="warn", message=f"Stop node for: {node_rev1}") stop_node(platform_system) stop_node(platform_system) @@ -876,9 +875,9 @@ def main(): if tag_no2 != 'None': delete_node_files() print('') - print_ok("==============================================================================") - print_ok(f"================= Start sync using node_rev2: {node_rev2} ===================") - print_ok("==============================================================================") + print_message(type="ok", message="==============================================================================") + print_message(type="ok", message=f"================= Start sync using node_rev2: {node_rev2} ===================") + print_message(type="ok", message="==============================================================================") print('Get the cardano-node and cardano-cli files') if 'windows' not in platform_system.lower(): @@ -886,17 +885,17 @@ def main(): elif 'windows' in platform_system.lower(): get_node_files(node_rev2, repository, build_tool='cabal') else: - print_error( + print_message(type="error", message= f"ERROR: method not implemented yet!!! Only building with NIX is supported at this moment - {node_build_mode}") exit(1) if env == 'mainnet' and (node_topology_type1 != node_topology_type2): - print_warn('remove the previous topology') + print_message(type="warn", message='remove the previous topology') delete_file(Path(ROOT_TEST_PATH) / 'topology.json') print('Getting the node configuration files') get_node_config_files(env, node_topology_type2) - print_warn('node version') + print_message(type="warn", message='node version') cli_version2, cli_git_rev2 = get_node_version() print(f" - cardano_cli_version2: {cli_version2}") print(f" - cardano_cli_git_rev2: {cli_git_rev2}") @@ -905,11 +904,11 @@ def main(): start_sync_time2 = get_current_date_time() secs_to_start2 = start_node(NODE, tag_no2, node_start_arguments2) - print_info(f" - waiting for the node to sync - using node_rev2: {node_rev2}") + print_message(type="info", message=f" - waiting for the node to sync - using node_rev2: {node_rev2}") sync_time_seconds2, last_slot_no2, latest_chunk_no2, era_details_dict2, epoch_details_dict2 = wait_for_node_to_sync( env) end_sync_time2 = get_current_date_time() - print_warn(f"Stop node for: {node_rev2}") + print_message(type="warn", message=f"Stop node for: {node_rev2}") stop_node(platform_system) stop_node(platform_system) diff --git a/sync_tests/tests/node_write_mainnet_tx_count_per_epoch.py b/sync_tests/tests/node_write_mainnet_tx_count_per_epoch.py index e4798a44..32f8698f 100644 --- a/sync_tests/tests/node_write_mainnet_tx_count_per_epoch.py +++ b/sync_tests/tests/node_write_mainnet_tx_count_per_epoch.py @@ -1,6 +1,6 @@ import pandas as pd -from sync_tests.utils.utils import print_info, print_warn +from sync_tests.utils.utils import print_message(type="info", message=, print_message(type="warn", message= from sync_tests.utils.aws_db_utils import get_last_epoch_no_from_table, add_bulk_values_into_db from sync_tests.utils.blockfrost_utils import get_tx_count_per_epoch_from_blockfrost, \ get_current_epoch_no_from_blockfrost @@ -20,7 +20,7 @@ def update_mainnet_tx_count_per_epoch(): if current_epoch_no > last_added_epoch_no + 1: # adding values into the db only for missing full epochs (ignoring the current/incomplete epoch) for epoch_no in range(last_added_epoch_no + 1, current_epoch_no): - print_info(f"Getting values for epoch {epoch_no}") + print_message(type="info", message=f"Getting values for epoch {epoch_no}") tx_count = get_tx_count_per_epoch_from_blockfrost(epoch_no) print(f" - tx_count: {tx_count}") new_row_data = {'epoch_no': epoch_no, 'tx_count': tx_count} @@ -33,7 +33,7 @@ def update_mainnet_tx_count_per_epoch(): print(f"col_to_insert: {col_to_insert}") print(f"val_to_insert: {val_to_insert}") else: - print_warn('There are no new finalized epochs to be added') + print_message(type="warn", message='There are no new finalized epochs to be added') if __name__ == "__main__": diff --git a/sync_tests/tests/node_write_sync_values_to_db.py b/sync_tests/tests/node_write_sync_values_to_db.py index dbb53f89..a82c9f75 100644 --- a/sync_tests/tests/node_write_sync_values_to_db.py +++ b/sync_tests/tests/node_write_sync_values_to_db.py @@ -6,7 +6,7 @@ from pathlib import Path import argparse -from sync_tests.utils.utils import print_info, print_warn +from sync_tests.utils.utils import print_message(type="info", message=, print_message(type="warn", message= from sync_tests.utils.aws_db_utils import get_identifier_last_run_from_table, get_column_names_from_table, \ add_column_to_table, add_bulk_values_into_db, add_single_value_into_db @@ -17,12 +17,12 @@ def main(): env = vars(args)["environment"] if "-" in env: env = f"`{env}`" - print_info(f"Environment: {env}") + print_message(type="info", message=f"Environment: {env}") current_directory = Path.cwd() - print_info(f"current_directory: {current_directory}") + print_message(type="info", message=f"current_directory: {current_directory}") - print_info(f"Read the test results file - {current_directory / RESULTS_FILE_NAME}") + print_message(type="info", message=f"Read the test results file - {current_directory / RESULTS_FILE_NAME}") with open(RESULTS_FILE_NAME, "r") as json_file: sync_test_results_dict = json.load(json_file) @@ -30,18 +30,18 @@ def main(): print(f"current_directory: {current_directory}") print(f" - listdir: {os.listdir(current_directory)}") - print_info("Move to 'sync_tests' directory") + print_message(type="info", message="Move to 'sync_tests' directory") os.chdir(current_directory / "sync_tests") current_directory = Path.cwd() print(f"current_directory: {current_directory}") - print_info(f"Check if there are DB columns for all the eras") + print_message(type="info", message=f"Check if there are DB columns for all the eras") print(f"Get the list of the existing eras in test") eras_in_test = sync_test_results_dict["eras_in_test"].replace("[", "").replace("]", "").replace( '"', '').split(", ") print(f"eras_in_test: {eras_in_test}") - print_info(f"Get the column names inside the {env} DB tables") + print_message(type="info", message=f"Get the column names inside the {env} DB tables") table_column_names = get_column_names_from_table(env) print(f" -- table_column_names: {table_column_names}") @@ -65,9 +65,9 @@ def main(): print(f"--- Write test values into the {env} DB table") - print_warn("=======================================") - print_warn(f"======= identifier: {sync_test_results_dict['identifier']} =======") - print_warn("=======================================") + print_message(type="warn", message="=======================================") + print_message(type="warn", message=f"======= identifier: {sync_test_results_dict['identifier']} =======") + print_message(type="warn", message="=======================================") test_results_dict = {i: sync_test_results_dict[i] for i in sync_test_results_dict if i not in ["sync_duration_per_epoch", "log_values"]} @@ -78,13 +78,13 @@ def main(): print(f"val_to_insert: {val_to_insert}") exit(1) - print_info(f" ==== Write test values into the {env + '_logs'} DB table") + print_message(type="info", message=f" ==== Write test values into the {env + '_logs'} DB table") log_values_dict = ast.literal_eval(str((sync_test_results_dict["log_values"]))) df1_column_names = ["identifier", "timestamp", "slot_no", "ram_bytes", "cpu_percent", "rss_ram_bytes"] df1 = pd.DataFrame(columns=df1_column_names) - print_info(f" ==== Creating the dataframe with the test values") + print_message(type="info", message=f" ==== Creating the dataframe with the test values") for key, val in log_values_dict.items(): new_row_data = {"identifier": sync_test_results_dict["identifier"], "timestamp": key, @@ -103,7 +103,7 @@ def main(): print(f"val_to_insert: {val_to_insert}") exit(1) - print_info(f" ==== Write test values into the {env + '_epoch_duration'} DB table") + print_message(type="info", message=f" ==== Write test values into the {env + '_epoch_duration'} DB table") sync_duration_values_dict = ast.literal_eval( str(sync_test_results_dict["sync_duration_per_epoch"])) epoch_list = list(sync_duration_values_dict.keys()) diff --git a/sync_tests/utils/aws_db_utils.py b/sync_tests/utils/aws_db_utils.py index 91f747ef..f8bce377 100644 --- a/sync_tests/utils/aws_db_utils.py +++ b/sync_tests/utils/aws_db_utils.py @@ -3,7 +3,7 @@ import pymysql.cursors import pandas as pd -from .utils import print_info, print_ok, print_error +from .utils import print_message @@ -18,7 +18,7 @@ def create_connection(): ) return conn except Exception as e: - print_error(f"!!! Database connection failed due to: {e}") + print_message(type="error", message=f"!!! Database connection failed due to: {e}") return conn @@ -31,7 +31,7 @@ def create_table(table_sql_query): conn.commit() cur.close() except Exception as e: - print_error(f"!!! ERROR: Failed to create table: {e}") + print_message(type="error", message=f"!!! ERROR: Failed to create table: {e}") return False finally: if conn: @@ -43,12 +43,12 @@ def create_table_based_on_another_table_structure(existing_table_name, new_table conn = create_connection() sql_query = f"CREATE TABLE {new_table_name} LIKE {existing_table_name};" - print_info(sql_query) + print_message(type="info", message=sql_query) try: cur = conn.cursor() cur.execute(sql_query) except Exception as e: - print_error(f"!!! ERROR: Failed create new table {new_table_name} based on {existing_table_name} --> {e}") + print_message(type="error", message=f"!!! ERROR: Failed create new table {new_table_name} based on {existing_table_name} --> {e}") return False finally: if conn: @@ -58,14 +58,14 @@ def create_table_based_on_another_table_structure(existing_table_name, new_table def drop_table(table_name): conn = create_connection() sql_query = f"DROP TABLE {table_name};" - print_info(sql_query) + print_message(type="info", message=sql_query) try: cur = conn.cursor() cur.execute(sql_query) conn.commit() cur.close() except Exception as e: - print_error(f"!!! ERROR: Failed to drop table {table_name}: {e}") + print_message(type="error", message=f"!!! ERROR: Failed to drop table {table_name}: {e}") return False finally: if conn: @@ -73,7 +73,7 @@ def drop_table(table_name): def get_column_names_from_table(table_name): - print_info(f"Getting the column names from table: {table_name}") + print_message(type="info", message=f"Getting the column names from table: {table_name}") conn = create_connection() sql_query = f"select * from {table_name}" @@ -84,7 +84,7 @@ def get_column_names_from_table(table_name): col_name_list = [res[0] for res in cur.description] return col_name_list except Exception as e: - print_error(f"!!! ERROR: Failed to get column names from table: {table_name}: {e}") + print_message(type="error", message=f"!!! ERROR: Failed to get column names from table: {table_name}: {e}") return False finally: if conn: @@ -92,7 +92,7 @@ def get_column_names_from_table(table_name): def add_column_to_table(table_name, column_name, column_type): - print_info(f"Adding column {column_name} with type {column_type} to {table_name} table") + print_message(type="info", message=f"Adding column {column_name} with type {column_type} to {table_name} table") conn = create_connection() sql_query = f"alter table {table_name} add column {column_name} {column_type}" @@ -101,7 +101,7 @@ def add_column_to_table(table_name, column_name, column_type): cur = conn.cursor() cur.execute(sql_query) except Exception as e: - print_error(f"!!! ERROR: Failed to add {column_name} column into table {table_name} --> {e}") + print_message(type="error", message=f"!!! ERROR: Failed to add {column_name} column into table {table_name} --> {e}") return False finally: if conn: @@ -109,7 +109,7 @@ def add_column_to_table(table_name, column_name, column_type): def add_single_value_into_db(table_name, col_names_list, col_values_list): - print_info(f"Adding 1 new entry into {table_name} table") + print_message(type="info", message=f"Adding 1 new entry into {table_name} table") initial_rows_no = get_last_row_no(table_name) col_names = ','.join(col_names_list) col_spaces = ','.join(['%s'] * len(col_names_list)) @@ -122,7 +122,7 @@ def add_single_value_into_db(table_name, col_names_list, col_values_list): conn.commit() cur.close() except Exception as e: - print_error(f" -- !!! ERROR: Failed to insert data into {table_name} table: {e}") + print_message(type="error", message=f" -- !!! ERROR: Failed to insert data into {table_name} table: {e}") return False finally: if conn: @@ -130,15 +130,15 @@ def add_single_value_into_db(table_name, col_names_list, col_values_list): final_rows_no = get_last_row_no(table_name) if final_rows_no > 0: - print_ok(f"Successfully added {final_rows_no - initial_rows_no} rows into table {table_name}") + print_message(type="ok", message=f"Successfully added {final_rows_no - initial_rows_no} rows into table {table_name}") return True - print_error(f"Rows were NOT inserted ! final_rows_no: {final_rows_no}") + print_message(type="error", message=f"Rows were NOT inserted ! final_rows_no: {final_rows_no}") return False def add_bulk_values_into_db(table_name, col_names_list, col_values_list): - print_info(f"Adding {len(col_values_list)} entries into {table_name} table") + print_message(type="info", message=f"Adding {len(col_values_list)} entries into {table_name} table") initial_rows_no = get_last_row_no(table_name) col_names = ','.join(col_names_list) col_spaces = ','.join(['%s'] * len(col_names_list)) @@ -151,22 +151,22 @@ def add_bulk_values_into_db(table_name, col_names_list, col_values_list): conn.commit() cur.close() except Exception as e: - print_error(f" -- !!! ERROR: Failed to bulk insert data into {table_name} table: {e}") + print_message(type="error", message=f" -- !!! ERROR: Failed to bulk insert data into {table_name} table: {e}") return False finally: if conn: conn.close() final_rows_no = get_last_row_no(table_name) if final_rows_no > 0: - print_ok(f"Successfully added {final_rows_no - initial_rows_no} rows into table {table_name}") + print_message(type="ok", message=f"Successfully added {final_rows_no - initial_rows_no} rows into table {table_name}") return True - print_error(f"Rows were NOT inserted ! final_rows_no: {final_rows_no}") + print_message(type="error", message=f"Rows were NOT inserted ! final_rows_no: {final_rows_no}") return False def get_last_row_no(table_name): - print_info(f"Getting the no of rows from table: {table_name}") + print_message(type="info", message=f"Getting the no of rows from table: {table_name}") conn = create_connection() sql_query = f"SELECT count(*) FROM {table_name};" @@ -177,7 +177,7 @@ def get_last_row_no(table_name): last_row_no = cur.fetchone()[0] return last_row_no except Exception as e: - print_error(f"!!! ERROR: Failed to get the no of rows from table {table_name} --> {e}") + print_message(type="error", message=f"!!! ERROR: Failed to get the no of rows from table {table_name} --> {e}") return False finally: if conn: @@ -185,7 +185,7 @@ def get_last_row_no(table_name): def get_identifier_last_run_from_table(table_name): - print_info(f"Getting the Identifier value of the last run from table {table_name}") + print_message(type="info", message=f"Getting the Identifier value of the last run from table {table_name}") if get_last_row_no(table_name) == 0: return table_name + "_0" @@ -200,7 +200,7 @@ def get_identifier_last_run_from_table(table_name): last_identifier = cur.fetchone()[0] return last_identifier except Exception as e: - print_error(f"!!! ERROR: Failed to get the no of rows from table {table_name} --> {e}") + print_message(type="error", message=f"!!! ERROR: Failed to get the no of rows from table {table_name} --> {e}") return False finally: if conn: @@ -208,7 +208,7 @@ def get_identifier_last_run_from_table(table_name): def get_last_epoch_no_from_table(table_name): - print_info(f"Getting the last epoch no value from table {table_name}") + print_message(type="info", message=f"Getting the last epoch no value from table {table_name}") if get_last_row_no(table_name) == 0: return 0 @@ -222,7 +222,7 @@ def get_last_epoch_no_from_table(table_name): last_identifier = cur.fetchone()[0] return last_identifier except Exception as e: - print_error(f"!!! ERROR: Failed to get last epoch no from table {table_name} --> {e}") + print_message(type="error", message=f"!!! ERROR: Failed to get last epoch no from table {table_name} --> {e}") return False finally: if conn: @@ -230,7 +230,7 @@ def get_last_epoch_no_from_table(table_name): def get_column_values(table_name, column_name): - print_info(f"Getting {column_name} column values from table {table_name}") + print_message(type="info", message=f"Getting {column_name} column values from table {table_name}") conn = create_connection() sql_query = f"SELECT {column_name} FROM {table_name};" @@ -239,7 +239,7 @@ def get_column_values(table_name, column_name): cur.execute(sql_query) return [el[0] for el in cur.fetchall()] except Exception as e: - print_error(f"!!! ERROR: Failed to get {column_name} column values from table {table_name} --> {e}") + print_message(type="error", message=f"!!! ERROR: Failed to get {column_name} column values from table {table_name} --> {e}") return False finally: if conn: @@ -247,7 +247,7 @@ def get_column_values(table_name, column_name): def delete_all_rows_from_table(table_name): - print_info(f"Deleting all entries from table: {table_name}") + print_message(type="info", message=f"Deleting all entries from table: {table_name}") conn = create_connection() sql_query = f"TRUNCATE TABLE {table_name}" print(f" -- sql_query: {sql_query}") @@ -258,7 +258,7 @@ def delete_all_rows_from_table(table_name): conn.commit() cur.close() except Exception as e: - print_error(f"!!! ERROR: Failed to delete all records from table {table_name} --> {e}") + print_message(type="error", message=f"!!! ERROR: Failed to delete all records from table {table_name} --> {e}") return False finally: if conn: @@ -268,9 +268,9 @@ def delete_all_rows_from_table(table_name): def delete_record(table_name, column_name, delete_value): - print_info(f"Deleting rows containing '{delete_value}' value inside the '{column_name}' column") + print_message(type="info", message=f"Deleting rows containing '{delete_value}' value inside the '{column_name}' column") initial_rows_no = get_last_row_no(table_name) - print_info(f"Deleting {column_name} = {delete_value} from {table_name} table") + print_message(type="info", message=f"Deleting {column_name} = {delete_value} from {table_name} table") conn = create_connection() sql_query = f"DELETE from {table_name} where {column_name}=\"{delete_value}\"" @@ -281,7 +281,7 @@ def delete_record(table_name, column_name, delete_value): conn.commit() cur.close() except Exception as e: - print_error(f"!!! ERROR: Failed to delete record {column_name} = {delete_value} from {table_name} table: --> {e}") + print_message(type="error", message=f"!!! ERROR: Failed to delete record {column_name} = {delete_value} from {table_name} table: --> {e}") return False finally: if conn: @@ -291,7 +291,7 @@ def delete_record(table_name, column_name, delete_value): def update_record(table_name, column_name, old_value, new_value): - print_info(f"Updating {column_name} = {new_value} from {table_name} table") + print_message(type="info", message=f"Updating {column_name} = {new_value} from {table_name} table") conn = create_connection() sql_query = f"UPDATE {table_name} SET {column_name}=\"{new_value}\" where {column_name}=\"{old_value}\"" @@ -302,7 +302,7 @@ def update_record(table_name, column_name, old_value, new_value): conn.commit() cur.close() except Exception as e: - print_error(f"!!! ERROR: Failed to update record {column_name} = {new_value} from {table_name} table: --> {e}") + print_message(type="error", message=f"!!! ERROR: Failed to update record {column_name} = {new_value} from {table_name} table: --> {e}") return False finally: if conn: diff --git a/sync_tests/utils/utils.py b/sync_tests/utils/utils.py index a6206797..89d18ad8 100644 --- a/sync_tests/utils/utils.py +++ b/sync_tests/utils/utils.py @@ -22,48 +22,6 @@ class CLIOut(NamedTuple): stderr: bytes -def run_cardano_cli(cli_args: List[str]) -> CLIOut: - """Run the `cardano-cli` command. - Args: - cli_args: A list of arguments for cardano-cli. - Returns: - CLIOut: A tuple containing command stdout and stderr. - """ - cli_args_strs = [str(arg) for arg in cli_args] - cmd_str = " ".join(cli_args_strs) - print(f"Running `{cmd_str}`") - - # re-run the command when running into - # Network.Socket.connect: : resource exhausted (Resource temporarily unavailable) - # or - # MuxError (MuxIOException writev: resource vanished (Broken pipe)) "(sendAll errored)" - for __ in range(3): - retcode = None - with subprocess.Popen( - cli_args_strs, stdout=subprocess.PIPE, stderr=subprocess.PIPE - ) as p: - stdout, stderr = p.communicate() - retcode = p.returncode - - if retcode == 0: - break - - stderr_dec = stderr.decode() - err_msg = ( - f"An error occurred running a CLI command `{cmd_str}` on path " - f"`{Path.cwd()}`: {stderr_dec}" - ) - if "resource exhausted" in stderr_dec or "resource vanished" in stderr_dec: - print(err_msg) - time.sleep(0.4) - continue - raise RuntimeError(err_msg) - else: - raise RuntimeError(err_msg) - - return CLIOut(stdout or b"", stderr or b"") - - def run_command( command: Union[str, list], ignore_fail: bool = False, @@ -103,23 +61,25 @@ def cli_has(command: str) -> bool: return not cmd_err.startswith("Invalid") -def print_ok(message): - print(Fore.GREEN + f"{message}", Style.RESET_ALL, flush=True) - - -def print_info(message): - print(Fore.BLUE + f"{message}", Style.RESET_ALL, flush=True) - - -def print_warn(message): - print(Fore.YELLOW + f"{message}", Style.RESET_ALL, flush=True) - - -def print_info_warn(message): - print(Fore.LIGHTMAGENTA_EX + f"{message}", Style.RESET_ALL, flush=True) +def print_message(message: str, type: str = "info"): + """ + Print a message to the logs with color coding. -def print_error(message): - print(Fore.RED + f"{message}", Style.RESET_ALL, flush=True) + Attributes: + message (str): The message to print. + type (str): The message level. Options are: + "ok", "info", "warn", "info_warn", "error". + Default is "info". + """ + colors = { + "ok": Fore.GREEN, + "info": Fore.BLUE, + "warn": Fore.YELLOW, + "info_warn": Fore.LIGHTMAGENTA_EX, + "error": Fore.RED + } + color = colors.get(type, Fore.BLUE) # Default to 'info' if level is invalid + print(color + f"{message}", Style.RESET_ALL, flush=True) def date_diff_in_seconds(dt2, dt1): @@ -146,19 +106,11 @@ def get_total_ram_in_GB(): return int(psutil.virtual_memory().total / 1000000000) -def show_percentage(part, whole): - return round(100 * float(part) / float(whole), 2) - - def get_current_date_time(): now = datetime.now() return now.strftime("%d/%m/%Y %H:%M:%S") -def get_file_creation_date(path_to_file): - return time.ctime(os.path.getmtime(path_to_file)) - - def print_file_content(file_name: str) -> None: try: with open(file_name, 'r') as file: @@ -170,10 +122,6 @@ def print_file_content(file_name: str) -> None: print(f"An error occurred while reading the file: {e}") -def is_dir(dir): - return os.path.isdir(dir) - - def list_absolute_file_paths(directory): files_paths = [] for dirpath,_,filenames in os.walk(directory): @@ -193,22 +141,9 @@ def get_directory_size(start_path='.'): return total_size -def zip_file(archive_name, file_name): - with zipfile.ZipFile(archive_name, 'w', compression=zipfile.ZIP_DEFLATED, compresslevel=9) as zip: - zip.write(file_name) - - -def unzip_file(file_name): - with zipfile.ZipFile(file_name, 'r') as zip: - zip.printdir() - - print(f"Extracting all the files from {file_name}...") - zip.extractall() - - def delete_file(file_path): # file_path should be a Path (pathlib object) try: file_path.unlink() except OSError as e: - print_error(f"Error: {file_path} : {e.strerror}") \ No newline at end of file + print_message(type="error", message=f"Error: {file_path} : {e.strerror}") \ No newline at end of file diff --git a/sync_tests/utils/utils_db_sync.py b/sync_tests/utils/utils_db_sync.py index bbca1fa1..dca7e4a2 100755 --- a/sync_tests/utils/utils_db_sync.py +++ b/sync_tests/utils/utils_db_sync.py @@ -4,7 +4,6 @@ import shutil import mmap import zipfile -import signal import subprocess import requests import urllib.request @@ -13,7 +12,7 @@ import json import shlex import psycopg2 -from assertpy import assert_that, assert_warn +from assertpy import assert_that from os.path import normpath, basename from pathlib import Path @@ -99,10 +98,6 @@ def get_machine_name(): def export_env_var(name, value): os.environ[name] = str(value) - -def read_env_var(name): - return os.environ[name] - def wait(seconds): time.sleep(seconds) @@ -192,23 +187,6 @@ def create_node_database_archive(env): node_db_archive_path = node_directory + f"/{node_db_archive}" return node_db_archive_path - -def set_github_env_var(env_var, value): - env_file = os.getenv('GITHUB_ENV') - with open(env_file, "a") as my_env_file: - my_env_file.write(f"{env_var}={value}") - - -def set_github_job_summary(value): - job_summary = os.getenv('GITHUB_STEP_SUMMARY') - with open(job_summary, "a") as job_summary: - job_summary.write(f"{value}") - job_summary.write(f"\n\n") - - -def set_github_warning(warning_msg): - print(f"::warning::{warning_msg}") - def set_buildkite_meta_data(key, value): p = subprocess.Popen(["buildkite-agent", "meta-data", "set", f"{key}", f"{value}"]) @@ -257,19 +235,11 @@ def stop_process(proc_name): proc.kill() -def show_percentage(part, whole): - return round(100 * float(part) / float(whole), 2) - - def get_current_date_time(): now = datetime.now() return now.strftime("%d/%m/%Y %H:%M:%S") -def get_file_creation_date(path_to_file): - return time.ctime(os.path.getmtime(path_to_file)) - - def create_dir(dir_name, root='.'): Path(f"{root}/{dir_name}").mkdir(parents=True, exist_ok=True) return f"{root}/{dir_name}" @@ -457,30 +427,6 @@ def emergency_upload_artifacts(env): stop_process('cardano-node') -def get_and_extract_archive_files(archive_url): - current_directory = os.getcwd() - request = requests.get(archive_url, allow_redirects=True) - download_url = request.url - archive_name = download_url.split("/")[-1].strip() - - print("Get and extract archive files:") - print(f" - current_directory: {current_directory}") - print(f" - download_url: {download_url}") - print(f" - archive name: {archive_name}") - - urllib.request.urlretrieve(download_url, Path(current_directory) / archive_name) - - print(f" ------ listdir (before archive extraction): {os.listdir(current_directory)}") - tf = tarfile.open(Path(current_directory) / archive_name) - tf.extractall(Path(current_directory)) - print(f" ------ listdir (after archive extraction): {os.listdir(current_directory)}") - - -def get_node_archive_url(node_pr): - cardano_node_pr=f"-pr-{node_pr}" - return f"https://hydra.iohk.io/job/Cardano/cardano-node{cardano_node_pr}/linux.musl.cardano-node-linux/latest-finished/download/1/" - - def get_node_config_files(env): base_url = "https://book.play.dev.cardano.org/environments/" filenames = [ @@ -816,11 +762,6 @@ def copy_db_sync_executables(build_method="nix"): ) -def get_db_sync_archive_url(db_pr): - cardano_db_sync_pr=f"-pr-{db_pr}" - return f"https://hydra.iohk.io/job/Cardano/cardano-db-sync{cardano_db_sync_pr}/cardano-db-sync-linux/latest-finished/download/1/" - - def get_db_sync_version(): current_directory = os.getcwd() os.chdir(ROOT_TEST_PATH / "cardano-db-sync") @@ -971,6 +912,7 @@ def create_db_sync_snapshot_stage_1(env): ) ) + def create_db_sync_snapshot_stage_2(stage_2_cmd, env): os.chdir(ROOT_TEST_PATH / 'cardano-db-sync') export_env_var("PGPASSFILE", f"config/pgpass-{env}")