Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: parallel intra strided-rolling #102

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
537 changes: 296 additions & 241 deletions poetry.lock

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ pycatch22 = "^0.4.2"
# antropy = "^0.1.5"
antropy = { git = "https://github.com/raphaelvallat/antropy.git", branch = "master" } # replace with pip when > 0.1.5 release available
nolds = "^0.5.2"
pyentrp = [
{ version = "^0.7.1", python = "<3.8" },
{ version = "^0.8.2", python = ">=3.8" }
]
# Linting
ruff = "^0.0.264"
black = "^22.12.0"
Expand Down
133 changes: 132 additions & 1 deletion tests/test_features_feature_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -1730,7 +1730,7 @@ def sum_mean(x, axis):
s = "EDA__"
p = "__w=1000"
assert np.all(res[s + "sum" + p].values == res[s + "sum_vect" + p].values)
assert np.all(res[s + "mean" + p].values == res[s + "mean_vect" + p].values)
assert np.allclose(res[s + "mean" + p].values, res[s + "mean_vect" + p].values)


def test_multiple_inputs_vectorized_features(dummy_data):
Expand Down Expand Up @@ -1759,6 +1759,137 @@ def windowed_diff(x1, x2):
assert np.all(res["EDA|TMP__windowed_diff" + p].values == manual_diff)


### Test parallel features


def test_basic_parallel_features_different_fc(dummy_data):
fs = 4 # The sample frequency in Hz
fc1 = FeatureCollection(
feature_descriptors=[
FeatureDescriptor(np.max, "EDA", 250 * fs, 75 * fs),
]
)
fc2 = FeatureCollection(
feature_descriptors=[
FeatureDescriptor(
FuncWrapper(np.max, output_names="max_", parallel=True),
"EDA",
250 * fs,
75 * fs,
),
]
)
res1 = fc1.calculate(dummy_data)
res2 = fc2.calculate(dummy_data)

assert len(res1) == 1
assert len(res2) == 1
res1 = res1[0]
res2 = res2[0]
assert (len(res1) > 1) and (len(res2) > 1)
assert np.all(res1.index == res2.index)
assert np.all(res1.values == res2.values)


def test_basic_parallel_features_same_fc(dummy_data):
fs = 4 # The sample frequency in Hz
fc = FeatureCollection(
feature_descriptors=[
FeatureDescriptor(np.max, "EDA", 250 * fs, 75 * fs),
FeatureDescriptor(
FuncWrapper(np.max, output_names="max_", parallel=True),
"EDA",
250 * fs,
75 * fs,
),
]
)
res = fc.calculate(dummy_data)

assert len(res) == 2
assert (len(res[0]) > 1) and (len(res[1]) > 1)
assert np.all(res[0].index == res[1].index)
assert np.all(res[0].values == res[1].values)


def test_time_based_parallel_features(dummy_data):
fc = FeatureCollection(
feature_descriptors=[
FeatureDescriptor(np.max, "EDA", "5min", "3min"),
FeatureDescriptor(
FuncWrapper(np.max, output_names="max_", parallel=True),
"EDA",
"5min",
"3min",
),
]
)
res = fc.calculate(dummy_data)

assert len(res) == 2
assert (len(res[0]) > 1) and (len(res[1]) > 1)
assert np.all(res[0].index == res[1].index)
assert np.all(res[0].values == res[1].values)


def test_multiple_outputs_parallel_features(dummy_data):
def sum_mean(x):
s = np.sum(x)
return s, s / x.shape[0]

fs = 4 # The sample frequency in Hz
fc = FeatureCollection(
feature_descriptors=[
FeatureDescriptor(np.sum, "EDA", 250 * fs, 75 * fs),
FeatureDescriptor(np.mean, "EDA", 250 * fs, 75 * fs),
FeatureDescriptor(
FuncWrapper(
sum_mean,
output_names=["sum_par", "mean_par"],
parallel=True,
),
"EDA",
250 * fs,
75 * fs,
),
]
)

res = fc.calculate(dummy_data, return_df=True)

assert res.shape[1] == 4
s = "EDA__"
p = "__w=1000"
assert np.all(res[s + "sum" + p].values == res[s + "sum_par" + p].values)
assert np.allclose(res[s + "mean" + p].values, res[s + "mean_par" + p].values)


def test_multiple_inputs_parallel_features(dummy_data):
def windowed_diff(x1, x2):
return np.sum(x1) - np.sum(x2)

