From 96299a445e67f6d7275f0ec9c83f1a30e03dc3ad Mon Sep 17 00:00:00 2001 From: Keshav Priyadarshi Date: Fri, 26 Jul 2024 18:12:22 +0530 Subject: [PATCH] Use VersionRange.normalize to compare advisory Signed-off-by: Keshav Priyadarshi --- vulntotal/vulntotal_cli.py | 234 +++++++++++++++++-------------------- 1 file changed, 107 insertions(+), 127 deletions(-) diff --git a/vulntotal/vulntotal_cli.py b/vulntotal/vulntotal_cli.py index f820e7b2c..2c2326da7 100755 --- a/vulntotal/vulntotal_cli.py +++ b/vulntotal/vulntotal_cli.py @@ -19,11 +19,15 @@ # TODO: use saneyaml import yaml +from fetchcode import package_versions from packageurl import PackageURL from texttable import Texttable -from univers.normalized_range import NormalizedVersionRanges +from univers.version_range import RANGE_CLASS_BY_SCHEMES +from univers.version_range import VersionRange +from univers.version_range import build_range_from_github_advisory_constraint +from univers.version_range import build_range_from_snyk_advisory_string +from univers.version_range import from_gitlab_native -from vulnerabilities.package_managers import VERSION_API_CLASSES_BY_PACKAGE_TYPE from vulntotal.datasources import DATASOURCE_REGISTRY from vulntotal.validator import VendorData from vulntotal.vulntotal_utils import get_item @@ -189,7 +193,9 @@ def handler( prettyprint(purl, active_datasource, pagination, no_threading) elif purl: - prettyprint_group_by_cve(purl, active_datasource, pagination, no_threading, vers, no_compare) + prettyprint_group_by_cve( + purl, active_datasource, pagination, no_threading, vers, no_compare + ) def get_valid_datasources(datasources): @@ -281,10 +287,12 @@ def run_datasources(purl, datasources, no_threading=False): return vulnerabilities -class VulntotalEncoder(json.JSONEncoder): +class VendorDataEncoder(json.JSONEncoder): def default(self, obj): - if isinstance(obj, VendorData) or isinstance(obj, NormalizedVersionRanges): + if isinstance(obj, VendorData): return obj.to_dict() + if isinstance(obj, VersionRange): + return str(obj) return json.JSONEncoder.default(self, obj) @@ -298,7 +306,7 @@ def write_json_output(purl, datasources, json_output, no_threading, no_group, no grouped_by_cve = group_by_cve(vulnerabilities, PackageURL.from_string(purl), no_compare) results.update(grouped_by_cve) - return json.dump(results, json_output, cls=VulntotalEncoder, indent=2) + return json.dump(results, json_output, cls=VendorDataEncoder, indent=2) def noop(self, *args, **kw): @@ -316,30 +324,26 @@ def write_yaml_output(purl, datasources, yaml_output, no_threading, no_group, no results.update(vulnerabilities) else: grouped_by_cve = group_by_cve(vulnerabilities, PackageURL.from_string(purl), no_compare) - serialize_normalized_range(grouped_by_cve, no_compare) + serialize_version_range(grouped_by_cve, no_compare) results.update(grouped_by_cve) return yaml.dump(results, yaml_output, default_flow_style=False, indent=2, sort_keys=False) -def serialize_normalized_range(grouped_by_cve, no_compare): +def serialize_version_range(grouped_by_cve, no_compare): if no_compare: return for cve, value in grouped_by_cve.items(): if cve in ("NOCVE", "NOADVISORY"): continue - for datasource, resources in value.items(): + for _, resources in value.items(): for resource in resources: affected_versions = resource.get("normalized_affected_versions") fixed_versions = resource.get("normalized_fixed_versions") - if isinstance(affected_versions, NormalizedVersionRanges): - resource["normalized_affected_versions"] = [ - str(vers) for vers in affected_versions.version_ranges - ] - if isinstance(fixed_versions, NormalizedVersionRanges): - resource["normalized_fixed_versions"] = [ - str(vers) for vers in fixed_versions.version_ranges - ] + if isinstance(affected_versions, VersionRange): + resource["normalized_affected_versions"] = str(affected_versions) + if isinstance(fixed_versions, VersionRange): + resource["normalized_fixed_versions"] = str(fixed_versions) def prettyprint(purl, datasources, pagination, no_threading): @@ -363,17 +367,6 @@ def prettyprint(purl, datasources, pagination, no_threading): pydoc.pager(metadata + table.draw()) if pagination else click.echo(metadata + table.draw()) -NORMALIZED_VERSION_RANGE_BY_DATASOURCE = { - "deps": NormalizedVersionRanges.from_discrete, - "github": NormalizedVersionRanges.from_github, - "gitlab": NormalizedVersionRanges.from_gitlab, - "oss_index": None, - "osv": NormalizedVersionRanges.from_discrete, - "snyk": NormalizedVersionRanges.from_snyk, - "vulnerablecode": NormalizedVersionRanges.from_discrete, -} - - def group_by_cve(vulnerabilities, purl, no_compare): grouped_by_cve = {} nocve = {} @@ -382,32 +375,20 @@ def group_by_cve(vulnerabilities, purl, no_compare): if not advisories: if datasource not in noadvisory: noadvisory[datasource] = [] - noadvisory[datasource].append( - { - "advisory": None, - } - ) + noadvisory[datasource].append({"advisory": None}) for advisory in advisories: cve = next((x for x in advisory.aliases if x.startswith("CVE")), None) if not cve: if datasource not in nocve: nocve[datasource] = [] - nocve[datasource].append( - { - "advisory": advisory, - } - ) + nocve[datasource].append({"advisory": advisory}) continue if cve not in grouped_by_cve: grouped_by_cve[cve] = {} if datasource not in grouped_by_cve[cve]: grouped_by_cve[cve][datasource] = [] - grouped_by_cve[cve][datasource].append( - { - "advisory": advisory, - } - ) + grouped_by_cve[cve][datasource].append({"advisory": advisory}) grouped_by_cve["NOCVE"] = nocve grouped_by_cve["NOADVISORY"] = noadvisory if not no_compare: @@ -418,7 +399,6 @@ def group_by_cve(vulnerabilities, purl, no_compare): def normalize_version_ranges(grouped_by_cve, purl): package_versions = get_all_versions(purl) - for cve, value in grouped_by_cve.items(): if cve in ("NOCVE", "NOADVISORY"): continue @@ -427,20 +407,24 @@ def normalize_version_ranges(grouped_by_cve, purl): advisory = resource["advisory"] normalized_affected_versions = [] normalized_fixed_versions = [] - datasource_normalizer = NORMALIZED_VERSION_RANGE_BY_DATASOURCE.get(datasource) - if datasource_normalizer and advisory.affected_versions: + version_range_func = VERSION_RANGE_BY_DATASOURCE.get(datasource) + if version_range_func and advisory.affected_versions: + affected = advisory.affected_versions + if len(affected) == 1: + affected = affected[0] + try: - normalized_affected_versions = datasource_normalizer( - advisory.affected_versions, purl.type, package_versions - ) + vra = version_range_func(purl.type, affected) + normalized_affected_versions = vra.normalize(package_versions) except Exception as err: normalized_affected_versions = [err] if advisory.fixed_versions: try: - normalized_fixed_versions = NormalizedVersionRanges.from_discrete( - advisory.fixed_versions, purl.type, package_versions + vrf = get_range_from_discrete_version_string( + purl.type, advisory.fixed_versions ) + normalized_fixed_versions = vrf.normalize(package_versions) except Exception as err: normalized_fixed_versions = [err] @@ -449,35 +433,37 @@ def normalize_version_ranges(grouped_by_cve, purl): def compare(grouped_by_cve): - for cve, value in grouped_by_cve.items(): + for cve, advisories in grouped_by_cve.items(): if cve in ("NOCVE", "NOADVISORY"): continue - sources = list(value.keys()) + sources = list(advisories.keys()) board = {source: {} for source in sources} - """ - A typical board after comparison may look like this. - - board = { - "github":{ - "snyk": 0, - "gitlab": 1, - "deps": 0, - "vulnerablecode": 1, - "osv": 1, - "oss_index": 1, - }, - "snyk":{ - "github": 0, - "gitlab": 1, - "deps": 0, - "vulnerablecode": 1, - "osv": 1, - "oss_index": 1, - }, - ... - } - """ - for datasource, resources in value.items(): + + # For each unique CVE create the scoring board to score + # advisory from different datasources. + # A typical board after comparison may look like this. + + # board = { + # "github":{ + # "snyk": 0, + # "gitlab": 1, + # "deps": 0, + # "vulnerablecode": 1, + # "osv": 1, + # "oss_index": 1, + # }, + # "snyk":{ + # "github": 0, + # "gitlab": 1, + # "deps": 0, + # "vulnerablecode": 1, + # "osv": 1, + # "oss_index": 1, + # }, + # ... + # } + + for datasource, resources in advisories.items(): normalized_affected_versions_a = get_item(resources, 0, "normalized_affected_versions") normalized_fixed_versions_a = get_item(resources, 0, "normalized_fixed_versions") if normalized_fixed_versions_a and normalized_affected_versions_a: @@ -489,28 +475,31 @@ def compare(grouped_by_cve): ): continue normalized_affected_versions_b = get_item( - value, source, 0, "normalized_affected_versions" + advisories, source, 0, "normalized_affected_versions" ) normalized_fixed_versions_b = get_item( - value, source, 0, "normalized_fixed_versions" + advisories, source, 0, "normalized_fixed_versions" ) board[datasource][source] = 0 board[source][datasource] = 0 - if ( - normalized_fixed_versions_a == normalized_fixed_versions_b - and normalized_affected_versions_a == normalized_affected_versions_b - ): - board[datasource][source] = 1 - board[source][datasource] = 1 - - maximum = max([sum(list(table.values())) for table in board.values()]) + if normalized_fixed_versions_a == normalized_fixed_versions_b: + board[datasource][source] += 0.5 + board[source][datasource] += 0.5 + elif normalized_affected_versions_a == normalized_affected_versions_b: + board[datasource][source] += 0.5 + board[source][datasource] += 0.5 + + # Compute the relative score from the score board for each advisory. + maximum = max([sum(table.values()) for table in board.values()]) datasource_count = len(sources) for datasource, table in board.items(): if maximum == 0: - # NA if only one advisory else TC aka `Total Collision`. - value[datasource][0]["score"] = "TC" if datasource_count > 1 else "NA" + # NA if only one advisory and nothing to compare with. + # TC (Total Collision) i.e no two advisory agree on common fixed or affected version. + advisories[datasource][0]["score"] = "TC" if datasource_count > 1 else "NA" continue - value[datasource][0]["score"] = (sum(list(table.values())) / maximum) * 100 + datasource_score = (sum(table.values()) / maximum) * 100 + advisories[datasource][0]["score"] = datasource_score def prettyprint_group_by_cve(purl, datasources, pagination, no_threading, vers, no_compare): @@ -535,37 +524,21 @@ def prettyprint_group_by_cve(purl, datasources, pagination, no_threading, vers, if not no_compare and vers and "score" in resources[0]: na_affected = get_item(resources, 0, "normalized_affected_versions") na_fixed = get_item(resources, 0, "normalized_fixed_versions") - na_affected = ( - na_affected.version_ranges - if isinstance(na_affected, NormalizedVersionRanges) - else na_affected - ) - na_fixed = ( - na_fixed.version_ranges - if isinstance(na_fixed, NormalizedVersionRanges) - else na_fixed - ) - na_affected = "\n".join([str(i) for i in na_affected]) - na_fixed = "\n".join([str(i) for i in na_fixed]) table.add_row(["", "", "", na_affected, na_fixed, ""]) pydoc.pager(metadata + table.draw()) if pagination else click.echo(metadata + table.draw()) -def strip_leading_v(version): - if version.startswith("v"): - return version[1:] - return version - - def get_texttable(no_group=False, no_compare=False): quantum = 100 / 125 terminal_width = os.get_terminal_size().columns line_factor = terminal_width / 100 - column_5x = math.floor(5 * quantum * line_factor) - column_15x = math.floor(15 * quantum * line_factor) - column_20x = math.floor(20 * quantum * line_factor) + column_size = lambda f: math.floor(f * quantum * line_factor) + column_7x = column_size(5) + column_17x = column_size(10) + column_15x = column_size(15) + column_20x = column_size(20) table = Texttable() @@ -581,37 +554,44 @@ def get_texttable(no_group=False, no_compare=False): table.set_cols_dtype(["a", "a", "a", "a", "a"]) table.set_cols_align(["l", "l", "l", "l", "l"]) table.set_cols_valign(["t", "t", "t", "a", "t"]) - table.set_cols_width([column_20x, column_15x, column_20x, column_20x, column_20x]) + table.set_cols_width([column_15x, column_15x, column_20x, column_20x, column_20x]) table.header(["CVE", "DATASOURCE", "ALIASES", "AFFECTED", "FIXED"]) return table table.set_cols_dtype(["a", "a", "a", "a", "a", "a"]) table.set_cols_align(["l", "l", "l", "l", "l", "l"]) table.set_cols_valign(["t", "t", "t", "a", "t", "t"]) - table.set_cols_width([column_20x, column_15x, column_20x, column_20x, column_20x, column_5x]) + table.set_cols_width([column_17x, column_15x, column_15x, column_20x, column_20x, column_7x]) table.header(["CVE", "DATASOURCE", "ALIASES", "AFFECTED", "FIXED", "SCORE"]) return table -def get_all_versions(purl: PackageURL): - if purl.type not in VERSION_API_CLASSES_BY_PACKAGE_TYPE: - return +def get_range_from_discrete_version_string(schema, versions): + range_cls = RANGE_CLASS_BY_SCHEMES.get(schema) + if isinstance(versions, str): + versions = [versions] + return range_cls.from_versions(versions) - versionAPI = None - package_name = None - if purl.type == "maven": - package_name = f"{purl.namespace}:{purl.name}" - if purl.type in ("composer", "golang", "github"): - package_name = f"{purl.namespace}/{purl.name}" - if purl.type in ("nuget", "pypi", "gem", "npm", "hex", "deb", "cargo"): - package_name = purl.name +VERSION_RANGE_BY_DATASOURCE = { + "deps": get_range_from_discrete_version_string, + "github": build_range_from_github_advisory_constraint, + "gitlab": from_gitlab_native, + "oss_index": None, + "osv": get_range_from_discrete_version_string, + "snyk": build_range_from_snyk_advisory_string, + "safetydb": build_range_from_snyk_advisory_string, + "vulnerablecode": get_range_from_discrete_version_string, +} + - versionAPI = VERSION_API_CLASSES_BY_PACKAGE_TYPE.get(purl.type)() - all_versions = versionAPI.fetch(package_name) +def get_all_versions(purl: PackageURL): + if purl.type not in package_versions.SUPPORTED_ECOSYSTEMS: + return - return [strip_leading_v(package_version.value) for package_version in all_versions] + all_versions = package_versions.versions(str(purl)) + return [package_version.value for package_version in all_versions] if __name__ == "__main__":