Skip to content

Commit

Permalink
Merge pull request #220 from rwth-i6/bene_update_cleaning
Browse files Browse the repository at this point in the history
Add Filtering, Alias printing and Typing to Cleaning
  • Loading branch information
Atticus1806 authored Nov 28, 2024
2 parents 442032d + 4b5a66c commit d08616b
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 20 deletions.
109 changes: 89 additions & 20 deletions sisyphus/cleaner.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,11 @@
import sys
import tempfile

from typing import Dict, List, Optional, Set, Union

from sisyphus import graph
from sisyphus.job import Job
from sisyphus.job_path import Path
import sisyphus.global_settings as gs


Expand All @@ -31,7 +35,7 @@
JOB_WITHOUT_KEEP_VALUE = -1


def extract_keep_values_from_graph():
def extract_keep_values_from_graph() -> Dict[str, int]:
"""Go through loaded graph and create dict with all jobs and keep values
:return:
Expand Down Expand Up @@ -80,11 +84,17 @@ def extract_keep_values_from_graph():
return job_dirs


def find_too_low_keep_value(job_dirs, min_keep_value):
def find_too_low_keep_value(
job_dirs: Union[str, Dict[Union[str, Path], int]],
min_keep_value: int,
filter_removed_jobs: Optional[List[Union[str, Path]]] = None,
) -> Set[Union[Path, str]]:
"""Check all given job if they can be removed and have a keep value lower min_keep_value.
:param job_dirs: dict with all keep values, can be created with extract_keep_values_from_graph
:param min_keep_value: minimal keep value
:param filter_removed_jobs: Only Jobs matching the substring will be deleted
:return:
"""
if isinstance(job_dirs, str):
Expand All @@ -96,12 +106,12 @@ def find_too_low_keep_value(job_dirs, min_keep_value):
continue
if keep_value == JOB_WITHOUT_KEEP_VALUE:
keep_value = gs.JOB_DEFAULT_KEEP_VALUE
if keep_value < min_keep_value:
if keep_value < min_keep_value and (filter_removed_jobs is None or any(x in path for x in filter_removed_jobs)):
to_remove.add(path)
return to_remove


def list_all_graph_directories():
def list_all_graph_directories() -> Dict[str, int]:
"""Create dict containing all filesystem directories used by jobs inside the loaded graph
:return: dict
Expand All @@ -117,7 +127,7 @@ def list_all_graph_directories():
return job_dirs


def save_used_paths(outfile=None, job_dirs=None):
def save_used_paths(outfile: Union[str, Path] = None, job_dirs: Dict[Union[str, Path], int] = None):
"""Write dict of directories in the graph to file
:param outfile: Filename of output file, if not given write to stdout
Expand All @@ -133,7 +143,7 @@ def save_used_paths(outfile=None, job_dirs=None):
out.close()


def load_used_paths(infile):
def load_used_paths(infile: Union[str, Path]) -> Dict[str, int]:
"""Load list save with save_used_paths
:param infile: Filename to load from
Expand All @@ -147,7 +157,7 @@ def load_used_paths(infile):
return job_dirs


def save_remove_list(to_remove, outfile):
def save_remove_list(to_remove: List[Union[str, Path]], outfile: Union[str, Path]):
"""Write list of files that should be removed to file
:param to_remove: List of directories
:param outfile: Filename of output file
Expand All @@ -158,7 +168,7 @@ def save_remove_list(to_remove, outfile):
f.write(i + "\n")


def load_remove_list(infile):
def load_remove_list(infile: Union[str, Path]) -> List[str]:
"""Load list save with save_remove_list
:param infile: Filename to load from
Expand All @@ -171,12 +181,19 @@ def load_remove_list(infile):
return out


def search_for_unused(job_dirs, current=gs.WORK_DIR, verbose=True):
def search_for_unused(
job_dirs: Union[str, Dict[Union[str, Path], int]],
current: str = gs.WORK_DIR,
verbose: bool = True,
filter_unused: Optional[List[str]] = None,
) -> Set[str]:
"""Check work directory and list all subdirectories which do not belong to the given list of directories.
:param job_dirs: dict with all used directories, can be created with list_all_graph_directories.
:param current: current work directory
:param verbose: make it verbose
:param filter_unused: Only Jobs matching the substring will be deleted
:return: List with all unused directories
"""

Expand All @@ -192,11 +209,11 @@ def search_for_unused(job_dirs, current=gs.WORK_DIR, verbose=True):
path = os.path.join(current, short_path)
status = job_dirs.get(path)

if status is None:
if status is None and (filter_unused is None or any(x in path for x in filter_unused)):
unused.add(path)
elif status == DIR_IN_GRAPH:
# directory has sub directories used by current graph
found = search_for_unused(job_dirs, path, verbose)
found = search_for_unused(job_dirs, path, verbose, filter_unused=filter_unused)
unused.update(found)
if verbose:
logging.info("found %s unused directories in %s (total so far: %s)" % (len(found), path, len(unused)))
Expand All @@ -206,7 +223,14 @@ def search_for_unused(job_dirs, current=gs.WORK_DIR, verbose=True):
return unused


