Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added --checks arg #34

Open
wants to merge 7 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion src/findcdn/cdnEngine/cdnEngine.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ def chef_executor(
user_agent: str,
verbosity: bool,
interactive: bool,
checks: str,
):
"""Attempt to make the method "threadsafe" by giving each worker its own detector."""
# Define detector
Expand All @@ -53,6 +54,7 @@ def chef_executor(
timeout=math.ceil(timeout * 0.4),
agent=user_agent,
interactive=interactive,
checks = checks,
)
except Exception as e:
# Incase some uncaught error somewhere
Expand All @@ -75,6 +77,7 @@ def __init__(
user_agent: str,
interactive: bool = False,
verbose: bool = False,
checks: str = "chnw",
):
"""Give the chef the pot to use."""
self.pot: DomainPot = pot
Expand All @@ -83,6 +86,7 @@ def __init__(
self.timeout: int = timeout
self.agent = user_agent
self.interactive = interactive
self.checks = checks

# Determine thread count
if threads and threads != 0:
Expand Down Expand Up @@ -129,6 +133,7 @@ def grab_cdn(self, double: bool = False): # type: ignore
self.agent,
self.verbose,
self.interactive,
self.checks,
)
for domain in newpot
}
Expand Down Expand Up @@ -178,13 +183,14 @@ def run_checks(
interactive: bool = False,
verbose: bool = False,
double: bool = False,
checks: str = "chnw",
) -> Tuple[List[detectCDN.Domain], int]:
"""Orchestrate the use of DomainPot and Chef."""
# Our domain pot
dp = DomainPot(domains)

# Our chef to manage pot
chef = Chef(dp, threads, timeout, user_agent, interactive, verbose)
chef = Chef(dp, threads, timeout, user_agent, interactive, verbose, checks)

# Run analysis for all domains
cnt = chef.run_checks(double)
Expand Down
66 changes: 55 additions & 11 deletions src/findcdn/cdnEngine/detectCDN/cdn_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from ssl import CertificateError, SSLError
from typing import List
from urllib import request as request
from urllib import parse
from urllib.error import URLError

# Third-Party Libraries
Expand All @@ -24,6 +25,41 @@
LIFETIME = 10


class RedirectFilter(request.HTTPRedirectHandler):

def redirect_request(self, req, fp, code, msg, hdrs, newurl):
newhost = parse.urlparse(newurl).netloc
# if the original and redirected hostname are the same
if req.host == newhost:
return request.HTTPRedirectHandler.redirect_request(self, req, fp, code, msg, hdrs, newurl)
# otherwise don't sent any more requests
else:
return None

def http_error_302(self, req, fp, code, msg, hdrs):

result = request.HTTPRedirectHandler.http_error_302(
self, req, fp, code, msg, hdrs)

# The original http_error_302 calls self.redirect_request()
# If the target hostname is the same in the redirection,
# redirect_request() will return a new request to be handled by
# http_error_302 above.
# Otherwise, it returns None, in which case http_error_302 also returns
# None. In that case, we just return the response (fp) to the initial
# request.

if result is None:
return fp

# store previous responses' headers into the last result
for k,v in hdrs.items():
result.headers.set_raw(k, v)

return result

http_error_301 = http_error_303 = http_error_307 = http_error_302

class Domain:
"""Domain class allows for storage of metadata on domain."""

Expand Down Expand Up @@ -59,7 +95,7 @@ def __init__(self):

def ip(self, dom: Domain) -> List[int]:
"""Determine IP addresses the domain resolves to."""
dom_list: List[str] = [dom.url, "www." + dom.url]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My main motivation was to add www. for domains which don't directly answer to their domain name but instead they will listen to www.domain.com. Is there a reason why you wanted to remove "www." ?

I have also thought about pairing in a light subdomain enumerator using like google but I think that's too big of a scope.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding www might add another host; I expect the user to provide all the hosts they want to test as an input, and avoid adding hostnames implicitly (without the user actually knowing about it).
I agree, subdomain enumeration is out of scope for this tool.

dom_list: List[str] = [dom.url]
return_codes = []
ip_list = []
for domain in dom_list:
Expand Down Expand Up @@ -88,7 +124,7 @@ def ip(self, dom: Domain) -> List[int]:
def cname(self, dom: Domain, timeout: int) -> List[int]:
"""Collect CNAME records on domain."""
# List of domains to check
dom_list = [dom.url, "www." + dom.url]
dom_list = [dom.url]
# Our codes to return
return_code = []
# Seutp resolver and timeouts
Expand Down Expand Up @@ -116,7 +152,7 @@ def https_lookup(
) -> int:
"""Read 'server' header for CDN hints."""
# List of domains with different protocols to check.
PROTOCOLS = ["https://", "https://www."]
kz0ltan marked this conversation as resolved.
Show resolved Hide resolved
PROTOCOLS = ["http://", "https://"]
# Iterate through all protocols
for PROTOCOL in PROTOCOLS:
try:
Expand All @@ -127,8 +163,11 @@ def https_lookup(
headers={"User-Agent": agent},
)
# Making the timeout 50 as to not hang thread.
response = request.urlopen(req, timeout=timeout) # nosec
except URLError:
# replace RedirectHandler with RedirectFilter
opener = request.build_opener(RedirectFilter)
response = opener.open(req, timeout=timeout)

