Skip to content

Commit

Permalink
Print IPM diagnostics to mci_ipm_to_csv if exception occurs
Browse files Browse the repository at this point in the history
  • Loading branch information
adelosa committed Apr 5, 2024
1 parent d3871d0 commit 1fc7025
Show file tree
Hide file tree
Showing 6 changed files with 317 additions and 19 deletions.
22 changes: 21 additions & 1 deletion cardutil/cli/mci_ipm_to_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,28 @@
import logging

from cardutil.cli import add_version, get_config, print_banner, print_exception_details
from cardutil.mciipm import IpmReader, MciIpmDataError
from cardutil.mciipm import IpmReader, MciIpmDataError, ipm_info


def cli_entry():
return cli_run(**vars(cli_parser().parse_args()))


def print_check_details(in_ipm_info):
"""
Print diagnostic information based on ipm_info
"""
print("IPM file diagnostics:")
if not in_ipm_info["isValidIPM"]:
print("The file does not appear to be in the correct format")
print(f"Reason: {in_ipm_info['reason']}")
return
print("The file seems to be valid based on analysis of the file")
print("The following parameters were detected")
print(f"File encoding: {in_ipm_info['encoding']}")
print(f"1014 blocking: {in_ipm_info['isBlocked']}")


def cli_run(**kwargs):

print_banner('mci_ipm_to_csv', kwargs)
Expand All @@ -23,12 +38,17 @@ def cli_run(**kwargs):
if not kwargs.get('out_filename'):
kwargs['out_filename'] = kwargs['in_filename'] + '.csv'

# check ipm details
with open(kwargs['in_filename'], 'rb') as in_ipm:
in_ipm_info = ipm_info(in_ipm)

try:
with open(kwargs['in_filename'], 'rb') as in_ipm:
with open(kwargs['out_filename'], 'w', encoding=kwargs.get('out_encoding')) as out_csv:
mci_ipm_to_csv(in_ipm=in_ipm, out_csv=out_csv, config=config, **kwargs)
except MciIpmDataError as err:
print_exception_details(err)
print_check_details(in_ipm_info)
return -1


Expand Down
5 changes: 3 additions & 2 deletions cardutil/iso8583.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,9 +179,9 @@ def _iso8583_to_dict(message, bit_config, encoding=DEFAULT_ENCODING, hex_bitmap=
# add the message type
try:
return_values["MTI"] = message_type_indicator.decode(encoding)
except UnicodeError as ex:
int(return_values["MTI"]) # check that mti is number
except (ValueError, UnicodeError) as ex:
raise Iso8583DataError('Failed decoding MTI field', binary_context_data=message, original_exception=ex)

message_pointer = 0
bitmap_list = _get_bitmap_list(binary_bitmap)

Expand Down Expand Up @@ -578,6 +578,7 @@ def _icc_to_dict(field_data):
break

field_length_raw = field_data[field_pointer:field_pointer+1]
LOGGER.debug(f"{field_length_raw=}")
field_length = struct.unpack(">B", field_length_raw)[0]

LOGGER.debug("%s", format(field_tag_display))
Expand Down
96 changes: 92 additions & 4 deletions cardutil/mciipm.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@
import struct
import typing

from cardutil import iso8583, config, CardutilError
from cardutil import iso8583, config, CardutilError, BitArray
from cardutil.vendor import hexdump

LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -270,7 +271,7 @@ def __next__(self) -> bytes:
f' got {len(record_length_raw)} -- assuming end of data')
raise StopIteration

record_length = struct.unpack(">i", record_length_raw)[0]
record_length = struct.unpack(">I", record_length_raw)[0]
LOGGER.debug("record_length=%s", record_length)

