Skip to content

Commit

Permalink
Fix sarif parser location files processing (#11265)
Browse files Browse the repository at this point in the history
* Fix sarif parser locations files processing

* Fix tests

* linter fixes

* fix snippet for each file hit

* fix snippet
  • Loading branch information
dmarushkin authored Nov 28, 2024
1 parent 4751052 commit 06ff84f
Show file tree
Hide file tree
Showing 2 changed files with 145 additions and 127 deletions.
268 changes: 143 additions & 125 deletions dojo/tools/sarif/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ def __get_items_from_run(self, run):
# get the timestamp of the run if possible
run_date = self.__get_last_invocation_date(run)
for result in run.get("results", []):
item = get_item(result, rules, artifacts, run_date)
if item is not None:
items.append(item)
result_items = get_items_from_result(result, rules, artifacts, run_date)
if result_items:
items.extend(result_items)
return items

def __get_last_invocation_date(self, data):
Expand Down Expand Up @@ -189,34 +189,35 @@ def get_title(result, rule):
return textwrap.shorten(title, 150)


def get_snippet(result):
def get_snippet(location):

snippet = None
if "locations" in result:
location = result["locations"][0]
if "physicalLocation" in location:
if "region" in location["physicalLocation"]:
if "snippet" in location["physicalLocation"]["region"]:
if (
"text"
in location["physicalLocation"]["region"]["snippet"]
):
snippet = location["physicalLocation"]["region"][
"snippet"
]["text"]
if (
snippet is None
and "contextRegion" in location["physicalLocation"]
):
if "snippet" in location["physicalLocation"]["contextRegion"]:
if (
"text"
in location["physicalLocation"]["contextRegion"][
"snippet"
]
):
snippet = location["physicalLocation"][
"contextRegion"
]["snippet"]["text"]

if location and "physicalLocation" in location:
if "region" in location["physicalLocation"]:
if "snippet" in location["physicalLocation"]["region"]:
if (
"text"
in location["physicalLocation"]["region"]["snippet"]
):
snippet = location["physicalLocation"]["region"][
"snippet"
]["text"]
if (
snippet is None
and "contextRegion" in location["physicalLocation"]
):
if "snippet" in location["physicalLocation"]["contextRegion"]:
if (
"text"
in location["physicalLocation"]["contextRegion"][
"snippet"
]
):
snippet = location["physicalLocation"][
"contextRegion"
]["snippet"]["text"]

return snippet


Expand Down Expand Up @@ -264,16 +265,16 @@ def get_codeFlowsDescription(codeFlows):
return description


def get_description(result, rule):
def get_description(result, rule, location):
description = ""
message = ""
if "message" in result:
message = get_message_from_multiformatMessageString(
result["message"], rule,
)
description += f"**Result message:** {message}\n"
if get_snippet(result) is not None:
description += f"**Snippet:**\n```\n{get_snippet(result)}\n```\n"
if get_snippet(location) is not None:
description += f"**Snippet:**\n```\n{get_snippet(location)}\n```\n"
if rule is not None:
if "name" in rule:
description += f"**{_('Rule name')}:** {rule.get('name')}\n"
Expand Down Expand Up @@ -351,7 +352,7 @@ def get_severity(result, rule):
return "Medium"


def get_item(result, rules, artifacts, run_date):
def get_items_from_result(result, rules, artifacts, run_date):
# see
# https://docs.oasis-open.org/sarif/sarif/v2.1.0/csprd01/sarif-v2.1.0-csprd01.html
# / 3.27.9
Expand All @@ -366,100 +367,117 @@ def get_item(result, rules, artifacts, run_date):
if result.get("suppressions"):
suppressed = True

# if there is a location get it
file_path = None
line = None
# if there is a location get all files into files list

files = []

if "locations" in result:
location = result["locations"][0]
if "physicalLocation" in location:
file_path = location["physicalLocation"]["artifactLocation"]["uri"]

# 'region' attribute is optionnal
if "region" in location["physicalLocation"]:
# https://docs.oasis-open.org/sarif/sarif/v2.0/csprd02/sarif-v2.0-csprd02.html / 3.30.1
# need to check whether it is byteOffset
if "byteOffset" in location["physicalLocation"]["region"]:
pass
else:
line = location["physicalLocation"]["region"]["startLine"]

# test rule link
rule = rules.get(result.get("ruleId"))

finding = Finding(
title=get_title(result, rule),
severity=get_severity(result, rule),
description=get_description(result, rule),
static_finding=True, # by definition
dynamic_finding=False, # by definition
false_p=suppressed,
active=not suppressed,
file_path=file_path,
line=line,
references=get_references(rule),
)

if "ruleId" in result:
finding.vuln_id_from_tool = result["ruleId"]
# for now we only support when the id of the rule is a CVE
if cve_try(result["ruleId"]):
finding.unsaved_vulnerability_ids = [cve_try(result["ruleId"])]
# some time the rule id is here but the tool doesn't define it
if rule is not None:
cwes_extracted = get_rule_cwes(rule)
if len(cwes_extracted) > 0:
finding.cwe = cwes_extracted[-1]

# Some tools such as GitHub or Grype return the severity in properties
# instead
if "properties" in rule and "security-severity" in rule["properties"]:
try:
cvss = float(rule["properties"]["security-severity"])
severity = cvss_to_severity(cvss)
finding.cvssv3_score = cvss
finding.severity = severity
except ValueError:
if rule["properties"]["security-severity"].lower().capitalize() in ["Info", "Low", "Medium", "High", "Critical"]:
finding.severity = rule["properties"]["security-severity"].lower().capitalize()
else:
finding.severity = "Info"

# manage the case that some tools produce CWE as properties of the result
cwes_properties_extracted = get_result_cwes_properties(result)
if len(cwes_properties_extracted) > 0:
finding.cwe = cwes_properties_extracted[-1]

# manage fixes provided in the report
if "fixes" in result:
finding.mitigation = "\n".join(
[fix.get("description", {}).get("text") for fix in result["fixes"]],
)
for location in result["locations"]:

file_path = None
line = None

if "physicalLocation" in location:
file_path = location["physicalLocation"]["artifactLocation"]["uri"]

# 'region' attribute is optionnal
if "region" in location["physicalLocation"]:
# https://docs.oasis-open.org/sarif/sarif/v2.0/csprd02/sarif-v2.0-csprd02.html / 3.30.1
# need to check whether it is byteOffset
if "byteOffset" in location["physicalLocation"]["region"]:
pass
else:
line = location["physicalLocation"]["region"]["startLine"]

if run_date:
finding.date = run_date

# manage tags provided in the report and rule and remove duplicated
tags = list(set(get_properties_tags(rule) + get_properties_tags(result)))
tags = [s.removeprefix("external/cwe/") for s in tags]
finding.tags = tags

# manage fingerprints
# fingerprinting in SARIF is more complete than in current implementation
# SARIF standard make it possible to have multiple version in the same report
# for now we just take the first one and keep the format to be able to
# compare it
if result.get("fingerprints"):
hashes = get_fingerprints_hashes(result["fingerprints"])
first_item = next(iter(hashes.items()))
finding.unique_id_from_tool = first_item[1]["value"]
elif result.get("partialFingerprints"):
# for this one we keep an order to have id that could be compared
hashes = get_fingerprints_hashes(result["partialFingerprints"])
sorted_hashes = sorted(hashes.keys())
finding.unique_id_from_tool = "|".join(
[f'{key}:{hashes[key]["value"]}' for key in sorted_hashes],
files.append((file_path, line, location))

if not files:
files.append((None, None, None))

result_items = []

for file_path, line, location in files:

# test rule link
rule = rules.get(result.get("ruleId"))

finding = Finding(
title=get_title(result, rule),
severity=get_severity(result, rule),
description=get_description(result, rule, location),
static_finding=True, # by definition
dynamic_finding=False, # by definition
false_p=suppressed,
active=not suppressed,
file_path=file_path,
line=line,
references=get_references(rule),
)
return finding

if "ruleId" in result:
finding.vuln_id_from_tool = result["ruleId"]
# for now we only support when the id of the rule is a CVE
if cve_try(result["ruleId"]):
finding.unsaved_vulnerability_ids = [cve_try(result["ruleId"])]
# some time the rule id is here but the tool doesn't define it
if rule is not None:
cwes_extracted = get_rule_cwes(rule)
if len(cwes_extracted) > 0:
finding.cwe = cwes_extracted[-1]

# Some tools such as GitHub or Grype return the severity in properties
# instead
if "properties" in rule and "security-severity" in rule["properties"]:
try:
cvss = float(rule["properties"]["security-severity"])
severity = cvss_to_severity(cvss)
finding.cvssv3_score = cvss
finding.severity = severity
except ValueError:
if rule["properties"]["security-severity"].lower().capitalize() in ["Info", "Low", "Medium", "High", "Critical"]:
finding.severity = rule["properties"]["security-severity"].lower().capitalize()
else:
finding.severity = "Info"

# manage the case that some tools produce CWE as properties of the result
cwes_properties_extracted = get_result_cwes_properties(result)
if len(cwes_properties_extracted) > 0:
finding.cwe = cwes_properties_extracted[-1]

# manage fixes provided in the report
if "fixes" in result:
finding.mitigation = "\n".join(
[fix.get("description", {}).get("text") for fix in result["fixes"]],
)

if run_date:
finding.date = run_date

# manage tags provided in the report and rule and remove duplicated
tags = list(set(get_properties_tags(rule) + get_properties_tags(result)))
tags = [s.removeprefix("external/cwe/") for s in tags]
finding.tags = tags

# manage fingerprints
# fingerprinting in SARIF is more complete than in current implementation
# SARIF standard make it possible to have multiple version in the same report
# for now we just take the first one and keep the format to be able to
# compare it
if result.get("fingerprints"):
hashes = get_fingerprints_hashes(result["fingerprints"])
first_item = next(iter(hashes.items()))
finding.unique_id_from_tool = first_item[1]["value"]
elif result.get("partialFingerprints"):
# for this one we keep an order to have id that could be compared
hashes = get_fingerprints_hashes(result["partialFingerprints"])
sorted_hashes = sorted(hashes.keys())
finding.unique_id_from_tool = "|".join(
[f'{key}:{hashes[key]["value"]}' for key in sorted_hashes],
)

result_items.append(finding)

return result_items


def get_fingerprints_hashes(values):
Expand Down
4 changes: 2 additions & 2 deletions unittests/tools/test_sarif_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ def test_njsscan(self):
with open(path.join(path.dirname(__file__), "../scans/sarif/njsscan.sarif"), encoding="utf-8") as testfile:
parser = SarifParser()
findings = parser.get_findings(testfile, Test())
self.assertEqual(2, len(findings))
self.assertEqual(3, len(findings))
# finding 0
finding = findings[0]
self.assertEqual(
Expand Down Expand Up @@ -313,7 +313,7 @@ def test_mobsfscan(self):
with open(path.join(path.dirname(__file__), "../scans/sarif/mobsfscan.json"), encoding="utf-8") as testfile:
parser = SarifParser()
findings = parser.get_findings(testfile, Test())
self.assertEqual(9, len(findings))
self.assertEqual(18, len(findings))
for finding in findings:
self.common_checks(finding)

Expand Down

0 comments on commit 06ff84f

Please sign in to comment.