def remove_directories(dirs, message, move_postfix=".cleanup", mode="remove", force=False):
def remove_directories(
dirs: Union[str, Dict[Union[str, Path], int], Set],
message: str,
move_postfix: str = ".cleanup",
mode: str = "remove",
force: bool = False,
filter_printed: Optional[List[str]] = None,
):
"""list all directories that will be deleted and add a security check"""
if isinstance(dirs, str):
dirs = load_remove_list(dirs)
Expand All @@ -228,7 +252,19 @@ def remove_directories(dirs, message, move_postfix=".cleanup", mode="remove", fo
if input_var.lower() != "n":
logging.info("Affected directories:")
for i in tmp:
logging.info(i)
if os.path.exists(i + "/info") and gs.CLEANER_PRINT_ALIAS:
with open(i + "/info") as f:
lines = f.readlines()
if lines[-1].strip().startswith("ALIAS"):
s = lines[-1].strip()
s.replace("ALIAS:", "ALIAS AT CREATION:")
else:
s = ""
else:
s = ""
if filter_printed is None or any(x in i for x in filter_printed):
logging.info(i + " " + s)

else:
with tempfile.NamedTemporaryFile(mode="w") as tmp_file:
for directory in dirs:
Expand Down Expand Up @@ -280,28 +316,55 @@ def cleanup_jobs():
job._sis_cleanup()


def cleanup_keep_value(min_keep_value, load_from: str = "", mode: str = "remove"):
"""Go through all jobs in the current graph to remove all jobs with a lower keep value that the given minimum"""
def cleanup_keep_value(
min_keep_value: int,
load_from: str = "",
mode: str = "remove",
filter_removed_jobs: Optional[List[str]] = None,
filter_printed: Optional[List[str]] = None,
):
"""Go through all jobs in the current graph to remove all jobs with a lower keep value that the given minimum
:param min_keep_value: Remove jobs with lower keep value than this
:param load_from: File name to load list with used directories
:param mode: Cleanup mode ('remove', 'move', or 'dryrun')
:param filter_removed_jobs: Only Jobs matching the substring will be deleted
:param filter_printed: Defines what substrings should be printed when listing affected directories
"""
if min_keep_value <= 0:
logging.error("Keep value must be larger than 0")
if load_from:
job_dirs = load_used_paths(load_from)
else:
job_dirs = extract_keep_values_from_graph()

to_remove = find_too_low_keep_value(job_dirs, min_keep_value)
to_remove = find_too_low_keep_value(job_dirs, min_keep_value, filter_removed_jobs=filter_removed_jobs)
remove_directories(
to_remove, "Remove jobs with lower keep value than min", move_postfix=".cleanup", mode=mode, force=False
to_remove,
"Remove jobs with lower keep value than min",
move_postfix=".cleanup",
mode=mode,
force=False,
filter_printed=filter_printed,
)


def cleanup_unused(load_from: str = "", job_dirs=None, mode="remove"):
def cleanup_unused(
load_from: str = "",
job_dirs: List[Job] = None,
mode: str = "remove",
filter_unused: Optional[List[str]] = None,
filter_printed: Optional[List[str]] = None,
):
"""Check work directory and remove all subdirectories which do not belong to the given list of directories.
If no input is given it removes everything that is not in the current graph
:param load_from: File name to load list with used directories
:param job_dirs: Already loaded list of used directories
:param mode: Cleanup mode ('remove', 'move', or 'dryrun')
:param filter_unused: Only Jobs matching the substring will be deleted
:param filter_printed: Defines what substrings should be printed when listing affected directories
:return:
"""
if job_dirs:
Expand All @@ -310,5 +373,11 @@ def cleanup_unused(load_from: str = "", job_dirs=None, mode="remove"):
job_dirs = load_used_paths(load_from)
else:
job_dirs = list_all_graph_directories()
to_remove = search_for_unused(job_dirs, verbose=True)
remove_directories(to_remove, "Not used in graph", mode=mode, force=False)
to_remove = search_for_unused(job_dirs, verbose=True, filter_unused=filter_unused)
remove_directories(
to_remove,
"Not used in graph",
mode=mode,
force=False,
filter_printed=filter_printed,
)
3 changes: 3 additions & 0 deletions sisyphus/global_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,9 @@ def file_caching(path):
JOB_CLEANUP_KEEP_INPUT = True
#: Default value for job used by tk.cleaner to determine if a job should be removed or not
JOB_DEFAULT_KEEP_VALUE = 50
#:
CLEANER_PRINT_ALIAS = True

#: How many threads should update the graph in parallel, useful if the filesystem has a high latency
GRAPH_WORKER = 16

Expand Down

0 comments on commit d08616b

Please sign in to comment.