fc = FeatureCollection(
feature_descriptors=[
FeatureDescriptor(np.sum, "EDA", "5min", "2.5min"),
FeatureDescriptor(np.sum, "TMP", "5min", "2.5min"),
FeatureDescriptor(
FuncWrapper(windowed_diff, parallel=True),
("EDA", "TMP"),
"5min",
"2.5min",
),
]
)

res = fc.calculate(dummy_data, return_df=True)

assert res.shape[1] == 3
assert res.shape[0] > 1
p = "__w=5m"
manual_diff = res["EDA__sum" + p].values - res["TMP__sum" + p].values
assert np.all(res["EDA|TMP__windowed_diff" + p].values == manual_diff)


### Test feature extraction length


Expand Down
131 changes: 101 additions & 30 deletions tsflex/features/feature_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,31 @@ def get_required_series(self) -> List[str]:
set(flatten([fr_key[0] for fr_key in self._feature_desc_dict.keys()]))
)

def _get_feature_desc_dict(
self, parallel: bool
) -> Dict[
Tuple[Tuple[str, ...], Union[float, pd.Timedelta]], List[FeatureDescriptor]
]:
"""Return the feature descriptor dictionary.

Parameters
----------
parallel : bool
Whether to return the feature descriptors for the parallel functions or not.

Returns
-------
Dict[Tuple[Tuple[str, ...], Union[float, pd.Timedelta]], List[FeatureDescriptor]]
The feature descriptor dictionary.

"""
res = {}
for k, fd_list in self._feature_desc_dict.items():
fd_list = [fd for fd in fd_list if fd.function.parallel == parallel]
if fd_list:
res[k] = fd_list
return res

def get_nb_output_features(self) -> int:
"""Return the number of output features in this feature collection.

Expand Down Expand Up @@ -260,11 +285,23 @@ def add(
self._check_feature_descriptors(skip_none=True)

@staticmethod
def _executor(idx: int):
# global get_stroll_func
stroll, function = get_stroll_func(idx)
def _non_parallel_func_executor(idx: int):
"""Execute functions that are not parallelized (over the strided window).

This executor can be called in a multiprocessing context.
"""
stroll, function = get_stroll_non_parallel_func(idx)
return stroll.apply_func(function)

@staticmethod
def _parallel_func_executor(idx: int, n_jobs: int):
"""Execute functions that are parallelized (over the strided window).

