-
Notifications
You must be signed in to change notification settings - Fork 39
/
replaygain.py
executable file
·235 lines (193 loc) · 8.61 KB
/
replaygain.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
#! /usr/bin/env python3
import sys
import os
import argparse
import subprocess
import asyncio
from concurrent.futures import ThreadPoolExecutor
import taglib
from pythonscripts.cpu import cores_count
from pythonscripts.logger import Logger
class ReplayGain:
""" Will consider all files to belong to one album.
"""
def __init__(self, logger, options, files):
# logger
self.log = logger
self.log.filename = None
# internals
self.raw_lines = []
self.data_files = []
self.data_album = {}
# options
self.force = options.force
self.force_album = options.force_album
self.force_track = options.force_track
self.files = files
def run(self):
# check if all files have ReplayGain tags; mp3gain runs very long
if not (self.force or self.force_album or self.force_track) and self.all_files_have_replaygain_tags():
self.log.error("All files already have ReplayGain tags, no action taken.")
return
if self.run_mp3gain():
self.update_tags()
def all_files_have_replaygain_tags(self):
""" Quick analysis to determine if input files contain replaygain_* tags.
"""
for fname in self.files:
# open id3 tag
f = taglib.File(fname)
tags = set([tag.lower() for tag in f.tags.keys() if tag.lower().startswith("replaygain_")])
return tags == set(["replaygain_track_gain", "replaygain_album_gain", "replaygain_track_peak", "replaygain_album_peak"])
def run_mp3gain(self):
""" Compute values for replaygain_* tags.
"""
self.log.debug("running mp3gain on specified files")
cmd = ["mp3gain", "-q", "-o", "-s", "s"] + self.files
ret = True
try:
raw_data = subprocess.check_output(cmd, universal_newlines=True)
self.raw_lines = raw_data.splitlines()
except subprocess.CalledProcessError as exc:
code = exc.returncode
msg = "mp3gain returned error status: " + str(code) + "\n"
msg += "-----------mp3gain output dump-----------\n"
msg += exc.output
msg += "\n-----------------------------------------\n"
self.log.error(msg)
ret = False
except Exception as e:
print(e)
ret = False
raise
finally:
return ret
def update_tags(self):
""" Add computed replaygain_* tags into all files.
"""
self.log.debug("parsing mp3gain output")
album_parts = self.raw_lines[-1].strip().split("\t")
# just in case
if album_parts[0] != '"Album"':
self.log.error("unable to parse mp3gain output")
return
a_gain = float(album_parts[2]) # album gain
a_peak = float(album_parts[3]) / 32768.0 # album peak
del self.raw_lines[0] # header
del self.raw_lines[-1] # album summary
for line in self.raw_lines:
parts = line.strip().split("\t")
fname = parts[0] # filename
self.log.filename = fname
self.log.debug("begin processing file")
t_gain = float(parts[2]) # track gain
t_peak = float(parts[3]) / 32768.0 # track peak
# set t_gain, t_peak, a_gain, a_peak depending on options
if self.force_album:
t_gain = a_gain
t_peak = a_peak
elif self.force_track:
a_gain = t_gain
a_peak = t_peak
# open id3 tag
f = taglib.File(fname)
# update tag
f.tags["REPLAYGAIN_TRACK_GAIN"] = "%.2f dB" % t_gain
f.tags["REPLAYGAIN_ALBUM_GAIN"] = "%.2f dB" % a_gain
f.tags["REPLAYGAIN_TRACK_PEAK"] = "%.6f" % t_peak
f.tags["REPLAYGAIN_ALBUM_PEAK"] = "%.6f" % a_peak
# save tag
self.log.debug("saving modified ID3 tag")
f.save()
self.log.debug("done processing file")
self.log.filename = None
class Main:
""" Will parse input pattern and create ReplayGain object on every directory found.
"""
def __init__(self, logger, options):
self.logger = logger
self.options = options
self.recursive = options.recursive
self.paths = options.files
del options.recursive # don't want to pass it to ReplayGain object
del options.files # don't want to pass it to ReplayGain object
async def run(self):
# We could use the default single-threaded executor with basically the same performance
# (because of Python's GIL), but the ThreadPoolExecutor allows to limit the maximum number
# of workers and thus the maximum number of concurrent subprocesses.
with ThreadPoolExecutor(max_workers=cores_count()) as executor:
loop = asyncio.get_event_loop()
tasks = [
loop.run_in_executor(executor, self.worker, path)
for path in self.queue_generator()
]
for result in await asyncio.gather(*tasks):
pass
def worker(self, paths):
paths = sorted(list(paths))
# skip dirs not containing any mp3 file
if len(paths) == 0:
return
# write info
print("Procesing:")
for path in paths:
print(" " + path)
try:
# create ReplayGain object, pass files and run
rg = ReplayGain(self.logger, self.options, paths)
rg.run()
except Exception as e:
print(e, file=sys.stderr)
raise
def queue_generator(self):
""" For each directory in self.files returns generator returning full paths to mp3 files in that folder.
If self.files contains file paths instead of directory, it's returned as [file].
"""
def walk(root):
dirs = []
files = []
for entry in os.scandir(root):
if entry.is_dir():
dirs.append(entry.name)
elif entry.is_file() and entry.name.endswith(".mp3"):
files.append(entry.name)
# first yield found files, then recurse into subdirs
if files:
yield (os.path.join(root, x) for x in files)
if self.recursive:
for d in dirs: # recurse into subdir
for x in walk(os.path.join(root, d)):
yield x
for path in self.paths:
if os.path.isdir(path):
for x in walk(path):
yield x
else:
yield [path]
def main(prog_name, options):
logger = Logger(options.log_level, prog_name)
logger.debug("Selected mp3 files:")
logger.debug("\n".join(sorted(options.files)))
main = Main(logger, options)
asyncio.run(main.run())
def argparse_path_handler(path):
if not os.path.exists(path):
raise argparse.ArgumentTypeError("invalid path: '%s'" % path)
if os.path.isfile(path) and not path.endswith(".mp3"):
raise argparse.ArgumentTypeError("not a mp3 file: '%s'" % path)
return os.path.abspath(path)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Write correct ReplayGain tags into mp3 files; uses mp3gain internally")
# log level options
log = parser.add_mutually_exclusive_group()
log.add_argument("-q", "--quiet", dest="log_level", action="store_const", const=0, default=1, help="do not output error messages")
log.add_argument("-v", "--verbose", dest="log_level", action="store_const", const=3, help="output warnings and informational messages")
log.add_argument("-d", "--debug", dest="log_level", action="store_const", const=4, help="output debug messages")
parser.add_argument("-r", "--recursive", action="store_true", help="when path to directory is specified, browse it recursively (albums still respected)")
parser.add_argument("--force", action="store_true", help="force overwriting of existing ID3v2 ReplayGain tags")
group = parser.add_mutually_exclusive_group()
group.add_argument("--force-album", action="store_true", help="write replaygain_album_{gain,peak} values into replaygain_track_{gain,peak} tags")
group.add_argument("--force-track", action="store_true", help="write replaygain_track_{gain,peak} values into replaygain_album_{gain,peak} tags")
parser.add_argument("files", nargs="+", metavar="FILE | FOLDER", type=argparse_path_handler, help="path to mp3 file(s) or directory(ies)")
args = parser.parse_args()
main(sys.argv[0], args)