Skip to content

Commit

Permalink
media magic preview
Browse files Browse the repository at this point in the history
  • Loading branch information
e3rd committed Nov 26, 2024
1 parent 9a832a4 commit 7ee56cf
Show file tree
Hide file tree
Showing 4 changed files with 250 additions and 21 deletions.
9 changes: 8 additions & 1 deletion deduplidog/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@

def main():
with run(Deduplidog, interface=None) as m:
# with run(Deduplidog, interface="tui") as m:
# m = run(Deduplidog, interface="gui")
# if 1:
# m.facet._layout # TODO
try:
while True:
print("")
Expand All @@ -18,7 +22,7 @@ def main():
# [setattr(deduplidog, f.name, f.convert()) for f in dog_fields]
# deduplidog.perform()
# else:
m.env.start()
m.env.start(m)
except Exception as e:
print("-"*100)
print(e)
Expand All @@ -28,6 +32,9 @@ def main():
except KeyboardInterrupt:
print("")
sys.exit()
except Exception as e:
import ipdb
ipdb.post_mortem() # TODO


if __name__ == "__main__":
Expand Down
77 changes: 57 additions & 20 deletions deduplidog/deduplidog.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,13 @@
from functools import cache, partial
from pathlib import Path
from time import sleep
import traceback
from typing import Optional

from humanize import naturaldelta, naturalsize
from PIL import Image
from mininterface import Mininterface
from mininterface.facet import Image as FacetImage
from pillow_heif import register_heif_opener
from tqdm.autonotebook import tqdm
from tyro.conf import OmitArgPrefixes
Expand Down Expand Up @@ -76,6 +79,9 @@ class Execution:
neglect_warning: bool = False
"By default, when a file with bigger size or older date should be affected, just warning is generated. Turn this to suppress it."

confirm_one_by_one: bool = True
""" Instead of executing changes all at once, confirm one by one. So that you may decide whether the media similarity detection works. """


@dataclass
class Match:
Expand Down Expand Up @@ -119,17 +125,18 @@ class Match:
class Media:

