Skip to content

Commit

Permalink
Generalizes timeout, applies it also to single sample computation.
Browse files Browse the repository at this point in the history
  • Loading branch information
jimmymathews committed Jul 30, 2024
1 parent 5b895c6 commit 0082d49
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 30 deletions.
40 changes: 13 additions & 27 deletions spatialprofilingtoolbox/ondemand/request_scheduling.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
CompositePhenotype,
UnivariateMetricsComputationResult,
)
from spatialprofilingtoolbox.ondemand.timeout import create_timeout_handler
from spatialprofilingtoolbox.ondemand.timeout import SPTTimeoutError
from spatialprofilingtoolbox.standalone_utilities.log_formats import colorized_logger
Metrics1D = UnivariateMetricsComputationResult

Expand All @@ -39,34 +41,19 @@ def _nonempty(string: str) -> bool:
return string != ''


class FeatureComputationTimeoutError(RuntimeError):
def __init__(self, message: str):
super().__init__(message)
self.message = message


class TimeoutHandler:
active: bool
wait_for_results_timeout_seconds: int = 300
class FeatureComputationTimeoutHandler:
feature: str
study: str

def __init__(self, feature: str, study: str):
self.active = True
self.feature = feature
self.study = study

def handle(self, signum, frame) -> None:
if self.active:
timeout = self.wait_for_results_timeout_seconds
message = f'Waited {timeout} seconds for the feature {self.feature} to complete. Aborting.'
logger.error(message)
if self._queue_size() == 0 and self._completed_size() < self._expected_size():
self._delete_feature()
raise FeatureComputationTimeoutError(message)

def disalarm(self) -> None:
self.active = False
def handle(self) -> None:
message = f'Timed out waiting for the feature {self.feature} to complete. Aborting.'
logger.error(message)
if self._queue_size() == 0 and self._completed_size() < self._expected_size():
self._delete_feature()

def _queue_size(self) -> int:
with DBCursor(study=self.study) as cursor:
Expand Down Expand Up @@ -98,6 +85,7 @@ def _delete_feature(self) -> None:
cursor.execute('DELETE FROM feature_specification WHERE identifier=%s ;', param)



class OnDemandRequester:
"""Entry point for requesting computation by the on-demand service."""

Expand Down Expand Up @@ -187,10 +175,8 @@ def _wait_for_wrappedup(
notifications = connection.notifies()

counts, feature = get_results()
handler = TimeoutHandler(feature, study_name)
signal.signal(signal.SIGALRM, handler.handle)
signal.alarm(handler.wait_for_results_timeout_seconds)

handler = FeatureComputationTimeoutHandler(feature, study_name)
generic_handler = create_timeout_handler(handler.handle)
try:
if not counts.is_pending:
logger.debug(f'Feature {feature} already complete.')
Expand All @@ -206,10 +192,10 @@ def _wait_for_wrappedup(
logger.debug(f'Closing notification processing, {feature} ready.')
notifications.close()
break
except FeatureComputationTimeoutError:
except SPTTimeoutError:
pass
finally:
handler.disalarm()
generic_handler.disalarm()

@classmethod
def get_proximity_metrics(
Expand Down
45 changes: 45 additions & 0 deletions spatialprofilingtoolbox/ondemand/timeout.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
"""General-purpose one-time timeout functionality based on Unix signal alarm."""
from typing import Callable
import signal

from spatialprofilingtoolbox.db.database_connection import DBCursor
from spatialprofilingtoolbox.standalone_utilities.log_formats import colorized_logger

logger = colorized_logger(__name__)


TIMEOUT_SECONDS_DEFAULT = 300


class SPTTimeoutError(RuntimeError):
def __init__(self, message: str):
super().__init__(message)
self.message = message


class TimeoutHandler:
active: bool
callback: Callable
timeout: int

def __init__(self, callback: Callable, timeout: int):
self.active = True
self.callback = callback
self.timeout = timeout

def handle(self, signum, frame) -> None:
if self.active:
message = f'Waited {self.timeout} seconds, timed out.'
logger.error(message)
self.callback()
raise TimeoutError(message)

def disalarm(self) -> None:
self.active = False


def create_timeout_handler(callback: Callable, timeout_seconds: int = TIMEOUT_SECONDS_DEFAULT) -> TimeoutHandler:
handler = TimeoutHandler(callback, timeout_seconds)
signal.signal(signal.SIGALRM, handler.handle)
signal.alarm(timeout_seconds)
return handler
18 changes: 15 additions & 3 deletions spatialprofilingtoolbox/ondemand/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
from spatialprofilingtoolbox.db.describe_features import get_handle
from spatialprofilingtoolbox.ondemand.job_reference import ComputationJobReference
from spatialprofilingtoolbox.ondemand.scheduler import MetricComputationScheduler
from spatialprofilingtoolbox.ondemand.timeout import create_timeout_handler
from spatialprofilingtoolbox.ondemand.timeout import SPTTimeoutError
from spatialprofilingtoolbox.standalone_utilities.log_formats import colorized_logger
Job = ComputationJobReference

Expand Down Expand Up @@ -66,16 +68,26 @@ def _one_job(self) -> tuple[bool, int]:
self._notify_complete(job)
return (True, pid)

def _no_value_wrapup(self, job) -> None:
provider = self._get_provider(job)
provider._warn_no_value()
provider._insert_null()

def _compute(self, job: Job) -> None:
provider = self._get_provider(job)
generic_handler = create_timeout_handler(
lambda *arg: self._no_value_wrapup(job),
timeout_seconds=150,
)
try:
provider.compute()
except SPTTimeoutError:
pass
except Exception as error:
logger.error(error)
print_exception(type(error), error, error.__traceback__)
provider = self._get_provider(job)
provider._warn_no_value()
provider._insert_null()
finally:
generic_handler.disalarm()

def _notify_complete(self, job: Job) -> None:
self.connection.execute('NOTIFY one_job_complete ;')
Expand Down

0 comments on commit 0082d49

Please sign in to comment.