diff --git a/deduplidog/__main__.py b/deduplidog/__main__.py index d7ee1d0..4696716 100644 --- a/deduplidog/__main__.py +++ b/deduplidog/__main__.py @@ -7,6 +7,10 @@ def main(): with run(Deduplidog, interface=None) as m: + # with run(Deduplidog, interface="tui") as m: + # m = run(Deduplidog, interface="gui") + # if 1: + # m.facet._layout # TODO try: while True: print("") @@ -18,7 +22,7 @@ def main(): # [setattr(deduplidog, f.name, f.convert()) for f in dog_fields] # deduplidog.perform() # else: - m.env.start() + m.env.start(m) except Exception as e: print("-"*100) print(e) @@ -28,6 +32,9 @@ def main(): except KeyboardInterrupt: print("") sys.exit() + except Exception as e: + import ipdb + ipdb.post_mortem() # TODO if __name__ == "__main__": diff --git a/deduplidog/deduplidog.py b/deduplidog/deduplidog.py index be9cea9..98d18aa 100644 --- a/deduplidog/deduplidog.py +++ b/deduplidog/deduplidog.py @@ -10,10 +10,13 @@ from functools import cache, partial from pathlib import Path from time import sleep +import traceback from typing import Optional from humanize import naturaldelta, naturalsize from PIL import Image +from mininterface import Mininterface +from mininterface.facet import Image as FacetImage from pillow_heif import register_heif_opener from tqdm.autonotebook import tqdm from tyro.conf import OmitArgPrefixes @@ -76,6 +79,9 @@ class Execution: neglect_warning: bool = False "By default, when a file with bigger size or older date should be affected, just warning is generated. Turn this to suppress it." + confirm_one_by_one: bool = True + """ Instead of executing changes all at once, confirm one by one. So that you may decide whether the media similarity detection works. """ + @dataclass class Match: @@ -119,17 +125,18 @@ class Match: class Media: media_magic: bool = False - """Nor the size or date is compared for files with media suffixes. + """ Media files similarity detection. + Nor the size or date is compared for files with media suffixes. A video is considered a duplicate if it has the same name and a similar number of frames, even if it has a different extension. An image is considered a duplicate if it has the same name and a similar image hash, even if the files are of different sizes. (This mode is considerably slower.) """ accepted_frame_delta: int = 1 - "Used only when media_magic is True" + "Number of frames for which two videos are considered equal." accepted_img_hash_diff: int = 1 - "Used only when media_magic is True" + "Hash difference between images so that they are considered equal, see https://github.com/JohannesBuchner/imagehash" img_compare_date: bool = False "If True and `media_magic=True`, the work file date or the work file EXIF date must match the original file date (has to be no more than an hour around)." @@ -233,6 +240,8 @@ def __post_init__(self): self.having_multiple_candidates: dict[Path, list[Path]] = {} "What unsuccessful candidates did work files have?" + self.m = Mininterface() + def reset(self): self.size_affected = 0 self.affected_count = 0 @@ -240,7 +249,9 @@ def reset(self): self.ignored_count = 0 self.having_multiple_candidates.clear() - def start(self): + def start(self, interface=None): + if interface: + self.m = interface self.reset() self.check() self.perform() @@ -295,7 +306,7 @@ def perform(self): f" {self.affected_count}/{len(self.work_files) - self.ignored_count}", end="") if self.ignored_count: print(f" ({self.ignored_count} ignored)", end="") - print("\nAffected size:", naturalsize(self.size_affected)) + print(f"\n{'Affected' if self.action.execute else 'Affectable'} size:", naturalsize(self.size_affected)) if self.warning_count: print(f"Warnings: {self.warning_count}") if self.having_multiple_candidates: @@ -386,24 +397,27 @@ def check(self): action = "will be" if self.action.execute else f"would be (if execute were True)" print(f"{which.capitalize()}{small}{nonzero} {action} ", end="") - match self.action.rename, self.action.replace_with_original, self.action.delete, self.action.replace_with_symlink: - case False, False, False, False: - print("left intact (because no action is selected).") - case True, False, False, False: - print("renamed (prefixed with ✓).") - case False, True, False, False: - print("replaced with the original.") - case False, False, True, False: - print("deleted.") - case False, False, False, True: - print("replaced with the symlink.") - case _: - raise AssertionError("Choose only one execute action (like only rename).") + print(self._get_action(passive=True) + ".") if self.execution.set_both_to_older_date: print("Original file mtime date might be set backwards to the duplicate file.") print("") # sometimes, this line is consumed + def _get_action(self, passive=False): + action = self.action.rename, self.action.replace_with_original, self.action.delete, self.action.replace_with_symlink + if not sum(action): + return f"{'left' if passive else 'leave'} intact (because no action is selected)" + elif sum(action) > 1: + raise AssertionError("Choose only one execute action (like only rename).") + elif self.action.rename: + return f"rename{'d' * passive} (prefixed with ✓)" + elif self.action.replace_with_original: + return f"replace{'d' * passive} with the original" + elif self.action.delete: + return f"delete{'d' * passive}" + elif self.action.replace_with_symlink: + return f"replace{'d' * passive} with the symlink" + def _loop_files(self): skip = self.skip work_files = self.work_files @@ -425,7 +439,8 @@ def _loop_files(self): raise else: sleep(1 * attempt) - print("Repeating on exception", work_file, e) + tb = traceback.format_tb(e.__traceback__) + print("Repeating on exception", work_file, e, tb[-1]) continue except KeyboardInterrupt: print(f"Interrupted. You may proceed where you left with the skip={skip+bar.n} parameter.") @@ -532,6 +547,9 @@ def _affect(self, work_file: Path, original: Path): change[other_file].append(f"DATE WARNING + {naturaldelta(other_date-affected_date)}") warning = other_file + if self.execution.confirm_one_by_one and not self._confirm(affected_file, other_file, change): + # NOTE we can resolve the warning in the dialog too + return if warning and not self.execution.neglect_warning: change[warning].append("🛟skipped on warning") else: @@ -561,6 +579,25 @@ def _affect(self, work_file: Path, original: Path): with redirect_stdout(self._output): self._print_change(change) + def _confirm(self, affected_file, other_file, change: Change): + els = [] + is_yes = True + + def add_file_elements(title, file): + nonlocal is_yes + els.extend([title, file]) + if file.suffix.lower() in IMAGE_SUFFIXES: + els.append(FacetImage(file)) + if t := change[file]: + els.extend(t) + is_yes = False + + add_file_elements("Going to affect", affected_file) + add_file_elements("Original", other_file) + + self.m.facet._layout(els) + return getattr(self.m, "is_yes" if is_yes else "is_no")(self._get_action().capitalize()) + def _rename(self, change: Change, affected_file: Path): msg = "renamable" if self.action.execute or self.action.inspect: @@ -662,7 +699,7 @@ def _find_similar(self, work_file: Path, candidates: list[Path]): if (self.match.ignore_date or wst.st_mtime == ost.st_mtime or self.match.tolerate_hour and self.match.tolerate_hour[0] <= (wst.st_mtime - ost.st_mtime)/3600 <= self.match.tolerate_hour[1] - ) and (self.match.ignore_size or wst.st_size == ost.st_size and (not self.match.checksum or crc(original) == crc(work_file))): + ) and (self.match.ignore_size or wst.st_size == ost.st_size and (not self.match.checksum or crc(original) == crc(work_file))): return original def _find_similar_media(self, work_file: Path, comparing_image: bool, candidates: list[Path]): diff --git a/tests/setup.py b/tests/setup.py new file mode 100644 index 0000000..a487653 --- /dev/null +++ b/tests/setup.py @@ -0,0 +1,15 @@ +from deduplidog import Deduplidog +from deduplidog.deduplidog import Action, Execution, Match, Media, Helper + + +def drun(action=None, execution=None, match=None, media=None, helper=None, **kw): + def _(l: list | dict): + if isinstance(l, list): + return {k: True for k in l} + return l + return Deduplidog(Action(**_(action or [])), + Execution(**_(execution or [])), + Match(**_(match or [])), + Media(**_(media or [])), + Helper(**_(helper or [])), + **kw).start() diff --git a/tests/test_deduplidog.py b/tests/test_deduplidog.py new file mode 100644 index 0000000..8912be5 --- /dev/null +++ b/tests/test_deduplidog.py @@ -0,0 +1,170 @@ + +from collections.abc import Mapping +from dataclasses import dataclass +from itertools import chain +import os +from pathlib import Path +from tempfile import TemporaryDirectory, mkdtemp +from typing import Self +from unittest import TestCase, main +import random +import string + +from tests.setup import drun + + +@dataclass +class FileRepresentation: + path: Path + mtime: int = 0 + "relative mtime" + text_seed: int = 1 + + def __post_init__(self): + self._mtime = round(self.path.parent.parent.stat().st_mtime + self.mtime) + + def write(self): + "Writes the representation to the disk." + self.path.write_text(self.get_text()) + os.utime(self.path, (self._mtime,)*2) + return self + + def check(self, test: TestCase): + "Checks the disk whether it contains the file represented." + test.assertTrue(self.path.exists(), msg=self.path) + test.assertEqual(self.get_text(), self.path.read_text(), msg=self.path) + test.assertEqual(self._mtime, self.path.stat().st_mtime, msg=self.path) + + def get_text(self): + random.seed(self.text_seed) + return ''.join(random.choices(string.ascii_letters + string.digits, k=10+self.text_seed*10)) + + def prefixed(self): + self.path = self.path.with_name("✓" + self.path.name) + + def suck(self, other: Self): + "Use the other file. Use its name, however stays in the current directory." + self.path = self.path.with_name(other.path.name) + self._mtime = other._mtime + self.text_seed = other.text_seed + + +@dataclass +class FolderState(Mapping): + test_case: TestCase + _work_dir: Path + _original_dir: Path + work_files: dict[str, FileRepresentation] + originals: dict[str, FileRepresentation] + + def __iter__(self): + yield from ('work_dir', 'original_dir') + + def __len__(self): + return 2 + + def __getitem__(self, key): + if key == 'work_dir': + return self._work_dir + elif key == 'original_dir': + return self._original_dir + else: + raise KeyError(key) + + def check(self, prefixed: tuple[int] = None, suck: tuple[int] = None): + """Checks the file changes + + :param prefixed: These files in the work dir are expected to be prefixed + :param suck: These files in the work dir are expected to be sucked from the originals + """ + [self.work_files[f"file_{i}"].prefixed() for i in prefixed or ()] + [self.work_files[f"file_{i}"].suck(self.originals[f"file_{i}"]) for i in suck or ()] + [f.check(self.test_case) for f in chain(self.work_files.values(), self.originals.values())] + + +class TestDeduplidog(TestCase): + + def prepare(self, testing_dir: str = None): + self.temp = mkdtemp() # TemporaryDirectory() NOTE + # temp = Path(testing_dir) if testing_dir else self.temp.name NOTE + temp = str(self.temp) + originals = Path(temp, "originals") + work_dir = Path(temp, "work_dir") + if not testing_dir: + originals.mkdir() + work_dir.mkdir() + + original_files = {name: FileRepresentation(originals / name).write() + for name in (f"file_{i}" for i in range(12))} + work_files = {name: FileRepresentation(work_dir / name, *rest).write() for name, *rest in ( + ("file_1", 0, 2), + ("file_2", 0, 3), + ("file_4", 3600), + ("file_5", 7200), + ("file_6", 3601), + ("file_7", 3599), + ("file_8", -3600), + ("file_9", -10), + ("file_10", -3600*24*365), + ("file_11", 0), + )} + + return FolderState(self, work_dir, originals, work_files, original_files) + + def test_simple_prefix(self): + state = self.prepare() + drun(["rename", "execute"], **state) + state.check(prefixed=(11,)) + + def test_date(self): + state = self.prepare() + drun(["rename", "execute"], ["neglect_warning"], ["ignore_date"], **state) + state.check(prefixed=(4, 5, 6, 7, 8, 9, 10, 11)) + state = self.prepare() + drun(["rename", "execute"], match=["ignore_date"], **state) + state.check(prefixed=(4, 5, 6, 7, 11)) + + state = self.prepare() + drun(["rename", "execute"], ["neglect_warning"], {"tolerate_hour": 1}, **state) + state.check(prefixed=(4, 7, 8, 9, 11)) + state = self.prepare() + drun(["rename", "execute"], match={"tolerate_hour": 1}, **state) + state.check(prefixed=(4, 7, 11)) + + state = self.prepare() + drun(["rename", "execute"], ["neglect_warning"], {"tolerate_hour": 2}, **state) + state.check(prefixed=(4, 5, 6, 7, 8, 9, 11)) + state = self.prepare() + drun(["rename", "execute"], match={"tolerate_hour": 2}, **state) + state.check(prefixed=(4, 5, 6, 7, 11)) + + def test_replace_with_original(self): + state = self.prepare() + drun(["replace_with_original", "execute"], ["neglect_warning"], **state) + state.work_files["file_11"].suck(state.originals["file_11"]) + state.check() + + state = self.prepare() + drun(["replace_with_original", "execute"], ["neglect_warning"], {"tolerate_hour": 2}, **state) + state.check(suck=(4, 5, 6, 7, 8, 9, 11)) + + def test_invert_selection(self): + state = self.prepare() + with self.assertRaises(AssertionError): + drun(["replace_with_original", "execute"], match={"tolerate_hour": 2, "invert_selection": True}, **state) + drun(["rename", "execute"], ["neglect_warning"], {"tolerate_hour": 2, "invert_selection": False}, **state) + state.check(prefixed=(4, 5, 6, 7, 8, 9, 11)) + + state = self.prepare() + drun(["rename", "execute"], ["neglect_warning"], {"tolerate_hour": 2, "invert_selection": True}, **state) + state.check(prefixed=(1, 2, 10)) + + # No media file in the test case. + # def test_skip_bigger(self): + # state = self.prepare() + # Deduplidog(*state, rename=True, execute=True, ignore_date=True, skip_bigger=True, `media_magic=True`) + # state.check() + + +if __name__ == '__main__': + main()