media_magic: bool = False
"""Nor the size or date is compared for files with media suffixes.
""" Media files similarity detection.
Nor the size or date is compared for files with media suffixes.
A video is considered a duplicate if it has the same name and a similar number of frames, even if it has a different extension.
An image is considered a duplicate if it has the same name and a similar image hash, even if the files are of different sizes.
(This mode is considerably slower.)
"""

accepted_frame_delta: int = 1
"Used only when media_magic is True"
"Number of frames for which two videos are considered equal."

accepted_img_hash_diff: int = 1
"Used only when media_magic is True"
"Hash difference between images so that they are considered equal, see https://github.com/JohannesBuchner/imagehash"

img_compare_date: bool = False
"If True and `media_magic=True`, the work file date or the work file EXIF date must match the original file date (has to be no more than an hour around)."
Expand Down Expand Up @@ -233,14 +240,18 @@ def __post_init__(self):
self.having_multiple_candidates: dict[Path, list[Path]] = {}
"What unsuccessful candidates did work files have?"

self.m = Mininterface()

def reset(self):
self.size_affected = 0
self.affected_count = 0
self.warning_count = 0
self.ignored_count = 0
self.having_multiple_candidates.clear()

def start(self):
def start(self, interface=None):
if interface:
self.m = interface
self.reset()
self.check()
self.perform()
Expand Down Expand Up @@ -295,7 +306,7 @@ def perform(self):
f" {self.affected_count}/{len(self.work_files) - self.ignored_count}", end="")
if self.ignored_count:
print(f" ({self.ignored_count} ignored)", end="")
print("\nAffected size:", naturalsize(self.size_affected))
print(f"\n{'Affected' if self.action.execute else 'Affectable'} size:", naturalsize(self.size_affected))
if self.warning_count:
print(f"Warnings: {self.warning_count}")
if self.having_multiple_candidates:
Expand Down Expand Up @@ -386,24 +397,27 @@ def check(self):
action = "will be" if self.action.execute else f"would be (if execute were True)"
print(f"{which.capitalize()}{small}{nonzero} {action} ", end="")

match self.action.rename, self.action.replace_with_original, self.action.delete, self.action.replace_with_symlink:
case False, False, False, False:
print("left intact (because no action is selected).")
case True, False, False, False:
print("renamed (prefixed with ✓).")
case False, True, False, False:
print("replaced with the original.")
case False, False, True, False:
print("deleted.")
case False, False, False, True:
print("replaced with the symlink.")
case _:
raise AssertionError("Choose only one execute action (like only rename).")
print(self._get_action(passive=True) + ".")

if self.execution.set_both_to_older_date:
print("Original file mtime date might be set backwards to the duplicate file.")
print("") # sometimes, this line is consumed

def _get_action(self, passive=False):
action = self.action.rename, self.action.replace_with_original, self.action.delete, self.action.replace_with_symlink
if not sum(action):
return f"{'left' if passive else 'leave'} intact (because no action is selected)"
elif sum(action) > 1:
raise AssertionError("Choose only one execute action (like only rename).")
elif self.action.rename:
return f"rename{'d' * passive} (prefixed with ✓)"
elif self.action.replace_with_original:
return f"replace{'d' * passive} with the original"
elif self.action.delete:
return f"delete{'d' * passive}"
elif self.action.replace_with_symlink:
return f"replace{'d' * passive} with the symlink"

def _loop_files(self):
skip = self.skip
work_files = self.work_files
Expand All @@ -425,7 +439,8 @@ def _loop_files(self):
raise
else:
sleep(1 * attempt)
print("Repeating on exception", work_file, e)
tb = traceback.format_tb(e.__traceback__)
print("Repeating on exception", work_file, e, tb[-1])
continue
except KeyboardInterrupt:
print(f"Interrupted. You may proceed where you left with the skip={skip+bar.n} parameter.")
Expand Down Expand Up @@ -532,6 +547,9 @@ def _affect(self, work_file: Path, original: Path):
change[other_file].append(f"DATE WARNING + {naturaldelta(other_date-affected_date)}")
warning = other_file

if self.execution.confirm_one_by_one and not self._confirm(affected_file, other_file, change):
# NOTE we can resolve the warning in the dialog too
return
if warning and not self.execution.neglect_warning:
change[warning].append("🛟skipped on warning")
else:
Expand Down Expand Up @@ -561,6 +579,25 @@ def _affect(self, work_file: Path, original: Path):
with redirect_stdout(self._output):
self._print_change(change)

def _confirm(self, affected_file, other_file, change: Change):
els = []
is_yes = True

def add_file_elements(title, file):
nonlocal is_yes
els.extend([title, file])
if file.suffix.lower() in IMAGE_SUFFIXES:
els.append(FacetImage(file))
if t := change[file]:
els.extend(t)
is_yes = False

add_file_elements("Going to affect", affected_file)
add_file_elements("Original", other_file)

self.m.facet._layout(els)
return getattr(self.m, "is_yes" if is_yes else "is_no")(self._get_action().capitalize())

def _rename(self, change: Change, affected_file: Path):
msg = "renamable"
if self.action.execute or self.action.inspect:
Expand Down Expand Up @@ -662,7 +699,7 @@ def _find_similar(self, work_file: Path, candidates: list[Path]):
if (self.match.ignore_date
or wst.st_mtime == ost.st_mtime
or self.match.tolerate_hour and self.match.tolerate_hour[0] <= (wst.st_mtime - ost.st_mtime)/3600 <= self.match.tolerate_hour[1]
) and (self.match.ignore_size or wst.st_size == ost.st_size and (not self.match.checksum or crc(original) == crc(work_file))):
) and (self.match.ignore_size or wst.st_size == ost.st_size and (not self.match.checksum or crc(original) == crc(work_file))):
return original

def _find_similar_media(self, work_file: Path, comparing_image: bool, candidates: list[Path]):
Expand Down
15 changes: 15 additions & 0 deletions tests/setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from deduplidog import Deduplidog
from deduplidog.deduplidog import Action, Execution, Match, Media, Helper


def drun(action=None, execution=None, match=None, media=None, helper=None, **kw):
def _(l: list | dict):
if isinstance(l, list):
return {k: True for k in l}
return l
return Deduplidog(Action(**_(action or [])),
Execution(**_(execution or [])),
Match(**_(match or [])),
Media(**_(media or [])),
Helper(**_(helper or [])),
**kw).start()
170 changes: 170 additions & 0 deletions tests/test_deduplidog.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@

from collections.abc import Mapping
from dataclasses import dataclass
from itertools import chain
import os
from pathlib import Path
from tempfile import TemporaryDirectory, mkdtemp
from typing import Self
from unittest import TestCase, main
import random
import string

from tests.setup import drun


@dataclass
class FileRepresentation:
path: Path
mtime: int = 0
"relative mtime"
text_seed: int = 1

def __post_init__(self):
self._mtime = round(self.path.parent.parent.stat().st_mtime + self.mtime)

def write(self):
"Writes the representation to the disk."
self.path.write_text(self.get_text())
os.utime(self.path, (self._mtime,)*2)
return self

def check(self, test: TestCase):
"Checks the disk whether it contains the file represented."
test.assertTrue(self.path.exists(), msg=self.path)
test.assertEqual(self.get_text(), self.path.read_text(), msg=self.path)
test.assertEqual(self._mtime, self.path.stat().st_mtime, msg=self.path)

def get_text(self):
random.seed(self.text_seed)
return ''.join(random.choices(string.ascii_letters + string.digits, k=10+self.text_seed*10))

def prefixed(self):
self.path = self.path.with_name("✓" + self.path.name)

def suck(self, other: Self):
"Use the other file. Use its name, however stays in the current directory."
self.path = self.path.with_name(other.path.name)
self._mtime = other._mtime
self.text_seed = other.text_seed


@dataclass
class FolderState(Mapping):
test_case: TestCase
_work_dir: Path
_original_dir: Path
work_files: dict[str, FileRepresentation]
originals: dict[str, FileRepresentation]

def __iter__(self):
yield from ('work_dir', 'original_dir')

def __len__(self):
return 2

def __getitem__(self, key):
if key == 'work_dir':
return self._work_dir
elif key == 'original_dir':
return self._original_dir
else:
raise KeyError(key)

def check(self, prefixed: tuple[int] = None, suck: tuple[int] = None):
"""Checks the file changes
:param prefixed: These files in the work dir are expected to be prefixed
:param suck: These files in the work dir are expected to be sucked from the originals
"""
[self.work_files[f"file_{i}"].prefixed() for i in prefixed or ()]
[self.work_files[f"file_{i}"].suck(self.originals[f"file_{i}"]) for i in suck or ()]
[f.check(self.test_case) for f in chain(self.work_files.values(), self.originals.values())]


class TestDeduplidog(TestCase):

def prepare(self, testing_dir: str = None):
self.temp = mkdtemp() # TemporaryDirectory() NOTE
# temp = Path(testing_dir) if testing_dir else self.temp.name NOTE
temp = str(self.temp)
originals = Path(temp, "originals")
work_dir = Path(temp, "work_dir")
if not testing_dir:
originals.mkdir()
work_dir.mkdir()

original_files = {name: FileRepresentation(originals / name).write()
for name in (f"file_{i}" for i in range(12))}
work_files = {name: FileRepresentation(work_dir / name, *rest).write() for name, *rest in (
("file_1", 0, 2),
("file_2", 0, 3),
("file_4", 3600),
("file_5", 7200),
("file_6", 3601),
("file_7", 3599),
("file_8", -3600),
("file_9", -10),
("file_10", -3600*24*365),
("file_11", 0),
)}

return FolderState(self, work_dir, originals, work_files, original_files)

def test_simple_prefix(self):
state = self.prepare()
drun(["rename", "execute"], **state)
state.check(prefixed=(11,))

def test_date(self):
state = self.prepare()
drun(["rename", "execute"], ["neglect_warning"], ["ignore_date"], **state)
state.check(prefixed=(4, 5, 6, 7, 8, 9, 10, 11))
state = self.prepare()
drun(["rename", "execute"], match=["ignore_date"], **state)
state.check(prefixed=(4, 5, 6, 7, 11))

state = self.prepare()
drun(["rename", "execute"], ["neglect_warning"], {"tolerate_hour": 1}, **state)
state.check(prefixed=(4, 7, 8, 9, 11))
state = self.prepare()
drun(["rename", "execute"], match={"tolerate_hour": 1}, **state)
state.check(prefixed=(4, 7, 11))

state = self.prepare()
drun(["rename", "execute"], ["neglect_warning"], {"tolerate_hour": 2}, **state)
state.check(prefixed=(4, 5, 6, 7, 8, 9, 11))
state = self.prepare()
drun(["rename", "execute"], match={"tolerate_hour": 2}, **state)
state.check(prefixed=(4, 5, 6, 7, 11))

def test_replace_with_original(self):
state = self.prepare()
drun(["replace_with_original", "execute"], ["neglect_warning"], **state)
state.work_files["file_11"].suck(state.originals["file_11"])
state.check()

state = self.prepare()
drun(["replace_with_original", "execute"], ["neglect_warning"], {"tolerate_hour": 2}, **state)
state.check(suck=(4, 5, 6, 7, 8, 9, 11))

def test_invert_selection(self):
state = self.prepare()
with self.assertRaises(AssertionError):
drun(["replace_with_original", "execute"], match={"tolerate_hour": 2, "invert_selection": True}, **state)
drun(["rename", "execute"], ["neglect_warning"], {"tolerate_hour": 2, "invert_selection": False}, **state)
state.check(prefixed=(4, 5, 6, 7, 8, 9, 11))

state = self.prepare()
drun(["rename", "execute"], ["neglect_warning"], {"tolerate_hour": 2, "invert_selection": True}, **state)
state.check(prefixed=(1, 2, 10))

# No media file in the test case.
# def test_skip_bigger(self):
# state = self.prepare()
# Deduplidog(*state, rename=True, execute=True, ignore_date=True, skip_bigger=True, `media_magic=True`)
# state.check()


if __name__ == '__main__':
main()

0 comments on commit 7ee56cf

Please sign in to comment.