Skip to content
This repository has been archived by the owner on Feb 9, 2024. It is now read-only.

Commit

Permalink
Merge pull request #12 from zas/optim
Browse files Browse the repository at this point in the history
Optimization and add support for multiple extensions per format
  • Loading branch information
mayhem authored Jan 10, 2024
2 parents 264fbbf + a4f6e0e commit 31f2623
Showing 1 changed file with 37 additions and 34 deletions.
71 changes: 37 additions & 34 deletions lb_content_resolver/database.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from abc import abstractmethod
from collections import namedtuple
import os
import datetime
from pathlib import Path
import sys
from time import time
from uuid import UUID
Expand All @@ -14,7 +16,26 @@
from lb_content_resolver.model.tag import Tag, RecordingTag
from lb_content_resolver.formats import mp3, m4a, flac, ogg_opus, ogg_vorbis, wma

SUPPORTED_FORMATS = ["flac", "ogg", "opus", "mp3", "m4a", "wma"]
SupportedFormat = namedtuple('SupportedFormat', ('extensions', 'handler'))
SUPPORTED_FORMATS = (
SupportedFormat({'.flac'}, flac),
SupportedFormat({'.ogg'}, ogg_vorbis),
SupportedFormat({'.opus'}, ogg_opus),
SupportedFormat({'.mp3', '.mp2', '.m2a'}, mp3),
SupportedFormat({'.m4a', '.m4b', '.m4p', '.m4v', '.m4r', '.mp4'}, m4a),
SupportedFormat({'.wma'}, wma),
)

ALL_EXTENSIONS = set()
EXTENSION_HANDLER = dict()
for fmt in SUPPORTED_FORMATS:
ALL_EXTENSIONS.update(fmt.extensions)
for ext in fmt.extensions:
EXTENSION_HANDLER[ext] = fmt.handler


def match_extensions(filepath, extensions):
return Path(filepath).suffix.lower() in extensions


class Database:
Expand Down Expand Up @@ -96,21 +117,17 @@ def traverse(self, relative_path, dry_run=False):
fullpath = os.path.join(self.music_dir, relative_path)

for f in sorted(os.listdir(fullpath)):
if f in ['.', '..'] or f.lower().endswith("jpg"):
if f in {'.', '..'}:
continue

new_relative_path = os.path.join(relative_path, f)
new_full_path = os.path.join(self.music_dir, new_relative_path)
if os.path.isfile(new_full_path):
if os.path.isfile(new_full_path) and match_extensions(new_full_path, ALL_EXTENSIONS):
if not dry_run:
self.add(new_relative_path)
else:
for f in SUPPORTED_FORMATS:
if new_full_path.endswith(f):
self.track_count_estimate += 1
break

if os.path.isdir(new_full_path):
self.track_count_estimate += 1
elif os.path.isdir(new_full_path):
if not self.traverse(new_relative_path, dry_run):
return False

Expand Down Expand Up @@ -180,7 +197,7 @@ def add_or_update_recording(self, mdata):
recording.save()
return "updated", details

def read_metadata_and_add(self, relative_path, format, mtime, update):
def read_metadata_and_add(self, relative_path, extension, mtime, update):
"""
Read the metadata from supported files and then add the
recording to the DB.
Expand All @@ -189,18 +206,7 @@ def read_metadata_and_add(self, relative_path, format, mtime, update):
file_path = os.path.join(self.music_dir, relative_path)

# We've never seen this before, or it was updated since we last saw it.
if format == "mp3":
mdata = mp3.read(file_path)
elif format == "flac":
mdata = flac.read(file_path)
elif format == "ogg":
mdata = ogg_vorbis.read(file_path)
elif format == "opus":
mdata = ogg_opus.read(file_path)
elif format == "m4a":
mdata = m4a.read(file_path)
elif format == "wma":
mdata = wma.read(file_path)
mdata = EXTENSION_HANDLER[extension].read(file_path)

# TODO: In the future we should attempt to read basic metadata from
# the filename here. But, if you have untagged files, this tool
Expand Down Expand Up @@ -238,38 +244,35 @@ def add(self, relative_path):
"""

fullpath = os.path.join(self.music_dir, relative_path)
self.total += 1

# Check to see if the file in question has changed since the last time
# we looked at it. If not, skip it for speed
stats = os.stat(fullpath)
ts = datetime.datetime.fromtimestamp(stats[8])

base, ext = os.path.splitext(relative_path)
ext = ext.lower()[1:]
base = os.path.basename(relative_path)
if ext not in SUPPORTED_FORMATS:

if not match_extensions(relative_path, ALL_EXTENSIONS):
print(" unknown %s" % base)
self.skipped += 1
return

self.total += 1

exists = False
try:
recording = Recording.get(Recording.file_path == fullpath)
except peewee.DoesNotExist as err:
recording = None

# Check to see if the file in question has changed since the last time
# we looked at it. If not, skip it for speed
stats = os.stat(fullpath)
ts = datetime.datetime.fromtimestamp(stats[8])

if recording:
exists = True
if recording.mtime == ts:
self.not_changed += 1
print("unchanged %s" % base)
return

# read the file's last modified time to avoid re-reading tags
stats = os.stat(fullpath)
ts = datetime.datetime.fromtimestamp(stats[8])

status, details = self.read_metadata_and_add(relative_path, ext, ts, exists)
if status == "updated":
print(" update %s" % details)
Expand Down

0 comments on commit 31f2623

Please sign in to comment.