Skip to content

Commit

Permalink
Merge branch 'parser_feature' of github.com:MISP/misp-stix into dev
Browse files Browse the repository at this point in the history
  • Loading branch information
chrisr3d committed Aug 24, 2023
2 parents 003126f + 262a537 commit 56179e0
Show file tree
Hide file tree
Showing 10 changed files with 171 additions and 118 deletions.
4 changes: 2 additions & 2 deletions misp_stix_converter/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,9 @@ def main():
print(f'{field.capitalize()} encountered during the '
f'{feature} conversion process:\n {messages}')
if 'fails' in traceback:
fails = '\n - '.join(traceback['fails'])
fails = _handle_return_message(traceback['fails'])
print('Failed parsing the following - and the related error '
f'message:\n - {fails}')
f'message:\n {fails}')
if 'results' in traceback:
results = traceback['results']
if isinstance(results, list):
Expand Down
6 changes: 6 additions & 0 deletions misp_stix_converter/misp_stix_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -1016,6 +1016,12 @@ def _stix_to_misp(stix_args):
results[key].update(value)
continue
for field in ('errors', 'warnings'):
if field not in traceback:
continue
content = traceback[field]
if isinstance(content, list):
results['fails'][filename.name] = content
continue
for identifier, values in traceback[field].items():
results['fails'][identifier] = tuple(values)
if success:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,14 +162,18 @@ class STIX2SampleObservableConverter(metaclass=ABCMeta):
def __init__(self, main: _MAIN_CONVERTER_TYPING):
self._main_converter = main

@property
def event_uuid(self):
return self.main_parser.misp_event.uuid

@property
def main_parser(self):
return self._main_converter.main_parser

