From dcc5927b7ae948fe736d83e4919efd1f16d30f66 Mon Sep 17 00:00:00 2001 From: James Mathews Date: Tue, 30 Jul 2024 14:30:03 -0400 Subject: [PATCH] Intermittent bug fixes (#341) * Fix up ADIFeauresUploader to work with autoincrementing IDs; remove duplicate db connections by workers; add sig alarm timeout for single value computation * Add autocleanup in case feature seems to never complete. * Fix indefinite wait, check flag. * Fix tuple typo. * Fix possible race condition in notification processing * Fix timing of notifications block in workers * Generalizes timeout, applies it also to single sample computation. --- .../db/importance_score_transcriber.py | 2 - .../db/source_file_parser_interface.py | 2 +- .../ondemand/providers/pending_provider.py | 2 +- .../ondemand/request_scheduling.py | 114 +++++++---- spatialprofilingtoolbox/ondemand/timeout.py | 45 +++++ spatialprofilingtoolbox/ondemand/worker.py | 51 +++-- .../workflow/common/export_features.py | 181 ++++++++---------- 7 files changed, 242 insertions(+), 155 deletions(-) create mode 100644 spatialprofilingtoolbox/ondemand/timeout.py diff --git a/spatialprofilingtoolbox/db/importance_score_transcriber.py b/spatialprofilingtoolbox/db/importance_score_transcriber.py index a8a1108b..e9592426 100644 --- a/spatialprofilingtoolbox/db/importance_score_transcriber.py +++ b/spatialprofilingtoolbox/db/importance_score_transcriber.py @@ -133,8 +133,6 @@ def _upload( get_feature_description("gnn importance score"), len(importance_score_set_indexer), ), - impute_zeros=True, - upload_anyway=True, ) as feature_uploader: for histological_structure, row in df.iterrows(): feature_uploader.stage_feature_value( diff --git a/spatialprofilingtoolbox/db/source_file_parser_interface.py b/spatialprofilingtoolbox/db/source_file_parser_interface.py index 5a42c1d4..e572186d 100644 --- a/spatialprofilingtoolbox/db/source_file_parser_interface.py +++ b/spatialprofilingtoolbox/db/source_file_parser_interface.py @@ -53,7 +53,7 @@ def get_field_names(self, tablename): def generate_basic_insert_query(self, tablename): fields_sorted = self.get_field_names(tablename) - if tablename == 'quantitative_feature_value': + if tablename in ('quantitative_feature_value', 'feature_specification'): fields_sorted = fields_sorted[1:] handle_duplicates = 'ON CONFLICT DO NOTHING ' query = ( diff --git a/spatialprofilingtoolbox/ondemand/providers/pending_provider.py b/spatialprofilingtoolbox/ondemand/providers/pending_provider.py index bafb9564..bc6592fa 100644 --- a/spatialprofilingtoolbox/ondemand/providers/pending_provider.py +++ b/spatialprofilingtoolbox/ondemand/providers/pending_provider.py @@ -94,7 +94,7 @@ def _warn_no_value(self) -> None: specification = str(self.job.feature_specification) study = self.job.study sample = self.job.sample - logger.warning(f'Feature {specification} ({sample}, {study}) could not be computed, worker generated None.') + logger.warning(f'Feature {specification} ({sample}, {study}) could not be computed, worker generated None. May insert None.') def _insert_value(self, value: float | int) -> None: study = self.job.study diff --git a/spatialprofilingtoolbox/ondemand/request_scheduling.py b/spatialprofilingtoolbox/ondemand/request_scheduling.py index 62b4245f..67adc022 100644 --- a/spatialprofilingtoolbox/ondemand/request_scheduling.py +++ b/spatialprofilingtoolbox/ondemand/request_scheduling.py @@ -2,10 +2,10 @@ from typing import cast from typing import Callable +import signal from psycopg import Connection as PsycopgConnection -from spatialprofilingtoolbox.ondemand.job_reference import ComputationJobReference from spatialprofilingtoolbox.db.database_connection import DBConnection from spatialprofilingtoolbox.db.database_connection import DBCursor from spatialprofilingtoolbox.ondemand.providers.counts_provider import CountsProvider @@ -18,7 +18,8 @@ CompositePhenotype, UnivariateMetricsComputationResult, ) -from spatialprofilingtoolbox.ondemand.relevant_specimens import relevant_specimens_query +from spatialprofilingtoolbox.ondemand.timeout import create_timeout_handler +from spatialprofilingtoolbox.ondemand.timeout import SPTTimeoutError from spatialprofilingtoolbox.standalone_utilities.log_formats import colorized_logger Metrics1D = UnivariateMetricsComputationResult @@ -40,6 +41,51 @@ def _nonempty(string: str) -> bool: return string != '' +class FeatureComputationTimeoutHandler: + feature: str + study: str + + def __init__(self, feature: str, study: str): + self.feature = feature + self.study = study + + def handle(self) -> None: + message = f'Timed out waiting for the feature {self.feature} to complete. Aborting.' + logger.error(message) + if self._queue_size() == 0 and self._completed_size() < self._expected_size(): + self._delete_feature() + + def _queue_size(self) -> int: + with DBCursor(study=self.study) as cursor: + query = 'SELECT COUNT(*) FROM quantitative_feature_value_queue WHERE feature=%s ;' + cursor.execute(query, (self.feature,)) + count = tuple(cursor.fetchall())[0][0] + return count + + def _completed_size(self) -> int: + with DBCursor(study=self.study) as cursor: + query = 'SELECT COUNT(*) FROM quantitative_feature_value WHERE feature=%s ;' + cursor.execute(query, (self.feature,)) + count = tuple(cursor.fetchall())[0][0] + return count + + def _expected_size(self) -> int: + with DBCursor(study=self.study) as cursor: + query = 'SELECT COUNT(*) FROM specimen_data_measurement_process ;' + cursor.execute(query) + count = tuple(cursor.fetchall())[0][0] + return count + + def _delete_feature(self) -> None: + logger.error('Also deleting the feature, since the queue was empty; we assume the remaining jobs failed.') + with DBCursor(study=self.study) as cursor: + param = (self.feature,) + cursor.execute('DELETE FROM quantitative_feature_value WHERE feature=%s ;', param) + cursor.execute('DELETE FROM feature_specifier WHERE feature_specification=%s ;', param) + cursor.execute('DELETE FROM feature_specification WHERE identifier=%s ;', param) + + + class OnDemandRequester: """Entry point for requesting computation by the on-demand service.""" @@ -89,7 +135,7 @@ def _counts( ) -> tuple[str, Metrics1D, Metrics1D]: get = CountsProvider.get_metrics_or_schedule - def get_results() -> tuple[Metrics1D, str]: + def get_results1() -> tuple[Metrics1D, str]: counts, feature1 = get( study_name, phenotype=phenotype, @@ -97,13 +143,6 @@ def get_results() -> tuple[Metrics1D, str]: ) return (counts, feature1) - with DBConnection() as connection: - connection._set_autocommit(True) - connection.execute('LISTEN new_items_in_queue ;') - connection.execute('LISTEN one_job_complete ;') - cls._wait_for_wrappedup(connection, get_results) - counts, feature1 = get_results() - def get_results2() -> tuple[Metrics1D, str]: counts_all, feature2 = get( study_name, @@ -114,44 +153,49 @@ def get_results2() -> tuple[Metrics1D, str]: with DBConnection() as connection: connection._set_autocommit(True) - connection.execute('LISTEN new_items_in_queue ;') - connection.execute('LISTEN one_job_complete ;') - cls._wait_for_wrappedup(connection, get_results2) - counts_all, _ = get_results2() - - cls._request_check_for_failed_jobs() - return (feature1, counts, counts_all) + cls._wait_for_wrappedup(connection, get_results1, study_name) + counts, feature1 = get_results1() - @classmethod - def _request_check_for_failed_jobs(cls) -> None: with DBConnection() as connection: connection._set_autocommit(True) - connection.execute('NOTIFY check_for_failed_jobs ;') + cls._wait_for_wrappedup(connection, get_results2, study_name) + counts_all, _ = get_results2() + + return (feature1, counts, counts_all) @classmethod def _wait_for_wrappedup( cls, connection: PsycopgConnection, get_results: Callable[[], tuple[Metrics1D, str]], + study_name: str, ) -> None: - counts, feature = get_results() - if not get_results()[0].is_pending: - logger.debug(f'Feature {feature} already complete.') - return - logger.debug(f'Waiting for signal that feature {feature} may be ready.') + connection.execute('LISTEN new_items_in_queue ;') + connection.execute('LISTEN one_job_complete ;') notifications = connection.notifies() - for notification in notifications: - if not get_results()[0].is_pending: - logger.debug(f'Closing notification processing, {feature} ready.') - notifications.close() - break - channel = notification.channel - if channel == 'one_job_complete': - logger.debug(f'A job is complete, so {feature} may be ready.') - if not get_results()[0].is_pending: - logger.debug(f'And {feature} was ready. Closing.') + + counts, feature = get_results() + handler = FeatureComputationTimeoutHandler(feature, study_name) + generic_handler = create_timeout_handler(handler.handle) + try: + if not counts.is_pending: + logger.debug(f'Feature {feature} already complete.') + return + logger.debug(f'Waiting for signal that feature {feature} may be ready, because the result is not ready yet.') + + for notification in notifications: + channel = notification.channel + if channel == 'one_job_complete': + logger.debug(f'A job is complete, so {feature} may be ready. (PID: {notification.pid})') + _result = get_results() + if not _result[0].is_pending: + logger.debug(f'Closing notification processing, {feature} ready.') notifications.close() break + except SPTTimeoutError: + pass + finally: + generic_handler.disalarm() @classmethod def get_proximity_metrics( diff --git a/spatialprofilingtoolbox/ondemand/timeout.py b/spatialprofilingtoolbox/ondemand/timeout.py new file mode 100644 index 00000000..8ee8b2ed --- /dev/null +++ b/spatialprofilingtoolbox/ondemand/timeout.py @@ -0,0 +1,45 @@ +"""General-purpose one-time timeout functionality based on Unix signal alarm.""" +from typing import Callable +import signal + +from spatialprofilingtoolbox.db.database_connection import DBCursor +from spatialprofilingtoolbox.standalone_utilities.log_formats import colorized_logger + +logger = colorized_logger(__name__) + + +TIMEOUT_SECONDS_DEFAULT = 300 + + +class SPTTimeoutError(RuntimeError): + def __init__(self, message: str): + super().__init__(message) + self.message = message + + +class TimeoutHandler: + active: bool + callback: Callable + timeout: int + + def __init__(self, callback: Callable, timeout: int): + self.active = True + self.callback = callback + self.timeout = timeout + + def handle(self, signum, frame) -> None: + if self.active: + message = f'Waited {self.timeout} seconds, timed out.' + logger.error(message) + self.callback() + raise TimeoutError(message) + + def disalarm(self) -> None: + self.active = False + + +def create_timeout_handler(callback: Callable, timeout_seconds: int = TIMEOUT_SECONDS_DEFAULT) -> TimeoutHandler: + handler = TimeoutHandler(callback, timeout_seconds) + signal.signal(signal.SIGALRM, handler.handle) + signal.alarm(timeout_seconds) + return handler diff --git a/spatialprofilingtoolbox/ondemand/worker.py b/spatialprofilingtoolbox/ondemand/worker.py index d11f962c..a34833ea 100644 --- a/spatialprofilingtoolbox/ondemand/worker.py +++ b/spatialprofilingtoolbox/ondemand/worker.py @@ -14,6 +14,8 @@ from spatialprofilingtoolbox.db.describe_features import get_handle from spatialprofilingtoolbox.ondemand.job_reference import ComputationJobReference from spatialprofilingtoolbox.ondemand.scheduler import MetricComputationScheduler +from spatialprofilingtoolbox.ondemand.timeout import create_timeout_handler +from spatialprofilingtoolbox.ondemand.timeout import SPTTimeoutError from spatialprofilingtoolbox.standalone_utilities.log_formats import colorized_logger Job = ComputationJobReference @@ -33,17 +35,17 @@ def start(self) -> None: def _listen_for_queue_activity(self) -> None: with DBConnection() as connection: - connection._set_autocommit(True) + self.connection = connection + self.connection._set_autocommit(True) + self.connection.execute('LISTEN new_items_in_queue ;') + logger.info('Listening on new_items_in_queue channel.') + self.notifications = self.connection.notifies() while True: - self._wait_for_queue_activity_on(connection) + self._wait_for_queue_activity_on_connection() self._work_until_complete() - def _wait_for_queue_activity_on(self, connection: PsycopgConnection) -> None: - connection.execute('LISTEN new_items_in_queue ;') - logger.info('Listening on new_items_in_queue channel.') - notifications = connection.notifies() - for notification in notifications: - notifications.close() + def _wait_for_queue_activity_on_connection(self) -> None: + for _ in self.notifications: logger.info('Received notice of new items in the job queue.') break @@ -57,26 +59,35 @@ def _work_until_complete(self) -> None: logger.info(f'Finished jobs {" ".join(completed_pids)}.') def _one_job(self) -> tuple[bool, int]: - with DBConnection() as connection: - connection._set_autocommit(True) - self.connection = connection - pid = self.connection.info.backend_pid - job = self.queue.pop_uncomputed() - if job is None: - return (False, pid) - logger.info(f'{pid} doing job {job.feature_specification} {job.sample}.') - self._compute(job) - self._notify_complete(job) - return (True, pid) + pid = self.connection.info.backend_pid + job = self.queue.pop_uncomputed() + if job is None: + return (False, pid) + logger.info(f'{pid} doing job {job.feature_specification} {job.sample}.') + self._compute(job) + self._notify_complete(job) + return (True, pid) + + def _no_value_wrapup(self, job) -> None: + provider = self._get_provider(job) + provider._warn_no_value() + provider._insert_null() def _compute(self, job: Job) -> None: provider = self._get_provider(job) + generic_handler = create_timeout_handler( + lambda *arg: self._no_value_wrapup(job), + timeout_seconds=150, + ) try: provider.compute() + except SPTTimeoutError: + pass except Exception as error: logger.error(error) print_exception(type(error), error, error.__traceback__) - self._get_provider(job)._warn_no_value() + finally: + generic_handler.disalarm() def _notify_complete(self, job: Job) -> None: self.connection.execute('NOTIFY one_job_complete ;') diff --git a/spatialprofilingtoolbox/workflow/common/export_features.py b/spatialprofilingtoolbox/workflow/common/export_features.py index 24fb801d..4cab51f9 100644 --- a/spatialprofilingtoolbox/workflow/common/export_features.py +++ b/spatialprofilingtoolbox/workflow/common/export_features.py @@ -25,26 +25,18 @@ class ADIFeaturesUploader(SourceToADIParser): Upload sparse representation of feature values to tables quantitative_feature_value, feature_specification, feature_specifier. """ - - feature_value_identifier: int connection_provider: ConnectionProvider feature_values: list[tuple[tuple[str, ...], str , float | None]] - upload_anyway: bool - quiet: bool def __init__(self, connection: PsycopgConnection, - data_analysis_study, + data_analysis_study: str, derivation_and_number_specifiers, - impute_zeros=False, - upload_anyway: bool = False, - quiet: bool = False, - **kwargs + impute_zeros: bool = False, + **kwargs, ): derivation_method, specifier_number = derivation_and_number_specifiers - self.impute_zeros=impute_zeros - self.upload_anyway = upload_anyway - self.quiet = quiet + self.impute_zeros = impute_zeros with as_file(files('adiscstudies').joinpath('fields.tsv')) as path: fields = pd.read_csv(path, sep='\t', na_filter=False) SourceToADIParser.__init__(self, fields) @@ -72,7 +64,7 @@ def __enter__(self): def __exit__(self, exception_type, exception_value, traceback): if self.connection_provider.is_connected(): - self.upload(upload_anyway=self.upload_anyway) + self.upload() def stage_feature_value(self, specifiers: tuple[str, ...], subject: str, value: float | None): self.validate_specifiers(specifiers) @@ -89,45 +81,27 @@ def validate_specifiers(self, specifiers): def get_connection(self): return self.connection_provider.get_connection() - def upload(self, upload_anyway: bool = False) -> None: + def upload(self) -> None: if self.check_nothing_to_upload(): return - if self.check_exact_feature_values_already_present() and not upload_anyway: - return self.test_subject_existence() self.test_study_existence() - if self.impute_zeros: self.add_imputed_zero_values() - cursor = self.get_connection().cursor() - get_next = SourceToADIParser.get_next_integer_identifier - next_identifier = get_next('feature_specification', cursor) specifiers_list = sorted(list(set(row[0] for row in self.feature_values))) - specifiers_by_id = { - next_identifier + i: specifiers - for i, specifiers in enumerate(specifiers_list) - } - - # self.get_feature_value_next_identifier(cursor) insert_notice = 'Inserting feature "%s" for study "%s".' logger.info(insert_notice, self.derivation_method, self.data_analysis_study) - for feature_identifier, specifiers in specifiers_by_id.items(): - cursor.execute( - self.insert_queries['feature_specification'], - (feature_identifier, self.derivation_method, self.data_analysis_study), + for specifiers in specifiers_list: + get_or_create = ADIFeaturesUploader._get_or_create_generic_feature_specification + feature_identifier, is_new = get_or_create( + cursor, self.data_analysis_study, specifiers, self.derivation_method + ) + feature_values = map( + lambda row: (row[1], row[2]), + filter(lambda row: row[0] == specifiers, self.feature_values), ) - self.insert_specifiers(cursor, specifiers, feature_identifier) - if not self.quiet: - logger.debug('Inserted feature specification, "%s".', specifiers) - feature_values = [ - [row[1], row[2]] for row in self.feature_values - if row[0] == specifiers - ] self.insert_feature_values(cursor, feature_identifier, feature_values) - if not self.quiet: - logger.debug('Inserted %s feature values.', len(feature_values)) - self.get_connection().commit() cursor.close() @@ -137,52 +111,6 @@ def check_nothing_to_upload(self): return True return False - def check_exact_feature_values_already_present(self): - count = self.count_known_feature_values_this_study() - if count == len(self.feature_values): - logger.info( - 'Exactly %s feature values already associated with study "%s" of ' - 'description "%s". This is the correct number; skipping upload ' - 'without error, unless "upload_anyway" is set.', - count, - self.data_analysis_study, - self.derivation_method, - ) - return True - if count > 0: - message = f'Already have {count} features associated with study ' \ - f'"{self.data_analysis_study}" of description "{self.derivation_method}". ' \ - 'May be an error.' - logger.warning(message) - if count == 0: - logger.info( - 'No feature values yet associated with study "%s" of description "%s". ' - 'Proceeding with upload.', - self.data_analysis_study, - self.derivation_method, - ) - return False - return None - - def count_known_feature_values_this_study(self): - cursor = self.get_connection().cursor() - count_query = ''' - SELECT COUNT(*) - FROM quantitative_feature_value qfv - JOIN feature_specification fs - ON fs.identifier = qfv.feature - WHERE fs.study = %s AND fs.derivation_method = %s - ; - ''' - cursor.execute( - count_query, - (self.data_analysis_study, self.derivation_method), - ) - rows = cursor.fetchall() - count = rows[0][0] - cursor.close() - return count - def test_subject_existence(self): subject_ids = self.get_subject_identifiers() unknown_subjects = set(row[1] for row in self.feature_values).difference(subject_ids) @@ -231,14 +159,6 @@ def add_imputed_zero_values(self): assignments = [(case[0], case[1], 0) for case in no_value_cases] self.feature_values = self.feature_values + assignments - def insert_specifiers(self, cursor: PsycopgCursor, specifiers, feature_identifier): - for i, specifier in enumerate(specifiers): - ordinality = i + 1 - cursor.execute( - self.insert_queries['feature_specifier'], - (feature_identifier, specifier, ordinality), - ) - def insert_feature_values(self, cursor: PsycopgCursor, feature_identifier, feature_values): for subject, value in feature_values: cursor.execute( @@ -246,13 +166,82 @@ def insert_feature_values(self, cursor: PsycopgCursor, feature_identifier, featu (feature_identifier, subject, value), ) + @classmethod + def _get_or_create_generic_feature_specification( + cls, + cursor: PsycopgCursor, + data_analysis_study: str, + specifiers: tuple[str, ...], + derivation_method: str, + ) -> tuple[str, bool]: + specification = cls._get_feature_specification(cursor, specifiers, derivation_method) + if specification is not None: + return (specification, False) + logger.debug(f'Creating feature with specifiers: {specifiers}') + specification = cls._create_feature_specification( + cursor, data_analysis_study, specifiers, derivation_method, + ) + return (specification, True) + + @classmethod + def _get_feature_specification(cls, + cursor: PsycopgCursor, + specifiers: tuple[str, ...], + derivation_method: str, + ) -> str | None: + args = ( + *specifiers, + derivation_method, + ) + specifiers_portion = ' AND '.join( + f"( fs.specifier=%s AND fs.ordinality='{i+1}')" + for i in range(len(specifiers)) + ) + query = f''' + SELECT + fsn.identifier, + fs.specifier + FROM feature_specification fsn + JOIN feature_specifier fs ON fs.feature_specification=fsn.identifier + WHERE {specifiers_portion} AND fsn.derivation_method=%s + ; + ''' + cursor.execute(query, args) + rows = tuple(cursor.fetchall()) + feature_specifications: dict[str, list[str]] = {row[0]: [] for row in rows} + matches_list: list[str] = [] + for row in rows: + feature_specifications[row[0]].append(row[1]) + for key, _specifiers in feature_specifications.items(): + if len(_specifiers) == len(specifiers): + matches_list.append(key) + matches = tuple(matches_list) + if len(matches) == 0: + return None + if len(matches) > 1: + text = 'Multiple features match the selected specification' + message = f'{text}: {matches} {specifiers}' + logger.warning(message) + return matches[0] + + @classmethod + def _create_feature_specification(cls, + cursor: PsycopgCursor, + data_analysis_study: str, + specifiers: tuple[str, ...], + derivation_method: str, + ) -> str: + Uploader = ADIFeatureSpecificationUploader + add = Uploader.add_new_feature + feature_specification = add(specifiers, derivation_method, data_analysis_study, cursor) + return feature_specification + class ADIFeatureSpecificationUploader: """Just upload a new feature specification.""" @staticmethod - def add_new_feature(specifiers, derivation_method, measurement_study, cursor: PsycopgCursor): + def add_new_feature(specifiers, derivation_method, data_analysis_study, cursor: PsycopgCursor): FSU = ADIFeatureSpecificationUploader - data_analysis_study = FSU.get_data_analysis_study(measurement_study, cursor) identifier = FSU.insert_specification(derivation_method, data_analysis_study, cursor) FSU.insert_specifiers(identifier, specifiers, cursor) return identifier