forked from anthony-foulfoin/trakt-tv-duplicates-removal
-
Notifications
You must be signed in to change notification settings - Fork 0
/
trakt-duplicates-removal.py
144 lines (111 loc) · 4.04 KB
/
trakt-duplicates-removal.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
import json
import requests
# Edit the informations bellow
client_id = 'YOUR CLIENT ID'
client_secret = 'YOUR CLIENT SECRET'
username = 'YOUR USERNAME'
# Optional
types = ['movies', 'episodes'] # 'movies' or 'episodes' or both
# Don't edit the informations bellow
trakt_api = 'https://api.trakt.tv'
auth_get_token_url = '%s/oauth/token' % trakt_api
get_history_url = '%s/users/%s/history/{type}?page={page}&limit={limit}' % (trakt_api, username)
sync_history_url = '%s/sync/history/remove' % trakt_api
session = requests.Session()
def login_to_trakt():
print('Authentication')
print('Open the link in a browser and paste the pin')
print('https://trakt.tv/oauth/authorize?response_type=code&client_id=%s&redirect_uri=urn:ietf:wg:oauth:2.0:oob' % client_id)
print('')
pin = str(input('Pin: '))
session.headers.update({
'Accept': 'application/json',
'User-Agent': 'Betaseries to Trakt',
'Connection': 'Keep-Alive'
})
post_data = {
'code': pin,
'client_id': client_id,
'client_secret': client_secret,
'redirect_uri': 'urn:ietf:wg:oauth:2.0:oob',
'grant_type': 'authorization_code'
}
request = session.post(auth_get_token_url, data=post_data)
response = request.json()
print(response)
print()
session.headers.update({
'Content-Type': 'application/json',
'trakt-api-version': '2',
'trakt-api-key': client_id,
'Authorization': 'Bearer ' + response["access_token"]
})
def get_history(type):
results = []
url_params = {
'page': 1,
'limit': 1000,
'type': type
}
print('Retrieving history for %s' % type)
while True:
print(get_history_url.format(**url_params))
resp = session.get(get_history_url.format(**url_params))
if resp.status_code != 200:
print(resp)
continue
results += resp.json()
if int(resp.headers['X-Pagination-Page-Count']) != url_params['page']:
url_params['page'] += 1
else:
break
print('Done retrieving %s history' % type)
return results
def remove_duplicate(history, type):
print('Removing %s duplicates' % type)
entry_type = 'movie' if type == 'movies' else 'episode'
entry_type = 'episode'
entries = {}
duplicates = []
duplicates_to_remove = []
temp_to_remove = []
for ind, i in enumerate(history):
traktID = i[entry_type]['ids']['trakt'];
if traktID in entries:
entries[traktID].append(ind)
if traktID not in duplicates:
duplicates.append(traktID)
else:
entries[traktID] = [ind]
for i in duplicates:
for j in entries[i]:
for m in entries[i]:
if j == m:
print('equal', j, m)
continue
else:
print('j, m', j, m)
if history[j]['watched_at'] == history[m]['watched_at']:
print(m, 'to be removed')
duplicates_to_remove.append(history[entries[i][j]]['id'])
temp_to_remove.append(m)
print('end iteration of inside loop', 'm=', m)
print('total end of inside loop', j, m)
for r in temp_to_remove:
entries[i].remove(r)
temp_to_remove = []
if len(duplicates_to_remove) > 0:
print('%s %s duplicates plays to be removed' % (len(duplicates_to_remove), type))
session.post(sync_history_url, json={'ids': duplicates_to_remove})
print('%s %s duplicates successfully removed!' % (len(duplicates_to_remove), type))
else:
print('No %s duplicates found' % type)
if __name__ == '__main__':
login_to_trakt()
for type in types:
history = get_history(type)
with open('%s.json' % type, 'w') as output:
json.dump(history, output, indent=4)
print('History saved in file %s.json' % type)
remove_duplicate(history, type)
print()