-
Notifications
You must be signed in to change notification settings - Fork 91
/
cleaner.py
160 lines (132 loc) · 5.48 KB
/
cleaner.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
import os
import json
from time import sleep
from pyrogram import Client
from pyrogram.raw.functions.messages import Search
from pyrogram.raw.types import InputPeerSelf, InputMessagesFilterEmpty
from pyrogram.raw.types.messages import ChannelMessages
from pyrogram.errors import FloodWait, UnknownError
cachePath = os.path.abspath(__file__)
cachePath = os.path.dirname(cachePath)
cachePath = os.path.join(cachePath, "cache")
if os.path.exists(cachePath):
with open(cachePath, "r") as cacheFile:
cache = json.loads(cacheFile.read())
API_ID = cache["API_ID"]
API_HASH = cache["API_HASH"]
else:
API_ID = os.getenv('API_ID', None) or int(input('Enter your Telegram API id: '))
API_HASH = os.getenv('API_HASH', None) or input('Enter your Telegram API hash: ')
app = Client("client", api_id=API_ID, api_hash=API_HASH)
app.start()
if not os.path.exists(cachePath):
with open(cachePath, "w") as cacheFile:
cache = {"API_ID": API_ID, "API_HASH": API_HASH}
cacheFile.write(json.dumps(cache))
class Cleaner:
def __init__(self, chats=None, search_chunk_size=100, delete_chunk_size=100):
self.chats = chats or []
if search_chunk_size > 100:
# https://github.com/gurland/telegram-delete-all-messages/issues/31
#
# The issue is that pyrogram.raw.functions.messages.Search uses
# pagination with chunks of 100 messages. Might consider switching
# to search_messages, which handles pagination transparently.
raise ValueError('search_chunk_size > 100 not supported')
self.search_chunk_size = search_chunk_size
self.delete_chunk_size = delete_chunk_size
@staticmethod
def chunks(l, n):
"""Yield successive n-sized chunks from l.
https://stackoverflow.com/questions/312443/how-do-you-split-a-list-into-evenly-sized-chunks#answer-312464"""
for i in range(0, len(l), n):
yield l[i:i + n]
@staticmethod
def get_all_chats():
dialogs = app.get_dialogs(pinned_only=True)
dialog_chunk = app.get_dialogs()
while len(dialog_chunk) > 0:
dialogs.extend(dialog_chunk)
dialog_chunk = app.get_dialogs(offset_date=dialogs[-1].top_message.date-1)
return [d.chat for d in dialogs]
def select_groups(self, recursive=0):
chats = self.get_all_chats()
groups = [c for c in chats if c.type in ('group', 'supergroup')]
print('Delete all your messages in')
for i, group in enumerate(groups):
print(f' {i+1}. {group.title}')
print(
f' {len(groups) + 1}. '
'(!) DELETE ALL YOUR MESSAGES IN ALL OF THOSE GROUPS (!)\n'
)
nums_str = input('Insert option numbers (comma separated): ')
nums = map(lambda s: int(s.strip()), nums_str.split(','))
for n in nums:
if not 1 <= n <= len(groups) + 1:
print('Invalid option selected. Exiting...')
exit(-1)
if n == len(groups) + 1:
print('\nTHIS WILL DELETE ALL YOUR MESSSAGES IN ALL GROUPS!')
answer = input('Please type "I understand" to proceed: ')
if answer.upper() != 'I UNDERSTAND':
print('Better safe than sorry. Aborting...')
exit(-1)
self.chats = groups
break
else:
self.chats.append(groups[n - 1])
groups_str = ', '.join(c.title for c in self.chats)
print(f'\nSelected {groups_str}.\n')
if recursive == 1:
self.run()
def run(self):
for chat in self.chats:
peer = app.resolve_peer(chat.id)
message_ids = []
add_offset = 0
while True:
q = self.search_messages(peer, add_offset)
message_ids.extend(msg.id for msg in q['messages'])
messages_count = len(q['messages'])
print(f'Found {messages_count} of your messages in "{chat.title}"')
if messages_count < self.search_chunk_size:
break
add_offset += self.search_chunk_size
self.delete_messages(chat.id, message_ids)
def delete_messages(self, chat_id, message_ids):
print(f'Deleting {len(message_ids)} messages with message IDs:')
print(message_ids)
for chunk in self.chunks(message_ids, self.delete_chunk_size):
try:
app.delete_messages(chat_id=chat_id, message_ids=chunk)
except FloodWait as flood_exception:
sleep(flood_exception.x)
def search_messages(self, peer, add_offset):
print(f'Searching messages. OFFSET: {add_offset}')
return app.send(
Search(
peer=peer,
q='',
filter=InputMessagesFilterEmpty(),
min_date=0,
max_date=0,
offset_id=0,
add_offset=add_offset,
limit=self.search_chunk_size,
max_id=0,
min_id=0,
hash=0,
from_id=InputPeerSelf()
),
sleep_threshold=60
)
if __name__ == '__main__':
try:
deleter = Cleaner()
deleter.select_groups()
deleter.run()
except UnknownError as e:
print(f'UnknownError occured: {e}')
print('Probably API has changed, ask developers to update this utility')
finally:
app.stop()