-
Notifications
You must be signed in to change notification settings - Fork 1
/
main.py
136 lines (106 loc) · 3.84 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
import asyncio
import tornado
import multiprocessing
from urllib.parse import urlparse
from concurrent.futures.thread import ThreadPoolExecutor
from seleniumbase import SB
import json
from datetime import datetime
from email.utils import formatdate
from time import mktime
import tornado.options
import platform
tornado.options.define("port", default="8080", help="The port to run on")
tornado.options.define("debug", default=False, help="Whether the server should be run in debug mode")
tornado.options.define("user-agent", default=None, help="Override the user agent")
tornado.options.define("xvfb", default=False, help="Use the xvfb for the browser. Only works on Linux.")
executor = ThreadPoolExecutor(2)
def scraper(url):
headless = True
use_xvfb = tornado.options.options.xvfb
if use_xvfb:
headless = False
with SB(
uc=True,
headless=headless,
agent=tornado.options.options.user_agent,
xvfb=use_xvfb,
) as sb:
sb.driver.uc_open_with_reconnect(url, reconnect_time=3)
if use_xvfb:
sb.uc_gui_click_cf()
else:
sb.switch_to_frame("iframe")
sb.driver.uc_click("span")
sb.assert_element_absent('[name="cf-turnstile-response"]', timeout=3)
sb.sleep(3)
user_agent = sb.get_user_agent()
cookies = sb.driver.get_cookies()
cf_clearance = get_cookie_by_name(cookies, 'cf_clearance')
return {
'user_agent': user_agent,
'cf_clearance': stringify_cookie(cf_clearance),
}
def get_cookie_by_name(cookies, name):
for cookie in cookies:
if cookie['name'] == name:
return cookie
raise RuntimeError(f'Missing Cookie \"{name}\"')
def stringify_cookie(cookie):
cookie_name = cookie['name']
cookie_value = cookie['value']
cookie_domain = cookie.get('domain')
cookie_path = cookie.get('path')
cookie_expiry = cookie.get('expiry')
output = f'{cookie_name}={cookie_value}; '
if cookie_domain is not None:
output += f'Domain={cookie_domain}; '
if cookie_path is not None:
output += f'Path={cookie_path}; '
if cookie_expiry is not None:
cookie_expiry = httpdate(datetime.fromtimestamp(cookie_expiry))
output += f'Expires={cookie_expiry}; '
if cookie.get('secure') == True:
output += 'Secure; '
if cookie.get('httpOnly') == True:
output += 'HttpOnly; '
return output
def httpdate(date):
stamp = mktime(date.timetuple())
return formatdate(
timeval = stamp,
localtime = False,
usegmt = True,
)
class MainHandler(tornado.web.RequestHandler):
def get(self):
self.write('Cloudflare Bypass Server')
class BypassHandler(tornado.web.RequestHandler):
async def post(self):
url = self.get_argument('url')
# Validate url
parsed_url = urlparse(url)
if parsed_url.scheme == '' or parsed_url.netloc == '':
raise tornado.web.HTTPError(400)
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(executor, scraper, url)
self.write(json.dumps(result))
await self.flush()
def make_app():
return tornado.web.Application(
[
(r'/', MainHandler),
(r'/api/bypass', BypassHandler),
],
debug=tornado.options.options.debug,
)
async def main():
tornado.options.parse_command_line()
use_xvfb = tornado.options.options.xvfb
if use_xvfb and platform.system() != 'Linux':
raise RuntimeError('The xvfb option only works on Linux')
app = make_app()
app.listen(tornado.options.options.port)
await asyncio.Event().wait()
if __name__ == '__main__':
asyncio.run(main())