Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor utils #15

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 57 additions & 58 deletions sync_tests/tests/node_sync_test.py

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions sync_tests/tests/node_write_mainnet_tx_count_per_epoch.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import pandas as pd

from sync_tests.utils.utils import print_info, print_warn
from sync_tests.utils.utils import print_message(type="info", message=, print_message(type="warn", message=
from sync_tests.utils.aws_db_utils import get_last_epoch_no_from_table, add_bulk_values_into_db
from sync_tests.utils.blockfrost_utils import get_tx_count_per_epoch_from_blockfrost, \
get_current_epoch_no_from_blockfrost
Expand All @@ -20,7 +20,7 @@ def update_mainnet_tx_count_per_epoch():
if current_epoch_no > last_added_epoch_no + 1:
# adding values into the db only for missing full epochs (ignoring the current/incomplete epoch)
for epoch_no in range(last_added_epoch_no + 1, current_epoch_no):
print_info(f"Getting values for epoch {epoch_no}")
print_message(type="info", message=f"Getting values for epoch {epoch_no}")
tx_count = get_tx_count_per_epoch_from_blockfrost(epoch_no)
print(f" - tx_count: {tx_count}")
new_row_data = {'epoch_no': epoch_no, 'tx_count': tx_count}
Expand All @@ -33,7 +33,7 @@ def update_mainnet_tx_count_per_epoch():
print(f"col_to_insert: {col_to_insert}")
print(f"val_to_insert: {val_to_insert}")
else:
print_warn('There are no new finalized epochs to be added')
print_message(type="warn", message='There are no new finalized epochs to be added')


if __name__ == "__main__":
Expand Down
26 changes: 13 additions & 13 deletions sync_tests/tests/node_write_sync_values_to_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from pathlib import Path
import argparse

from sync_tests.utils.utils import print_info, print_warn
from sync_tests.utils.utils import print_message(type="info", message=, print_message(type="warn", message=
from sync_tests.utils.aws_db_utils import get_identifier_last_run_from_table, get_column_names_from_table, \
add_column_to_table, add_bulk_values_into_db, add_single_value_into_db

Expand All @@ -17,31 +17,31 @@ def main():
env = vars(args)["environment"]
if "-" in env:
env = f"`{env}`"
print_info(f"Environment: {env}")
print_message(type="info", message=f"Environment: {env}")

current_directory = Path.cwd()
print_info(f"current_directory: {current_directory}")
print_message(type="info", message=f"current_directory: {current_directory}")

print_info(f"Read the test results file - {current_directory / RESULTS_FILE_NAME}")
print_message(type="info", message=f"Read the test results file - {current_directory / RESULTS_FILE_NAME}")
with open(RESULTS_FILE_NAME, "r") as json_file:
sync_test_results_dict = json.load(json_file)

current_directory = Path.cwd()
print(f"current_directory: {current_directory}")
print(f" - listdir: {os.listdir(current_directory)}")

print_info("Move to 'sync_tests' directory")
print_message(type="info", message="Move to 'sync_tests' directory")
os.chdir(current_directory / "sync_tests")
current_directory = Path.cwd()
print(f"current_directory: {current_directory}")

print_info(f"Check if there are DB columns for all the eras")
print_message(type="info", message=f"Check if there are DB columns for all the eras")
print(f"Get the list of the existing eras in test")
eras_in_test = sync_test_results_dict["eras_in_test"].replace("[", "").replace("]", "").replace(
'"', '').split(", ")
print(f"eras_in_test: {eras_in_test}")

print_info(f"Get the column names inside the {env} DB tables")
print_message(type="info", message=f"Get the column names inside the {env} DB tables")
table_column_names = get_column_names_from_table(env)
print(f" -- table_column_names: {table_column_names}")

Expand All @@ -65,9 +65,9 @@ def main():

print(f"--- Write test values into the {env} DB table")

print_warn("=======================================")
print_warn(f"======= identifier: {sync_test_results_dict['identifier']} =======")
print_warn("=======================================")
print_message(type="warn", message="=======================================")
print_message(type="warn", message=f"======= identifier: {sync_test_results_dict['identifier']} =======")
print_message(type="warn", message="=======================================")

test_results_dict = {i: sync_test_results_dict[i] for i in sync_test_results_dict if i not in ["sync_duration_per_epoch", "log_values"]}

Expand All @@ -78,13 +78,13 @@ def main():
print(f"val_to_insert: {val_to_insert}")
exit(1)

print_info(f" ==== Write test values into the {env + '_logs'} DB table")
print_message(type="info", message=f" ==== Write test values into the {env + '_logs'} DB table")
log_values_dict = ast.literal_eval(str((sync_test_results_dict["log_values"])))

df1_column_names = ["identifier", "timestamp", "slot_no", "ram_bytes", "cpu_percent", "rss_ram_bytes"]
df1 = pd.DataFrame(columns=df1_column_names)

print_info(f" ==== Creating the dataframe with the test values")
print_message(type="info", message=f" ==== Creating the dataframe with the test values")
for key, val in log_values_dict.items():
new_row_data = {"identifier": sync_test_results_dict["identifier"],
"timestamp": key,
Expand All @@ -103,7 +103,7 @@ def main():
print(f"val_to_insert: {val_to_insert}")
exit(1)

print_info(f" ==== Write test values into the {env + '_epoch_duration'} DB table")
print_message(type="info", message=f" ==== Write test values into the {env + '_epoch_duration'} DB table")
sync_duration_values_dict = ast.literal_eval(
str(sync_test_results_dict["sync_duration_per_epoch"]))
epoch_list = list(sync_duration_values_dict.keys())
Expand Down
68 changes: 34 additions & 34 deletions sync_tests/utils/aws_db_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import pymysql.cursors
import pandas as pd

from .utils import print_info, print_ok, print_error
from .utils import print_message



Expand All @@ -18,7 +18,7 @@ def create_connection():
)
return conn
except Exception as e:
print_error(f"!!! Database connection failed due to: {e}")
print_message(type="error", message=f"!!! Database connection failed due to: {e}")

return conn

Expand All @@ -31,7 +31,7 @@ def create_table(table_sql_query):
conn.commit()
cur.close()
except Exception as e:
print_error(f"!!! ERROR: Failed to create table: {e}")
print_message(type="error", message=f"!!! ERROR: Failed to create table: {e}")
return False
finally:
if conn:
Expand All @@ -43,12 +43,12 @@ def create_table_based_on_another_table_structure(existing_table_name, new_table

conn = create_connection()
sql_query = f"CREATE TABLE {new_table_name} LIKE {existing_table_name};"
print_info(sql_query)
print_message(type="info", message=sql_query)
try:
cur = conn.cursor()
cur.execute(sql_query)
except Exception as e:
print_error(f"!!! ERROR: Failed create new table {new_table_name} based on {existing_table_name} --> {e}")
print_message(type="error", message=f"!!! ERROR: Failed create new table {new_table_name} based on {existing_table_name} --> {e}")
return False
finally:
if conn:
Expand All @@ -58,22 +58,22 @@ def create_table_based_on_another_table_structure(existing_table_name, new_table
def drop_table(table_name):
conn = create_connection()
sql_query = f"DROP TABLE {table_name};"
print_info(sql_query)
print_message(type="info", message=sql_query)
try:
cur = conn.cursor()
cur.execute(sql_query)
conn.commit()
cur.close()
except Exception as e:
print_error(f"!!! ERROR: Failed to drop table {table_name}: {e}")
print_message(type="error", message=f"!!! ERROR: Failed to drop table {table_name}: {e}")
return False
finally:
if conn:
conn.close()


def get_column_names_from_table(table_name):
print_info(f"Getting the column names from table: {table_name}")
print_message(type="info", message=f"Getting the column names from table: {table_name}")

conn = create_connection()
sql_query = f"select * from {table_name}"
Expand All @@ -84,15 +84,15 @@ def get_column_names_from_table(table_name):
col_name_list = [res[0] for res in cur.description]
return col_name_list
except Exception as e:
print_error(f"!!! ERROR: Failed to get column names from table: {table_name}: {e}")
print_message(type="error", message=f"!!! ERROR: Failed to get column names from table: {table_name}: {e}")
return False
finally:
if conn:
conn.close()


def add_column_to_table(table_name, column_name, column_type):
print_info(f"Adding column {column_name} with type {column_type} to {table_name} table")
print_message(type="info", message=f"Adding column {column_name} with type {column_type} to {table_name} table")

conn = create_connection()
sql_query = f"alter table {table_name} add column {column_name} {column_type}"
Expand All @@ -101,15 +101,15 @@ def add_column_to_table(table_name, column_name, column_type):
cur = conn.cursor()
cur.execute(sql_query)
except Exception as e:
print_error(f"!!! ERROR: Failed to add {column_name} column into table {table_name} --> {e}")
print_message(type="error", message=f"!!! ERROR: Failed to add {column_name} column into table {table_name} --> {e}")
return False
finally:
if conn:
conn.close()


def add_single_value_into_db(table_name, col_names_list, col_values_list):
print_info(f"Adding 1 new entry into {table_name} table")
print_message(type="info", message=f"Adding 1 new entry into {table_name} table")
initial_rows_no = get_last_row_no(table_name)
col_names = ','.join(col_names_list)
col_spaces = ','.join(['%s'] * len(col_names_list))
Expand All @@ -122,23 +122,23 @@ def add_single_value_into_db(table_name, col_names_list, col_values_list):
conn.commit()
cur.close()
except Exception as e:
print_error(f" -- !!! ERROR: Failed to insert data into {table_name} table: {e}")
print_message(type="error", message=f" -- !!! ERROR: Failed to insert data into {table_name} table: {e}")
return False
finally:
if conn:
conn.close()
final_rows_no = get_last_row_no(table_name)

if final_rows_no > 0:
print_ok(f"Successfully added {final_rows_no - initial_rows_no} rows into table {table_name}")
print_message(type="ok", message=f"Successfully added {final_rows_no - initial_rows_no} rows into table {table_name}")
return True

print_error(f"Rows were NOT inserted ! final_rows_no: {final_rows_no}")
print_message(type="error", message=f"Rows were NOT inserted ! final_rows_no: {final_rows_no}")
return False


def add_bulk_values_into_db(table_name, col_names_list, col_values_list):
print_info(f"Adding {len(col_values_list)} entries into {table_name} table")
print_message(type="info", message=f"Adding {len(col_values_list)} entries into {table_name} table")
initial_rows_no = get_last_row_no(table_name)
col_names = ','.join(col_names_list)
col_spaces = ','.join(['%s'] * len(col_names_list))
Expand All @@ -151,22 +151,22 @@ def add_bulk_values_into_db(table_name, col_names_list, col_values_list):
conn.commit()
cur.close()
except Exception as e:
print_error(f" -- !!! ERROR: Failed to bulk insert data into {table_name} table: {e}")
print_message(type="error", message=f" -- !!! ERROR: Failed to bulk insert data into {table_name} table: {e}")
return False
finally:
if conn:
conn.close()
final_rows_no = get_last_row_no(table_name)
if final_rows_no > 0:
print_ok(f"Successfully added {final_rows_no - initial_rows_no} rows into table {table_name}")
print_message(type="ok", message=f"Successfully added {final_rows_no - initial_rows_no} rows into table {table_name}")
return True

print_error(f"Rows were NOT inserted ! final_rows_no: {final_rows_no}")
print_message(type="error", message=f"Rows were NOT inserted ! final_rows_no: {final_rows_no}")
return False


def get_last_row_no(table_name):
print_info(f"Getting the no of rows from table: {table_name}")
print_message(type="info", message=f"Getting the no of rows from table: {table_name}")

conn = create_connection()
sql_query = f"SELECT count(*) FROM {table_name};"
Expand All @@ -177,15 +177,15 @@ def get_last_row_no(table_name):
last_row_no = cur.fetchone()[0]
return last_row_no
except Exception as e:
print_error(f"!!! ERROR: Failed to get the no of rows from table {table_name} --> {e}")
print_message(type="error", message=f"!!! ERROR: Failed to get the no of rows from table {table_name} --> {e}")
return False
finally:
if conn:
conn.close()


def get_identifier_last_run_from_table(table_name):
print_info(f"Getting the Identifier value of the last run from table {table_name}")
print_message(type="info", message=f"Getting the Identifier value of the last run from table {table_name}")

if get_last_row_no(table_name) == 0:
return table_name + "_0"
Expand All @@ -200,15 +200,15 @@ def get_identifier_last_run_from_table(table_name):
last_identifier = cur.fetchone()[0]
return last_identifier
except Exception as e:
print_error(f"!!! ERROR: Failed to get the no of rows from table {table_name} --> {e}")
print_message(type="error", message=f"!!! ERROR: Failed to get the no of rows from table {table_name} --> {e}")
return False
finally:
if conn:
conn.close()


def get_last_epoch_no_from_table(table_name):
print_info(f"Getting the last epoch no value from table {table_name}")
print_message(type="info", message=f"Getting the last epoch no value from table {table_name}")

if get_last_row_no(table_name) == 0:
return 0
Expand All @@ -222,15 +222,15 @@ def get_last_epoch_no_from_table(table_name):
last_identifier = cur.fetchone()[0]
return last_identifier
except Exception as e:
print_error(f"!!! ERROR: Failed to get last epoch no from table {table_name} --> {e}")
print_message(type="error", message=f"!!! ERROR: Failed to get last epoch no from table {table_name} --> {e}")
return False
finally:
if conn:
conn.close()


def get_column_values(table_name, column_name):
print_info(f"Getting {column_name} column values from table {table_name}")
print_message(type="info", message=f"Getting {column_name} column values from table {table_name}")

conn = create_connection()
sql_query = f"SELECT {column_name} FROM {table_name};"
Expand All @@ -239,15 +239,15 @@ def get_column_values(table_name, column_name):
cur.execute(sql_query)
return [el[0] for el in cur.fetchall()]
except Exception as e:
print_error(f"!!! ERROR: Failed to get {column_name} column values from table {table_name} --> {e}")
print_message(type="error", message=f"!!! ERROR: Failed to get {column_name} column values from table {table_name} --> {e}")
return False
finally:
if conn:
conn.close()


def delete_all_rows_from_table(table_name):
print_info(f"Deleting all entries from table: {table_name}")
print_message(type="info", message=f"Deleting all entries from table: {table_name}")
conn = create_connection()
sql_query = f"TRUNCATE TABLE {table_name}"
print(f" -- sql_query: {sql_query}")
Expand All @@ -258,7 +258,7 @@ def delete_all_rows_from_table(table_name):
conn.commit()
cur.close()
except Exception as e:
print_error(f"!!! ERROR: Failed to delete all records from table {table_name} --> {e}")
print_message(type="error", message=f"!!! ERROR: Failed to delete all records from table {table_name} --> {e}")
return False
finally:
if conn:
Expand All @@ -268,9 +268,9 @@ def delete_all_rows_from_table(table_name):


def delete_record(table_name, column_name, delete_value):
print_info(f"Deleting rows containing '{delete_value}' value inside the '{column_name}' column")
print_message(type="info", message=f"Deleting rows containing '{delete_value}' value inside the '{column_name}' column")
initial_rows_no = get_last_row_no(table_name)
print_info(f"Deleting {column_name} = {delete_value} from {table_name} table")
print_message(type="info", message=f"Deleting {column_name} = {delete_value} from {table_name} table")

conn = create_connection()
sql_query = f"DELETE from {table_name} where {column_name}=\"{delete_value}\""
Expand All @@ -281,7 +281,7 @@ def delete_record(table_name, column_name, delete_value):
conn.commit()
cur.close()
except Exception as e:
print_error(f"!!! ERROR: Failed to delete record {column_name} = {delete_value} from {table_name} table: --> {e}")
print_message(type="error", message=f"!!! ERROR: Failed to delete record {column_name} = {delete_value} from {table_name} table: --> {e}")
return False
finally:
if conn:
Expand All @@ -291,7 +291,7 @@ def delete_record(table_name, column_name, delete_value):


def update_record(table_name, column_name, old_value, new_value):
print_info(f"Updating {column_name} = {new_value} from {table_name} table")
print_message(type="info", message=f"Updating {column_name} = {new_value} from {table_name} table")

conn = create_connection()
sql_query = f"UPDATE {table_name} SET {column_name}=\"{new_value}\" where {column_name}=\"{old_value}\""
Expand All @@ -302,7 +302,7 @@ def update_record(table_name, column_name, old_value, new_value):
conn.commit()
cur.close()
except Exception as e:
print_error(f"!!! ERROR: Failed to update record {column_name} = {new_value} from {table_name} table: --> {e}")
print_message(type="error", message=f"!!! ERROR: Failed to update record {column_name} = {new_value} from {table_name} table: --> {e}")
return False
finally:
if conn:
Expand Down
Loading