This executor canNOT be called in a multiprocessing context.
"""
stroll, function = get_stroll_parallel_func(idx)
return stroll.apply_func(function, n_jobs)

# def _get_stroll(self, kwargs):
# return StridedRollingFactory.get_segmenter(**kwargs)

Expand All @@ -279,20 +316,21 @@ def _stroll_feat_generator(
window_idx: str,
include_final_window: bool,
approve_sparsity: bool,
parallel: bool,
) -> Callable[[int], Tuple[StridedRolling, FuncWrapper]]:
# --- Future work ---
# We could also make the StridedRolling creation multithreaded
# Very low priority because the STROLL __init__ is rather efficient!
keys_wins_strides = list(self._feature_desc_dict.keys())
lengths = np.cumsum(
[len(self._feature_desc_dict[k]) for k in keys_wins_strides]
)
feature_desc_dict = self._get_feature_desc_dict(parallel)

keys_wins_strides = list(feature_desc_dict)
lengths = np.cumsum([len(feature_desc_dict[k]) for k in keys_wins_strides])

def get_stroll_function(idx) -> Tuple[StridedRolling, FuncWrapper]:
key_idx = np.searchsorted(lengths, idx, "right") # right bc idx starts at 0
key, win = keys_wins_strides[key_idx]

feature = self._feature_desc_dict[keys_wins_strides[key_idx]][
feature = feature_desc_dict[keys_wins_strides[key_idx]][
idx - lengths[key_idx]
]
stride = feature.stride if calc_stride is None else calc_stride
Expand All @@ -316,10 +354,9 @@ def get_stroll_function(idx) -> Tuple[StridedRolling, FuncWrapper]:

return get_stroll_function

def _get_stroll_feat_length(self) -> int:
return sum(
len(self._feature_desc_dict[k]) for k in self._feature_desc_dict.keys()
)
def _get_stroll_feat_length(self, parallel: bool) -> int:
feat_desc_dict = self._get_feature_desc_dict(parallel)
return sum(len(feat_desc_dict[k]) for k in feat_desc_dict.keys())

def _check_no_multiple_windows(self):
assert (
Expand Down Expand Up @@ -599,11 +636,18 @@ def calculate(
for n, s, in series_dict.items()
}

if (
os.name == "nt"
): # On Windows no multiprocessing is supported, see https://github.com/predict-idlab/tsflex/issues/51
n_jobs = 1
elif n_jobs is None:
n_jobs = os.cpu_count()

# Note: this variable has a global scope so this is shared in multiprocessing
# TODO: try to make this more efficient (but is not really the bottleneck)
global get_stroll_func
get_stroll_func = self._stroll_feat_generator(
series_dict,
global get_stroll_non_parallel_func # is not applied in parallel on the stroll
global get_stroll_parallel_func # is applied in parallel on the stroll
kwargs = dict(
calc_stride=stride,
segment_start_idxs=segment_start_idxs,
segment_end_idxs=segment_end_idxs,
Expand All @@ -613,30 +657,42 @@ def calculate(
include_final_window=include_final_window,
approve_sparsity=approve_sparsity,
)
nb_stroll_funcs = self._get_stroll_feat_length()

if (
os.name == "nt"
): # On Windows no multiprocessing is supported, see https://github.com/predict-idlab/tsflex/issues/51
n_jobs = 1
elif n_jobs is None:
n_jobs = os.cpu_count()
n_jobs = min(n_jobs, nb_stroll_funcs)
get_stroll_non_parallel_func = self._stroll_feat_generator(
series_dict,
**kwargs,
parallel=False,
)
nb_stroll_non_parallel_func = self._get_stroll_feat_length(parallel=False)
n_jobs_non_parallel_func = min(n_jobs, nb_stroll_non_parallel_func)
get_stroll_parallel_func = self._stroll_feat_generator(
series_dict,
**kwargs,
parallel=True,
)
nb_stroll_parallel_func = self._get_stroll_feat_length(parallel=True)

calculated_feature_list = None
jvdd marked this conversation as resolved.
Show resolved Hide resolved
if n_jobs in [0, 1]:
idxs = range(nb_stroll_funcs)

# 1. Calculate the features for the non-parallel functions
# -> these functions are calculated (possibly in parallel) over the functions
if n_jobs_non_parallel_func in [0, 1]:
idxs = range(nb_stroll_non_parallel_func)
if show_progress:
idxs = tqdm(idxs)
try:
calculated_feature_list = [self._executor(idx) for idx in idxs]
# 1. Calculate the features for the non-parallel functions
calculated_feature_list = [
self._non_parallel_func_executor(i) for i in idxs
]
except Exception:
traceback.print_exc()
else:
with Pool(processes=n_jobs) as pool:
results = pool.imap_unordered(self._executor, range(nb_stroll_funcs))
with Pool(processes=n_jobs_non_parallel_func) as pool:
results = pool.imap_unordered(
self._non_parallel_func_executor, range(nb_stroll_non_parallel_func)
)
if show_progress:
results = tqdm(results, total=nb_stroll_funcs)
results = tqdm(results, total=nb_stroll_non_parallel_func)
try:
calculated_feature_list = [f for f in results]
except Exception:
Expand All @@ -647,6 +703,21 @@ def calculate(
pool.close()
pool.join()

# 2. Calculate the features for the parallel functions
# -> these functions are calculated (possibly in parallel) within the functions
idxs = range(nb_stroll_parallel_func)
if show_progress:
idxs = tqdm(idxs)
try:
# We collect one by one the results of the parallel functions
# (because the parallelization is done within the function , i.e. over the
# strided windows)
calculated_feature_list += [
self._parallel_func_executor(i, n_jobs) for i in idxs
]
except Exception:
traceback.print_exc()

# Close the file handler (this avoids PermissionError: [WinError 32])
if logging_file_path:
f_handler.close()
Expand Down
7 changes: 6 additions & 1 deletion tsflex/features/function_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ def __init__(
output_names: Optional[Union[List[str], str]] = None,
input_type: Optional[Union[np.array, pd.Series]] = np.array,
vectorized: bool = False,
parallel: bool = False,
**kwargs,
):
"""Create FuncWrapper instance."""
Expand All @@ -83,10 +84,14 @@ def __init__(

assert input_type in SUPPORTED_STROLL_TYPES, "Invalid input_type!"
assert not (
vectorized & (input_type is not np.array)
vectorized and (input_type is not np.array)
), "The input_type must be np.array if vectorized is True!"
assert not (
vectorized and parallel
), "vectorized and parallel cannot be both True!"
self.input_type = input_type
self.vectorized = vectorized
self.parallel = parallel

self._freeze()

Expand Down
Loading