From 215c8c7e90e43bd9cf8c6de6b127931fba416b66 Mon Sep 17 00:00:00 2001 From: Iskander Gaba Date: Mon, 7 Oct 2024 19:18:39 +0200 Subject: [PATCH] Improve Autoperiod performance even further! --- pyriodicity/detectors/autoperiod.py | 109 +++++++++++++++++++++------- 1 file changed, 81 insertions(+), 28 deletions(-) diff --git a/pyriodicity/detectors/autoperiod.py b/pyriodicity/detectors/autoperiod.py index 56c9db8..58ffebf 100644 --- a/pyriodicity/detectors/autoperiod.py +++ b/pyriodicity/detectors/autoperiod.py @@ -21,9 +21,9 @@ class Autoperiod: References ---------- .. [1] Vlachos, M., Yu, P., & Castelli, V. (2005). - On periodicity detection and Structural Periodic similarity. - Proceedings of the 2005 SIAM International Conference on Data Mining. - https://doi.org/10.1137/1.9781611972757.40 + On periodicity detection and Structural Periodic similarity. + Proceedings of the 2005 SIAM International Conference on Data Mining. + https://doi.org/10.1137/1.9781611972757.40 Examples -------- @@ -120,35 +120,82 @@ def fit( ] ) - # Compute the ACF + # Validate period hints + valid_hints = [ + h for h in hints if self._is_hint_valid(self.y, h, correlation_func) + ] + + # Return the closest ACF peak to each valid period hint length = len(self.y) - acf_arr = acf( - self.y, lag_start=0, lag_stop=length, correlation_func=correlation_func + hint_ranges = [ + np.arange( + np.floor((h + length / (length / h + 1)) / 2 - 1) - 1, + np.ceil((h + length / (length / h - 1)) / 2 + 1) + 1, + dtype=int, + ) + for h in valid_hints + ] + acf_arrays = [ + acf( + self.y, + lag_start=r[0], + lag_stop=r[-1], + correlation_func=correlation_func, + ) + for r in hint_ranges + ] + return np.array( + list( + { + r[0] + min(argrelmax(arr)[0], key=lambda x: abs(x - h)) + for h, r, arr in zip(valid_hints, hint_ranges, acf_arrays) + } + ) ) - # Validate period hints - valid_hints = [] - for p in hints: - q = length / p - start = np.floor((p + length / (q + 1)) / 2 - 1).astype(int) - end = np.ceil((p + length / (q - 1)) / 2 + 1).astype(int) - - splits = [ - self._split(np.arange(len(acf_arr)), acf_arr, start, end, i) - for i in range(start + 2, end) - ] - line1, line2, _ = splits[ - np.array([error for _, _, error in splits]).argmin() - ] + @staticmethod + def _is_hint_valid( + y: ArrayLike, + hint: float, + correlation_func: str, + ) -> bool: + """ + Validate the period hint. - if line1.coef[-1] > 0 > line2.coef[-1]: - valid_hints.append(p) + Parameters + ---------- + y : array_like + Data to be investigated. Must be squeezable to 1-d. + hint : float + The period hint to be validated. + correlation_func : str, default = 'pearson' + The correlation function to be used to calculate the ACF of the series + or the signal. Possible values are ['pearson', 'spearman', 'kendall']. - # Return the closest ACF peak to each valid period hint - local_argmax = argrelmax(acf_arr)[0] - return np.array( - list({min(local_argmax, key=lambda x: abs(x - p)) for p in valid_hints}) + Returns + ------- + bool + Whether the period hint is valid. + """ + length = len(y) + hint_range = np.arange( + np.floor((hint + length / (length / hint + 1)) / 2 - 1) - 1, + np.ceil((hint + length / (length / hint - 1)) / 2 + 1) + 1, + dtype=int, ) + acf_arr = acf( + y, + lag_start=hint_range[0], + lag_stop=hint_range[-1], + correlation_func=correlation_func, + ) + splits = [ + Autoperiod._split(hint_range, acf_arr, 0, len(hint_range), i) + for i in range(2, hint_range[-1] - hint_range[0]) + ] + + line1, line2, _ = splits[np.array([error for _, _, error in splits]).argmin()] + return line1.coef[-1] > 0 > line2.coef[-1] @staticmethod def _split(x: ArrayLike, y: ArrayLike, start: int, end: int, split: int) -> tuple: @@ -183,11 +230,17 @@ def _split(x: ArrayLike, y: ArrayLike, start: int, end: int, split: int) -> tupl float The approximation error. """ + if split - start < 2 or end - split < 2: + raise ValueError( + "Invalid start, split, and end values ({}, {}, {})".format( + start, split, end + ) + ) x1, y1, x2, y2 = ( x[start:split], y[start:split], - x[split : end + 1], - y[split : end + 1], + x[split:end], + y[split:end], ) line1, stats1 = np.polynomial.Polynomial.fit(x1, y1, deg=1, full=True) line2, stats2 = np.polynomial.Polynomial.fit(x2, y2, deg=1, full=True)