# throw mcipm data error if length is negative or excessively large (indicates bad input)
Expand Down Expand Up @@ -483,7 +484,7 @@ def write(self, record: bytes) -> None:
# get the length of the record
record_length = len(record)
# convert length to binary
record_length_raw = struct.pack(">i", record_length)
record_length_raw = struct.pack(">I", record_length)
# add length to output data
self.out_file.write(record_length_raw)
# add data to output
Expand All @@ -506,7 +507,7 @@ def close(self) -> None:
:return: None
"""
# add zero length to end of record
self.out_file.write(struct.pack(">i", 0))
self.out_file.write(struct.pack(">I", 0))
self.out_file.seek(0)

def __enter__(self, *args, **kwargs):
Expand Down Expand Up @@ -658,6 +659,93 @@ def vbs_bytes_to_list(vbs_bytes: bytes, **kwargs) -> list:
return [record for record in VbsReader(file_in, **kwargs)]


def ipm_info(input_data: typing.BinaryIO) -> dict:
"""
Use this function to inspect an IPM file and provide details
:param input_data: The file like object of IPM data
:return: a dictionary containing file information
{
"isValidIPM": True,
"reason": "If not valid, describes the reason"
"isBlocked": True,
"encoding": "latin1",
}
"""
output = {"isValidIPM": False}

# get first 2500 bytes to perform analysis
sample_data = input_data.read(2500)

# if data less than 20 bytes then can't be valid
if len(sample_data) < 24:
output["reason"] = "File does not have sufficient data to be valid"
return output

# check that the first 4 bytes contain a valid length
# large lengths indicate file issues
length_bytes = sample_data[:4]
record_length = struct.unpack(">I", length_bytes)[0]
if record_length > 1000:
output["reason"] = f"First IPM record has large record size ({record_length}) which usually indicates a file issue"
return output

# check the bitmap to make sure it has a valid bit config
bitmap_ok, reason = bitmap_check(sample_data[8:24])
if not bitmap_ok:
output["reason"] = reason
return output

output["isBlocked"] = block_1014_check(sample_data)
output["encoding"] = encoding_check(sample_data[4:8])
output["isValidIPM"] = True

return output


def block_1014_check(sample_data):
# Blocked files should be blocked out to 1014 at a minimum.
# Going to work with first 1014 bytes of the file
if len(sample_data) < 1014:
return False

# if the last two bytes of stream is x40x40, the probably blocked.
# go and get the next 2 just to be sure
first_1014 = sample_data[0:1014]
if first_1014[-2:] == Block1014.PAD_CHAR * 2:
if len(sample_data) == 1014:
return True
if len(sample_data) == 2028 and sample_data[-2:] == Block1014.PAD_CHAR * 2:
return True
return False


def bitmap_check(bitmap: bytes) -> (bool, str):
LOGGER.debug(hexdump.hexdump(bitmap,result='return'))
bitarray = BitArray.BitArray()
bitarray.frombytes(bitmap)
bits = bitarray.tolist()
for bit, bit_value in enumerate(bits):
if bit == 0: # bit 1 does not have config
continue
if bit_value:
if str(bit+1) not in config.config['bit_config']:
return False, f"Bitmap uses DE{bit+1} which is not used in IPM"
return True, None


def encoding_check(mti: bytes) -> str:
"""
This function will check if an MTI in record looks
like ASCII based encoding or EBCDIC encoding.
This is a very basic encoding check.
"""
if mti.decode('latin1').isnumeric():
return 'latin1'
if mti.decode('cp037').isnumeric():
return 'cp037'
return 'unknown'


if __name__ == '__main__':
import doctest
doctest.testmod()
8 changes: 6 additions & 2 deletions tests/cli/test_mci_ipm_encode.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,16 @@ def test_mci_ipm_encode_cli_parser(self):
def test_mci_ipm_encode_input_params(self):
"""
Run mci_ipm_encode using real files
Default input encoding is cp500
"""
# create an ipm file
with tempfile.NamedTemporaryFile(mode='wb', delete=False) as out_ipm:
out_ipm.write(
b'\x00\x00\x00\x1a0100\x80\x00\x00\x00\x04\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
b'n\x9cm\x9cl\x9c\x00\x00\x00\x00')
b'\x00\x00\x00\x1a'
b'\xf0\xf1\xf0\xf0' # mti(4)
b'\x80\x00\x00\x00\x04\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' # bitmap(16)
b'\xf0\xf1\xf0\xf0\xf0\xf0' # data(6)
)
in_ipm_name = out_ipm.name
out_ipm.close()

Expand Down
71 changes: 63 additions & 8 deletions tests/cli/test_mci_ipm_to_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,30 +60,39 @@ def test_ipm_to_csv_input_params(self):
in_ipm_name = in_ipm.name
print(in_ipm_name)
in_ipm.close()
mci_ipm_to_csv.cli_run(in_filename=in_ipm_name, out_encoding='ascii')
mci_ipm_to_csv.cli_run(in_filename=in_ipm_name, out_filename=in_ipm_name + '.csv', out_encoding='latin_1')

result = mci_ipm_to_csv.cli_run(in_filename=in_ipm_name, out_encoding='ascii')
self.assertFalse(result)

result = mci_ipm_to_csv.cli_run(
in_filename=in_ipm_name, out_filename=in_ipm_name + '.csv', out_encoding='latin_1')
self.assertFalse(result)

# run with config file
with tempfile.NamedTemporaryFile(mode='w+', delete=False) as in_config:
config_filename = in_config.name
in_config.write(CONFIG_DATA)
in_config.close()
mci_ipm_to_csv.cli_run(
result = mci_ipm_to_csv.cli_run(
in_filename=in_ipm_name,
out_filename=in_ipm_name + '.csv',
config_file=in_config.name,
out_encoding='latin_1',
debug=True
)
os.remove(config_filename)

self.assertFalse(result)

with open(in_ipm_name + '.csv', 'r') as csv_data:
csv_output = csv_data.read()

self.assertEqual(csv_output, "MTI,DE38\n0100,nXmXlX\n")

os.remove(in_ipm_name)
os.remove(in_ipm_name + '.csv')

def test_ipm_to_csv_generate_exception(self):
def test_ipm_to_csv_exception_max_reclen(self):
"""
Actually run using real files, and exception generated
Triggered through negative RDW on second record -- invalid record length
Expand All @@ -92,7 +101,54 @@ def test_ipm_to_csv_generate_exception(self):
:return:
"""
in_ipm_data = (b'\x00\x00\x00\x1a0100\x80\x00\x00\x00\x04\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
b'nXmXlX\xFF\xFF\x00\x00')
b'nXmXlX\xFF\xFF\xFF\xFF')

with tempfile.NamedTemporaryFile(mode='w+b', delete=False) as in_ipm:
in_ipm.write(in_ipm_data)
in_ipm_name = in_ipm.name
print(in_ipm_name)
in_ipm.close()

f = io.StringIO()
with contextlib.redirect_stdout(f):
result = mci_ipm_to_csv.cli_run(in_filename=in_ipm_name, out_encoding='ascii')
output = f.getvalue() #.splitlines()
os.remove(in_ipm_name)
os.remove(in_ipm_name + '.csv')
print(output)
self.assertEqual(-1, result)
assert output.splitlines()[4] == '*** ERROR - processing has stopped ***'

def test_ipm_to_csv_exception_bad_encoding(self):
in_ipm_data = (b'\x00\x00\x00\x1a' # reclen
b'\xf0\xf1\xf0\xf0' # mti (cp037)
b'\x80\x00\x00\x00\x04\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
b'nXmXlX\xFF\xFF\x00\x00'
)

with tempfile.NamedTemporaryFile(mode='w+b', delete=False) as in_ipm:
in_ipm.write(in_ipm_data)
in_ipm_name = in_ipm.name
print(in_ipm_name)
in_ipm.close()

f = io.StringIO()
with contextlib.redirect_stdout(f):
result = mci_ipm_to_csv.cli_run(in_filename=in_ipm_name, out_encoding='ascii')
output = f.getvalue().splitlines()
os.remove(in_ipm_name)
os.remove(in_ipm_name + '.csv')
print(output)
self.assertEqual(-1, result)

def test_ipm_to_csv_exception_reclen_over_3000_bytes(self):
"""
Check that diagnostics shows that file is invalid and reason
"""
in_ipm_data = (b'\x00\x00\x0b\xb9' # reclen
b'0100' # mti
b'\x80\x00\x00\x00\x04\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
b'nXmXlX') # data

with tempfile.NamedTemporaryFile(mode='w+b', delete=False) as in_ipm:
in_ipm.write(in_ipm_data)
Expand All @@ -102,13 +158,12 @@ def test_ipm_to_csv_generate_exception(self):

f = io.StringIO()
with contextlib.redirect_stdout(f):
mci_ipm_to_csv.cli_run(in_filename=in_ipm_name, out_encoding='ascii')
result = mci_ipm_to_csv.cli_run(in_filename=in_ipm_name, out_encoding='ascii')
output = f.getvalue().splitlines()
os.remove(in_ipm_name)
os.remove(in_ipm_name + '.csv')
self.assertEqual(-1, result)
print(output)
assert len(output) == 8
assert output[4] == '*** ERROR - processing has stopped ***'


if __name__ == '__main__':
Expand Down
Loading

0 comments on commit 1fc7025

Please sign in to comment.