diff --git a/README.md b/README.md index 52c3917..a1cea38 100644 --- a/README.md +++ b/README.md @@ -12,7 +12,8 @@ ## About Pyriodicity Pyriodicity provides intuitive and easy-to-use Python implementation for periodicity (seasonality) detection in univariate time series. Pyriodicity supports the following detection methods: - [Autocorrelation Function (ACF)](https://otexts.com/fpp3/acf.html) -- [Autoperiod]( https://doi.org/10.1137/1.9781611972757.40) +- [Autoperiod](https://doi.org/10.1137/1.9781611972757.40) +- [CFD-Autoperiod](https://doi.org/10.1007/978-3-030-39098-3_4) - [Fast Fourier Transform (FFT)](https://otexts.com/fpp3/useful-predictors.html#fourier-series) ## Installation @@ -23,35 +24,28 @@ pip install pyriodicity ``` ## Example -Start by loading a the `co2` timeseries emissions sample data from [`statsmodels`](https://www.statsmodels.org) +Start by loading a the `co2` emissions sample time series data from [`statsmodels`](https://www.statsmodels.org) ```python -from statsmodels.datasets import co2 -data = co2.load().data +>>> from statsmodels.datasets import co2 +>>> data = co2.load().data ``` You can then resample the data to whatever frequency you want. In this example, we downsample the data to a monthly frequency ```python -data = data.resample("ME").mean().ffill() +>>> data = data.resample("ME").mean().ffill() ``` Use `Autoperiod` to find the list of periods based in this data (if any). ```python -from pyriodicity import Autoperiod -autoperiod = Autoperiod(data) -periods = autoperiod.fit() +>>> from pyriodicity import Autoperiod +>>> autoperiod = Autoperiod(data) +>>> autoperiod.fit() +array([12]) ``` -There are multiple parameters you can play with should you wish to. For example, you can specify a lower percentile value for a more lenient detection -```python -autoperiod.fit(percentile=90) -``` - -Or increase the number of random data permutations for a better power threshold estimation -```python -autoperiod.fit(k=300) -``` +The detected periodicity length is 12 which suggests a strong yearly seasonality given that the data has a monthly frequency. -Alternatively, you can use other periodicity detection methods such as `ACFPeriodicityDetector` and `FFTPeriodicityDetector` and compare results and performances. +You can also use `CFDAutoperiod` variant of `Autoperiod` or any other supported periodicity detection method such as `ACFPeriodicityDetector` and `FFTPeriodicityDetector` and compare results and performances. ## Development Environment Setup This project is built and published using [Poetry](https://python-poetry.org). To setup a development environment for this project you can follow these steps: @@ -85,3 +79,4 @@ poetry export --with test --output requirements-dev.txt ## References - [1] Hyndman, R.J., & Athanasopoulos, G. (2021) Forecasting: principles and practice, 3rd edition, OTexts: Melbourne, Australia. [OTexts.com/fpp3](https://otexts.com/fpp3). Accessed on 09-15-2024. - [2] Vlachos, M., Yu, P., & Castelli, V. (2005). On periodicity detection and Structural Periodic similarity. Proceedings of the 2005 SIAM International Conference on Data Mining. [doi.org/10.1137/1.9781611972757.40](https://doi.org/10.1137/1.9781611972757.40). +- [3] Puech, T., Boussard, M., D'Amato, A., & Millerand, G. (2020). A fully automated periodicity detection in time series. In Advanced Analytics and Learning on Temporal Data: 4th ECML PKDD Workshop, AALTD 2019, Würzburg, Germany, September 20, 2019, Revised Selected Papers 4 (pp. 43-54). Springer International Publishing. [doi.org/10.1007/978-3-030-39098-3_4](https://doi.org/10.1007/978-3-030-39098-3_4). diff --git a/pyriodicity/__init__.py b/pyriodicity/__init__.py index 018d74a..b74a6ac 100644 --- a/pyriodicity/__init__.py +++ b/pyriodicity/__init__.py @@ -1,7 +1,13 @@ from .detectors import ( ACFPeriodicityDetector, Autoperiod, + CFDAutoperiod, FFTPeriodicityDetector, ) -__all__ = ["ACFPeriodicityDetector", "Autoperiod", "FFTPeriodicityDetector"] +__all__ = [ + "ACFPeriodicityDetector", + "Autoperiod", + "CFDAutoperiod", + "FFTPeriodicityDetector", +] diff --git a/pyriodicity/detectors/__init__.py b/pyriodicity/detectors/__init__.py index 1449dbc..2e2b52d 100644 --- a/pyriodicity/detectors/__init__.py +++ b/pyriodicity/detectors/__init__.py @@ -1,9 +1,11 @@ from .acf import ACFPeriodicityDetector from .autoperiod import Autoperiod +from .cfd_autoperiod import CFDAutoperiod from .fft import FFTPeriodicityDetector __all__ = [ "ACFPeriodicityDetector", "Autoperiod", + "CFDAutoperiod", "FFTPeriodicityDetector", ] diff --git a/pyriodicity/detectors/acf.py b/pyriodicity/detectors/acf.py index 7254443..585a1b8 100644 --- a/pyriodicity/detectors/acf.py +++ b/pyriodicity/detectors/acf.py @@ -1,9 +1,9 @@ -from typing import Callable, Optional, Union +from typing import Optional, Union from numpy.typing import ArrayLike, NDArray -from scipy.signal import argrelmax +from scipy.signal import argrelmax, detrend -from pyriodicity.tools import acf, apply_window, detrend, to_1d_array +from pyriodicity.tools import acf, apply_window, to_1d_array class ACFPeriodicityDetector: @@ -55,7 +55,7 @@ def __init__(self, endog: ArrayLike): def fit( self, max_period_count: Optional[int] = None, - detrend_func: Optional[Union[str, Callable[[ArrayLike], NDArray]]] = "linear", + detrend_func: Optional[str] = "linear", window_func: Optional[Union[str, float, tuple]] = None, correlation_func: Optional[str] = "pearson", ) -> NDArray: @@ -66,10 +66,9 @@ def fit( ---------- max_period_count : int, optional, default = None Maximum number of periods to look for. - detrend_func : str, callable, default = None - The kind of detrending to be applied on the series. It can either be - 'linear' or 'constant' if it the parameter is of 'str' type, or a - custom function that returns a detrended series. + detrend_func : str, default = 'linear' + The kind of detrending to be applied on the signal. It can either be + 'linear' or 'constant'. window_func : float, str, tuple optional, default = None Window function to be applied to the time series. Check 'window' parameter documentation for scipy.signal.get_window @@ -99,13 +98,18 @@ def fit( List of detected periods. """ # Detrend data - self.y = self.y if detrend_func is None else detrend(self.y, detrend_func) + self.y = self.y if detrend_func is None else detrend(self.y, type=detrend_func) # Apply window on data self.y = self.y if window_func is None else apply_window(self.y, window_func) # Compute the ACF - acf_arr = acf(self.y, len(self.y) // 2, correlation_func) + acf_arr = acf( + self.y, + lag_start=0, + lag_stop=len(self.y) // 2, + correlation_func=correlation_func, + ) # Find the local argmax of the first half of the ACF array local_argmax = argrelmax(acf_arr)[0] diff --git a/pyriodicity/detectors/autoperiod.py b/pyriodicity/detectors/autoperiod.py index e39f3e1..3396915 100644 --- a/pyriodicity/detectors/autoperiod.py +++ b/pyriodicity/detectors/autoperiod.py @@ -1,11 +1,10 @@ -from typing import Callable, Optional, Union +from typing import Optional, Union import numpy as np from numpy.typing import ArrayLike, NDArray -from scipy.signal import argrelmax, periodogram -from scipy.stats import linregress +from scipy.signal import argrelmax, detrend, periodogram -from pyriodicity.tools import acf, apply_window, detrend, to_1d_array +from pyriodicity.tools import acf, apply_window, power_threshold, to_1d_array class Autoperiod: @@ -22,9 +21,9 @@ class Autoperiod: References ---------- .. [1] Vlachos, M., Yu, P., & Castelli, V. (2005). - On periodicity detection and Structural Periodic similarity. - Proceedings of the 2005 SIAM International Conference on Data Mining. - https://doi.org/10.1137/1.9781611972757.40 + On periodicity detection and Structural Periodic similarity. + Proceedings of the 2005 SIAM International Conference on Data Mining. + https://doi.org/10.1137/1.9781611972757.40 Examples -------- @@ -58,7 +57,7 @@ def fit( self, k: int = 100, percentile: int = 95, - detrend_func: Optional[Union[str, Callable[[ArrayLike], NDArray]]] = "linear", + detrend_func: Optional[str] = "linear", window_func: Optional[Union[str, float, tuple]] = None, correlation_func: Optional[str] = "pearson", ) -> NDArray: @@ -73,10 +72,9 @@ def fit( percentile : int, optional, default = 95 Percentage for the percentile parameter used in computing the power threshold. Value must be between 0 and 100 inclusive. - detrend_func : str, callable, default = None - The kind of detrending to be applied on the series. It can either be - 'linear' or 'constant' if it the parameter is of 'str' type, or a - custom function that returns a detrended series. + detrend_func : str, default = 'linear' + The kind of detrending to be applied on the signal. It can either be + 'linear' or 'constant'. window_func : float, str, tuple optional, default = None Window function to be applied to the time series. Check 'window' parameter documentation for scipy.signal.get_window @@ -105,16 +103,16 @@ def fit( List of detected periods. """ # Detrend data - self.y = self.y if detrend_func is None else detrend(self.y, detrend_func) + self.y = self.y if detrend_func is None else detrend(self.y, type=detrend_func) # Apply window on data self.y = self.y if window_func is None else apply_window(self.y, window_func) # Compute the power threshold - p_threshold = self._power_threshold(self.y, k, percentile) + p_threshold = power_threshold(self.y, detrend_func, k, percentile) # Find period hints - freq, power = periodogram(self.y, window=None, detrend=None) - period_hints = np.array( + freq, power = periodogram(self.y, window=None, detrend=False) + hints = np.array( [ 1 / f for f, p in zip(freq, power) @@ -122,84 +120,87 @@ def fit( ] ) - # Compute the ACF - length = len(self.y) - acf_arr = acf(self.y, nlags=length, correlation_func=correlation_func) - # Validate period hints - period_hints_valid = [] - for p in period_hints: - q = length / p - start = np.floor((p + length / (q + 1)) / 2 - 1).astype(int) - end = np.ceil((p + length / (q - 1)) / 2 + 1).astype(int) - - splits = [ - self._split(np.arange(len(acf_arr)), acf_arr, start, end, i) - for i in range(start + 2, end) - ] - line1, line2, _ = splits[ - np.array([error for _, _, error in splits]).argmin() - ] - - if line1.slope > 0 > line2.slope: - period_hints_valid.append(p) + valid_hints = [ + h for h in hints if self._is_hint_valid(self.y, h, correlation_func) + ] - period_hints_valid = np.array(period_hints_valid) - - # Return the closest ACF peak for each valid period hint - local_argmax = argrelmax(acf_arr)[0] + # Return the closest ACF peak to each valid period hint + length = len(self.y) + hint_ranges = [ + np.arange( + np.floor((h + length / (length / h + 1)) / 2 - 1), + np.ceil((h + length / (length / h - 1)) / 2 + 1), + dtype=int, + ) + for h in valid_hints + ] + acf_arrays = [ + acf( + self.y, + lag_start=r[0], + lag_stop=r[-1], + correlation_func=correlation_func, + ) + for r in hint_ranges + ] return np.array( list( { - min(local_argmax, key=lambda x: abs(x - p)) - for p in period_hints_valid + r[0] + min(argrelmax(arr)[0], key=lambda x: abs(x - h)) + for h, r, arr in zip(valid_hints, hint_ranges, acf_arrays) } ) ) @staticmethod - def _power_threshold(y: ArrayLike, k: int, p: int) -> float: + def _is_hint_valid( + y: ArrayLike, + hint: float, + correlation_func: str, + ) -> bool: """ - Compute the power threshold as the p-th percentile of the maximum - power values of the periodogram of k permutations of the data. + Validate the period hint. Parameters ---------- y : array_like Data to be investigated. Must be squeezable to 1-d. - k : int - The number of times the data is randomly permuted to compute - the maximum power values. - p : int - The percentile value used to compute the power threshold. - It determines the cutoff point in the sorted list of the maximum - power values from the periodograms of the permuted data. - Value must be between 0 and 100 inclusive. - - See Also - -------- - scipy.signal.periodogram - Estimate power spectral density using a periodogram. + hint : float + The period hint to be validated. + correlation_func : str, default = 'pearson' + The correlation function to be used to calculate the ACF of the series + or the signal. Possible values are ['pearson', 'spearman', 'kendall']. Returns ------- - float - Power threshold of the target data. + bool + Whether the period hint is valid. """ - max_powers = [] - while len(max_powers) < k: - _, power_p = periodogram( - np.random.permutation(y), window=None, detrend=None - ) - max_powers.append(power_p.max()) - max_powers.sort() - return np.percentile(max_powers, p) + length = len(y) + hint_range = np.arange( + np.floor((hint + length / (length / hint + 1)) / 2 - 1), + np.ceil((hint + length / (length / hint - 1)) / 2 + 1), + dtype=int, + ) + acf_arr = acf( + y, + lag_start=hint_range[0], + lag_stop=hint_range[-1], + correlation_func=correlation_func, + ) + splits = [ + Autoperiod._split(hint_range, acf_arr, 0, len(hint_range), i) + for i in range(1, len(hint_range) - 1) + ] + line1, line2, _ = splits[np.array([error for _, _, error in splits]).argmin()] + return line1.coef[-1] > 0 > line2.coef[-1] @staticmethod def _split(x: ArrayLike, y: ArrayLike, start: int, end: int, split: int) -> tuple: """ Approximate a function at [start, end] with two line segments at - [start, split - 1] and [split, end]. + [start, split] and [split, end]. Parameters ---------- @@ -221,22 +222,27 @@ def _split(x: ArrayLike, y: ArrayLike, start: int, end: int, split: int) -> tupl Returns ------- - linregress + numpy.polynomial.Polynomial The first line segment. - linregress + numpy.polynomial.Polynomial The second line segment. float - The error of the approximation. + The approximation error. """ + if not start < split < end: + raise ValueError( + "Invalid start, split, and end values ({}, {}, {})".format( + start, split, end + ) + ) x1, y1, x2, y2 = ( - x[start:split], - y[start:split], - x[split : end + 1], - y[split : end + 1], - ) - line1 = linregress(x1, y1) - line2 = linregress(x2, y2) - error = np.sum(np.abs(y1 - (line1.intercept + line1.slope * x1))) + np.sum( - np.abs(y2 - (line2.intercept + line2.slope * x2)) + x[start : split + 1], + y[start : split + 1], + x[split:end], + y[split:end], ) - return line1, line2, error + line1, stats1 = np.polynomial.Polynomial.fit(x1, y1, deg=1, full=True) + line2, stats2 = np.polynomial.Polynomial.fit(x2, y2, deg=1, full=True) + resid1 = 0 if len(stats1[0]) == 0 else stats1[0][0] + resid2 = 0 if len(stats2[0]) == 0 else stats2[0][0] + return line1.convert(), line2.convert(), resid1 + resid2 diff --git a/pyriodicity/detectors/cfd_autoperiod.py b/pyriodicity/detectors/cfd_autoperiod.py new file mode 100644 index 0000000..f64b098 --- /dev/null +++ b/pyriodicity/detectors/cfd_autoperiod.py @@ -0,0 +1,235 @@ +from typing import Optional, Union + +import numpy as np +from numpy.typing import ArrayLike, NDArray +from scipy.signal import argrelmax, butter, detrend, periodogram, sosfiltfilt + +from pyriodicity.tools import acf, apply_window, power_threshold, to_1d_array + + +class CFDAutoperiod: + """ + CFDAutoperiod periodicity detector. + + Find the periods in a given signal or series using CFDAutoperiod [1]_. + + Parameters + ---------- + endog : array_like + Data to be investigated. Must be squeezable to 1-d. + + References + ---------- + .. [1] Puech, T., Boussard, M., D'Amato, A., & Millerand, G. (2020). + A fully automated periodicity detection in time series. In Advanced + Analytics and Learning on Temporal Data: 4th ECML PKDD Workshop, AALTD 2019, + Würzburg, Germany, September 20, 2019, Revised Selected Papers 4 (pp. 43-54). + Springer International Publishing. https://doi.org/10.1007/978-3-030-39098-3_4 + + Examples + -------- + Start by loading a timeseries datasets and resampling to an appropriate + frequency. + + >>> from statsmodels.datasets import co2 + >>> data = co2.load().data + >>> data = data.resample("ME").mean().ffill() + + Use ``CFDAutoperiod`` to find the list of periods in the data. + + >>> from pyriodicity import CFDAutoperiod + >>> cfd_autoperiod = CFDAutoperiod(data) + >>> cfd_autoperiod.fit() + array([12]) + + You can specify a lower percentile value should you wish for + a more lenient detection + + >>> cfd_autoperiod.fit(percentile=90) + array([12]) + + You can also increase the number of random data permutations + for a more robust power threshold estimation + + >>> cfd_autoperiod.fit(k=300) + array([12]) + + ``CFDAutoperiod`` is considered a more robust variant of ``Autoperiod``. + The detection algorithm found exactly one periodicity of 12, suggesting + a strong yearly periodicity. + """ + + def __init__(self, endog: ArrayLike): + self.y = to_1d_array(endog) + + def fit( + self, + k: int = 100, + percentile: int = 99, + detrend_func: Optional[str] = "linear", + window_func: Optional[Union[str, float, tuple]] = None, + correlation_func: Optional[str] = "pearson", + ) -> NDArray: + """ + Find periods in the given series. + + Parameters + ---------- + k : int, optional, default = 100 + The number of times the data is randomly permuted while estimating the + power threshold. + percentile : int, optional, default = 99 + Percentage for the percentile parameter used in computing the power + threshold. Value must be between 0 and 100 inclusive. + detrend_func : str, default = 'linear' + The kind of detrending to be applied on the signal. It can either be + 'linear' or 'constant'. + window_func : float, str, tuple optional, default = None + Window function to be applied to the time series. Check + 'window' parameter documentation for ``scipy.signal.get_window`` + function for more information on the accepted formats of this + parameter. + correlation_func : str, default = 'pearson' + The correlation function to be used to calculate the ACF of the time + series. Possible values are ['pearson', 'spearman', 'kendall']. + + Returns + ------- + NDArray + List of detected periods. + + See Also + -------- + scipy.signal.detrend + Remove linear trend along axis from data. + scipy.signal.get_window + Return a window of a given length and type. + scipy.stats.kendalltau + Calculate Kendall's tau, a correlation measure for ordinal data. + scipy.stats.pearsonr + Pearson correlation coefficient and p-value for testing non-correlation. + scipy.stats.spearmanr + Calculate a Spearman correlation coefficient with associated p-value. + + """ + # Detrend data + self.y = self.y if detrend_func is None else detrend(self.y, type=detrend_func) + # Apply window on data + self.y = self.y if window_func is None else apply_window(self.y, window_func) + + # Compute the power threshold + p_threshold = power_threshold(self.y, detrend_func, k, percentile) + + # Find period hints + freq, power = periodogram(self.y, detrend=detrend_func) + hints = np.array( + [ + 1 / f + for f, p in zip(freq, power) + if f >= 1 / len(freq) and p >= p_threshold + ] + ) + + # Replace period hints with their density clustering centroids + hints = self._cluster_period_hints(hints, len(self.y)) + + # Validate period hints + valid_hints = [] + length = len(self.y) + y_filtered = np.array(self.y) + for h in hints: + if self._is_hint_valid(y_filtered, h, detrend_func, correlation_func): + # Apply a low pass filter with an adapted cutoff frequency for the next hint + f_cuttoff = 1 / (length / (length / h + 1) - 1) + y_filtered = sosfiltfilt( + butter(N=5, Wn=f_cuttoff, output="sos"), y_filtered + ) + valid_hints.append(h) + + # Calculate only the needed part of the ACF array for each hint + hint_ranges = [ + np.arange(h // 2, 1 + h + h // 2, dtype=int) for h in valid_hints + ] + acf_arrays = [ + acf( + self.y, + lag_start=r[0], + lag_stop=r[-1], + correlation_func=correlation_func, + ) + for r in hint_ranges + ] + + # Return the closest ACF peak to each valid period hint + return np.array( + [ + r[0] + min(argrelmax(arr)[0], key=lambda x: abs(x - h)) + for h, r, arr in zip(valid_hints, hint_ranges, acf_arrays) + ] + ) + + @staticmethod + def _cluster_period_hints(period_hints: ArrayLike, n: int) -> NDArray: + """ + Find the centroids of the period hint density clusters. + + Parameters + ---------- + period_hints : array_like + List of period hints. + n : int + Length of the data. + + Returns + ------- + NDArray + List of period hint density cluster centroids. + """ + hints = np.sort(period_hints) + eps = [ + hints[i] if i == 0 else 1 + n / (n / hints[i - 1] - 1) + for i in range(len(hints)) + ] + clusters = np.split(hints, np.argwhere(hints > eps).flatten()) + return np.array([c.mean() for c in clusters if len(c) > 0]) + + @staticmethod + def _is_hint_valid( + y: ArrayLike, + hint: float, + detrend_func: Union[str], + correlation_func: str, + ) -> bool: + """ + Validate the period hint. + + Parameters + ---------- + y : array_like + Data to be investigated. Must be squeezable to 1-d. + hint : float + The period hint to be validated. + detrend_func : str + The kind of detrending to be applied on the signal. It can either be + 'linear' or 'constant'. + correlation_func : str + The correlation function to be used to calculate the ACF of the series + or the signal. Possible values are ['pearson', 'spearman', 'kendall']. + + Returns + ------- + bool + Whether the period hint is valid. + """ + hint_range = np.arange(hint // 2, 1 + hint + hint // 2, dtype=int) + acf_arr = acf( + y, + lag_start=hint_range[0], + lag_stop=hint_range[-1], + correlation_func=correlation_func, + ) + polynomial = np.polynomial.Polynomial.fit( + hint_range, detrend(acf_arr, type=detrend_func), deg=2 + ).convert() + derivative = polynomial.deriv() + return polynomial.coef[-1] < 0 and int(derivative.roots()[0]) in hint_range diff --git a/pyriodicity/detectors/fft.py b/pyriodicity/detectors/fft.py index c9280d2..657f666 100644 --- a/pyriodicity/detectors/fft.py +++ b/pyriodicity/detectors/fft.py @@ -1,9 +1,10 @@ -from typing import Callable, Optional, Union +from typing import Optional, Union import numpy as np from numpy.typing import ArrayLike, NDArray +from scipy.signal import detrend -from pyriodicity.tools import apply_window, detrend, to_1d_array +from pyriodicity.tools import apply_window, to_1d_array class FFTPeriodicityDetector: @@ -51,7 +52,7 @@ def __init__(self, endog: ArrayLike): def fit( self, max_period_count: Optional[int] = None, - detrend_func: Optional[Union[str, Callable[[ArrayLike], NDArray]]] = "linear", + detrend_func: Optional[str] = "linear", window_func: Optional[Union[float, str, tuple]] = None, ) -> NDArray: """ @@ -61,10 +62,9 @@ def fit( ---------- max_period_count : int, optional, default = None Maximum number of periods to look for. - detrend_func : str, callable, default = None - The kind of detrending to be applied on the series. It can either be - 'linear' or 'constant' if it the parameter is of 'str' type, or a - custom function that returns a detrended series. + detrend_func : str, default = 'linear' + The kind of detrending to be applied on the signal. It can either be + 'linear' or 'constant'. window_func : float, str, tuple optional, default = None Window function to be applied to the time series. Check 'window' parameter documentation for scipy.signal.get_window @@ -86,7 +86,7 @@ def fit( List of detected periods. """ # Detrend data - self.y = self.y if detrend_func is None else detrend(self.y, detrend_func) + self.y = self.y if detrend_func is None else detrend(self.y, type=detrend_func) # Apply the window function on the data self.y = ( diff --git a/pyriodicity/tools/__init__.py b/pyriodicity/tools/__init__.py index bdd219a..bedad90 100644 --- a/pyriodicity/tools/__init__.py +++ b/pyriodicity/tools/__init__.py @@ -1,17 +1,13 @@ from ._tools import ( acf, apply_window, - detrend, - remove_overloaded_kwargs, - seasonality_strength, + power_threshold, to_1d_array, ) __all__ = [ "acf", "apply_window", - "detrend", - "remove_overloaded_kwargs", - "seasonality_strength", + "power_threshold", "to_1d_array", ] diff --git a/pyriodicity/tools/_tools.py b/pyriodicity/tools/_tools.py index ba8dcbf..24e46c2 100644 --- a/pyriodicity/tools/_tools.py +++ b/pyriodicity/tools/_tools.py @@ -1,9 +1,8 @@ -from typing import Callable, Dict, List, Optional, Union +from typing import Optional, Union import numpy as np from numpy.typing import ArrayLike, NDArray -from scipy.signal import detrend as _detrend -from scipy.signal import get_window +from scipy.signal import get_window, periodogram from scipy.stats import kendalltau, pearsonr, spearmanr @@ -15,43 +14,72 @@ def to_1d_array(x: ArrayLike) -> NDArray: return y -@staticmethod -def remove_overloaded_kwargs(kwargs: Dict, args: List) -> Dict: - for arg in args: - kwargs.pop(arg, None) - return kwargs - - -@staticmethod -def seasonality_strength(seasonal: ArrayLike, resid: ArrayLike) -> float: - return max(0, 1 - np.var(resid) / np.var(seasonal + resid)) - - @staticmethod def apply_window(x: ArrayLike, window_func: Union[str, float, tuple]) -> NDArray: return x * get_window(window=window_func, Nx=len(x)) -@staticmethod -def detrend( - x: ArrayLike, - method: Union[str, Callable[[ArrayLike], NDArray]], -) -> NDArray: - if isinstance(method, str): - return _detrend(x, type=method) - return method(x) - - @staticmethod def acf( x: ArrayLike, - nlags: int, + lag_start: int, + lag_stop: int, correlation_func: Optional[str] = "pearson", ) -> NDArray: - if not 0 < nlags <= len(x): - raise ValueError("nlags must be a postive integer less than the data length") + if not 0 <= lag_start < lag_stop <= len(x): + raise ValueError( + "Invalid lag values range ({}, {})".format(lag_start, lag_stop) + ) + lag_values = np.arange(lag_start, lag_stop + 1, dtype=int) if correlation_func == "spearman": - return np.array([spearmanr(x, np.roll(x, l)).statistic for l in range(nlags)]) + return np.array([spearmanr(x, np.roll(x, l)).statistic for l in lag_values]) elif correlation_func == "kendall": - return np.array([kendalltau(x, np.roll(x, l)).statistic for l in range(nlags)]) - return np.array([pearsonr(x, np.roll(x, l)).statistic for l in range(nlags)]) + return np.array([kendalltau(x, np.roll(x, l)).statistic for l in lag_values]) + return np.array([pearsonr(x, np.roll(x, l)).statistic for l in lag_values]) + + +@staticmethod +def power_threshold( + y: ArrayLike, + detrend_func: str, + k: int, + p: int, +) -> float: + """ + Compute the power threshold as the p-th percentile of the maximum + power values of the periodogram of k permutations of the data. + + Parameters + ---------- + y : array_like + Data to be investigated. Must be squeezable to 1-d. + detrend_func : str, default = 'linear' + The kind of detrending to be applied on the series. It can either be + 'linear' or 'constant'. + k : int + The number of times the data is randomly permuted to compute + the maximum power values. + p : int + The percentile value used to compute the power threshold. + It determines the cutoff point in the sorted list of the maximum + power values from the periodograms of the permuted data. + Value must be between 0 and 100 inclusive. + + See Also + -------- + scipy.signal.periodogram + Estimate power spectral density using a periodogram. + + Returns + ------- + float + Power threshold of the target data. + """ + if detrend_func is None: + detrend_func = False + max_powers = [] + while len(max_powers) < k: + _, power_p = periodogram(np.random.permutation(y), detrend=detrend_func) + max_powers.append(power_p.max()) + max_powers.sort() + return np.percentile(max_powers, p) diff --git a/tests/detectors/conftest.py b/tests/detectors/conftest.py new file mode 100644 index 0000000..657d789 --- /dev/null +++ b/tests/detectors/conftest.py @@ -0,0 +1,17 @@ +import pytest +from statsmodels.datasets import co2 + + +@pytest.fixture(scope="module") +def co2_daily(): + return co2.load().data.resample("D").mean().ffill() + + +@pytest.fixture(scope="module") +def co2_weekly(): + return co2.load().data.resample("W").mean().ffill() + + +@pytest.fixture(scope="module") +def co2_monthly(): + return co2.load().data.resample("ME").mean().ffill() diff --git a/tests/detectors/test_acf.py b/tests/detectors/test_acf.py index 30538eb..0d58b21 100644 --- a/tests/detectors/test_acf.py +++ b/tests/detectors/test_acf.py @@ -1,96 +1,118 @@ -from statsmodels.datasets import co2 - from pyriodicity import ACFPeriodicityDetector -def test_co2_daily_acf_default(): - data = co2.load().data.resample("D").mean().ffill() +def test_co2_daily_acf_default(co2_daily): + data = co2_daily acf_detector = ACFPeriodicityDetector(data) periods = acf_detector.fit() assert len(periods) > 0 -def test_co2_weekly_acf_default(): - data = co2.load().data.resample("W").mean().ffill() +def test_co2_weekly_acf_default(co2_weekly): + data = co2_weekly acf_detector = ACFPeriodicityDetector(data) periods = acf_detector.fit() assert len(periods) > 0 -def test_co2_monthly_acf_default(): - data = co2.load().data.resample("ME").mean().ffill() +def test_co2_monthly_acf_default(co2_monthly): + data = co2_monthly acf_detector = ACFPeriodicityDetector(data) periods = acf_detector.fit() assert len(periods) > 0 -def test_co2_daily_acf_max_period_count_one(): - data = co2.load().data.resample("D").mean().ffill() +def test_co2_daily_acf_max_period_count_one(co2_daily): + data = co2_daily acf_detector = ACFPeriodicityDetector(data) periods = acf_detector.fit(max_period_count=1) assert len(periods) == 1 - assert periods[0] == 364 + assert 364 in periods -def test_co2_weekly_acf_max_period_count_one(): - data = co2.load().data.resample("W").mean().ffill() +def test_co2_weekly_acf_max_period_count_one(co2_weekly): + data = co2_weekly acf_detector = ACFPeriodicityDetector(data) periods = acf_detector.fit(max_period_count=1) assert len(periods) == 1 - assert periods[0] == 52 + assert 52 in periods -def test_co2_monthly_acf_max_period_count_one(): - data = co2.load().data.resample("ME").mean().ffill() +def test_co2_monthly_acf_max_period_count_one(co2_monthly): + data = co2_monthly acf_detector = ACFPeriodicityDetector(data) periods = acf_detector.fit(max_period_count=1) assert len(periods) == 1 - assert periods[0] == 12 + assert 12 in periods -def test_co2_daily_acf_max_period_count_one_correlation_func_spearman(): - data = co2.load().data.resample("D").mean().ffill() +def test_co2_daily_acf_max_period_count_one_correlation_func_spearman(co2_daily): + data = co2_daily acf_detector = ACFPeriodicityDetector(data) periods = acf_detector.fit(max_period_count=1, correlation_func="spearman") assert len(periods) == 1 - assert periods[0] == 364 + assert 364 in periods -def test_co2_weekly_acf_max_period_count_one_correlation_func_spearman(): - data = co2.load().data.resample("W").mean().ffill() +def test_co2_weekly_acf_max_period_count_one_correlation_func_spearman(co2_weekly): + data = co2_weekly acf_detector = ACFPeriodicityDetector(data) periods = acf_detector.fit(max_period_count=1, correlation_func="spearman") assert len(periods) == 1 - assert periods[0] == 52 + assert 52 in periods -def test_co2_monthly_acf_max_period_count_one_correlation_func_spearman(): - data = co2.load().data.resample("ME").mean().ffill() +def test_co2_monthly_acf_max_period_count_one_correlation_func_spearman(co2_monthly): + data = co2_monthly acf_detector = ACFPeriodicityDetector(data) periods = acf_detector.fit(max_period_count=1, correlation_func="spearman") assert len(periods) == 1 - assert periods[0] == 12 + assert 12 in periods + + +def test_co2_daily_acf_max_period_count_one_correlation_func_kendall(co2_daily): + data = co2_daily + acf_detector = ACFPeriodicityDetector(data) + periods = acf_detector.fit(max_period_count=1, correlation_func="kendall") + assert len(periods) == 1 + assert 364 in periods + + +def test_co2_weekly_acf_max_period_count_one_correlation_func_kendall(co2_weekly): + data = co2_weekly + acf_detector = ACFPeriodicityDetector(data) + periods = acf_detector.fit(max_period_count=1, correlation_func="kendall") + assert len(periods) == 1 + assert 52 in periods + + +def test_co2_monthly_acf_max_period_count_one_correlation_func_kendall(co2_monthly): + data = co2_monthly + acf_detector = ACFPeriodicityDetector(data) + periods = acf_detector.fit(max_period_count=1, correlation_func="kendall") + assert len(periods) == 1 + assert 12 in periods -def test_co2_daily_acf_max_period_count_one_window_func_blackman(): - data = co2.load().data.resample("D").mean().ffill() +def test_co2_daily_acf_max_period_count_one_window_func_blackman(co2_daily): + data = co2_daily acf_detector = ACFPeriodicityDetector(data) periods = acf_detector.fit(max_period_count=1, window_func="blackman") assert len(periods) == 1 - assert periods[0] == 364 + assert 364 in periods -def test_co2_weekly_acf_max_period_count_one_window_func_blackman(): - data = co2.load().data.resample("W").mean().ffill() +def test_co2_weekly_acf_max_period_count_one_window_func_blackman(co2_weekly): + data = co2_weekly acf_detector = ACFPeriodicityDetector(data) periods = acf_detector.fit(max_period_count=1, window_func="blackman") assert len(periods) == 1 - assert periods[0] == 52 + assert 52 in periods -def test_co2_monthly_acf_max_period_count_one_window_func_blackman(): - data = co2.load().data.resample("ME").mean().ffill() +def test_co2_monthly_acf_max_period_count_one_window_func_blackman(co2_monthly): + data = co2_monthly acf_detector = ACFPeriodicityDetector(data) periods = acf_detector.fit(max_period_count=1, window_func="blackman") assert len(periods) == 1 - assert periods[0] == 12 + assert 12 in periods diff --git a/tests/detectors/test_autoperiod.py b/tests/detectors/test_autoperiod.py index 582403d..529e1d5 100644 --- a/tests/detectors/test_autoperiod.py +++ b/tests/detectors/test_autoperiod.py @@ -1,103 +1,91 @@ -import numpy as np -from statsmodels.datasets import co2 - from pyriodicity import Autoperiod -def test_co2_daily_autoperiod_default(): - data = co2.load().data.resample("D").mean().ffill() +def test_co2_daily_autoperiod_default(co2_daily): + data = co2_daily autoperiod = Autoperiod(data) periods = autoperiod.fit() - assert len(periods) == 1 - assert periods[0] == 364 + assert len(periods) > 0 + assert 364 in periods -def test_co2_weekly_autoperiod_default(): - data = co2.load().data.resample("W").mean().ffill() +def test_co2_weekly_autoperiod_default(co2_weekly): + data = co2_weekly autoperiod = Autoperiod(data) periods = autoperiod.fit() - assert len(periods) == 1 - assert periods[0] == 52 + assert len(periods) > 0 + assert 52 in periods -def test_co2_monthly_autoperiod_default(): - data = co2.load().data.resample("ME").mean().ffill() +def test_co2_monthly_autoperiod_default(co2_monthly): + data = co2_monthly autoperiod = Autoperiod(data) periods = autoperiod.fit() - assert len(periods) == 1 - assert periods[0] == 12 + assert len(periods) > 0 + assert 12 in periods -def test_co2_daily_autoperiod_detrend_func_constant(): - data = co2.load().data.resample("D").mean().ffill() +def test_co2_daily_autoperiod_detrend_func_none(co2_daily): + data = co2_daily autoperiod = Autoperiod(data) - periods = autoperiod.fit(detrend_func="constant") + periods = autoperiod.fit(detrend_func=None) assert len(periods) == 0 -def test_co2_weekly_autoperiod_detrend_func_constant(): - data = co2.load().data.resample("W").mean().ffill() +def test_co2_weekly_autoperiod_detrend_func_none(co2_weekly): + data = co2_weekly autoperiod = Autoperiod(data) - periods = autoperiod.fit(detrend_func="constant") + periods = autoperiod.fit(detrend_func=None) assert len(periods) == 0 -def test_co2_monthly_autoperiod_detrend_func_constant(): - data = co2.load().data.resample("ME").mean().ffill() +def test_co2_monthly_autoperiod_detrend_func_none(co2_monthly): + data = co2_monthly autoperiod = Autoperiod(data) - periods = autoperiod.fit(detrend_func="constant") + periods = autoperiod.fit(detrend_func=None) assert len(periods) == 0 -def test_co2_daily_autoperiod_detrend_func_constant_window_func_blackman(): - data = co2.load().data.resample("D").mean().ffill() +def test_co2_daily_autoperiod_detrend_func_constant(co2_daily): + data = co2_daily autoperiod = Autoperiod(data) - periods = autoperiod.fit(detrend_func="constant", window_func="blackman") - assert len(periods) == 1 - assert periods[0] == 364 + periods = autoperiod.fit(detrend_func="constant") + assert len(periods) == 0 -def test_co2_weekly_autoperiod_detrend_func_constant_window_func_blackman(): - data = co2.load().data.resample("ME").mean().ffill() +def test_co2_weekly_autoperiod_detrend_func_constant(co2_weekly): + data = co2_weekly autoperiod = Autoperiod(data) - periods = autoperiod.fit(detrend_func="constant", window_func="blackman") - assert len(periods) == 1 - assert periods[0] == 12 + periods = autoperiod.fit(detrend_func="constant") + assert len(periods) == 0 -def test_co2_monthly_autoperiod_detrend_func_constant_window_func_blackman(): - data = co2.load().data.resample("ME").mean().ffill() +def test_co2_monthly_autoperiod_detrend_func_constant(co2_monthly): + data = co2_monthly autoperiod = Autoperiod(data) - periods = autoperiod.fit(detrend_func="constant", window_func="blackman") - assert len(periods) == 1 - assert periods[0] == 12 + periods = autoperiod.fit(detrend_func="constant") + assert len(periods) == 0 -def test_co2_daily_autoperiod_detrend_func_custom_window_func_blackmanharris(): - data = co2.load().data.resample("D").mean().ffill() +def test_co2_daily_autoperiod_detrend_func_constant_window_func_blackman(co2_daily): + data = co2_daily autoperiod = Autoperiod(data) - periods = autoperiod.fit( - detrend_func=lambda x: x - np.median(x), window_func="blackmanharris" - ) - assert len(periods) == 1 - assert periods[0] == 364 + periods = autoperiod.fit(detrend_func="constant", window_func="blackman") + assert len(periods) > 0 + assert 364 in periods -def test_co2_weekly_autoperiod_detrend_func_custom_window_func_blackmanharris(): - data = co2.load().data.resample("W").mean().ffill() +def test_co2_weekly_autoperiod_detrend_func_constant_window_func_blackman(co2_weekly): + data = co2_weekly autoperiod = Autoperiod(data) - periods = autoperiod.fit( - detrend_func=lambda x: x - np.median(x), window_func="blackmanharris" - ) - assert len(periods) == 1 - assert periods[0] == 52 + periods = autoperiod.fit(detrend_func="constant", window_func="blackman") + assert len(periods) > 0 + assert 52 in periods -def test_co2_monthly_autoperiod_detrend_func_custom_window_func_blackmanharris(): - data = co2.load().data.resample("ME").mean().ffill() +def test_co2_monthly_autoperiod_detrend_func_constant_window_func_blackman(co2_monthly): + data = co2_monthly autoperiod = Autoperiod(data) - periods = autoperiod.fit( - detrend_func=lambda x: x - np.median(x), window_func="blackmanharris" - ) - assert len(periods) == 1 - assert periods[0] == 12 + periods = autoperiod.fit(detrend_func="constant", window_func="blackman") + assert len(periods) > 0 + assert 12 in periods diff --git a/tests/detectors/test_cfd_autoperiod.py b/tests/detectors/test_cfd_autoperiod.py new file mode 100644 index 0000000..7d8f097 --- /dev/null +++ b/tests/detectors/test_cfd_autoperiod.py @@ -0,0 +1,91 @@ +from pyriodicity import CFDAutoperiod + + +def test_co2_daily_cfd_autoperiod_default(co2_daily): + data = co2_daily + cfd_autoperiod = CFDAutoperiod(data) + periods = cfd_autoperiod.fit() + assert len(periods) > 0 + assert 364 in periods + + +def test_co2_weekly_cfd_autoperiod_default(co2_weekly): + data = co2_weekly + cfd_autoperiod = CFDAutoperiod(data) + periods = cfd_autoperiod.fit() + assert len(periods) > 0 + assert 52 in periods + + +def test_co2_monthly_cfd_autoperiod_default(co2_monthly): + data = co2_monthly + cfd_autoperiod = CFDAutoperiod(data) + periods = cfd_autoperiod.fit() + assert len(periods) > 0 + assert 12 in periods + + +def test_co2_daily_cfd_autoperiod_detrend_func_none(co2_daily): + data = co2_daily + cfd_autoperiod = CFDAutoperiod(data) + periods = cfd_autoperiod.fit(detrend_func=None) + assert len(periods) == 0 + + +def test_co2_weekly_cfd_autoperiod_detrend_func_none(co2_weekly): + data = co2_weekly + cfd_autoperiod = CFDAutoperiod(data) + periods = cfd_autoperiod.fit(detrend_func=None) + assert len(periods) == 0 + + +def test_co2_monthly_cfd_autoperiod_detrend_func_none(co2_monthly): + data = co2_monthly + cfd_autoperiod = CFDAutoperiod(data) + periods = cfd_autoperiod.fit(detrend_func=None) + assert len(periods) == 0 + + +def test_co2_daily_cfd_autoperiod_detrend_func_constant(co2_daily): + data = co2_daily + cfd_autoperiod = CFDAutoperiod(data) + periods = cfd_autoperiod.fit(detrend_func="constant") + assert len(periods) == 0 + + +def test_co2_weekly_cfd_autoperiod_detrend_func_constant(co2_weekly): + data = co2_weekly + cfd_autoperiod = CFDAutoperiod(data) + periods = cfd_autoperiod.fit(detrend_func="constant") + assert len(periods) == 0 + + +def test_co2_monthly_cfd_autoperiod_detrend_func_constant(co2_monthly): + data = co2_monthly + cfd_autoperiod = CFDAutoperiod(data) + periods = cfd_autoperiod.fit(detrend_func="constant") + assert len(periods) == 0 + + +def test_co2_daily_cfd_autoperiod_window_func_blackman(co2_daily): + data = co2_daily + cfd_autoperiod = CFDAutoperiod(data) + periods = cfd_autoperiod.fit(window_func="blackman") + assert len(periods) > 0 + assert 364 in periods + + +def test_co2_weekly_cfd_autoperiod_window_func_blackman(co2_weekly): + data = co2_weekly + cfd_autoperiod = CFDAutoperiod(data) + periods = cfd_autoperiod.fit(window_func="blackman") + assert len(periods) > 0 + assert 52 in periods + + +def test_co2_monthly_cfd_autoperiod_window_func_blackman(co2_monthly): + data = co2_monthly + cfd_autoperiod = CFDAutoperiod(data) + periods = cfd_autoperiod.fit(window_func="blackman") + assert len(periods) > 0 + assert 12 in periods diff --git a/tests/detectors/test_fft.py b/tests/detectors/test_fft.py index f658b1f..6ee847e 100644 --- a/tests/detectors/test_fft.py +++ b/tests/detectors/test_fft.py @@ -1,233 +1,231 @@ -from statsmodels.datasets import co2 - from pyriodicity import FFTPeriodicityDetector -def test_co2_monthly_fft_find_all_periods(): - data = co2.load().data.resample("ME").mean().ffill() +def test_co2_monthly_fft_find_all_periods(co2_monthly): + data = co2_monthly fft_detector = FFTPeriodicityDetector(data) periods = fft_detector.fit() - assert len(periods) != 0 + assert len(periods) > 0 -def test_co2_monthly_fft_find_first_two_periods(): - data = co2.load().data.resample("ME").mean().ffill() +def test_co2_monthly_fft_find_first_two_periods(co2_monthly): + data = co2_monthly fft_detector = FFTPeriodicityDetector(data) periods = fft_detector.fit(max_period_count=2) assert len(periods) == 2 -def test_co2_daily_fft_find_strongest_period(): - data = co2.load().data.resample("D").mean().ffill() +def test_co2_daily_fft_find_strongest_period(co2_daily): + data = co2_daily fft_detector = FFTPeriodicityDetector(data) periods = fft_detector.fit(max_period_count=1) assert len(periods) == 1 - assert periods[0] == 363 + assert 363 in periods -def test_co2_weekly_fft_find_strongest_period(): - data = co2.load().data.resample("W").mean().ffill() +def test_co2_weekly_fft_find_strongest_period(co2_weekly): + data = co2_weekly fft_detector = FFTPeriodicityDetector(data) periods = fft_detector.fit(max_period_count=1) assert len(periods) == 1 - assert periods[0] == 52 + assert 52 in periods -def test_co2_monthly_fft_find_strongest_period(): - data = co2.load().data.resample("ME").mean().ffill() +def test_co2_monthly_fft_find_strongest_period(co2_monthly): + data = co2_monthly fft_detector = FFTPeriodicityDetector(data) periods = fft_detector.fit(max_period_count=1) assert len(periods) == 1 - assert periods[0] == 12 + assert 12 in periods -def test_co2_daily_fft_find_strongest_period_window_func_barthann(): - data = co2.load().data.resample("D").mean().ffill() +def test_co2_daily_fft_find_strongest_period_window_func_barthann(co2_daily): + data = co2_daily fft_detector = FFTPeriodicityDetector(data) periods = fft_detector.fit(max_period_count=1, window_func="barthann") assert len(periods) == 1 - assert periods[0] == 363 + assert 363 in periods -def test_co2_weekly_fft_find_strongest_period_window_func_barthann(): - data = co2.load().data.resample("W").mean().ffill() +def test_co2_weekly_fft_find_strongest_period_window_func_barthann(co2_weekly): + data = co2_weekly fft_detector = FFTPeriodicityDetector(data) periods = fft_detector.fit(max_period_count=1, window_func="barthann") assert len(periods) == 1 - assert periods[0] == 52 + assert 52 in periods -def test_co2_monthly_fft_find_strongest_period_window_func_barthann(): - data = co2.load().data.resample("ME").mean().ffill() +def test_co2_monthly_fft_find_strongest_period_window_func_barthann(co2_monthly): + data = co2_monthly fft_detector = FFTPeriodicityDetector(data) periods = fft_detector.fit(max_period_count=1, window_func="barthann") assert len(periods) == 1 - assert periods[0] == 12 + assert 12 in periods -def test_co2_daily_fft_find_strongest_period_window_func_bartlett(): - data = co2.load().data.resample("D").mean().ffill() +def test_co2_daily_fft_find_strongest_period_window_func_bartlett(co2_daily): + data = co2_daily fft_detector = FFTPeriodicityDetector(data) periods = fft_detector.fit(max_period_count=1, window_func="bartlett") assert len(periods) == 1 - assert periods[0] == 363 + assert 363 in periods -def test_co2_weekly_fft_find_strongest_period_window_func_bartlett(): - data = co2.load().data.resample("W").mean().ffill() +def test_co2_weekly_fft_find_strongest_period_window_func_bartlett(co2_weekly): + data = co2_weekly fft_detector = FFTPeriodicityDetector(data) periods = fft_detector.fit(max_period_count=1, window_func="bartlett") assert len(periods) == 1 - assert periods[0] == 52 + assert 52 in periods -def test_co2_monthly_fft_find_strongest_period_window_func_bartlett(): - data = co2.load().data.resample("ME").mean().ffill() +def test_co2_monthly_fft_find_strongest_period_window_func_bartlett(co2_monthly): + data = co2_monthly fft_detector = FFTPeriodicityDetector(data) periods = fft_detector.fit(max_period_count=1, window_func="bartlett") assert len(periods) == 1 - assert periods[0] == 12 + assert 12 in periods -def test_co2_daily_fft_find_strongest_period_window_func_blackman(): - data = co2.load().data.resample("D").mean().ffill() +def test_co2_daily_fft_find_strongest_period_window_func_blackman(co2_daily): + data = co2_daily fft_detector = FFTPeriodicityDetector(data) periods = fft_detector.fit(max_period_count=1, window_func="blackman") assert len(periods) == 1 - assert periods[0] == 363 + assert 363 in periods -def test_co2_weekly_fft_find_strongest_period_window_func_blackman(): - data = co2.load().data.resample("W").mean().ffill() +def test_co2_weekly_fft_find_strongest_period_window_func_blackman(co2_weekly): + data = co2_weekly fft_detector = FFTPeriodicityDetector(data) periods = fft_detector.fit(max_period_count=1, window_func="blackman") assert len(periods) == 1 - assert periods[0] == 52 + assert 52 in periods -def test_co2_monthly_fft_find_strongest_period_window_func_blackman(): - data = co2.load().data.resample("ME").mean().ffill() +def test_co2_monthly_fft_find_strongest_period_window_func_blackman(co2_monthly): + data = co2_monthly fft_detector = FFTPeriodicityDetector(data) periods = fft_detector.fit(max_period_count=1, window_func="blackman") assert len(periods) == 1 - assert periods[0] == 12 + assert 12 in periods -def test_co2_daily_fft_find_strongest_period_window_func_blackmanharris(): - data = co2.load().data.resample("D").mean().ffill() +def test_co2_daily_fft_find_strongest_period_window_func_blackmanharris(co2_daily): + data = co2_daily fft_detector = FFTPeriodicityDetector(data) periods = fft_detector.fit(max_period_count=1, window_func="blackmanharris") assert len(periods) == 1 - assert periods[0] == 363 + assert 363 in periods -def test_co2_weekly_fft_find_strongest_period_window_func_blackmanharris(): - data = co2.load().data.resample("W").mean().ffill() +def test_co2_weekly_fft_find_strongest_period_window_func_blackmanharris(co2_weekly): + data = co2_weekly fft_detector = FFTPeriodicityDetector(data) periods = fft_detector.fit(max_period_count=1, window_func="blackmanharris") assert len(periods) == 1 - assert periods[0] == 52 + assert 52 in periods -def test_co2_monthly_fft_find_strongest_period_window_func_blackmanharris(): - data = co2.load().data.resample("ME").mean().ffill() +def test_co2_monthly_fft_find_strongest_period_window_func_blackmanharris(co2_monthly): + data = co2_monthly fft_detector = FFTPeriodicityDetector(data) periods = fft_detector.fit(max_period_count=1, window_func="blackmanharris") assert len(periods) == 1 - assert periods[0] == 12 + assert 12 in periods -def test_co2_daily_fft_find_strongest_period_window_func_boxcar(): - data = co2.load().data.resample("D").mean().ffill() +def test_co2_daily_fft_find_strongest_period_window_func_boxcar(co2_daily): + data = co2_daily fft_detector = FFTPeriodicityDetector(data) periods = fft_detector.fit(max_period_count=1, window_func="boxcar") assert len(periods) == 1 - assert periods[0] == 363 + assert 363 in periods -def test_co2_weekly_fft_find_strongest_period_window_func_boxcar(): - data = co2.load().data.resample("W").mean().ffill() +def test_co2_weekly_fft_find_strongest_period_window_func_boxcar(co2_weekly): + data = co2_weekly fft_detector = FFTPeriodicityDetector(data) periods = fft_detector.fit(max_period_count=1, window_func="boxcar") assert len(periods) == 1 - assert periods[0] == 52 + assert 52 in periods -def test_co2_monthly_fft_find_strongest_period_window_func_boxcar(): - data = co2.load().data.resample("ME").mean().ffill() +def test_co2_monthly_fft_find_strongest_period_window_func_boxcar(co2_monthly): + data = co2_monthly fft_detector = FFTPeriodicityDetector(data) periods = fft_detector.fit(max_period_count=1, window_func="boxcar") assert len(periods) == 1 - assert periods[0] == 12 + assert 12 in periods -def test_co2_daily_fft_find_strongest_period_window_func_hamming(): - data = co2.load().data.resample("D").mean().ffill() +def test_co2_daily_fft_find_strongest_period_window_func_hamming(co2_daily): + data = co2_daily fft_detector = FFTPeriodicityDetector(data) periods = fft_detector.fit(max_period_count=1, window_func="hamming") assert len(periods) == 1 - assert periods[0] == 363 + assert 363 in periods -def test_co2_weekly_fft_find_strongest_period_window_func_hamming(): - data = co2.load().data.resample("W").mean().ffill() +def test_co2_weekly_fft_find_strongest_period_window_func_hamming(co2_weekly): + data = co2_weekly fft_detector = FFTPeriodicityDetector(data) periods = fft_detector.fit(max_period_count=1, window_func="hamming") assert len(periods) == 1 - assert periods[0] == 52 + assert 52 in periods -def test_co2_monthly_fft_find_strongest_period_window_func_hamming(): - data = co2.load().data.resample("ME").mean().ffill() +def test_co2_monthly_fft_find_strongest_period_window_func_hamming(co2_monthly): + data = co2_monthly fft_detector = FFTPeriodicityDetector(data) periods = fft_detector.fit(max_period_count=1, window_func="hamming") assert len(periods) == 1 - assert periods[0] == 12 + assert 12 in periods -def test_co2_daily_fft_find_strongest_period_window_func_hann(): - data = co2.load().data.resample("D").mean().ffill() +def test_co2_daily_fft_find_strongest_period_window_func_hann(co2_daily): + data = co2_daily fft_detector = FFTPeriodicityDetector(data) periods = fft_detector.fit(max_period_count=1, window_func="hann") assert len(periods) == 1 - assert periods[0] == 363 + assert 363 in periods -def test_co2_weekly_fft_find_strongest_period_window_func_hann(): - data = co2.load().data.resample("W").mean().ffill() +def test_co2_weekly_fft_find_strongest_period_window_func_hann(co2_weekly): + data = co2_weekly fft_detector = FFTPeriodicityDetector(data) periods = fft_detector.fit(max_period_count=1, window_func="hann") assert len(periods) == 1 - assert periods[0] == 52 + assert 52 in periods -def test_co2_monthly_fft_find_strongest_period_window_func_hann(): - data = co2.load().data.resample("ME").mean().ffill() +def test_co2_monthly_fft_find_strongest_period_window_func_hann(co2_monthly): + data = co2_monthly fft_detector = FFTPeriodicityDetector(data) periods = fft_detector.fit(max_period_count=1, window_func="hann") assert len(periods) == 1 - assert periods[0] == 12 + assert 12 in periods -def test_co2_daily_fft_find_strongest_period_window_func_tukey(): - data = co2.load().data.resample("D").mean().ffill() +def test_co2_daily_fft_find_strongest_period_window_func_tukey(co2_daily): + data = co2_daily fft_detector = FFTPeriodicityDetector(data) periods = fft_detector.fit(max_period_count=1, window_func="tukey") assert len(periods) == 1 - assert periods[0] == 363 + assert 363 in periods -def test_co2_weekly_fft_find_strongest_period_window_func_tukey(): - data = co2.load().data.resample("W").mean().ffill() +def test_co2_weekly_fft_find_strongest_period_window_func_tukey(co2_weekly): + data = co2_weekly fft_detector = FFTPeriodicityDetector(data) periods = fft_detector.fit(max_period_count=1, window_func="tukey") assert len(periods) == 1 - assert periods[0] == 52 + assert 52 in periods -def test_co2_monthly_fft_find_strongest_period_window_func_tukey(): - data = co2.load().data.resample("ME").mean().ffill() +def test_co2_monthly_fft_find_strongest_period_window_func_tukey(co2_monthly): + data = co2_monthly fft_detector = FFTPeriodicityDetector(data) periods = fft_detector.fit(max_period_count=1, window_func="tukey") assert len(periods) == 1 - assert periods[0] == 12 + assert 12 in periods