diff --git a/src/offat/api/config.py b/src/offat/api/config.py index ff869af..56bc01b 100644 --- a/src/offat/api/config.py +++ b/src/offat/api/config.py @@ -10,7 +10,10 @@ load_dotenv() app = FastAPI( - title='OFFAT - API' + title='OFFAT - API', + servers=[{ + 'url':'http://localhost:8000', + }], ) auth_secret_key = environ.get( diff --git a/src/offat/parsers/openapi.py b/src/offat/parsers/openapi.py index 261c2e1..ea49b8f 100644 --- a/src/offat/parsers/openapi.py +++ b/src/offat/parsers/openapi.py @@ -13,6 +13,7 @@ class OpenAPIv3Parser(BaseParser): '''OpenAPI v3 Spec File Parser''' # while adding new method to this class, make sure same method is present in SwaggerParser class + def __init__(self, file_or_url: str, spec: dict | None = None) -> None: super().__init__(file_or_url=file_or_url, spec=spec) # noqa if not self.is_v3: @@ -25,6 +26,7 @@ def __init__(self, file_or_url: str, spec: dict | None = None) -> None: self.request_response_params = self._get_request_response_params() + def _populate_hosts(self): servers = self.specification.get('servers', []) hosts = [] @@ -41,6 +43,7 @@ def _populate_hosts(self): self.hosts = hosts self.host = self.hosts[0] + def _get_scheme(self): servers = self.specification.get('servers', []) schemes = [] @@ -50,6 +53,21 @@ def _get_scheme(self): scheme = 'https' if 'https' in schemes else 'http' return scheme + + def _fetch_schema_from_spec(self, param_schema_ref:str) -> dict: + schema_spec_path = param_schema_ref.split('/')[1:] + + if len(schema_spec_path) > 3: + logger.error('Schema spec $ref path should not be greater than 3 (excluding #)') + return {} + + schema_data:dict = self.specification + for child_ele in schema_spec_path: + schema_data:dict = schema_data.get(child_ele, {}) + + return schema_data + + def _get_param_definition_schema(self, param: dict): '''Returns Model defined schema for the passed param''' param_schema = param.get('schema') @@ -58,9 +76,7 @@ def _get_param_definition_schema(self, param: dict): if param_schema: param_schema_ref = param_schema.get('$ref') if param_schema_ref: - model_slug = param_schema_ref.split('/')[-1] - param_schema = self.specification.get( - 'components', {}).get('schemas', {}).get(model_slug, {}) # model schema + param_schema = self._fetch_schema_from_spec(param_schema_ref) return param_schema @@ -75,20 +91,27 @@ def _get_response_definition_schema(self, responses: dict): ''' for status_code in responses.keys(): # below line could return: ["application/json", "application/xml"] - status_code_content_type_response = responses[status_code]['content'].keys() - - for status_code_content_type in status_code_content_type_response: - status_code_content = responses[status_code]['content'][status_code_content_type].keys( - ) - if 'parameters' in status_code_content: - # done - responses[status_code]['schema'] = responses[status_code]['content'][status_code_content_type]['parameters'] - elif 'schema' in status_code_content: - responses[status_code]['schema'] = self._get_param_definition_schema( - responses[status_code]['content'][status_code_content_type]) + content = responses[status_code].get('content', None) + + if content: + status_code_content_type_responses = content.keys() + for status_code_content_type in status_code_content_type_responses: + status_code_content = responses[status_code]['content'][status_code_content_type].keys() + if 'parameters' in status_code_content: + responses[status_code]['schema'] = responses[status_code]['content'][status_code_content_type]['parameters'] + elif 'schema' in status_code_content: + responses[status_code]['schema'] = self._get_param_definition_schema( + responses[status_code]['content'][status_code_content_type]) + + else: + # Fetch $ref schema directly + ref = responses[status_code].get('$ref', None) + if ref: + responses[status_code]['schema'] = self._fetch_schema_from_spec(ref) return responses + def _get_request_response_params(self): '''Returns Schema of requests and response params diff --git a/src/offat/parsers/parser.py b/src/offat/parsers/parser.py index e514c02..035b415 100644 --- a/src/offat/parsers/parser.py +++ b/src/offat/parsers/parser.py @@ -11,7 +11,7 @@ class InvalidSpecVersion(Exception): class BaseParser: def __init__(self, file_or_url: str, spec: dict = None) -> None: if spec: - self.specification = spec + self.specification:dict = spec base_uri = "" else: self.specification, base_uri = read_from_filename(file_or_url) diff --git a/src/offat/report/generator.py b/src/offat/report/generator.py index 9781851..fdec7f3 100644 --- a/src/offat/report/generator.py +++ b/src/offat/report/generator.py @@ -1,7 +1,9 @@ +""" +Report Generator utils +""" from copy import deepcopy from html import escape from json import dumps as json_dumps -from offat.report import templates from os.path import dirname, join as path_join from os import makedirs from rich.table import Table @@ -9,16 +11,21 @@ from .templates.table import TestResultTable from ..logger import logger, console +from ..report import templates class ReportGenerator: + """can generate report in json,yaml,table and html formats""" + @staticmethod def generate_html_report(results: list[dict]): + """generates html report from OFFAT results""" html_report_template_file_name = 'report.html' html_report_file_path = path_join( - dirname(templates.__file__), html_report_template_file_name) + dirname(templates.__file__), html_report_template_file_name + ) - with open(html_report_file_path, 'r') as f: + with open(html_report_file_path, 'r', encoding='utf-8') as f: report_file_content = f.read() # TODO: validate report data to avoid HTML injection attacks. @@ -27,7 +34,7 @@ def generate_html_report(results: list[dict]): # HTML escape data escaped_results = [] - escape_keys = ["response_body"] + escape_keys = ['response_body'] for result_dict in results: escaped_result_dict = {} for key, value in result_dict.items(): @@ -40,12 +47,16 @@ def generate_html_report(results: list[dict]): escaped_results.append(escaped_result_dict) report_file_content = report_file_content.replace( - '{ results }', json_dumps(escaped_results)) + '{ results }', json_dumps(escaped_results) + ) return report_file_content @staticmethod - def handle_report_format(results: list[dict], report_format: str | None) -> str | Table: + def handle_report_format( + results: list[dict], report_format: str | None + ) -> str | Table: + """returns report in specified format""" result = None match report_format: @@ -53,20 +64,27 @@ def handle_report_format(results: list[dict], report_format: str | None) -> str logger.warning('HTML output format displays only basic data.') result = ReportGenerator.generate_html_report(results=results) case 'yaml': - logger.warning('YAML output format needs to be sanitized before using it further.') - result = yaml_dump({ - 'results': results, - }) + logger.warning( + 'YAML output format needs to be sanitized before using it further.' + ) + result = yaml_dump( + { + 'results': results, + } + ) case 'json': report_format = 'json' - result = json_dumps({ - 'results': results, - }) + result = json_dumps( + { + 'results': results, + } + ) case _: # default: CLI table # TODO: filter failed requests first and then create new table for failed requests report_format = 'table' results_table = TestResultTable().generate_result_table( - deepcopy(results)) + deepcopy(results) + ) result = results_table logger.info('Generated %s format report.', report_format.upper()) @@ -74,30 +92,43 @@ def handle_report_format(results: list[dict], report_format: str | None) -> str @staticmethod def save_report(report_path: str | None, report_file_content: str | Table | None): + """saves/prints report to console""" if report_path != '/' and report_path: dir_name = dirname(report_path) if dir_name != '' and report_path: makedirs(dir_name, exist_ok=True) # print to cli if report path and file content as absent else write to file location. - if report_path and report_file_content and not isinstance(report_file_content, Table): - with open(report_path, 'w') as f: + if ( + report_path + and report_file_content + and not isinstance(report_file_content, Table) + ): + with open(report_path, 'w', encoding='utf-8') as f: logger.info('Writing report to file: %s', report_path) f.write(report_file_content) else: if isinstance(report_file_content, Table) and report_file_content.columns: TestResultTable().print_table(report_file_content) - elif isinstance(report_file_content, Table) and not report_file_content.columns: + elif ( + isinstance(report_file_content, Table) + and not report_file_content.columns + ): logger.warning('No Columns found in Table.') else: console.print(report_file_content) @staticmethod - def generate_report(results: list[dict], report_format: str | None, report_path: str | None): + def generate_report( + results: list[dict], report_format: str | None, report_path: str | None + ): + """main function used to generate report""" if report_path: report_format = report_path.split('.')[-1] formatted_results = ReportGenerator.handle_report_format( - results=results, report_format=report_format) + results=results, report_format=report_format + ) ReportGenerator.save_report( - report_path=report_path, report_file_content=formatted_results) + report_path=report_path, report_file_content=formatted_results + ) diff --git a/src/offat/tester/tester_utils.py b/src/offat/tester/tester_utils.py index 896eb35..0c10317 100644 --- a/src/offat/tester/tester_utils.py +++ b/src/offat/tester/tester_utils.py @@ -43,7 +43,7 @@ def is_host_up(openapi_parser: SwaggerParser | OpenAPIv3Parser) -> bool: case _: proto = http_client.HTTPConnection - logger.info("Checking whether host %s:%d is available", host, port) + logger.info("Checking whether host %s:%s is available", host, port) try: conn = proto(host=host, port=port, timeout=5) conn.request("GET", "/") diff --git a/src/offat/utils.py b/src/offat/utils.py index 8c80cd2..823ac69 100644 --- a/src/offat/utils.py +++ b/src/offat/utils.py @@ -189,6 +189,6 @@ def is_valid_url(url: str) -> bool: Any exception occurred during operation ''' url_regex = re_compile( - r'https?://(www\.)?[-a-zA-Z0-9@:%._\+~#=]{1,256}\.[a-zA-Z0-9()]{1,6}\b([-a-zA-Z0-9()@:%_\+.~#?&//=]*)' + r'https?:\/\/[a-z.-]+(:\d+)?.*' ) return bool(match(url_regex, url)) diff --git a/src/poetry.lock b/src/poetry.lock index 93b47e1..c9a44b4 100644 --- a/src/poetry.lock +++ b/src/poetry.lock @@ -769,13 +769,13 @@ files = [ [[package]] name = "pydantic" -version = "2.6.3" +version = "2.6.4" description = "Data validation using Python type hints" optional = true python-versions = ">=3.8" files = [ - {file = "pydantic-2.6.3-py3-none-any.whl", hash = "sha256:72c6034df47f46ccdf81869fddb81aade68056003900a8724a4f160700016a2a"}, - {file = "pydantic-2.6.3.tar.gz", hash = "sha256:e07805c4c7f5c6826e33a1d4c9d47950d7eaf34868e2690f8594d2e30241f11f"}, + {file = "pydantic-2.6.4-py3-none-any.whl", hash = "sha256:cc46fce86607580867bdc3361ad462bab9c222ef042d3da86f2fb333e1d916c5"}, + {file = "pydantic-2.6.4.tar.gz", hash = "sha256:b1704e0847db01817624a6b86766967f552dd9dbf3afba4004409f908dcc84e6"}, ] [package.dependencies] @@ -957,17 +957,17 @@ files = [ [[package]] name = "redis" -version = "5.0.2" +version = "5.0.3" description = "Python client for Redis database and key-value store" optional = true python-versions = ">=3.7" files = [ - {file = "redis-5.0.2-py3-none-any.whl", hash = "sha256:4caa8e1fcb6f3c0ef28dba99535101d80934b7d4cd541bbb47f4a3826ee472d1"}, - {file = "redis-5.0.2.tar.gz", hash = "sha256:3f82cc80d350e93042c8e6e7a5d0596e4dd68715babffba79492733e1f367037"}, + {file = "redis-5.0.3-py3-none-any.whl", hash = "sha256:5da9b8fe9e1254293756c16c008e8620b3d15fcc6dde6babde9541850e72a32d"}, + {file = "redis-5.0.3.tar.gz", hash = "sha256:4973bae7444c0fbed64a06b87446f79361cb7e4ec1538c022d696ed7a5015580"}, ] [package.dependencies] -async-timeout = ">=4.0.3" +async-timeout = {version = ">=4.0.3", markers = "python_full_version < \"3.11.3\""} [package.extras] hiredis = ["hiredis (>=1.0.0)"] @@ -1151,13 +1151,13 @@ files = [ [[package]] name = "rq" -version = "1.16.0" +version = "1.16.1" description = "RQ is a simple, lightweight, library for creating background jobs, and processing them." optional = true python-versions = ">=3.7" files = [ - {file = "rq-1.16.0-py3-none-any.whl", hash = "sha256:d71859da6f3af597bd79675b240e138587878ce9b693d1bcf681443e2a8193ce"}, - {file = "rq-1.16.0.tar.gz", hash = "sha256:aab9e0fc90d7ffd100241b0f8dd7458c1ee4039edc2bfea9f7e2cfc0567f5710"}, + {file = "rq-1.16.1-py3-none-any.whl", hash = "sha256:273de33f10bb9f18cd1e8ccc0a4e8dba2b8eb86a6ab2a91ae674f99bd68025f1"}, + {file = "rq-1.16.1.tar.gz", hash = "sha256:d9a6314bc759a743b4a5d89aa467eaa3a31dbbc0a34bcd0ee82e8852d9ec166d"}, ] [package.dependencies] @@ -1166,18 +1166,18 @@ redis = ">=3.5" [[package]] name = "setuptools" -version = "69.1.1" +version = "69.2.0" description = "Easily download, build, install, upgrade, and uninstall Python packages" optional = false python-versions = ">=3.8" files = [ - {file = "setuptools-69.1.1-py3-none-any.whl", hash = "sha256:02fa291a0471b3a18b2b2481ed902af520c69e8ae0919c13da936542754b4c56"}, - {file = "setuptools-69.1.1.tar.gz", hash = "sha256:5c0806c7d9af348e6dd3777b4f4dbb42c7ad85b190104837488eab9a7c945cf8"}, + {file = "setuptools-69.2.0-py3-none-any.whl", hash = "sha256:c21c49fb1042386df081cb5d86759792ab89efca84cf114889191cd09aacc80c"}, + {file = "setuptools-69.2.0.tar.gz", hash = "sha256:0ff4183f8f42cd8fa3acea16c45205521a4ef28f73c6391d8a25e92893134f2e"}, ] [package.extras] docs = ["furo", "jaraco.packaging (>=9.3)", "jaraco.tidelift (>=1.4)", "pygments-github-lexers (==0.0.5)", "rst.linker (>=1.9)", "sphinx (<7.2.5)", "sphinx (>=3.5)", "sphinx-favicon", "sphinx-inline-tabs", "sphinx-lint", "sphinx-notfound-page (>=1,<2)", "sphinx-reredirects", "sphinxcontrib-towncrier"] -testing = ["build[virtualenv]", "filelock (>=3.4.0)", "flake8-2020", "ini2toml[lite] (>=0.9)", "jaraco.develop (>=7.21)", "jaraco.envs (>=2.2)", "jaraco.path (>=3.2.0)", "packaging (>=23.2)", "pip (>=19.1)", "pytest (>=6)", "pytest-checkdocs (>=2.4)", "pytest-cov", "pytest-enabler (>=2.2)", "pytest-home (>=0.5)", "pytest-mypy (>=0.9.1)", "pytest-perf", "pytest-ruff (>=0.2.1)", "pytest-timeout", "pytest-xdist", "tomli-w (>=1.0.0)", "virtualenv (>=13.0.0)", "wheel"] +testing = ["build[virtualenv]", "filelock (>=3.4.0)", "importlib-metadata", "ini2toml[lite] (>=0.9)", "jaraco.develop (>=7.21)", "jaraco.envs (>=2.2)", "jaraco.path (>=3.2.0)", "mypy (==1.9)", "packaging (>=23.2)", "pip (>=19.1)", "pytest (>=6)", "pytest-checkdocs (>=2.4)", "pytest-cov", "pytest-enabler (>=2.2)", "pytest-home (>=0.5)", "pytest-mypy (>=0.9.1)", "pytest-perf", "pytest-ruff (>=0.2.1)", "pytest-timeout", "pytest-xdist (>=3)", "tomli", "tomli-w (>=1.0.0)", "virtualenv (>=13.0.0)", "wheel"] testing-integration = ["build[virtualenv] (>=1.0.3)", "filelock (>=3.4.0)", "jaraco.envs (>=2.2)", "jaraco.path (>=3.2.0)", "packaging (>=23.2)", "pytest", "pytest-enabler", "pytest-xdist", "tomli", "virtualenv (>=13.0.0)", "wheel"] [[package]] diff --git a/src/pyproject.toml b/src/pyproject.toml index 2df5cc7..a5b01cb 100644 --- a/src/pyproject.toml +++ b/src/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "offat" -version = "0.15.4" +version = "0.15.5" description = "Offensive API tester tool automates checks for common API vulnerabilities" authors = ["Dhrumil Mistry "] license = "MIT"