def _parse_artifact_observable(
self, artifact_ref: str, malware: Malware) -> MISPObject:
observable = self.main_parser._observable[artifact_ref]
if observable['used']:
if observable['used'][self.event_uuid]:
return observable['misp_object']
artifact = observable['observable']
artifact_object = self._main_converter._create_misp_object_from_observable(
Expand All @@ -187,16 +191,17 @@ def _parse_artifact_observable(
self._maing_converter._populate_object_attributes(
artifact_object, mapping, getattr(artifact, field)
)
observable['used'][self.event_uuid] = True
misp_object = self._main_parser._add_misp_object(
artifact_object, artifact
)
observable.update({'used': True, 'misp_object': misp_object})
observable['misp_object'] = misp_object
return misp_object

def _parse_file_observable(
self, file_ref: str, malware: Malware) -> MISPObject:
observable = self.main_parser._observable[file_ref]
if observable['used']:
if observable['used'][self.event_uuid]:
return observable['misp_object']
_file = observable['observable']
file_object = self._main_converter._create_misp_object_from_observable(
Expand All @@ -214,14 +219,15 @@ def _parse_file_observable(
self._main_converter._populate_object_attributes(
file_object, mapping, getattr(_file, field)
)
observable['used'][self.event_uuid] = True
misp_object = self.main_parser._add_misp_object(file_object, _file)
observable.update({'used': True, 'misp_object': misp_object})
observable['misp_object'] = misp_object
return misp_object

def _parse_software_observable(
self, software_ref: str, malware: Malware) -> MISPObject:
observable = self.main_parser._observable[software_ref]
if observable['used']:
if observable['used'][self.event_uuid]:
return observable['misp_object']
software = observable['observable']
software_object = self._main_converter._create_misp_object_from_observable(
Expand All @@ -232,10 +238,11 @@ def _parse_software_observable(
self._main_converter._populate_object_attributes(
software_object, mapping, getattr(software, field)
)
observable['used'][self.event_uuid] = True
misp_object = self.main_parser._add_misp_object(
software_object, software
)
observable.update({'used': True, 'misp_object': misp_object})
observable['misp_object'] = misp_object
return misp_object


Expand Down
8 changes: 0 additions & 8 deletions misp_stix_converter/stix2misp/external_stix2_mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,10 +180,6 @@ class ExternalSTIX2toMISPMapping(STIX2toMISPMapping):
)

# MISP OBJECTS MAPPING
__attack_pattern_object_mapping = Mapping(
name=STIX2toMISPMapping.name_attribute(),
description=STIX2toMISPMapping.summary_attribute()
)
__course_of_action_object_mapping = Mapping(
name=STIX2toMISPMapping.name_attribute(),
description=STIX2toMISPMapping.description_attribute()
Expand Down Expand Up @@ -514,10 +510,6 @@ class ExternalSTIX2toMISPMapping(STIX2toMISPMapping):
def asn_pattern_mapping(cls, field) -> Union[dict, None]:
return cls.__asn_pattern_mapping.get(field)

@classmethod
def attack_pattern_object_mapping(cls, field) -> Union[dict, None]:
return cls.__attack_pattern_object_mapping.get(field)

@classmethod
def course_of_action_object_mapping(cls) -> dict:
return cls.__course_of_action_object_mapping
Expand Down
43 changes: 40 additions & 3 deletions misp_stix_converter/stix2misp/external_stix2_to_misp.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from .exceptions import (
InvalidSTIXPatternError, UnknownParsingFunctionError,
UnknownObservableMappingError, UnknownPatternMappingError,
UnknownPatternTypeError)
UnknownPatternTypeError, UnknownStixObjectTypeError)
from .external_stix2_mapping import ExternalSTIX2toMISPMapping
from .importparser import _INDICATOR_TYPING
from .converters import (
Expand All @@ -15,8 +15,8 @@
from .stix2_pattern_parser import STIX2PatternParser
from .stix2_to_misp import (
STIX2toMISPParser, _COURSE_OF_ACTION_TYPING, _GALAXY_OBJECTS_TYPING,
_IDENTITY_TYPING, _NETWORK_TRAFFIC_TYPING, _OBSERVED_DATA_TYPING,
_SDO_TYPING, _VULNERABILITY_TYPING)
_IDENTITY_TYPING, _NETWORK_TRAFFIC_TYPING, _OBSERVABLE_TYPING,
_OBSERVED_DATA_TYPING, _SDO_TYPING, _VULNERABILITY_TYPING)
from collections import defaultdict
from pymisp import MISPGalaxy, MISPGalaxyCluster, MISPObject
from stix2.v20.observables import (
Expand Down Expand Up @@ -144,6 +144,18 @@ def _set_malware_parser(self) -> ExternalSTIX2MalwareConverter:
self._malware_parser = ExternalSTIX2MalwareConverter(self)
return self._malware_parser

############################################################################
# STIX OBJECTS LOADING METHODS #
############################################################################

def _load_observable_object(self, observable: _OBSERVABLE_TYPING):
self._check_uuid(observable.id)
to_load = {'used': {}, 'observable': observable}
try:
self._observable[observable.id] = to_load
except AttributeError:
self._observable = {observable.id: to_load}

################################################################################
# MAIN STIX OBJECTS PARSING FUNCTIONS. #
################################################################################
Expand Down Expand Up @@ -214,6 +226,23 @@ def _handle_object_forcing(attributes: list, force_object: tuple) -> bool:
return True
return attributes[0]['object_relation'] in force_object

def _handle_object_refs(self, object_refs: list):
for object_ref in object_refs:
object_type = object_ref.split('--')[0]
if object_type in self._mapping.object_type_refs_to_skip():
continue
if object_type in self._mapping.observable_object_types():
if self._observable.get(object_ref) is not None:
observable = self._observable[object_ref]
if self.misp_event.uuid not in observable['used']:
observable['used'][self.misp_event.uuid] = False
try:
self._handle_object(object_type, object_ref)
except UnknownStixObjectTypeError as error:
self._unknown_stix_object_type_error(error)
except UnknownParsingFunctionError as error:
self._unknown_parsing_function_error(error)

def _handle_observables_mapping(self, observable_mapping: set) -> str:
"""
Simple Observable object types handling function.
Expand Down Expand Up @@ -458,6 +487,12 @@ def _parse_intrusion_set(self, intrusion_set_ref: str):
intrusion_set_ref
)

def _parse_loaded_features(self):
if hasattr(self, '_observable'):
for observable in self._observable.values():
observable['used'][self.misp_event.uuid] = False
super()._parse_loaded_features()

def _parse_location(self, location_ref: str):
"""
STIX 2.1 Location object parsing function. A geolocation MISP object is
Expand Down Expand Up @@ -768,11 +803,13 @@ def _create_attribute_from_single_observable_object(
def _fetch_observable_object_refs(
self, observed_data: _OBSERVED_DATA_TYPING):
for reference in observed_data.object_refs:
self._observable[reference]['used'][self.misp_event.uuid] = True
yield self._observable[reference]['observable']

def _fetch_observable_object_refs_with_id(
self, observed_data: _OBSERVED_DATA_TYPING):
for reference in observed_data.object_refs:
self._observable[reference]['used'][self.misp_event.uuid] = True
yield reference, self._observable[reference]['observable']

@staticmethod
Expand Down
55 changes: 29 additions & 26 deletions misp_stix_converter/stix2misp/importparser.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,9 @@ def _sanitise_sharing_group_id(
self._sharing_group_id_error(error)
return None

################################################################################
# PROPERTIES #
################################################################################
############################################################################
# PROPERTIES #
############################################################################

@property
def distribution(self) -> int:
Expand Down Expand Up @@ -123,15 +123,16 @@ def synonyms_path(self) -> Path:
def warnings(self) -> defaultdict:
return self.__warnings

################################################################################
# ERRORS AND WARNINGS HANDLING FUNCTIONS #
################################################################################
############################################################################
# ERRORS AND WARNINGS HANDLING METHODS #
############################################################################

def _attack_pattern_error(self, attack_pattern_id: str, exception: Exception):
def _attack_pattern_error(
self, attack_pattern_id: str, exception: Exception):
tb = self._parse_traceback(exception)
self.__errors[self._identifier].add(
f'Error with the Attack Pattern object with id {attack_pattern_id}'
f': {tb}'
'Error parsing the Attack Pattern object with id '
f'{attack_pattern_id}: {tb}'
)

def _attribute_from_pattern_parsing_error(self, indicator_id: str):
Expand All @@ -142,7 +143,7 @@ def _attribute_from_pattern_parsing_error(self, indicator_id: str):
def _course_of_action_error(
self, course_of_action_id: str, exception: Exception):
self.__errors[self._identifier].add(
'Error with the Course of Action object with id'
'Error parsing the Course of Action object with id'
f'{course_of_action_id}: {self._parse_traceback(exception)}'
)

Expand Down Expand Up @@ -172,31 +173,31 @@ def _hash_type_error(self, hash_type: str):
def _identity_error(self, identity_id: str, exception: Exception):
tb = self._parse_traceback(exception)
self.__errors[self._identifier].add(
f'Error with the Identity object with id {identity_id}: {tb}'
f'Error parsing the Identity object with id {identity_id}: {tb}'
)

def _indicator_error(self, indicator_id: str, exception: Exception):
tb = self._parse_traceback(exception)
self.__errors[self._identifier].add(
f'Error with the Indicator object with id {indicator_id}: {tb}'
f'Error parsing the Indicator object with id {indicator_id}: {tb}'
)

def _intrusion_set_error(self, intrusion_set_id: str, exception: Exception):
self.__errors[self._identifier].add(
f'Error with the Intrusion Set object with id {intrusion_set_id}'
f'Error parsing the Intrusion Set object with id {intrusion_set_id}'
f': {self._parse_traceback(exception)}'
)

def _location_error(self, location_id: str, exception: Exception):
tb = self._parse_traceback(exception)
self.__errors[self._identifier].add(
f'Error with the Location object with id {location_id}: {tb}'
f'Error parsing the Location object with id {location_id}: {tb}'
)

def _malware_error(self, malware_id: str, exception: Exception):
tb = self._parse_traceback(exception)
self.__errors[self._identifier].add(
f'Error with the Malware object with id {malware_id}: {tb}'
f'Error parsing the Malware object with id {malware_id}: {tb}'
)

def _no_converted_content_from_pattern_warning(
Expand Down Expand Up @@ -226,7 +227,7 @@ def _observable_mapping_error(

def _observed_data_error(self, observed_data_id: str, exception: Exception):
self.__errors[self._identifier].add(
f'Error with the Observed Data object with id {observed_data_id}'
f'Error parsing the Observed Data object with id {observed_data_id}'
f': {self._parse_traceback(exception)}'
)

Expand All @@ -242,14 +243,14 @@ def _sharing_group_id_error(self, exception: Exception):

def _threat_actor_error(self, threat_actor_id: str, exception: Exception):
self.__errors[self._identifier].add(
f'Error with the Threat Actor object with id {threat_actor_id}'
f'Error parsing the Threat Actor object with id {threat_actor_id}'
f': {self._parse_traceback(exception)}'
)

def _tool_error(self, tool_id: str, exception: Exception):
tb = self._parse_traceback(exception)
self.__errors[self._identifier].add(
f'Error with the Tool object with id {tool_id}: {tb}'
f'Error parsing the Tool object with id {tool_id}: {tb}'
)

def _unable_to_load_stix_object_type_error(self, object_type: str):
Expand Down Expand Up @@ -322,13 +323,13 @@ def _unmapped_pattern_warning(self, indicator_id: str, feature: str):

def _vulnerability_error(self, vulnerability_id: str, exception: Exception):
self.__errors[self._identifier].add(
f'Error with the Vulnerability object with id {vulnerability_id}'
f'Error parsing the Vulnerability object with id {vulnerability_id}'
f': {self._parse_traceback(exception)}'
)

################################################################################
# SYNONYMS TO GALAXY TAG NAMES MAPPING HANDLING FUNCTIONS. #
################################################################################
############################################################################
# SYNONYMS TO GALAXY TAG NAMES MAPPING HANDLING METHODS. #
############################################################################

def __galaxies_up_to_date(self) -> bool:
fingerprint_path = _DATA_PATH / 'synonymsToTagNames.fingerprint'
Expand Down Expand Up @@ -389,14 +390,16 @@ def __get_synonyms_mapping(self):
with open(self.synonyms_path, 'rt', encoding='utf-8') as f:
self.__synonyms_mapping = json.loads(f.read())

################################################################################
# UUID SANITATION HANDLING FUNCTIONS #
################################################################################
############################################################################
# UUID SANITATION HANDLING METHODS #
############################################################################

def _check_uuid(self, object_id: str):
object_uuid = self._extract_uuid(object_id)
if UUID(object_uuid).version not in _RFC_VERSIONS and object_uuid not in self.replacement_uuids:
self.replacement_uuids[object_uuid] = self._create_v5_uuid(object_uuid)
self.replacement_uuids[object_uuid] = self._create_v5_uuid(
object_uuid
)

@staticmethod
def _create_v5_uuid(value: str) -> UUID:
Expand Down
Loading

0 comments on commit 56179e0

Please sign in to comment.