Skip to content

Commit

Permalink
chg: centralize validators, cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
Rafiot committed Jul 22, 2024
1 parent 66fdcc5 commit ffc38da
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 112 deletions.
196 changes: 103 additions & 93 deletions lacuscore/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,24 @@ class CaptureSettings(BaseModel):
depth: int = 0
rendered_hostname_only: bool = True # Note: only used if depth is > 0

@model_validator(mode="before")
@classmethod
def empty_str_to_none(cls, data: Any) -> dict[str, Any] | Any:
if isinstance(data, dict):
# Make sure all the strings are stripped, and None if empty.
to_return: dict[str, Any] = {}
for k, v in data.items():
if isinstance(v, str):
if v_stripped := v.strip():
if v_stripped[0] in ['{', '[']:
to_return[k] = from_json(v_stripped)
else:
to_return[k] = v_stripped
else:
to_return[k] = v
return to_return
return data

@model_validator(mode='after')
def check_capture_element(self) -> CaptureSettings:
if self.document_name and not self.document:
Expand All @@ -126,150 +144,142 @@ def check_capture_element(self) -> CaptureSettings:

@field_validator('url', mode='after')
@classmethod
def load_url(cls, v: str | None) -> str | None:
if isinstance(v, str):
url = v.strip()
url = refang(url) # In case we get a defanged url at this stage.
if (not url.lower().startswith('data:')
and not url.lower().startswith('http:')
and not url.lower().startswith('https:')
and not url.lower().startswith('file:')):
url = f'http://{url}'
return url
return v
def load_url(cls, url: str | None) -> str | None:
if isinstance(url, str):
_url = refang(url) # In case we get a defanged url at this stage.
if (not _url.lower().startswith('data:')
and not _url.lower().startswith('http:')
and not _url.lower().startswith('https:')
and not _url.lower().startswith('file:')):
_url = f'http://{_url}'
return _url
return url

@field_validator('document_name', mode='after')
@classmethod
def load_document_name(cls, v: str | None) -> str | None:
if isinstance(v, str):
name = v.strip()
if '.' not in name:
def load_document_name(cls, document_name: str | None) -> str | None:
if isinstance(document_name, str):
if '.' not in document_name:
# The browser will simply display the file as text if there is no extension.
# Just add HTML as a fallback, as it will be the most comon one.
name = f'{name}.html'
return name
return v
document_name = f'{document_name}.html'
return document_name
return None

@field_validator('browser', mode='before')
@classmethod
def load_browser(cls, v: Any) -> str | None:
if isinstance(v, str) and v.strip() in ['chromium', 'firefox', 'webkit']:
return v.strip()
def load_browser(cls, browser: Any) -> str | None:
if isinstance(browser, str) and browser in ['chromium', 'firefox', 'webkit']:
return browser
# There are old captures where the browser is not a playwright browser name, so we ignore it.
return None

@field_validator('proxy', mode='before')
@classmethod
def load_proxy_json(cls, v: Any) -> str | dict[str, str] | None:
if not v:
def load_proxy_json(cls, proxy: Any) -> str | dict[str, str] | None:
if not proxy:
return None
if isinstance(v, str):
if v.startswith('{'):
return from_json(v)
if isinstance(proxy, str):
# Just the proxy
return v
elif isinstance(v, dict):
return v
return proxy
elif isinstance(proxy, dict):
return proxy
return None

@field_validator('cookies', mode='before')
@classmethod
def load_cookies_json(cls, v: Any) -> list[dict[str, Any]] | None:
if not v:
def load_cookies_json(cls, cookies: Any) -> list[dict[str, Any]] | None:
if not cookies:
return None
if isinstance(v, str):
if v.startswith('['):
return from_json(v)
if isinstance(cookies, str):
# Cookies are invalid, ignoring.
elif isinstance(v, list):
return v
pass
elif isinstance(cookies, list):
return cookies
return None

@field_validator('headers', mode='before')
@classmethod
def load_headers_json(cls, v: Any) -> dict[str, str] | None:
if not v:
def load_headers_json(cls, headers: Any) -> dict[str, str] | None:
if not headers:
return None
if isinstance(v, str):
if v[0] == '{':
return from_json(v)
else:
# make it a dict
new_headers = {}
for header_line in v.splitlines():
if header_line and ':' in header_line:
splitted = header_line.split(':', 1)
if splitted and len(splitted) == 2:
header, h_value = splitted
if header.strip() and h_value.strip():
new_headers[header.strip()] = h_value.strip()
return new_headers
elif isinstance(v, dict):
return v
if isinstance(headers, str):
# make it a dict
new_headers = {}
for header_line in headers.splitlines():
if header_line and ':' in header_line:
splitted = header_line.split(':', 1)
if splitted and len(splitted) == 2:
header, h_value = splitted
if header.strip() and h_value.strip():
new_headers[header.strip()] = h_value.strip()
return new_headers
elif isinstance(headers, dict):
return headers
return None

