Skip to content

Commit

Permalink
Use VersionRange.normalize to compare advisory
Browse files Browse the repository at this point in the history
Signed-off-by: Keshav Priyadarshi <git@keshav.space>
  • Loading branch information
keshav-space committed Jul 26, 2024
1 parent 964275b commit 96299a4
Showing 1 changed file with 107 additions and 127 deletions.
234 changes: 107 additions & 127 deletions vulntotal/vulntotal_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,15 @@

# TODO: use saneyaml
import yaml
from fetchcode import package_versions
from packageurl import PackageURL
from texttable import Texttable
from univers.normalized_range import NormalizedVersionRanges
from univers.version_range import RANGE_CLASS_BY_SCHEMES
from univers.version_range import VersionRange
from univers.version_range import build_range_from_github_advisory_constraint
from univers.version_range import build_range_from_snyk_advisory_string
from univers.version_range import from_gitlab_native

from vulnerabilities.package_managers import VERSION_API_CLASSES_BY_PACKAGE_TYPE
from vulntotal.datasources import DATASOURCE_REGISTRY
from vulntotal.validator import VendorData
from vulntotal.vulntotal_utils import get_item
Expand Down Expand Up @@ -189,7 +193,9 @@ def handler(
prettyprint(purl, active_datasource, pagination, no_threading)

elif purl:
prettyprint_group_by_cve(purl, active_datasource, pagination, no_threading, vers, no_compare)
prettyprint_group_by_cve(
purl, active_datasource, pagination, no_threading, vers, no_compare
)


def get_valid_datasources(datasources):
Expand Down Expand Up @@ -281,10 +287,12 @@ def run_datasources(purl, datasources, no_threading=False):
return vulnerabilities


class VulntotalEncoder(json.JSONEncoder):
class VendorDataEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, VendorData) or isinstance(obj, NormalizedVersionRanges):
if isinstance(obj, VendorData):
return obj.to_dict()
if isinstance(obj, VersionRange):
return str(obj)
return json.JSONEncoder.default(self, obj)


