-
Notifications
You must be signed in to change notification settings - Fork 0
/
utils.py
392 lines (295 loc) · 12.9 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
import os
import warnings
import asyncio
import subprocess
import aiohttp
import config
import spotify
from music_genre_classification.src.get_genre import main as get_genre
warnings.filterwarnings("ignore")
def check_imports() -> tuple[bool, ImportError | None]:
try:
import numpy
import torch
import sklearn
import librosa
import pandas
import aiohttp
import fastapi
import uvicorn
except ImportError as e:
return False, e
else:
return True, None
def remove_tmp():
current_file_path = os.path.dirname(os.path.abspath(__file__))
tmp_path = current_file_path + "/tmp/"
for file in os.listdir(tmp_path):
os.remove(tmp_path + file)
os.rmdir(tmp_path)
async def get_available(client: spotify.Client) -> tuple[dict[str, dict], dict[str, dict[spotify.Track, list[int]]]]:
tracks_available = {
'playlist-track': {},
# 'track-playlist': [],
}
playlists = []
offset = 0
user = await client.get_user_info()
while True:
response = await client.user_playlists(limit=50, offset=offset)
if not response.items:
break
playlists += response.items
offset += response.limit
_available_playlists = [x for x in playlists if x['owner']['id'] == user.id and x['name'] in [
config.GENRE_PLAYLIST_NAME.get(genre, config.GENRE_DEFAULT_PLAYLIST_NAME.format(genre.title())) for genre in
['blues', 'classical', 'country', 'disco', 'hiphop', 'jazz', 'metal', 'pop', 'reggae', 'rock']
]]
available_playlists = {}
for i in _available_playlists:
for genre in ['blues', 'classical', 'country', 'disco', 'hiphop', 'jazz', 'metal', 'pop', 'reggae', 'rock']:
if config.GENRE_PLAYLIST_NAME.get(genre, config.GENRE_DEFAULT_PLAYLIST_NAME.format(genre.title())) == i[
'name']:
available_playlists[genre] = i
for genre, playlist in available_playlists.items():
offset = 0
tracks = []
playlist_id = playlist['id']
while True:
response_tracks = await client.get_playlist_items(playlist_id, offset=offset, limit=100)
if not response_tracks.items:
break
tracks += [x['track'] for x in response_tracks.items]
offset += response.limit
tracks_available['playlist-track'][playlist_id] = tracks
added_playlist_ids = {}
# for track in tracks:
# if {'track': track, 'playlist_ids': added_playlist_ids} in tracks_available['track-playlist']:
# #tracks_available['track-playlist'][track].append(playlist_id)
# tracks_available['track-playlist'][tracks_available['track-playlist'].index(
# {'track': track, 'playlist_ids': added_playlist_ids})]['playlist_ids'].append(playlist_id)
# else:
# tracks_available['track-playlist'].append({
# 'track': track,
# 'playlist_ids': [playlist_id]
# })
#
# if track.id in added_playlist_ids:
# added_playlist_ids[track.id].append(playlist_id)
# else:
# added_playlist_ids[track.id] = [playlist_id]
return available_playlists, tracks_available
async def run_genre_classification(track: spotify.Track) -> dict[str, float]:
cwd = os.getcwd()
current_file_path = os.path.dirname(os.path.abspath(__file__))
tmp_path = current_file_path + "/tmp/"
if not os.path.exists(tmp_path):
os.mkdir(tmp_path)
async with aiohttp.ClientSession() as session:
async with session.get(track.preview_url) as resp:
track_path = tmp_path + track.id + ".mp3"
with open(track_path, "wb") as fp:
fp.write(await resp.read())
genres = get_genre([track_path])
# "cd " + os.path.dirname(os.path.abspath(__file__)) + "music-genre-classification/src" + f" && python3
# get_genre.py {track_path}"
# process = subprocess.Popen(f"cd {os.path.dirname(os.path.abspath(__file__))}/music-genre-classification/src && "
# f"python3 get_genre.py {track_path} && "
# f"cd {cwd}", stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
#
# stdout, stderr = process.communicate()
#
# os.remove(track_path)
#
# stdout = stdout.decode('utf-8')
# stderr = stderr.decode('utf-8')
#
# output = stdout + stderr
#
# genres = {}
#
# # print("stdout: ", stdout)
# # print("stderr: ", stderr)
#
# for line in output.split("\n"):
# if not line:
# continue
#
# genre, confidence = line.split(":")
# genres[genre.strip()] = float(confidence.strip())
return genres
def handle_removed_tracks(tracks: list[spotify.Track], tracks_before: list[spotify.Track]) -> list[spotify.Track]:
# returns the removed tracks
removed_tracks = []
for track in tracks_before:
# if track not in tracks and (track in tracks_before or track in tracks_available['track-playlist'])
if track in tracks_before and track not in tracks:
removed_tracks.append(track)
return removed_tracks
async def check_new_tracks(client: spotify.Client, *, tracks_before: list[spotify.Track] = None):
# TODO: Either make the genre classification + playlist in a task or check for tracks in all the genre playlists, and filter out the tracks with the tracks in the config playlist to improve speed.
tracks_before = tracks_before or []
while True:
# print(1)
offset = 0
tracks = []
while True:
response_tracks = await client.get_playlist_items(config.SPOTIFY_PLAYLIST_ID, offset=offset, limit=100)
if not response_tracks.items:
break
tracks += [x['track'] for x in response_tracks.items]
offset += response_tracks.limit
original_tracks = tracks.copy()
# print(2)
available_playlists, tracks_available = await get_available(client)
# print(3)
# if tracks_before:
# removed_tracks = handle_removed_tracks(tracks, tracks_before)
for playlist, playlist_tracks in tracks_available['playlist-track'].items():
# to_be_removed = []
#
# for track in tracks_available['playlist-track'][playlist]:
# if track in removed_tracks:
# to_be_removed.append(track)
#
# # print(to_be_removed)
#
# await client.remove_playlist_tracks(playlist, to_be_removed)
to_be_removed = []
for playlist_track in playlist_tracks:
if playlist_track not in tracks:
to_be_removed.append(playlist_track)
# print(playlist, to_be_removed)
if not to_be_removed:
continue
# print("debug yes")
newline = '\n'
print(f"[LOGS] Removing tracks from {playlist} playlist: {to_be_removed}")
await client.remove_playlist_tracks(playlist, to_be_removed)
# ----------------------------------------------------------------- #
# print(tracks_available['playlist-track'].items())
for playlist, playlist_tracks in tracks_available['playlist-track'].items():
for track in tracks:
if track in playlist_tracks:
# print("yes yess")
tracks.remove(track)
# print("passed")
# print(4, tracks)
genre_tracks = {}
for track in tracks:
if track in tracks_before:
continue
if not track.preview_url or not isinstance(track.preview_url, str):
continue
try:
genres = await run_genre_classification(track)
except Exception as e:
raise e
# print(5)
for genre, confidence in genres.items():
if not genre or not confidence or genre.lower() in [x.lower() for x in config.GENRES_IGNORED]:
continue
if genre not in genre_tracks:
genre_tracks[genre] = []
genre_tracks[genre].append({
'track': track,
'confidence': confidence,
})
available_playlists, tracks_available = await get_available(client)
if genre not in available_playlists:
playlist_created = False
else:
playlist_created = True
playlist_id = available_playlists[genre]['id']
offset = 0
tracks = []
if playlist_created is True:
while True:
response_tracks = await client.get_playlist_items(playlist_id, offset=offset, limit=100) # type: ignore
if not response_tracks.items:
break
tracks += [x['track'] for x in response_tracks.items]
offset += response_tracks.limit
tracks_to_add = [track if track not in tracks else None]
if not tracks_to_add:
continue
if playlist_created is False:
description = config.GENRE_DEFAULT_PLAYLIST_DESCRIPTION or ''
playlist = await client.create_playlist(
config.GENRE_PLAYLIST_NAME.get(genre, config.GENRE_DEFAULT_PLAYLIST_NAME.format(genre.title())),
description=config.GENRE_PLAYLIST_DESCRIPTION.get(
genre, description.format(genre.title())
) or None,
public=config.GENRE_PLAYLIST_PUBLIC.get(genre, config.GENRE_DEFAULT_PLAYLIST_PUBLIC),
)
playlist_id = playlist.id
await client.add_playlist_tracks(playlist_id, tracks_to_add)
print(f"[LOGS] Added tracks {track.name} to {playlist_id} ({genre}) with confidence of {confidence}")
await asyncio.sleep(1.5)
# print(6)
# for genre in genre_tracks:
# if genre not in available_playlists:
# playlist_created = False
# # description = config.GENRE_DEFAULT_PLAYLIST_DESCRIPTION or ''
# # playlist = await client.create_playlist(
# # config.GENRE_PLAYLIST_NAME.get(genre, config.GENRE_DEFAULT_PLAYLIST_NAME.format(genre.title())),
# # description=config.GENRE_PLAYLIST_DESCRIPTION.get(
# # genre, description.format(genre.title())
# # ) or None,
# # public=config.GENRE_PLAYLIST_PUBLIC.get(genre, config.GENRE_DEFAULT_PLAYLIST_PUBLIC),
# # )
# # playlist_id = playlist.id
# else:
# playlist_created = True
# playlist_id = available_playlists[genre]['id']
#
# # print(7)
#
# offset = 0
# tracks = []
#
# if playlist_created is True:
# while True:
# response_tracks = await client.get_playlist_items(playlist_id, offset=offset, limit=100)
#
# if not response_tracks.items:
# break
#
# tracks += [x['track'] for x in response_tracks.items]
#
# offset += response_tracks.limit
#
# # print(8)
#
# # print(tracks)
#
# tracks_to_add = [x['track'] for x in genre_tracks[genre] if x['track'] not in tracks]
#
# # print(tracks_to_add)
#
# if not tracks_to_add:
# continue
#
# if playlist_created is False:
# description = config.GENRE_DEFAULT_PLAYLIST_DESCRIPTION or ''
# playlist = await client.create_playlist(
# config.GENRE_PLAYLIST_NAME.get(genre, config.GENRE_DEFAULT_PLAYLIST_NAME.format(genre.title())),
# description=config.GENRE_PLAYLIST_DESCRIPTION.get(
# genre, description.format(genre.title())
# ) or None,
# public=config.GENRE_PLAYLIST_PUBLIC.get(genre, config.GENRE_DEFAULT_PLAYLIST_PUBLIC),
# )
# playlist_id = playlist.id
#
# # print(9)
#
# await client.add_playlist_tracks(playlist_id, tracks_to_add)
#
# print(f"[LOGS] Added tracks {tracks_to_add} to {playlist_id}")
#
# # print(10)
#
# await asyncio.sleep(1.5)
tracks_before = original_tracks
# print(11)
await asyncio.sleep(5)