except URLError as e:
continue
except RemoteDisconnected:
continue
Expand All @@ -143,6 +182,7 @@ def https_lookup(
if interactive or verbose:
print(f"[{e}]: https://{dom.url}")
continue

# Define headers to check for the response
# to grab strings for later parsing.
HEADERS = ["server", "via"]
Expand Down Expand Up @@ -232,7 +272,7 @@ def CDNid(self, dom: Domain, data_blob: List):
dom.cdns.append(CDNs_rev[name])
dom.cdns_by_name.append(name)

def data_digest(self, dom: Domain) -> int:
def data_digest(self, dom: Domain, checks: str) -> int:
"""Digest all data collected and assign to CDN list."""
return_code = 1
# Iterate through all attributes for substrings
Expand All @@ -242,7 +282,7 @@ def data_digest(self, dom: Domain) -> int:
if len(dom.headers) > 0 and not None:
self.CDNid(dom, dom.headers)
return_code = 0
if len(dom.namesrvs) > 0 and not None:
if len(dom.namesrvs) > 0 and not None and 'n' in checks:
self.CDNid(dom, dom.namesrvs)
return_code = 0
if len(dom.whois_data) > 0 and not None:
Expand All @@ -257,16 +297,20 @@ def all_checks(
agent: str,
verbose: bool = False,
interactive: bool = False,
checks: str = "chnw",
) -> int:
"""Option to run everything in this library then digest."""
# Obtain each attributes data
self.ip(dom)
self.cname(dom, timeout)
self.https_lookup(dom, timeout, agent, interactive, verbose)
self.whois(dom, interactive, verbose)
if 'c' in checks:
kz0ltan marked this conversation as resolved.
Show resolved Hide resolved
self.cname(dom, timeout)
if 'h' in checks:
self.https_lookup(dom, timeout, agent, interactive, verbose)
if 'w' in checks:
self.whois(dom, interactive, verbose)

# Digest the data
return_code = self.data_digest(dom)
return_code = self.data_digest(dom, checks)

# Extra case if we want verbosity for each domain check
if verbose:
Expand Down
14 changes: 14 additions & 0 deletions src/findcdn/findcdn.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
conclude processing, otherwise use default.
--user_agent=<user_agent> Set the user agent to use, otherwise
use default.
--checks=<checks> Select detection types; possible values:
cname (c), HTTP headers (h), nameservers (n),
whois data (w). Default: "chnw"
"""

# Standard Python Libraries
Expand All @@ -49,6 +52,7 @@
USER_AGENT = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/35.0.1916.47 Safari/537.36"
TIMEOUT = 60 # Time in seconds
THREADS = 0 # If 0 then cdnEngine uses CPU count to set thread count
CHECKS = "chnw" # cnames, headers, nameservers, whois_data


def write_json(json_dump: str, output: str, verbose: bool, interactive: bool):
Expand All @@ -72,6 +76,7 @@ def main(
threads: int = THREADS,
timeout: int = TIMEOUT,
user_agent: str = USER_AGENT,
checks: str = CHECKS,
) -> str:
"""Take in a list of domains and determine the CDN for each return (JSON, number of successful jobs)."""
# Make sure the list passed is got something in it
Expand Down Expand Up @@ -100,6 +105,7 @@ def main(
interactive,
verbose,
double_in,
checks,
)

# Parse the domain data
Expand Down Expand Up @@ -154,6 +160,8 @@ def interactive() -> None:
args["--threads"] = THREADS
if args["--timeout"] is None:
args["--timeout"] = TIMEOUT
if args["--checks"] is None:
args["--checks"] = CHECKS

# Validate and convert arguments as needed with schema
schema: Schema = Schema(
Expand Down Expand Up @@ -188,6 +196,11 @@ def interactive() -> None:
str,
error="The user agent must be a string.",
),
"--checks": And(
str,
lambda checks: set(checks) <= {'c', 'h', 'n', 'w'},
error="Checks can be the following characters: chnw"
),
"<domain>": And(list, error="Please format the domains as a list."),
str: object, # Don't care about other keys, if any
}
Expand Down Expand Up @@ -223,6 +236,7 @@ def interactive() -> None:
validated_args["--threads"],
validated_args["--timeout"],
validated_args["--user_agent"],
validated_args["--checks"]
)
# Check for all potential exceptions
except OutputFileExists as ofe:
Expand Down