@field_validator('http_credentials', mode='before')
@classmethod
def load_http_creds_json(cls, v: Any) -> dict[str, str] | None:
if not v:
def load_http_creds_json(cls, http_credentials: Any) -> dict[str, str] | None:
if not http_credentials:
return None
if isinstance(v, str):
if v.startswith('{'):
return from_json(v)
elif isinstance(v, dict):
return v
if isinstance(http_credentials, str):
# ignore
return None
elif isinstance(http_credentials, dict):
return http_credentials
return None

@field_validator('http_credentials', mode='after')
@classmethod
def check_http_creds(cls, v: dict[str, str] | None) -> dict[str, str] | None:
if not v:
return v
if 'username' in v and 'password' in v:
return v
raise CaptureSettingsError(f'HTTP credentials must have a username and a password: {v}')
def check_http_creds(cls, http_credentials: dict[str, str] | None) -> dict[str, str] | None:
if not http_credentials:
return None
if 'username' in http_credentials and 'password' in http_credentials:
return http_credentials
raise CaptureSettingsError(f'HTTP credentials must have a username and a password: {http_credentials}')

@field_validator('geolocation', mode='before')
@classmethod
def load_geolocation_json(cls, v: Any) -> dict[str, float] | None:
if not v:
def load_geolocation_json(cls, geolocation: Any) -> dict[str, float] | None:
if not geolocation:
return None
if isinstance(v, str):
if v.startswith('{'):
return from_json(v)
elif isinstance(v, dict):
return v
if isinstance(geolocation, str):
# ignore
return None
elif isinstance(geolocation, dict):
return geolocation
return None

@field_validator('geolocation', mode='after')
@classmethod
def check_geolocation(cls, v: dict[str, float] | None) -> dict[str, float] | None:
if not v:
return v
if 'latitude' in v and 'longitude' in v:
return v
raise CaptureSettingsError(f'A geolocation must have a latitude and a longitude: {v}')
def check_geolocation(cls, geolocation: dict[str, float] | None) -> dict[str, float] | None:
if not geolocation:
return None
if 'latitude' in geolocation and 'longitude' in geolocation:
return geolocation
raise CaptureSettingsError(f'A geolocation must have a latitude and a longitude: {geolocation}')

@field_validator('viewport', mode='before')
@classmethod
def load_viewport_json(cls, v: Any) -> dict[str, int] | None:
if not v:
def load_viewport_json(cls, viewport: Any) -> dict[str, int] | None:
if not viewport:
return None
if isinstance(v, str):
if v.startswith('{'):
return from_json(v)
elif isinstance(v, dict):
return v
if isinstance(viewport, str):
# ignore
return None
elif isinstance(viewport, dict):
return viewport
return None

@field_validator('viewport', mode='after')
@classmethod
def check_viewport(cls, v: dict[str, int] | None) -> dict[str, int] | None:
if not v:
return v
if 'width' in v and 'height' in v:
return v
raise CaptureSettingsError(f'A viewport must have a width and a height: {v}')
def check_viewport(cls, viewport: dict[str, int] | None) -> dict[str, int] | None:
if not viewport:
return None
if 'width' in viewport and 'height' in viewport:
return viewport
raise CaptureSettingsError(f'A viewport must have a width and a height: {viewport}')

def redis_dump(self) -> Mapping[str | bytes, bytes | float | int | str]:
mapping_capture: dict[str | bytes, bytes | float | int | str] = {}
Expand Down
36 changes: 18 additions & 18 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ Sphinx = [
{version = "<7.2", python = "<3.9", optional = true},
{version = "^7.2", python = ">=3.9", optional = true}
]
playwrightcapture = {extras = ["recaptcha"], version = "^1.25.6"}
playwrightcapture = {extras = ["recaptcha"], version = "^1.25.7"}
defang = "^0.5.3"
ua-parser = "^0.18.0"
redis = {version = "^5.0.7", extras = ["hiredis"]}
Expand Down

0 comments on commit ffc38da

Please sign in to comment.