-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.py
94 lines (82 loc) · 3.73 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
import os, time, requests, asyncio
from fastapi import FastAPI, Response, Header
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium import webdriver
from concurrent.futures import ThreadPoolExecutor, as_completed
from threading import Lock
app = FastAPI()
class BrowserPool:
def __init__(self, pool_size=int(os.environ.get("browser_pool_size", 1))):
self.pool_size = pool_size
self.pool = []
self.lock = Lock()
self.create_pool()
def create_pool(self):
chrome_options = Options()
chrome_options.add_argument('--no-sandbox')
chrome_options.add_argument('--disable-dev-shm-usage')
chrome_options.add_argument('--headless')
chrome_options.add_argument('--disable-gpu')
chrome_options.add_argument('--enable-features=WebContentsForceDark')
prefs = {
"download_restrictions": 3,
"download.open_pdf_in_system_reader": False,
"download.prompt_for_download": True,
"download.default_directory": "/dev/null",
"plugins.always_open_pdf_externally": False
}
chrome_options.add_experimental_option("prefs", prefs)
for _ in range(self.pool_size):
browser = webdriver.Chrome(options=chrome_options)
self.pool.append(browser)
def get_browser(self):
with self.lock:
browser = self.pool.pop()
return browser
def release_browser(self, browser):
with self.lock:
self.pool.append(browser)
browser_pool = BrowserPool()
def get_screenshot(browser, url, resolution: int, delay: int = 7) -> bytes:
ip = requests.get('https://ipv4.icanhazip.com').text.strip()
window_height = int(resolution*16/9)
window_width = resolution
els = time.time()
browser.set_window_size(window_height, window_width)
browser.get(url)
wait = WebDriverWait(browser, 10)
wait.until(EC.presence_of_element_located((By.XPATH, "//body[not(@class='loading')]")))
time.sleep(3 + delay)
if ip:
elements = browser.find_elements(By.XPATH, f"//*[contains(text(), '{ip}')]")
for element in elements:
browser.execute_script("arguments[0].innerText = arguments[1];", element, '<the host ip address>')
screenshot_bytes = browser.get_screenshot_as_png()
elapsed = 1000*(time.time() - els) # in ms
return screenshot_bytes, round(elapsed)
@app.get('/')
def root():
return 'Hello there'
@app.get('/image')
async def image(resolution: int = 720, delay: int = 7, authorization: str = Header(None), url: str = Header(None)):
try:
if not authorization:
return Response('Missing authorization', status_code=400)
if authorization != os.environ.get('allowed_key', 'testkey_'):
return Response(f'Invalid token, given {authorization}', status_code=401)
if not url:
return Response('Missing URL in header', status_code=400)
browser = browser_pool.get_browser()
loop = asyncio.get_running_loop()
image_binary, elapsed = await loop.run_in_executor(None, get_screenshot, browser, url, resolution, delay)
browser.execute_script("window.close()")
browser_pool.release_browser(browser)
with open('log.txt', 'a+') as f:
f.write(str(int(time.time())) + ' ' + f'{url} | {resolution}p | {elapsed}ms\n')
return Response(image_binary, media_type='image/png', headers={"X-Elapsed-Time": str(elapsed)})
except Exception as e:
print(e)
return Response(f'Error: {e}', status_code=500)