From 899c1553222dedbbae7fc8eedd8ffade539cd9f0 Mon Sep 17 00:00:00 2001 From: James Mathews Date: Mon, 29 Jul 2024 16:27:56 -0400 Subject: [PATCH 1/7] Fix up ADIFeauresUploader to work with autoincrementing IDs; remove duplicate db connections by workers; add sig alarm timeout for single value computation --- .../db/importance_score_transcriber.py | 2 - .../db/source_file_parser_interface.py | 2 +- .../ondemand/providers/pending_provider.py | 2 +- .../ondemand/request_scheduling.py | 45 ++++- spatialprofilingtoolbox/ondemand/worker.py | 34 ++-- .../workflow/common/export_features.py | 181 ++++++++---------- 6 files changed, 143 insertions(+), 123 deletions(-) diff --git a/spatialprofilingtoolbox/db/importance_score_transcriber.py b/spatialprofilingtoolbox/db/importance_score_transcriber.py index a8a1108b..e9592426 100644 --- a/spatialprofilingtoolbox/db/importance_score_transcriber.py +++ b/spatialprofilingtoolbox/db/importance_score_transcriber.py @@ -133,8 +133,6 @@ def _upload( get_feature_description("gnn importance score"), len(importance_score_set_indexer), ), - impute_zeros=True, - upload_anyway=True, ) as feature_uploader: for histological_structure, row in df.iterrows(): feature_uploader.stage_feature_value( diff --git a/spatialprofilingtoolbox/db/source_file_parser_interface.py b/spatialprofilingtoolbox/db/source_file_parser_interface.py index 5a42c1d4..e572186d 100644 --- a/spatialprofilingtoolbox/db/source_file_parser_interface.py +++ b/spatialprofilingtoolbox/db/source_file_parser_interface.py @@ -53,7 +53,7 @@ def get_field_names(self, tablename): def generate_basic_insert_query(self, tablename): fields_sorted = self.get_field_names(tablename) - if tablename == 'quantitative_feature_value': + if tablename in ('quantitative_feature_value', 'feature_specification'): fields_sorted = fields_sorted[1:] handle_duplicates = 'ON CONFLICT DO NOTHING ' query = ( diff --git a/spatialprofilingtoolbox/ondemand/providers/pending_provider.py b/spatialprofilingtoolbox/ondemand/providers/pending_provider.py index bafb9564..bc6592fa 100644 --- a/spatialprofilingtoolbox/ondemand/providers/pending_provider.py +++ b/spatialprofilingtoolbox/ondemand/providers/pending_provider.py @@ -94,7 +94,7 @@ def _warn_no_value(self) -> None: specification = str(self.job.feature_specification) study = self.job.study sample = self.job.sample - logger.warning(f'Feature {specification} ({sample}, {study}) could not be computed, worker generated None.') + logger.warning(f'Feature {specification} ({sample}, {study}) could not be computed, worker generated None. May insert None.') def _insert_value(self, value: float | int) -> None: study = self.job.study diff --git a/spatialprofilingtoolbox/ondemand/request_scheduling.py b/spatialprofilingtoolbox/ondemand/request_scheduling.py index 62b4245f..1cfd9f20 100644 --- a/spatialprofilingtoolbox/ondemand/request_scheduling.py +++ b/spatialprofilingtoolbox/ondemand/request_scheduling.py @@ -2,12 +2,11 @@ from typing import cast from typing import Callable +import signal from psycopg import Connection as PsycopgConnection -from spatialprofilingtoolbox.ondemand.job_reference import ComputationJobReference from spatialprofilingtoolbox.db.database_connection import DBConnection -from spatialprofilingtoolbox.db.database_connection import DBCursor from spatialprofilingtoolbox.ondemand.providers.counts_provider import CountsProvider from spatialprofilingtoolbox.ondemand.providers.proximity_provider import ProximityProvider from spatialprofilingtoolbox.ondemand.providers.squidpy_provider import SquidpyProvider @@ -18,7 +17,6 @@ CompositePhenotype, UnivariateMetricsComputationResult, ) -from spatialprofilingtoolbox.ondemand.relevant_specimens import relevant_specimens_query from spatialprofilingtoolbox.standalone_utilities.log_formats import colorized_logger Metrics1D = UnivariateMetricsComputationResult @@ -40,6 +38,23 @@ def _nonempty(string: str) -> bool: return string != '' +class TimeoutHandler: + active: bool + wait_for_results_timeout_seconds: int = 300 + + def __init__(self): + self.active = True + + def handle(self, signum, frame) -> None: + if self.active: + message = f'Waited {self.wait_for_results_timeout_seconds} seconds for the feature to complete. Aborting.' + logger.error(message) + raise RuntimeError(message) + + def disalarm(self) -> None: + self.active = False + + class OnDemandRequester: """Entry point for requesting computation by the on-demand service.""" @@ -97,6 +112,10 @@ def get_results() -> tuple[Metrics1D, str]: ) return (counts, feature1) + handler = TimeoutHandler() + signal.signal(signal.SIGALRM, handler.handle) + signal.alarm(handler.wait_for_results_timeout_seconds) + with DBConnection() as connection: connection._set_autocommit(True) connection.execute('LISTEN new_items_in_queue ;') @@ -104,6 +123,8 @@ def get_results() -> tuple[Metrics1D, str]: cls._wait_for_wrappedup(connection, get_results) counts, feature1 = get_results() + handler.disalarm() + def get_results2() -> tuple[Metrics1D, str]: counts_all, feature2 = get( study_name, @@ -112,6 +133,10 @@ def get_results2() -> tuple[Metrics1D, str]: ) return (counts_all, feature2) + handler2 = TimeoutHandler() + signal.signal(signal.SIGALRM, handler2.handle) + signal.alarm(handler2.wait_for_results_timeout_seconds) + with DBConnection() as connection: connection._set_autocommit(True) connection.execute('LISTEN new_items_in_queue ;') @@ -119,6 +144,8 @@ def get_results2() -> tuple[Metrics1D, str]: cls._wait_for_wrappedup(connection, get_results2) counts_all, _ = get_results2() + handler2.disalarm() + cls._request_check_for_failed_jobs() return (feature1, counts, counts_all) @@ -135,10 +162,12 @@ def _wait_for_wrappedup( get_results: Callable[[], tuple[Metrics1D, str]], ) -> None: counts, feature = get_results() - if not get_results()[0].is_pending: + if not counts.is_pending: logger.debug(f'Feature {feature} already complete.') return logger.debug(f'Waiting for signal that feature {feature} may be ready.') + counts.values = '(truncated)' + logger.debug(f'Because result ready yet: {counts}') notifications = connection.notifies() for notification in notifications: if not get_results()[0].is_pending: @@ -147,11 +176,15 @@ def _wait_for_wrappedup( break channel = notification.channel if channel == 'one_job_complete': - logger.debug(f'A job is complete, so {feature} may be ready.') - if not get_results()[0].is_pending: + logger.debug(f'A job is complete, so {feature} may be ready. (PID: {notification.pid})') + _result = get_results() + if not _result[0].is_pending: logger.debug(f'And {feature} was ready. Closing.') notifications.close() break + _result[0].values = '(truncated)' + logger.debug(f'Not ready yet: {_result}') + logger.debug(f'Notification: {notification}') @classmethod def get_proximity_metrics( diff --git a/spatialprofilingtoolbox/ondemand/worker.py b/spatialprofilingtoolbox/ondemand/worker.py index d11f962c..5418bf97 100644 --- a/spatialprofilingtoolbox/ondemand/worker.py +++ b/spatialprofilingtoolbox/ondemand/worker.py @@ -33,15 +33,16 @@ def start(self) -> None: def _listen_for_queue_activity(self) -> None: with DBConnection() as connection: - connection._set_autocommit(True) + self.connection = connection + self.connection._set_autocommit(True) while True: - self._wait_for_queue_activity_on(connection) + self._wait_for_queue_activity_on_connection() self._work_until_complete() - def _wait_for_queue_activity_on(self, connection: PsycopgConnection) -> None: - connection.execute('LISTEN new_items_in_queue ;') + def _wait_for_queue_activity_on_connection(self) -> None: + self.connection.execute('LISTEN new_items_in_queue ;') logger.info('Listening on new_items_in_queue channel.') - notifications = connection.notifies() + notifications = self.connection.notifies() for notification in notifications: notifications.close() logger.info('Received notice of new items in the job queue.') @@ -57,17 +58,14 @@ def _work_until_complete(self) -> None: logger.info(f'Finished jobs {" ".join(completed_pids)}.') def _one_job(self) -> tuple[bool, int]: - with DBConnection() as connection: - connection._set_autocommit(True) - self.connection = connection - pid = self.connection.info.backend_pid - job = self.queue.pop_uncomputed() - if job is None: - return (False, pid) - logger.info(f'{pid} doing job {job.feature_specification} {job.sample}.') - self._compute(job) - self._notify_complete(job) - return (True, pid) + pid = self.connection.info.backend_pid + job = self.queue.pop_uncomputed() + if job is None: + return (False, pid) + logger.info(f'{pid} doing job {job.feature_specification} {job.sample}.') + self._compute(job) + self._notify_complete(job) + return (True, pid) def _compute(self, job: Job) -> None: provider = self._get_provider(job) @@ -76,7 +74,9 @@ def _compute(self, job: Job) -> None: except Exception as error: logger.error(error) print_exception(type(error), error, error.__traceback__) - self._get_provider(job)._warn_no_value() + provider = self._get_provider(job) + provider._warn_no_value() + provider._insert_null() def _notify_complete(self, job: Job) -> None: self.connection.execute('NOTIFY one_job_complete ;') diff --git a/spatialprofilingtoolbox/workflow/common/export_features.py b/spatialprofilingtoolbox/workflow/common/export_features.py index 24fb801d..4cab51f9 100644 --- a/spatialprofilingtoolbox/workflow/common/export_features.py +++ b/spatialprofilingtoolbox/workflow/common/export_features.py @@ -25,26 +25,18 @@ class ADIFeaturesUploader(SourceToADIParser): Upload sparse representation of feature values to tables quantitative_feature_value, feature_specification, feature_specifier. """ - - feature_value_identifier: int connection_provider: ConnectionProvider feature_values: list[tuple[tuple[str, ...], str , float | None]] - upload_anyway: bool - quiet: bool def __init__(self, connection: PsycopgConnection, - data_analysis_study, + data_analysis_study: str, derivation_and_number_specifiers, - impute_zeros=False, - upload_anyway: bool = False, - quiet: bool = False, - **kwargs + impute_zeros: bool = False, + **kwargs, ): derivation_method, specifier_number = derivation_and_number_specifiers - self.impute_zeros=impute_zeros - self.upload_anyway = upload_anyway - self.quiet = quiet + self.impute_zeros = impute_zeros with as_file(files('adiscstudies').joinpath('fields.tsv')) as path: fields = pd.read_csv(path, sep='\t', na_filter=False) SourceToADIParser.__init__(self, fields) @@ -72,7 +64,7 @@ def __enter__(self): def __exit__(self, exception_type, exception_value, traceback): if self.connection_provider.is_connected(): - self.upload(upload_anyway=self.upload_anyway) + self.upload() def stage_feature_value(self, specifiers: tuple[str, ...], subject: str, value: float | None): self.validate_specifiers(specifiers) @@ -89,45 +81,27 @@ def validate_specifiers(self, specifiers): def get_connection(self): return self.connection_provider.get_connection() - def upload(self, upload_anyway: bool = False) -> None: + def upload(self) -> None: if self.check_nothing_to_upload(): return - if self.check_exact_feature_values_already_present() and not upload_anyway: - return self.test_subject_existence() self.test_study_existence() - if self.impute_zeros: self.add_imputed_zero_values() - cursor = self.get_connection().cursor() - get_next = SourceToADIParser.get_next_integer_identifier - next_identifier = get_next('feature_specification', cursor) specifiers_list = sorted(list(set(row[0] for row in self.feature_values))) - specifiers_by_id = { - next_identifier + i: specifiers - for i, specifiers in enumerate(specifiers_list) - } - - # self.get_feature_value_next_identifier(cursor) insert_notice = 'Inserting feature "%s" for study "%s".' logger.info(insert_notice, self.derivation_method, self.data_analysis_study) - for feature_identifier, specifiers in specifiers_by_id.items(): - cursor.execute( - self.insert_queries['feature_specification'], - (feature_identifier, self.derivation_method, self.data_analysis_study), + for specifiers in specifiers_list: + get_or_create = ADIFeaturesUploader._get_or_create_generic_feature_specification + feature_identifier, is_new = get_or_create( + cursor, self.data_analysis_study, specifiers, self.derivation_method + ) + feature_values = map( + lambda row: (row[1], row[2]), + filter(lambda row: row[0] == specifiers, self.feature_values), ) - self.insert_specifiers(cursor, specifiers, feature_identifier) - if not self.quiet: - logger.debug('Inserted feature specification, "%s".', specifiers) - feature_values = [ - [row[1], row[2]] for row in self.feature_values - if row[0] == specifiers - ] self.insert_feature_values(cursor, feature_identifier, feature_values) - if not self.quiet: - logger.debug('Inserted %s feature values.', len(feature_values)) - self.get_connection().commit() cursor.close() @@ -137,52 +111,6 @@ def check_nothing_to_upload(self): return True return False - def check_exact_feature_values_already_present(self): - count = self.count_known_feature_values_this_study() - if count == len(self.feature_values): - logger.info( - 'Exactly %s feature values already associated with study "%s" of ' - 'description "%s". This is the correct number; skipping upload ' - 'without error, unless "upload_anyway" is set.', - count, - self.data_analysis_study, - self.derivation_method, - ) - return True - if count > 0: - message = f'Already have {count} features associated with study ' \ - f'"{self.data_analysis_study}" of description "{self.derivation_method}". ' \ - 'May be an error.' - logger.warning(message) - if count == 0: - logger.info( - 'No feature values yet associated with study "%s" of description "%s". ' - 'Proceeding with upload.', - self.data_analysis_study, - self.derivation_method, - ) - return False - return None - - def count_known_feature_values_this_study(self): - cursor = self.get_connection().cursor() - count_query = ''' - SELECT COUNT(*) - FROM quantitative_feature_value qfv - JOIN feature_specification fs - ON fs.identifier = qfv.feature - WHERE fs.study = %s AND fs.derivation_method = %s - ; - ''' - cursor.execute( - count_query, - (self.data_analysis_study, self.derivation_method), - ) - rows = cursor.fetchall() - count = rows[0][0] - cursor.close() - return count - def test_subject_existence(self): subject_ids = self.get_subject_identifiers() unknown_subjects = set(row[1] for row in self.feature_values).difference(subject_ids) @@ -231,14 +159,6 @@ def add_imputed_zero_values(self): assignments = [(case[0], case[1], 0) for case in no_value_cases] self.feature_values = self.feature_values + assignments - def insert_specifiers(self, cursor: PsycopgCursor, specifiers, feature_identifier): - for i, specifier in enumerate(specifiers): - ordinality = i + 1 - cursor.execute( - self.insert_queries['feature_specifier'], - (feature_identifier, specifier, ordinality), - ) - def insert_feature_values(self, cursor: PsycopgCursor, feature_identifier, feature_values): for subject, value in feature_values: cursor.execute( @@ -246,13 +166,82 @@ def insert_feature_values(self, cursor: PsycopgCursor, feature_identifier, featu (feature_identifier, subject, value), ) + @classmethod + def _get_or_create_generic_feature_specification( + cls, + cursor: PsycopgCursor, + data_analysis_study: str, + specifiers: tuple[str, ...], + derivation_method: str, + ) -> tuple[str, bool]: + specification = cls._get_feature_specification(cursor, specifiers, derivation_method) + if specification is not None: + return (specification, False) + logger.debug(f'Creating feature with specifiers: {specifiers}') + specification = cls._create_feature_specification( + cursor, data_analysis_study, specifiers, derivation_method, + ) + return (specification, True) + + @classmethod + def _get_feature_specification(cls, + cursor: PsycopgCursor, + specifiers: tuple[str, ...], + derivation_method: str, + ) -> str | None: + args = ( + *specifiers, + derivation_method, + ) + specifiers_portion = ' AND '.join( + f"( fs.specifier=%s AND fs.ordinality='{i+1}')" + for i in range(len(specifiers)) + ) + query = f''' + SELECT + fsn.identifier, + fs.specifier + FROM feature_specification fsn + JOIN feature_specifier fs ON fs.feature_specification=fsn.identifier + WHERE {specifiers_portion} AND fsn.derivation_method=%s + ; + ''' + cursor.execute(query, args) + rows = tuple(cursor.fetchall()) + feature_specifications: dict[str, list[str]] = {row[0]: [] for row in rows} + matches_list: list[str] = [] + for row in rows: + feature_specifications[row[0]].append(row[1]) + for key, _specifiers in feature_specifications.items(): + if len(_specifiers) == len(specifiers): + matches_list.append(key) + matches = tuple(matches_list) + if len(matches) == 0: + return None + if len(matches) > 1: + text = 'Multiple features match the selected specification' + message = f'{text}: {matches} {specifiers}' + logger.warning(message) + return matches[0] + + @classmethod + def _create_feature_specification(cls, + cursor: PsycopgCursor, + data_analysis_study: str, + specifiers: tuple[str, ...], + derivation_method: str, + ) -> str: + Uploader = ADIFeatureSpecificationUploader + add = Uploader.add_new_feature + feature_specification = add(specifiers, derivation_method, data_analysis_study, cursor) + return feature_specification + class ADIFeatureSpecificationUploader: """Just upload a new feature specification.""" @staticmethod - def add_new_feature(specifiers, derivation_method, measurement_study, cursor: PsycopgCursor): + def add_new_feature(specifiers, derivation_method, data_analysis_study, cursor: PsycopgCursor): FSU = ADIFeatureSpecificationUploader - data_analysis_study = FSU.get_data_analysis_study(measurement_study, cursor) identifier = FSU.insert_specification(derivation_method, data_analysis_study, cursor) FSU.insert_specifiers(identifier, specifiers, cursor) return identifier From 84d61d8d4705e153babb5583d8b00f18d3f59a9f Mon Sep 17 00:00:00 2001 From: James Mathews Date: Mon, 29 Jul 2024 17:21:43 -0400 Subject: [PATCH 2/7] Add autocleanup in case feature seems to never complete. --- .../ondemand/request_scheduling.py | 41 +++++++++++++++---- 1 file changed, 33 insertions(+), 8 deletions(-) diff --git a/spatialprofilingtoolbox/ondemand/request_scheduling.py b/spatialprofilingtoolbox/ondemand/request_scheduling.py index 1cfd9f20..23a725b9 100644 --- a/spatialprofilingtoolbox/ondemand/request_scheduling.py +++ b/spatialprofilingtoolbox/ondemand/request_scheduling.py @@ -7,6 +7,7 @@ from psycopg import Connection as PsycopgConnection from spatialprofilingtoolbox.db.database_connection import DBConnection +from spatialprofilingtoolbox.db.database_connection import DBCursor from spatialprofilingtoolbox.ondemand.providers.counts_provider import CountsProvider from spatialprofilingtoolbox.ondemand.providers.proximity_provider import ProximityProvider from spatialprofilingtoolbox.ondemand.providers.squidpy_provider import SquidpyProvider @@ -41,19 +42,41 @@ def _nonempty(string: str) -> bool: class TimeoutHandler: active: bool wait_for_results_timeout_seconds: int = 300 + feature: str + study: str - def __init__(self): + def __init__(self, feature: str, study: str): self.active = True + self.feature = feature + self.study = study def handle(self, signum, frame) -> None: if self.active: - message = f'Waited {self.wait_for_results_timeout_seconds} seconds for the feature to complete. Aborting.' + timeout = self.wait_for_results_timeout_seconds + message = f'Waited {timeout} seconds for the feature {self.feature} to complete. Aborting.' logger.error(message) + if self._queue_size() == 0: + self._delete_feature() raise RuntimeError(message) def disalarm(self) -> None: self.active = False + def _queue_size(self) -> int: + with DBCursor(study=self.study) as cursor: + query = 'SELECT COUNT(*) FROM quantitative_feature_value_queue WHERE feature=%s ;' + cursor.execute(query, self.feature) + count = tuple(cursor.fetchall())[0][0] + return count + + def _delete_feature(self) -> None: + logger.error('Also deleting the feature, since the queue was empty; we assume the remaining jobs failed.') + with DBCursor(study=self.study) as cursor: + param = (self.feature,) + cursor.execute('DELETE FROM quantitative_feature_value WHERE feature=%s ;', param) + cursor.execute('DELETE FROM feature_specifier WHERE feature_specification=%s ;', param) + cursor.execute('DELETE FROM feature_specification WHERE identifier=%s ;', param) + class OnDemandRequester: """Entry point for requesting computation by the on-demand service.""" @@ -112,9 +135,10 @@ def get_results() -> tuple[Metrics1D, str]: ) return (counts, feature1) - handler = TimeoutHandler() - signal.signal(signal.SIGALRM, handler.handle) - signal.alarm(handler.wait_for_results_timeout_seconds) + _, feature1 = get_results() + handler1 = TimeoutHandler(feature1, study_name) + signal.signal(signal.SIGALRM, handler1.handle) + signal.alarm(handler1.wait_for_results_timeout_seconds) with DBConnection() as connection: connection._set_autocommit(True) @@ -123,7 +147,7 @@ def get_results() -> tuple[Metrics1D, str]: cls._wait_for_wrappedup(connection, get_results) counts, feature1 = get_results() - handler.disalarm() + handler1.disalarm() def get_results2() -> tuple[Metrics1D, str]: counts_all, feature2 = get( @@ -133,7 +157,8 @@ def get_results2() -> tuple[Metrics1D, str]: ) return (counts_all, feature2) - handler2 = TimeoutHandler() + _, feature2 = get_results2() + handler2 = TimeoutHandler(feature2, study_name) signal.signal(signal.SIGALRM, handler2.handle) signal.alarm(handler2.wait_for_results_timeout_seconds) @@ -167,7 +192,7 @@ def _wait_for_wrappedup( return logger.debug(f'Waiting for signal that feature {feature} may be ready.') counts.values = '(truncated)' - logger.debug(f'Because result ready yet: {counts}') + logger.debug(f'Because result not ready yet: {counts}') notifications = connection.notifies() for notification in notifications: if not get_results()[0].is_pending: From 08cc2eeca63282db53374cf7d4b55a7f1610fbe3 Mon Sep 17 00:00:00 2001 From: James Mathews Date: Mon, 29 Jul 2024 17:38:29 -0400 Subject: [PATCH 3/7] Fix indefinite wait, check flag. --- .../ondemand/request_scheduling.py | 46 ++++++++++--------- 1 file changed, 24 insertions(+), 22 deletions(-) diff --git a/spatialprofilingtoolbox/ondemand/request_scheduling.py b/spatialprofilingtoolbox/ondemand/request_scheduling.py index 23a725b9..30509c1a 100644 --- a/spatialprofilingtoolbox/ondemand/request_scheduling.py +++ b/spatialprofilingtoolbox/ondemand/request_scheduling.py @@ -135,19 +135,20 @@ def get_results() -> tuple[Metrics1D, str]: ) return (counts, feature1) - _, feature1 = get_results() - handler1 = TimeoutHandler(feature1, study_name) - signal.signal(signal.SIGALRM, handler1.handle) - signal.alarm(handler1.wait_for_results_timeout_seconds) + counts, feature1 = get_results() + if counts.is_pending: + handler1 = TimeoutHandler(feature1, study_name) + signal.signal(signal.SIGALRM, handler1.handle) + signal.alarm(handler1.wait_for_results_timeout_seconds) - with DBConnection() as connection: - connection._set_autocommit(True) - connection.execute('LISTEN new_items_in_queue ;') - connection.execute('LISTEN one_job_complete ;') - cls._wait_for_wrappedup(connection, get_results) - counts, feature1 = get_results() + with DBConnection() as connection: + connection._set_autocommit(True) + connection.execute('LISTEN new_items_in_queue ;') + connection.execute('LISTEN one_job_complete ;') + cls._wait_for_wrappedup(connection, get_results) + counts, feature1 = get_results() - handler1.disalarm() + handler1.disalarm() def get_results2() -> tuple[Metrics1D, str]: counts_all, feature2 = get( @@ -157,19 +158,20 @@ def get_results2() -> tuple[Metrics1D, str]: ) return (counts_all, feature2) - _, feature2 = get_results2() - handler2 = TimeoutHandler(feature2, study_name) - signal.signal(signal.SIGALRM, handler2.handle) - signal.alarm(handler2.wait_for_results_timeout_seconds) + counts_all, feature2 = get_results2() + if counts_all.is_pending: + handler2 = TimeoutHandler(feature2, study_name) + signal.signal(signal.SIGALRM, handler2.handle) + signal.alarm(handler2.wait_for_results_timeout_seconds) - with DBConnection() as connection: - connection._set_autocommit(True) - connection.execute('LISTEN new_items_in_queue ;') - connection.execute('LISTEN one_job_complete ;') - cls._wait_for_wrappedup(connection, get_results2) - counts_all, _ = get_results2() + with DBConnection() as connection: + connection._set_autocommit(True) + connection.execute('LISTEN new_items_in_queue ;') + connection.execute('LISTEN one_job_complete ;') + cls._wait_for_wrappedup(connection, get_results2) + counts_all, _ = get_results2() - handler2.disalarm() + handler2.disalarm() cls._request_check_for_failed_jobs() return (feature1, counts, counts_all) From 07222b2049cc7dbcf879e3c4c7e482e31b3d7d21 Mon Sep 17 00:00:00 2001 From: James Mathews Date: Mon, 29 Jul 2024 17:54:01 -0400 Subject: [PATCH 4/7] Fix tuple typo. --- spatialprofilingtoolbox/ondemand/request_scheduling.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spatialprofilingtoolbox/ondemand/request_scheduling.py b/spatialprofilingtoolbox/ondemand/request_scheduling.py index 30509c1a..3ad5c4ab 100644 --- a/spatialprofilingtoolbox/ondemand/request_scheduling.py +++ b/spatialprofilingtoolbox/ondemand/request_scheduling.py @@ -65,7 +65,7 @@ def disalarm(self) -> None: def _queue_size(self) -> int: with DBCursor(study=self.study) as cursor: query = 'SELECT COUNT(*) FROM quantitative_feature_value_queue WHERE feature=%s ;' - cursor.execute(query, self.feature) + cursor.execute(query, (self.feature,)) count = tuple(cursor.fetchall())[0][0] return count From 25934690955c18eb3f5c8a1b8441ce0b3cb7fe9d Mon Sep 17 00:00:00 2001 From: James Mathews Date: Tue, 30 Jul 2024 13:15:41 -0400 Subject: [PATCH 5/7] Fix possible race condition in notification processing --- .../ondemand/request_scheduling.py | 79 +++++++------------ 1 file changed, 27 insertions(+), 52 deletions(-) diff --git a/spatialprofilingtoolbox/ondemand/request_scheduling.py b/spatialprofilingtoolbox/ondemand/request_scheduling.py index 3ad5c4ab..58baf2c2 100644 --- a/spatialprofilingtoolbox/ondemand/request_scheduling.py +++ b/spatialprofilingtoolbox/ondemand/request_scheduling.py @@ -127,7 +127,7 @@ def _counts( ) -> tuple[str, Metrics1D, Metrics1D]: get = CountsProvider.get_metrics_or_schedule - def get_results() -> tuple[Metrics1D, str]: + def get_results1() -> tuple[Metrics1D, str]: counts, feature1 = get( study_name, phenotype=phenotype, @@ -135,21 +135,6 @@ def get_results() -> tuple[Metrics1D, str]: ) return (counts, feature1) - counts, feature1 = get_results() - if counts.is_pending: - handler1 = TimeoutHandler(feature1, study_name) - signal.signal(signal.SIGALRM, handler1.handle) - signal.alarm(handler1.wait_for_results_timeout_seconds) - - with DBConnection() as connection: - connection._set_autocommit(True) - connection.execute('LISTEN new_items_in_queue ;') - connection.execute('LISTEN one_job_complete ;') - cls._wait_for_wrappedup(connection, get_results) - counts, feature1 = get_results() - - handler1.disalarm() - def get_results2() -> tuple[Metrics1D, str]: counts_all, feature2 = get( study_name, @@ -158,60 +143,50 @@ def get_results2() -> tuple[Metrics1D, str]: ) return (counts_all, feature2) - counts_all, feature2 = get_results2() - if counts_all.is_pending: - handler2 = TimeoutHandler(feature2, study_name) - signal.signal(signal.SIGALRM, handler2.handle) - signal.alarm(handler2.wait_for_results_timeout_seconds) - - with DBConnection() as connection: - connection._set_autocommit(True) - connection.execute('LISTEN new_items_in_queue ;') - connection.execute('LISTEN one_job_complete ;') - cls._wait_for_wrappedup(connection, get_results2) - counts_all, _ = get_results2() - - handler2.disalarm() - - cls._request_check_for_failed_jobs() - return (feature1, counts, counts_all) + with DBConnection() as connection: + connection._set_autocommit(True) + cls._wait_for_wrappedup(connection, get_results1, study_name) + counts, feature1 = get_results1() - @classmethod - def _request_check_for_failed_jobs(cls) -> None: with DBConnection() as connection: connection._set_autocommit(True) - connection.execute('NOTIFY check_for_failed_jobs ;') + cls._wait_for_wrappedup(connection, get_results2, study_name) + counts_all, _ = get_results2() + + return (feature1, counts, counts_all) @classmethod def _wait_for_wrappedup( cls, connection: PsycopgConnection, get_results: Callable[[], tuple[Metrics1D, str]], + study_name: str, ) -> None: + connection.execute('LISTEN new_items_in_queue ;') + connection.execute('LISTEN one_job_complete ;') + notifications = connection.notifies() + counts, feature = get_results() + handler = TimeoutHandler(feature, study_name) + signal.signal(signal.SIGALRM, handler.handle) + signal.alarm(handler.wait_for_results_timeout_seconds) + if not counts.is_pending: logger.debug(f'Feature {feature} already complete.') + handler.disalarm() return - logger.debug(f'Waiting for signal that feature {feature} may be ready.') - counts.values = '(truncated)' - logger.debug(f'Because result not ready yet: {counts}') - notifications = connection.notifies() + logger.debug(f'Waiting for signal that feature {feature} may be ready, because the result is not ready yet.') + for notification in notifications: - if not get_results()[0].is_pending: - logger.debug(f'Closing notification processing, {feature} ready.') - notifications.close() - break channel = notification.channel if channel == 'one_job_complete': logger.debug(f'A job is complete, so {feature} may be ready. (PID: {notification.pid})') - _result = get_results() - if not _result[0].is_pending: - logger.debug(f'And {feature} was ready. Closing.') - notifications.close() - break - _result[0].values = '(truncated)' - logger.debug(f'Not ready yet: {_result}') - logger.debug(f'Notification: {notification}') + _result = get_results() + if not _result[0].is_pending: + logger.debug(f'Closing notification processing, {feature} ready.') + notifications.close() + break + handler.disalarm() @classmethod def get_proximity_metrics( From 5b895c6bda6ee0f618f389be3bc3b5999fc40c82 Mon Sep 17 00:00:00 2001 From: James Mathews Date: Tue, 30 Jul 2024 13:50:46 -0400 Subject: [PATCH 6/7] Fix timing of notifications block in workers --- .../ondemand/request_scheduling.py | 57 +++++++++++++------ spatialprofilingtoolbox/ondemand/worker.py | 9 ++- 2 files changed, 44 insertions(+), 22 deletions(-) diff --git a/spatialprofilingtoolbox/ondemand/request_scheduling.py b/spatialprofilingtoolbox/ondemand/request_scheduling.py index 58baf2c2..970c8182 100644 --- a/spatialprofilingtoolbox/ondemand/request_scheduling.py +++ b/spatialprofilingtoolbox/ondemand/request_scheduling.py @@ -39,6 +39,12 @@ def _nonempty(string: str) -> bool: return string != '' +class FeatureComputationTimeoutError(RuntimeError): + def __init__(self, message: str): + super().__init__(message) + self.message = message + + class TimeoutHandler: active: bool wait_for_results_timeout_seconds: int = 300 @@ -55,9 +61,9 @@ def handle(self, signum, frame) -> None: timeout = self.wait_for_results_timeout_seconds message = f'Waited {timeout} seconds for the feature {self.feature} to complete. Aborting.' logger.error(message) - if self._queue_size() == 0: + if self._queue_size() == 0 and self._completed_size() < self._expected_size(): self._delete_feature() - raise RuntimeError(message) + raise FeatureComputationTimeoutError(message) def disalarm(self) -> None: self.active = False @@ -69,6 +75,20 @@ def _queue_size(self) -> int: count = tuple(cursor.fetchall())[0][0] return count + def _completed_size(self) -> int: + with DBCursor(study=self.study) as cursor: + query = 'SELECT COUNT(*) FROM quantitative_feature_value WHERE feature=%s ;' + cursor.execute(query, (self.feature,)) + count = tuple(cursor.fetchall())[0][0] + return count + + def _expected_size(self) -> int: + with DBCursor(study=self.study) as cursor: + query = 'SELECT COUNT(*) FROM specimen_data_measurement_process ;' + cursor.execute(query) + count = tuple(cursor.fetchall())[0][0] + return count + def _delete_feature(self) -> None: logger.error('Also deleting the feature, since the queue was empty; we assume the remaining jobs failed.') with DBCursor(study=self.study) as cursor: @@ -171,22 +191,25 @@ def _wait_for_wrappedup( signal.signal(signal.SIGALRM, handler.handle) signal.alarm(handler.wait_for_results_timeout_seconds) - if not counts.is_pending: - logger.debug(f'Feature {feature} already complete.') + try: + if not counts.is_pending: + logger.debug(f'Feature {feature} already complete.') + return + logger.debug(f'Waiting for signal that feature {feature} may be ready, because the result is not ready yet.') + + for notification in notifications: + channel = notification.channel + if channel == 'one_job_complete': + logger.debug(f'A job is complete, so {feature} may be ready. (PID: {notification.pid})') + _result = get_results() + if not _result[0].is_pending: + logger.debug(f'Closing notification processing, {feature} ready.') + notifications.close() + break + except FeatureComputationTimeoutError: + pass + finally: handler.disalarm() - return - logger.debug(f'Waiting for signal that feature {feature} may be ready, because the result is not ready yet.') - - for notification in notifications: - channel = notification.channel - if channel == 'one_job_complete': - logger.debug(f'A job is complete, so {feature} may be ready. (PID: {notification.pid})') - _result = get_results() - if not _result[0].is_pending: - logger.debug(f'Closing notification processing, {feature} ready.') - notifications.close() - break - handler.disalarm() @classmethod def get_proximity_metrics( diff --git a/spatialprofilingtoolbox/ondemand/worker.py b/spatialprofilingtoolbox/ondemand/worker.py index 5418bf97..398468ff 100644 --- a/spatialprofilingtoolbox/ondemand/worker.py +++ b/spatialprofilingtoolbox/ondemand/worker.py @@ -35,16 +35,15 @@ def _listen_for_queue_activity(self) -> None: with DBConnection() as connection: self.connection = connection self.connection._set_autocommit(True) + self.connection.execute('LISTEN new_items_in_queue ;') + logger.info('Listening on new_items_in_queue channel.') + self.notifications = self.connection.notifies() while True: self._wait_for_queue_activity_on_connection() self._work_until_complete() def _wait_for_queue_activity_on_connection(self) -> None: - self.connection.execute('LISTEN new_items_in_queue ;') - logger.info('Listening on new_items_in_queue channel.') - notifications = self.connection.notifies() - for notification in notifications: - notifications.close() + for _ in self.notifications: logger.info('Received notice of new items in the job queue.') break From 0082d495e240a219512386a36460ea7c02047c61 Mon Sep 17 00:00:00 2001 From: James Mathews Date: Tue, 30 Jul 2024 14:21:47 -0400 Subject: [PATCH 7/7] Generalizes timeout, applies it also to single sample computation. --- .../ondemand/request_scheduling.py | 40 ++++++----------- spatialprofilingtoolbox/ondemand/timeout.py | 45 +++++++++++++++++++ spatialprofilingtoolbox/ondemand/worker.py | 18 ++++++-- 3 files changed, 73 insertions(+), 30 deletions(-) create mode 100644 spatialprofilingtoolbox/ondemand/timeout.py diff --git a/spatialprofilingtoolbox/ondemand/request_scheduling.py b/spatialprofilingtoolbox/ondemand/request_scheduling.py index 970c8182..67adc022 100644 --- a/spatialprofilingtoolbox/ondemand/request_scheduling.py +++ b/spatialprofilingtoolbox/ondemand/request_scheduling.py @@ -18,6 +18,8 @@ CompositePhenotype, UnivariateMetricsComputationResult, ) +from spatialprofilingtoolbox.ondemand.timeout import create_timeout_handler +from spatialprofilingtoolbox.ondemand.timeout import SPTTimeoutError from spatialprofilingtoolbox.standalone_utilities.log_formats import colorized_logger Metrics1D = UnivariateMetricsComputationResult @@ -39,34 +41,19 @@ def _nonempty(string: str) -> bool: return string != '' -class FeatureComputationTimeoutError(RuntimeError): - def __init__(self, message: str): - super().__init__(message) - self.message = message - - -class TimeoutHandler: - active: bool - wait_for_results_timeout_seconds: int = 300 +class FeatureComputationTimeoutHandler: feature: str study: str def __init__(self, feature: str, study: str): - self.active = True self.feature = feature self.study = study - def handle(self, signum, frame) -> None: - if self.active: - timeout = self.wait_for_results_timeout_seconds - message = f'Waited {timeout} seconds for the feature {self.feature} to complete. Aborting.' - logger.error(message) - if self._queue_size() == 0 and self._completed_size() < self._expected_size(): - self._delete_feature() - raise FeatureComputationTimeoutError(message) - - def disalarm(self) -> None: - self.active = False + def handle(self) -> None: + message = f'Timed out waiting for the feature {self.feature} to complete. Aborting.' + logger.error(message) + if self._queue_size() == 0 and self._completed_size() < self._expected_size(): + self._delete_feature() def _queue_size(self) -> int: with DBCursor(study=self.study) as cursor: @@ -98,6 +85,7 @@ def _delete_feature(self) -> None: cursor.execute('DELETE FROM feature_specification WHERE identifier=%s ;', param) + class OnDemandRequester: """Entry point for requesting computation by the on-demand service.""" @@ -187,10 +175,8 @@ def _wait_for_wrappedup( notifications = connection.notifies() counts, feature = get_results() - handler = TimeoutHandler(feature, study_name) - signal.signal(signal.SIGALRM, handler.handle) - signal.alarm(handler.wait_for_results_timeout_seconds) - + handler = FeatureComputationTimeoutHandler(feature, study_name) + generic_handler = create_timeout_handler(handler.handle) try: if not counts.is_pending: logger.debug(f'Feature {feature} already complete.') @@ -206,10 +192,10 @@ def _wait_for_wrappedup( logger.debug(f'Closing notification processing, {feature} ready.') notifications.close() break - except FeatureComputationTimeoutError: + except SPTTimeoutError: pass finally: - handler.disalarm() + generic_handler.disalarm() @classmethod def get_proximity_metrics( diff --git a/spatialprofilingtoolbox/ondemand/timeout.py b/spatialprofilingtoolbox/ondemand/timeout.py new file mode 100644 index 00000000..8ee8b2ed --- /dev/null +++ b/spatialprofilingtoolbox/ondemand/timeout.py @@ -0,0 +1,45 @@ +"""General-purpose one-time timeout functionality based on Unix signal alarm.""" +from typing import Callable +import signal + +from spatialprofilingtoolbox.db.database_connection import DBCursor +from spatialprofilingtoolbox.standalone_utilities.log_formats import colorized_logger + +logger = colorized_logger(__name__) + + +TIMEOUT_SECONDS_DEFAULT = 300 + + +class SPTTimeoutError(RuntimeError): + def __init__(self, message: str): + super().__init__(message) + self.message = message + + +class TimeoutHandler: + active: bool + callback: Callable + timeout: int + + def __init__(self, callback: Callable, timeout: int): + self.active = True + self.callback = callback + self.timeout = timeout + + def handle(self, signum, frame) -> None: + if self.active: + message = f'Waited {self.timeout} seconds, timed out.' + logger.error(message) + self.callback() + raise TimeoutError(message) + + def disalarm(self) -> None: + self.active = False + + +def create_timeout_handler(callback: Callable, timeout_seconds: int = TIMEOUT_SECONDS_DEFAULT) -> TimeoutHandler: + handler = TimeoutHandler(callback, timeout_seconds) + signal.signal(signal.SIGALRM, handler.handle) + signal.alarm(timeout_seconds) + return handler diff --git a/spatialprofilingtoolbox/ondemand/worker.py b/spatialprofilingtoolbox/ondemand/worker.py index 398468ff..a34833ea 100644 --- a/spatialprofilingtoolbox/ondemand/worker.py +++ b/spatialprofilingtoolbox/ondemand/worker.py @@ -14,6 +14,8 @@ from spatialprofilingtoolbox.db.describe_features import get_handle from spatialprofilingtoolbox.ondemand.job_reference import ComputationJobReference from spatialprofilingtoolbox.ondemand.scheduler import MetricComputationScheduler +from spatialprofilingtoolbox.ondemand.timeout import create_timeout_handler +from spatialprofilingtoolbox.ondemand.timeout import SPTTimeoutError from spatialprofilingtoolbox.standalone_utilities.log_formats import colorized_logger Job = ComputationJobReference @@ -66,16 +68,26 @@ def _one_job(self) -> tuple[bool, int]: self._notify_complete(job) return (True, pid) + def _no_value_wrapup(self, job) -> None: + provider = self._get_provider(job) + provider._warn_no_value() + provider._insert_null() + def _compute(self, job: Job) -> None: provider = self._get_provider(job) + generic_handler = create_timeout_handler( + lambda *arg: self._no_value_wrapup(job), + timeout_seconds=150, + ) try: provider.compute() + except SPTTimeoutError: + pass except Exception as error: logger.error(error) print_exception(type(error), error, error.__traceback__) - provider = self._get_provider(job) - provider._warn_no_value() - provider._insert_null() + finally: + generic_handler.disalarm() def _notify_complete(self, job: Job) -> None: self.connection.execute('NOTIFY one_job_complete ;')