Expand All @@ -298,7 +306,7 @@ def write_json_output(purl, datasources, json_output, no_threading, no_group, no
grouped_by_cve = group_by_cve(vulnerabilities, PackageURL.from_string(purl), no_compare)
results.update(grouped_by_cve)

return json.dump(results, json_output, cls=VulntotalEncoder, indent=2)
return json.dump(results, json_output, cls=VendorDataEncoder, indent=2)


def noop(self, *args, **kw):
Expand All @@ -316,30 +324,26 @@ def write_yaml_output(purl, datasources, yaml_output, no_threading, no_group, no
results.update(vulnerabilities)
else:
grouped_by_cve = group_by_cve(vulnerabilities, PackageURL.from_string(purl), no_compare)
serialize_normalized_range(grouped_by_cve, no_compare)
serialize_version_range(grouped_by_cve, no_compare)
results.update(grouped_by_cve)

return yaml.dump(results, yaml_output, default_flow_style=False, indent=2, sort_keys=False)


def serialize_normalized_range(grouped_by_cve, no_compare):
def serialize_version_range(grouped_by_cve, no_compare):
if no_compare:
return
for cve, value in grouped_by_cve.items():
if cve in ("NOCVE", "NOADVISORY"):
continue
for datasource, resources in value.items():
for _, resources in value.items():
for resource in resources:
affected_versions = resource.get("normalized_affected_versions")
fixed_versions = resource.get("normalized_fixed_versions")
if isinstance(affected_versions, NormalizedVersionRanges):
resource["normalized_affected_versions"] = [
str(vers) for vers in affected_versions.version_ranges
]
if isinstance(fixed_versions, NormalizedVersionRanges):
resource["normalized_fixed_versions"] = [
str(vers) for vers in fixed_versions.version_ranges
]
if isinstance(affected_versions, VersionRange):
resource["normalized_affected_versions"] = str(affected_versions)
if isinstance(fixed_versions, VersionRange):
resource["normalized_fixed_versions"] = str(fixed_versions)


def prettyprint(purl, datasources, pagination, no_threading):
Expand All @@ -363,17 +367,6 @@ def prettyprint(purl, datasources, pagination, no_threading):
pydoc.pager(metadata + table.draw()) if pagination else click.echo(metadata + table.draw())


NORMALIZED_VERSION_RANGE_BY_DATASOURCE = {
"deps": NormalizedVersionRanges.from_discrete,
"github": NormalizedVersionRanges.from_github,
"gitlab": NormalizedVersionRanges.from_gitlab,
"oss_index": None,
"osv": NormalizedVersionRanges.from_discrete,
"snyk": NormalizedVersionRanges.from_snyk,
"vulnerablecode": NormalizedVersionRanges.from_discrete,
}


def group_by_cve(vulnerabilities, purl, no_compare):
grouped_by_cve = {}
nocve = {}
Expand All @@ -382,32 +375,20 @@ def group_by_cve(vulnerabilities, purl, no_compare):
if not advisories:
if datasource not in noadvisory:
noadvisory[datasource] = []
noadvisory[datasource].append(
{
"advisory": None,
}
)
noadvisory[datasource].append({"advisory": None})
for advisory in advisories:
cve = next((x for x in advisory.aliases if x.startswith("CVE")), None)
if not cve:
if datasource not in nocve:
nocve[datasource] = []
nocve[datasource].append(
{
"advisory": advisory,
}
)
nocve[datasource].append({"advisory": advisory})
continue
if cve not in grouped_by_cve:
grouped_by_cve[cve] = {}

if datasource not in grouped_by_cve[cve]:
grouped_by_cve[cve][datasource] = []
grouped_by_cve[cve][datasource].append(
{
"advisory": advisory,
}
)
grouped_by_cve[cve][datasource].append({"advisory": advisory})
grouped_by_cve["NOCVE"] = nocve
grouped_by_cve["NOADVISORY"] = noadvisory
if not no_compare:
Expand All @@ -418,7 +399,6 @@ def group_by_cve(vulnerabilities, purl, no_compare):

def normalize_version_ranges(grouped_by_cve, purl):
package_versions = get_all_versions(purl)

for cve, value in grouped_by_cve.items():
if cve in ("NOCVE", "NOADVISORY"):
continue
Expand All @@ -427,20 +407,24 @@ def normalize_version_ranges(grouped_by_cve, purl):
advisory = resource["advisory"]
normalized_affected_versions = []
normalized_fixed_versions = []
datasource_normalizer = NORMALIZED_VERSION_RANGE_BY_DATASOURCE.get(datasource)
if datasource_normalizer and advisory.affected_versions:
version_range_func = VERSION_RANGE_BY_DATASOURCE.get(datasource)
if version_range_func and advisory.affected_versions:
affected = advisory.affected_versions
if len(affected) == 1:
affected = affected[0]

try:
normalized_affected_versions = datasource_normalizer(
advisory.affected_versions, purl.type, package_versions
)
vra = version_range_func(purl.type, affected)
normalized_affected_versions = vra.normalize(package_versions)
except Exception as err:
normalized_affected_versions = [err]

if advisory.fixed_versions:
try:
normalized_fixed_versions = NormalizedVersionRanges.from_discrete(
advisory.fixed_versions, purl.type, package_versions
vrf = get_range_from_discrete_version_string(
purl.type, advisory.fixed_versions
)
normalized_fixed_versions = vrf.normalize(package_versions)
except Exception as err:
normalized_fixed_versions = [err]

Expand All @@ -449,35 +433,37 @@ def normalize_version_ranges(grouped_by_cve, purl):


def compare(grouped_by_cve):
for cve, value in grouped_by_cve.items():
for cve, advisories in grouped_by_cve.items():
if cve in ("NOCVE", "NOADVISORY"):
continue
sources = list(value.keys())
sources = list(advisories.keys())
board = {source: {} for source in sources}
"""
A typical board after comparison may look like this.
board = {
"github":{
"snyk": 0,
"gitlab": 1,
"deps": 0,
"vulnerablecode": 1,
"osv": 1,
"oss_index": 1,
},
"snyk":{
"github": 0,
"gitlab": 1,
"deps": 0,
"vulnerablecode": 1,
"osv": 1,
"oss_index": 1,
},
...
}
"""
for datasource, resources in value.items():

# For each unique CVE create the scoring board to score
# advisory from different datasources.
# A typical board after comparison may look like this.

# board = {
# "github":{
# "snyk": 0,
# "gitlab": 1,
# "deps": 0,
# "vulnerablecode": 1,
# "osv": 1,
# "oss_index": 1,
# },
# "snyk":{
# "github": 0,
# "gitlab": 1,
# "deps": 0,
# "vulnerablecode": 1,
# "osv": 1,
# "oss_index": 1,
# },
# ...
# }

for datasource, resources in advisories.items():
normalized_affected_versions_a = get_item(resources, 0, "normalized_affected_versions")
normalized_fixed_versions_a = get_item(resources, 0, "normalized_fixed_versions")
if normalized_fixed_versions_a and normalized_affected_versions_a:
Expand All @@ -489,28 +475,31 @@ def compare(grouped_by_cve):
):
continue
normalized_affected_versions_b = get_item(
value, source, 0, "normalized_affected_versions"
advisories, source, 0, "normalized_affected_versions"
)
normalized_fixed_versions_b = get_item(
value, source, 0, "normalized_fixed_versions"
advisories, source, 0, "normalized_fixed_versions"
)
board[datasource][source] = 0
board[source][datasource] = 0
if (
normalized_fixed_versions_a == normalized_fixed_versions_b
and normalized_affected_versions_a == normalized_affected_versions_b
):
board[datasource][source] = 1
board[source][datasource] = 1

maximum = max([sum(list(table.values())) for table in board.values()])
if normalized_fixed_versions_a == normalized_fixed_versions_b:
board[datasource][source] += 0.5
board[source][datasource] += 0.5
elif normalized_affected_versions_a == normalized_affected_versions_b:
board[datasource][source] += 0.5
board[source][datasource] += 0.5

# Compute the relative score from the score board for each advisory.
maximum = max([sum(table.values()) for table in board.values()])
datasource_count = len(sources)
for datasource, table in board.items():
if maximum == 0:
# NA if only one advisory else TC aka `Total Collision`.
value[datasource][0]["score"] = "TC" if datasource_count > 1 else "NA"
# NA if only one advisory and nothing to compare with.
# TC (Total Collision) i.e no two advisory agree on common fixed or affected version.
advisories[datasource][0]["score"] = "TC" if datasource_count > 1 else "NA"
continue
value[datasource][0]["score"] = (sum(list(table.values())) / maximum) * 100
datasource_score = (sum(table.values()) / maximum) * 100
advisories[datasource][0]["score"] = datasource_score


def prettyprint_group_by_cve(purl, datasources, pagination, no_threading, vers, no_compare):
Expand All @@ -535,37 +524,21 @@ def prettyprint_group_by_cve(purl, datasources, pagination, no_threading, vers,
if not no_compare and vers and "score" in resources[0]:
na_affected = get_item(resources, 0, "normalized_affected_versions")
na_fixed = get_item(resources, 0, "normalized_fixed_versions")
na_affected = (
na_affected.version_ranges
if isinstance(na_affected, NormalizedVersionRanges)
else na_affected
)
na_fixed = (
na_fixed.version_ranges
if isinstance(na_fixed, NormalizedVersionRanges)
else na_fixed
)
na_affected = "\n".join([str(i) for i in na_affected])
na_fixed = "\n".join([str(i) for i in na_fixed])
table.add_row(["", "", "", na_affected, na_fixed, ""])

pydoc.pager(metadata + table.draw()) if pagination else click.echo(metadata + table.draw())


def strip_leading_v(version):
if version.startswith("v"):
return version[1:]
return version


def get_texttable(no_group=False, no_compare=False):
quantum = 100 / 125
terminal_width = os.get_terminal_size().columns
line_factor = terminal_width / 100

column_5x = math.floor(5 * quantum * line_factor)
column_15x = math.floor(15 * quantum * line_factor)
column_20x = math.floor(20 * quantum * line_factor)
column_size = lambda f: math.floor(f * quantum * line_factor)
column_7x = column_size(5)
column_17x = column_size(10)
column_15x = column_size(15)
column_20x = column_size(20)

table = Texttable()

Expand All @@ -581,37 +554,44 @@ def get_texttable(no_group=False, no_compare=False):
table.set_cols_dtype(["a", "a", "a", "a", "a"])
table.set_cols_align(["l", "l", "l", "l", "l"])
table.set_cols_valign(["t", "t", "t", "a", "t"])
table.set_cols_width([column_20x, column_15x, column_20x, column_20x, column_20x])
table.set_cols_width([column_15x, column_15x, column_20x, column_20x, column_20x])
table.header(["CVE", "DATASOURCE", "ALIASES", "AFFECTED", "FIXED"])
return table

table.set_cols_dtype(["a", "a", "a", "a", "a", "a"])
table.set_cols_align(["l", "l", "l", "l", "l", "l"])
table.set_cols_valign(["t", "t", "t", "a", "t", "t"])
table.set_cols_width([column_20x, column_15x, column_20x, column_20x, column_20x, column_5x])
table.set_cols_width([column_17x, column_15x, column_15x, column_20x, column_20x, column_7x])
table.header(["CVE", "DATASOURCE", "ALIASES", "AFFECTED", "FIXED", "SCORE"])

return table


def get_all_versions(purl: PackageURL):
if purl.type not in VERSION_API_CLASSES_BY_PACKAGE_TYPE:
return
def get_range_from_discrete_version_string(schema, versions):
range_cls = RANGE_CLASS_BY_SCHEMES.get(schema)
if isinstance(versions, str):
versions = [versions]
return range_cls.from_versions(versions)

versionAPI = None
package_name = None

if purl.type == "maven":
package_name = f"{purl.namespace}:{purl.name}"
if purl.type in ("composer", "golang", "github"):
package_name = f"{purl.namespace}/{purl.name}"
if purl.type in ("nuget", "pypi", "gem", "npm", "hex", "deb", "cargo"):
package_name = purl.name
VERSION_RANGE_BY_DATASOURCE = {
"deps": get_range_from_discrete_version_string,
"github": build_range_from_github_advisory_constraint,
"gitlab": from_gitlab_native,
"oss_index": None,
"osv": get_range_from_discrete_version_string,
"snyk": build_range_from_snyk_advisory_string,
"safetydb": build_range_from_snyk_advisory_string,
"vulnerablecode": get_range_from_discrete_version_string,
}


versionAPI = VERSION_API_CLASSES_BY_PACKAGE_TYPE.get(purl.type)()
all_versions = versionAPI.fetch(package_name)
def get_all_versions(purl: PackageURL):
if purl.type not in package_versions.SUPPORTED_ECOSYSTEMS:
return

return [strip_leading_v(package_version.value) for package_version in all_versions]
all_versions = package_versions.versions(str(purl))
return [package_version.value for package_version in all_versions]


if __name__ == "__main__":
Expand Down

0 comments on commit 96299a4

Please sign in to comment.