From 1fc70259f21e83a200f408785b6dcc84087f65ad Mon Sep 17 00:00:00 2001 From: Anthony Delosa Date: Fri, 5 Apr 2024 18:10:01 +1000 Subject: [PATCH] Print IPM diagnostics to mci_ipm_to_csv if exception occurs --- cardutil/cli/mci_ipm_to_csv.py | 22 ++++- cardutil/iso8583.py | 5 +- cardutil/mciipm.py | 96 +++++++++++++++++++++- tests/cli/test_mci_ipm_encode.py | 8 +- tests/cli/test_mci_ipm_to_csv.py | 71 ++++++++++++++-- tests/test_mciipm.py | 134 ++++++++++++++++++++++++++++++- 6 files changed, 317 insertions(+), 19 deletions(-) diff --git a/cardutil/cli/mci_ipm_to_csv.py b/cardutil/cli/mci_ipm_to_csv.py index 8005278..0ff6c36 100644 --- a/cardutil/cli/mci_ipm_to_csv.py +++ b/cardutil/cli/mci_ipm_to_csv.py @@ -4,13 +4,28 @@ import logging from cardutil.cli import add_version, get_config, print_banner, print_exception_details -from cardutil.mciipm import IpmReader, MciIpmDataError +from cardutil.mciipm import IpmReader, MciIpmDataError, ipm_info def cli_entry(): return cli_run(**vars(cli_parser().parse_args())) +def print_check_details(in_ipm_info): + """ + Print diagnostic information based on ipm_info + """ + print("IPM file diagnostics:") + if not in_ipm_info["isValidIPM"]: + print("The file does not appear to be in the correct format") + print(f"Reason: {in_ipm_info['reason']}") + return + print("The file seems to be valid based on analysis of the file") + print("The following parameters were detected") + print(f"File encoding: {in_ipm_info['encoding']}") + print(f"1014 blocking: {in_ipm_info['isBlocked']}") + + def cli_run(**kwargs): print_banner('mci_ipm_to_csv', kwargs) @@ -23,12 +38,17 @@ def cli_run(**kwargs): if not kwargs.get('out_filename'): kwargs['out_filename'] = kwargs['in_filename'] + '.csv' + # check ipm details + with open(kwargs['in_filename'], 'rb') as in_ipm: + in_ipm_info = ipm_info(in_ipm) + try: with open(kwargs['in_filename'], 'rb') as in_ipm: with open(kwargs['out_filename'], 'w', encoding=kwargs.get('out_encoding')) as out_csv: mci_ipm_to_csv(in_ipm=in_ipm, out_csv=out_csv, config=config, **kwargs) except MciIpmDataError as err: print_exception_details(err) + print_check_details(in_ipm_info) return -1 diff --git a/cardutil/iso8583.py b/cardutil/iso8583.py index cb1388d..e9d43fd 100644 --- a/cardutil/iso8583.py +++ b/cardutil/iso8583.py @@ -179,9 +179,9 @@ def _iso8583_to_dict(message, bit_config, encoding=DEFAULT_ENCODING, hex_bitmap= # add the message type try: return_values["MTI"] = message_type_indicator.decode(encoding) - except UnicodeError as ex: + int(return_values["MTI"]) # check that mti is number + except (ValueError, UnicodeError) as ex: raise Iso8583DataError('Failed decoding MTI field', binary_context_data=message, original_exception=ex) - message_pointer = 0 bitmap_list = _get_bitmap_list(binary_bitmap) @@ -578,6 +578,7 @@ def _icc_to_dict(field_data): break field_length_raw = field_data[field_pointer:field_pointer+1] + LOGGER.debug(f"{field_length_raw=}") field_length = struct.unpack(">B", field_length_raw)[0] LOGGER.debug("%s", format(field_tag_display)) diff --git a/cardutil/mciipm.py b/cardutil/mciipm.py index b24f16c..801be16 100644 --- a/cardutil/mciipm.py +++ b/cardutil/mciipm.py @@ -93,7 +93,8 @@ import struct import typing -from cardutil import iso8583, config, CardutilError +from cardutil import iso8583, config, CardutilError, BitArray +from cardutil.vendor import hexdump LOGGER = logging.getLogger(__name__) @@ -270,7 +271,7 @@ def __next__(self) -> bytes: f' got {len(record_length_raw)} -- assuming end of data') raise StopIteration - record_length = struct.unpack(">i", record_length_raw)[0] + record_length = struct.unpack(">I", record_length_raw)[0] LOGGER.debug("record_length=%s", record_length) # throw mcipm data error if length is negative or excessively large (indicates bad input) @@ -483,7 +484,7 @@ def write(self, record: bytes) -> None: # get the length of the record record_length = len(record) # convert length to binary - record_length_raw = struct.pack(">i", record_length) + record_length_raw = struct.pack(">I", record_length) # add length to output data self.out_file.write(record_length_raw) # add data to output @@ -506,7 +507,7 @@ def close(self) -> None: :return: None """ # add zero length to end of record - self.out_file.write(struct.pack(">i", 0)) + self.out_file.write(struct.pack(">I", 0)) self.out_file.seek(0) def __enter__(self, *args, **kwargs): @@ -658,6 +659,93 @@ def vbs_bytes_to_list(vbs_bytes: bytes, **kwargs) -> list: return [record for record in VbsReader(file_in, **kwargs)] +def ipm_info(input_data: typing.BinaryIO) -> dict: + """ + Use this function to inspect an IPM file and provide details + :param input_data: The file like object of IPM data + :return: a dictionary containing file information + { + "isValidIPM": True, + "reason": "If not valid, describes the reason" + "isBlocked": True, + "encoding": "latin1", + } + """ + output = {"isValidIPM": False} + + # get first 2500 bytes to perform analysis + sample_data = input_data.read(2500) + + # if data less than 20 bytes then can't be valid + if len(sample_data) < 24: + output["reason"] = "File does not have sufficient data to be valid" + return output + + # check that the first 4 bytes contain a valid length + # large lengths indicate file issues + length_bytes = sample_data[:4] + record_length = struct.unpack(">I", length_bytes)[0] + if record_length > 1000: + output["reason"] = f"First IPM record has large record size ({record_length}) which usually indicates a file issue" + return output + + # check the bitmap to make sure it has a valid bit config + bitmap_ok, reason = bitmap_check(sample_data[8:24]) + if not bitmap_ok: + output["reason"] = reason + return output + + output["isBlocked"] = block_1014_check(sample_data) + output["encoding"] = encoding_check(sample_data[4:8]) + output["isValidIPM"] = True + + return output + + +def block_1014_check(sample_data): + # Blocked files should be blocked out to 1014 at a minimum. + # Going to work with first 1014 bytes of the file + if len(sample_data) < 1014: + return False + + # if the last two bytes of stream is x40x40, the probably blocked. + # go and get the next 2 just to be sure + first_1014 = sample_data[0:1014] + if first_1014[-2:] == Block1014.PAD_CHAR * 2: + if len(sample_data) == 1014: + return True + if len(sample_data) == 2028 and sample_data[-2:] == Block1014.PAD_CHAR * 2: + return True + return False + + +def bitmap_check(bitmap: bytes) -> (bool, str): + LOGGER.debug(hexdump.hexdump(bitmap,result='return')) + bitarray = BitArray.BitArray() + bitarray.frombytes(bitmap) + bits = bitarray.tolist() + for bit, bit_value in enumerate(bits): + if bit == 0: # bit 1 does not have config + continue + if bit_value: + if str(bit+1) not in config.config['bit_config']: + return False, f"Bitmap uses DE{bit+1} which is not used in IPM" + return True, None + + +def encoding_check(mti: bytes) -> str: + """ + This function will check if an MTI in record looks + like ASCII based encoding or EBCDIC encoding. + This is a very basic encoding check. + """ + if mti.decode('latin1').isnumeric(): + return 'latin1' + if mti.decode('cp037').isnumeric(): + return 'cp037' + return 'unknown' + + if __name__ == '__main__': import doctest doctest.testmod() diff --git a/tests/cli/test_mci_ipm_encode.py b/tests/cli/test_mci_ipm_encode.py index ad3d6f5..31fbc2d 100644 --- a/tests/cli/test_mci_ipm_encode.py +++ b/tests/cli/test_mci_ipm_encode.py @@ -42,12 +42,16 @@ def test_mci_ipm_encode_cli_parser(self): def test_mci_ipm_encode_input_params(self): """ Run mci_ipm_encode using real files + Default input encoding is cp500 """ # create an ipm file with tempfile.NamedTemporaryFile(mode='wb', delete=False) as out_ipm: out_ipm.write( - b'\x00\x00\x00\x1a0100\x80\x00\x00\x00\x04\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' - b'n\x9cm\x9cl\x9c\x00\x00\x00\x00') + b'\x00\x00\x00\x1a' + b'\xf0\xf1\xf0\xf0' # mti(4) + b'\x80\x00\x00\x00\x04\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' # bitmap(16) + b'\xf0\xf1\xf0\xf0\xf0\xf0' # data(6) + ) in_ipm_name = out_ipm.name out_ipm.close() diff --git a/tests/cli/test_mci_ipm_to_csv.py b/tests/cli/test_mci_ipm_to_csv.py index 0d525a0..e0bda92 100644 --- a/tests/cli/test_mci_ipm_to_csv.py +++ b/tests/cli/test_mci_ipm_to_csv.py @@ -60,15 +60,20 @@ def test_ipm_to_csv_input_params(self): in_ipm_name = in_ipm.name print(in_ipm_name) in_ipm.close() - mci_ipm_to_csv.cli_run(in_filename=in_ipm_name, out_encoding='ascii') - mci_ipm_to_csv.cli_run(in_filename=in_ipm_name, out_filename=in_ipm_name + '.csv', out_encoding='latin_1') + + result = mci_ipm_to_csv.cli_run(in_filename=in_ipm_name, out_encoding='ascii') + self.assertFalse(result) + + result = mci_ipm_to_csv.cli_run( + in_filename=in_ipm_name, out_filename=in_ipm_name + '.csv', out_encoding='latin_1') + self.assertFalse(result) # run with config file with tempfile.NamedTemporaryFile(mode='w+', delete=False) as in_config: config_filename = in_config.name in_config.write(CONFIG_DATA) in_config.close() - mci_ipm_to_csv.cli_run( + result = mci_ipm_to_csv.cli_run( in_filename=in_ipm_name, out_filename=in_ipm_name + '.csv', config_file=in_config.name, @@ -76,14 +81,18 @@ def test_ipm_to_csv_input_params(self): debug=True ) os.remove(config_filename) + + self.assertFalse(result) + with open(in_ipm_name + '.csv', 'r') as csv_data: csv_output = csv_data.read() + self.assertEqual(csv_output, "MTI,DE38\n0100,nXmXlX\n") os.remove(in_ipm_name) os.remove(in_ipm_name + '.csv') - def test_ipm_to_csv_generate_exception(self): + def test_ipm_to_csv_exception_max_reclen(self): """ Actually run using real files, and exception generated Triggered through negative RDW on second record -- invalid record length @@ -92,7 +101,54 @@ def test_ipm_to_csv_generate_exception(self): :return: """ in_ipm_data = (b'\x00\x00\x00\x1a0100\x80\x00\x00\x00\x04\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' - b'nXmXlX\xFF\xFF\x00\x00') + b'nXmXlX\xFF\xFF\xFF\xFF') + + with tempfile.NamedTemporaryFile(mode='w+b', delete=False) as in_ipm: + in_ipm.write(in_ipm_data) + in_ipm_name = in_ipm.name + print(in_ipm_name) + in_ipm.close() + + f = io.StringIO() + with contextlib.redirect_stdout(f): + result = mci_ipm_to_csv.cli_run(in_filename=in_ipm_name, out_encoding='ascii') + output = f.getvalue() #.splitlines() + os.remove(in_ipm_name) + os.remove(in_ipm_name + '.csv') + print(output) + self.assertEqual(-1, result) + assert output.splitlines()[4] == '*** ERROR - processing has stopped ***' + + def test_ipm_to_csv_exception_bad_encoding(self): + in_ipm_data = (b'\x00\x00\x00\x1a' # reclen + b'\xf0\xf1\xf0\xf0' # mti (cp037) + b'\x80\x00\x00\x00\x04\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + b'nXmXlX\xFF\xFF\x00\x00' + ) + + with tempfile.NamedTemporaryFile(mode='w+b', delete=False) as in_ipm: + in_ipm.write(in_ipm_data) + in_ipm_name = in_ipm.name + print(in_ipm_name) + in_ipm.close() + + f = io.StringIO() + with contextlib.redirect_stdout(f): + result = mci_ipm_to_csv.cli_run(in_filename=in_ipm_name, out_encoding='ascii') + output = f.getvalue().splitlines() + os.remove(in_ipm_name) + os.remove(in_ipm_name + '.csv') + print(output) + self.assertEqual(-1, result) + + def test_ipm_to_csv_exception_reclen_over_3000_bytes(self): + """ + Check that diagnostics shows that file is invalid and reason + """ + in_ipm_data = (b'\x00\x00\x0b\xb9' # reclen + b'0100' # mti + b'\x80\x00\x00\x00\x04\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + b'nXmXlX') # data with tempfile.NamedTemporaryFile(mode='w+b', delete=False) as in_ipm: in_ipm.write(in_ipm_data) @@ -102,13 +158,12 @@ def test_ipm_to_csv_generate_exception(self): f = io.StringIO() with contextlib.redirect_stdout(f): - mci_ipm_to_csv.cli_run(in_filename=in_ipm_name, out_encoding='ascii') + result = mci_ipm_to_csv.cli_run(in_filename=in_ipm_name, out_encoding='ascii') output = f.getvalue().splitlines() os.remove(in_ipm_name) os.remove(in_ipm_name + '.csv') + self.assertEqual(-1, result) print(output) - assert len(output) == 8 - assert output[4] == '*** ERROR - processing has stopped ***' if __name__ == '__main__': diff --git a/tests/test_mciipm.py b/tests/test_mciipm.py index cef7af9..fa3e6db 100644 --- a/tests/test_mciipm.py +++ b/tests/test_mciipm.py @@ -4,8 +4,7 @@ from cardutil import CardutilError from cardutil.mciipm import ( VbsWriter, VbsReader, IpmReader, IpmWriter, Block1014, Unblock1014, block_1014, unblock_1014, vbs_list_to_bytes, - vbs_bytes_to_list, IpmParamReader, MciIpmDataError) - + vbs_bytes_to_list, IpmParamReader, MciIpmDataError, ipm_info) from tests import message_ascii_raw, message_ebcdic_raw, print_stream @@ -342,5 +341,136 @@ def test_ipm_param_reader(self): print(record) +class MciIpmInfoTestCase(unittest.TestCase): + def test_ipm_info_filelength_less_than_24(self): + # shortest valid file + data = io.BytesIO( + b'\x00\x00\x00\x18' # record length(4) + b'1234' + # mti(4) + b'\x00' * 15 # bitmap(16) -1 so invalid + ) + info = ipm_info(data) + print(info) + assert info["isValidIPM"] is False + self.assertEqual('File does not have sufficient data to be valid', info['reason']) + + def test_ipm_info_first_record_length_too_long(self): + data = io.BytesIO(b'\x00x\x00\x03\x39' + (b' ' * 20)) + info = ipm_info(data) + print(info) + assert info["isValidIPM"] is False + self.assertEqual( + 'First IPM record has large record size (7864323) which usually indicates a file issue', + info['reason']) + + def test_ipm_info_bad_bitmap(self): + # bit 7 is not defined for Mastercard files. + # if present, would indicate a bad bitmap and file issue + data = io.BytesIO( + b'\x00\x00\x00\xFF' + # record length + b'1234' + # mti(4) + b'\x02' + (b'\x00' * 15)) # bitmap - invalid + info = ipm_info(data) + print(info) + assert info["isValidIPM"] is False + + def test_ipm_info_vbs_1013(self): + data = io.BytesIO( + b'\x00\x00\x00\xff' + # Length(4) + b'1234' + # mti(4) + b'\x70' + (b'\x00' * 15) + # Bitmap(16) - valid + (b' ' * 989) # data(989) + ) + info = ipm_info(data) + print(info) + assert info["isBlocked"] is False + + def test_ipm_info_blocked_1014(self): + data = io.BytesIO( + b'\x00\x00\x00\xff' + # length(4) + b'1234' + # mti(4) + b'\x70' + (b'\x00' * 15) + # Bitmap(16) - valid + (b' ' * 988) + # data(988) + b'\x40\x40' # block(2) + ) + info = ipm_info(data) + print(info) + assert info["isBlocked"] is True + + def test_ipm_info_vbs_1014(self): + data = io.BytesIO( + b'\x00\x00\x00\xff' + # Length(4) + b'1234' + # mti(4) + b'\x70' + (b'\x00' * 15) + # Bitmap(16) - valid + (b' ' * 992) # data(990) - no marker + ) + + info = ipm_info(data) + print(info) + assert info["isBlocked"] is False + + def test_ipm_info_blocked_2028(self): + data = io.BytesIO( + b'\x00\x00\x00\xff' + # length(4) + b'1234' + # mti(4) + b'\x70' + (b'\x00' * 15) + # Bitmap(16) + (b' ' * 988) + # data(988) + b'\x40\x40' + # block(2) + (b' ' * 1012) + b'\x40\x40' # record 2 with marker + ) + # data = io.BytesIO(b'\x00\x00\x00\xff' + (b' ' * 1008) + b'\x40\x40' + (b' ' * 1012) + b'\x40\x40') + info = ipm_info(data) + print(info) + assert info["isBlocked"] is True + + def test_ipm_info_vbs_2028(self): + data = io.BytesIO( + b'\x00\x00\x00\xff' + # length(4) + b'1234' + # mti(4) + b'\x70' + (b'\x00' * 15) + # Bitmap(16) + (b' ' * 990) + # data(988) + b'\x40\x40' + # block(2) + (b' ' * 1014) # record 2, no marker + ) + info = ipm_info(data) + print(info) + assert info["isBlocked"] is False + + def test_ipm_info_vbs_2027(self): + data = io.BytesIO( + b'\x00\x00\x00\xff' + # length(4) + b'1234' + # mti(4) + b'\x70' + (b'\x00' * 15) + # Bitmap(16) + (b' ' * 988) + # data(988) + b'\x40\x40' + # block(2) + (b' ' * 1013) # record 2 len 1013 + ) + info = ipm_info(data) + print(info) + assert info["isBlocked"] is False + + def test_ipm_info_ebcdic_encode(self): + data = io.BytesIO( + b'\x00\x00\x00\xff' + # Length(4) + '1234'.encode('cp037') + # mti(4) + b'\x70' + (b'\x00' * 15) + # Bitmap(16) - valid + (b' ' * 992) # data(992) - no marker + ) + info = ipm_info(data) + print(info) + assert info["encoding"] == 'cp037' + + def test_ipm_info_unknown_encode(self): + data = io.BytesIO( + b'\x00\x00\x00\xff' + # Length(4) + 'XXXX'.encode('cp037') + # mti(4) -- force encode issue + b'\x70' + (b'\x00' * 15) + # Bitmap(16) - valid + (b' ' * 992) # data(990) - no marker + ) + info = ipm_info(data) + print(info) + assert info["encoding"] == 'unknown' + + if __name__ == '__main__': unittest.main()