Skip to content

Commit

Permalink
Partial implementation of support for DER (#3374)
Browse files Browse the repository at this point in the history
  • Loading branch information
ESultanik committed Jul 6, 2022
1 parent ab6ea03 commit fe4ad01
Show file tree
Hide file tree
Showing 3 changed files with 209 additions and 26 deletions.
190 changes: 180 additions & 10 deletions polyfile/der.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from abc import ABC, abstractmethod
from io import BytesIO
from pathlib import Path
from typing import IO, Optional, Type, TypeVar, Union
from typing import Dict, IO, Optional, Type, TypeVar, Union

from .kaitai.parser import KaitaiParser
from .kaitai.parsers.asn1_der import Asn1Der
Expand All @@ -15,16 +16,185 @@ def __init__(self, der: Asn1Der, parent: Optional["DERQuery"] = None):
self.der: Asn1Der = der # type: ignore
self.parent = parent

@property
def type_value(self) -> int:
if isinstance(self.der.type_tag, int):
return self.der.type_tag
else:
return self.der.type_tag.value

@property
def is_seq(self) -> bool:
return self.der.type_tag.value == 0x10 or self.der.type_tag.value == 0x30
return self.type_value == 0x10 or self.type_value == 0x30

@property
def is_set(self) -> bool:
return self.type_value == 0x31

@classmethod
def parse(cls: Type[C], data: Union[bytes, str, Path, BytesIO]) -> C:
ast = DER_PARSER.parse(data).ast
if isinstance(ast.obj, DER_PARSER.struct_type):
return DERQuery(ast.obj)
else:
raise ValueError("Error parsing DER")


T = TypeVar("T", bound="Tag")


class Tag(ABC):
TAGS: Dict[str, Type["Tag"]] = {}
name: str

def __init_subclass__(cls, **kwargs):
if not hasattr(cls, "name") or cls.name is None:
raise TypeError(f"DERTag subclass {cls.__name__} must define the `name` member variable")
elif cls.name in Tag.TAGS:
raise TypeError(f"DERTag name {cls.name} for {cls.__name__} is already assigned to "
f"{TAGS[cls.name].__name__}")
Tag.TAGS[cls.name] = cls

@classmethod
def parse(cls: Type[C], data: Union[bytes, str, Path, BytesIO]) -> Optional[C]:
try:
ast = DER_PARSER.parse(data).ast
if isinstance(ast.obj, DER_PARSER.struct_type):
return DERQuery(ast.obj)
except EOFError:
pass
return None
@abstractmethod
def with_value(cls: Type[T], value: str = "", number_modifier: Optional[int] = None) -> T:
raise NotImplementedError()

@abstractmethod
def test(self, query: DERQuery) -> DERQuery:
raise NotImplementedError()

@staticmethod
def parse(tag: str) -> "Tag":
original_tag = tag
equals_index = tag.find("=")
if equals_index > 0:
value = tag[equals_index+1:]
tag = tag[:equals_index]
else:
value = ""
if tag in Tag.TAGS:
return Tag.TAGS[tag].with_value(value)
# Does it have a number modifier?
number_modifier = ""
while tag and tag[-1].isnumeric():
number_modifier = f"{tag[-1]}{number_modifier}"
tag = tag[:-1]
if tag in Tag.TAGS:
return Tag.TAGS[tag].with_value(value, number_modifier=int(number_modifier))
if original_tag != tag:
raise ValueError(f"Unknown DER tag {tag!r} in {original_tag!r}")
else:
raise ValueError(f"Unknown DER tag {tag!r}")

class BareTagMixin:
@classmethod
def with_value(cls: Type[T], value: str = "", number_modifier: Optional[int] = None) -> T:
if value:
raise ValueError(f"{cls.name} is not expected to have a value")
elif number_modifier:
raise ValueError(f"{cls.name} is not expected to have a number modifier")
return cls()


class Seq(BareTagMixin, Tag):
name = "seq"

def test(self, query: DERQuery) -> DERQuery:
if not query.is_seq:
raise ValueError(f"{query.der.type_tag!r} is not a sequence type")
return DERQuery(query.der.body, parent=query)


class Set(BareTagMixin, Tag):
name = "set"

def test(self, query: DERQuery) -> DERQuery:
if not query.is_set:
raise ValueError(f"{query.der.type_tag!r} is not a set type")
return DERQuery(query.der.body, parent=query)


class Int(Tag):
name = "int"

def __init__(self, value: Optional[int], number_modifier: int):
self.value: Optional[int] = value
self.number_modifier: int = number_modifier

@property
def is_wildcard(self) -> bool:
return self.value is None

@classmethod
def with_value(cls: Type[T], value: str = "", number_modifier: Optional[int] = None) -> T:
if value.lower() == "x":
# wildcard
obj_value: Optional[int] = None
else:
obj_value = int(value, 16)
if number_modifier is None:
raise ValueError(f"{cls.name} requires a number modifier")
return cls(value, number_modifier)

def test(self, query: DERQuery) -> DERQuery:
raise NotImplementedError()


class ObjId(Int):
name = "obj_id"

def test(self, query: DERQuery) -> DERQuery:
raise NotImplementedError()


class WildcardOnlyMixin:
@classmethod
def with_value(cls: Type[T], value: str = "", number_modifier: Optional[int] = None) -> T:
if value and value.lower() != "x":
raise ValueError(f"invalid value for {cls.name}: {value!r}")
if number_modifier is not None:
raise ValueError(f"{cls.name} does not support a number modifier")
return cls()


class PrtStr(WildcardOnlyMixin, Tag):
name = "prt_str"

def test(self, query: DERQuery) -> DERQuery:
raise NotImplementedError()


class UTF8Str(WildcardOnlyMixin, Tag):
name = "utf8_str"

def test(self, query: DERQuery) -> DERQuery:
raise NotImplementedError()


class IA5Str(WildcardOnlyMixin, Tag):
name = "ia5_str"

def test(self, query: DERQuery) -> DERQuery:
raise NotImplementedError()


class Null(BareTagMixin, Tag):
name = "null"

def test(self, query: DERQuery) -> DERQuery:
raise NotImplementedError()


class UTCTime(WildcardOnlyMixin, Tag):
name = "utc_time"

def test(self, query: DERQuery) -> DERQuery:
raise NotImplementedError()


class EOC(BareTagMixin, Tag):
name = "eoc"

def test(self, query: DERQuery) -> DERQuery:
raise NotImplementedError()
1 change: 1 addition & 0 deletions polyfile/kaitaimatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
"application/x-python-code": "executable/python_pyc_27.ksy",
"application/x-shockwave-flash": "executable/swf.ksy",
"application/x-doom": "game/doom_wad.ksy",
"application/x-509": "serialization/asn1/asn1_der.ksy",
"image/x-dcx": "image/pcx_dcx.ksy",
"model/gltf-binary": "3d/gltf_binary.ksy",
"application/x-rpm": "archive/rpm.ksy",
Expand Down
44 changes: 28 additions & 16 deletions polyfile/magic.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from chardet.universaldetector import UniversalDetector

from .arithmetic import CStyleInt, make_c_style_int
from .der import DERQuery
from .der import DERQuery, Tag
from .iterators import LazyIterableSet
from .logger import getStatusLogger, TRACE

Expand Down Expand Up @@ -2053,7 +2053,7 @@ class DERTest(MagicTest):
def __init__(
self,
offset: Offset,
test_str: str,
tag: Tag,
mime: Optional[Union[str, TernaryExecutableMessage]] = None,
extensions: Iterable[str] = (),
message: Union[str, Message] = "",
Expand All @@ -2064,26 +2064,35 @@ def __init__(
mime = "application/x-509"
super().__init__(offset=offset, mime=mime, extensions=(".cer", ".crt"), message=message, parent=parent,
comments=comments)
self.test_str: str = test_str
self.tag: Tag = tag

def test(self, data: bytes, absolute_offset: int, parent_match: Optional[TestResult]) -> TestResult:
if parent_match is not None:
if not isinstance(parent_match, MatchedTest):
raise ValueError("A DERTest can only be run if its parent was a match!")
elif not isinstance(parent_match.value, DERQuery):
raise ValueError("A DERTest's parent match must be an instance of DERQuery!")
parent_query: DERQuery = parent_match.value
query: DERQuery = parent_match.value
else:
query = DERQuery.parse(data[absolute_offset:])
if query is not None:
return MatchedTest(self, query, absolute_offset, len(data) - absolute_offset, None)
# TODO:
return FailedTest(
self,
offset=absolute_offset,
parent=parent_match,
message=f"expected TODO!"
)
try:
query = DERQuery.parse(data[absolute_offset:])
except (EOFError, ValueError, Exception) as e:
return FailedTest(
self,
offset=absolute_offset,
parent=parent_match,
message=str(e)
)
try:
child_query = self.tag.test(query)
except ValueError as e:
return FailedTest(
self,
offset=absolute_offset,
parent=parent_match,
message=f"{self.tag!s}: {e!s}"
)
return MatchedTest(self, child_query, absolute_offset, len(data) - absolute_offset, parent_match)


class PlainTextTest(MagicTest):
Expand Down Expand Up @@ -2468,8 +2477,11 @@ def __init__(self):
late_binding=late_binding
)
elif data_type == "der":
# TODO: Update this as necessary once we fully implement the DERTest
test = DERTest(offset=offset, test_str=test_str, message=message, parent=parent)
try:
der_tag = Tag.parse(test_str)
except ValueError as e:
raise ValueError(f"{def_file!s} line {line_number}: {e!s}")
test = DERTest(offset=offset, tag=der_tag, message=message, parent=parent)
else:
try:
data_type = DataType.parse(data_type)
Expand Down

0 comments on commit fe4ad01

Please sign in to comment.