From fa126ae082f82fcf4df28b5efcbf635a3700e3d5 Mon Sep 17 00:00:00 2001 From: "ADHSoft (pc)" <15692732+ADHSoft@users.noreply.github.com> Date: Mon, 8 Apr 2024 14:02:35 -0300 Subject: [PATCH] 0.95 --- bcolors.py | 13 ++ functions.py | 23 ++- keyFiles/disclaimer.txt | 2 +- main.py | 377 ++++++++++++++++++++++++---------------- myBytes.py | 13 +- my_lzo_dll/MakefileA | 3 + requirements.txt | 3 +- xipDecoder_strategy.py | 47 +++-- 8 files changed, 296 insertions(+), 185 deletions(-) create mode 100644 bcolors.py create mode 100644 my_lzo_dll/MakefileA diff --git a/bcolors.py b/bcolors.py new file mode 100644 index 0000000..5d7d7dc --- /dev/null +++ b/bcolors.py @@ -0,0 +1,13 @@ +HEADER = '\033[95m' +OKBLUE = '\033[94m' +OKCYAN = '\033[96m' +OKGREEN = '\033[92m' +WARNING = '\033[93m' +FAIL = '\033[91m' +ENDC = '\033[0m' +BOLD = '\033[1m' +UNDERLINE = '\033[4m' + +ERR = FAIL +WARN = WARNING +END = ENDC \ No newline at end of file diff --git a/functions.py b/functions.py index 5cb15d6..aac7923 100644 --- a/functions.py +++ b/functions.py @@ -1,12 +1,13 @@ from typing import List + +import numba import xipDecoder_strategy -def checksumA(a, b, c) -> bool: +def checksumA(a:int , b:int, c:int) -> bool: if c == 0: #if __debug__: print("Unimplemented border case") TODO? b ^= 1 - var1 = a * b - var1 &= 0xFF + var1 = (a * b) & 0xFF return var1 == c def japDeXor(input_: bytes, offset: int , strategy: xipDecoder_strategy.XipDecoderStrategy ) -> bytes: @@ -16,7 +17,7 @@ def japDeXor(input_: bytes, offset: int , strategy: xipDecoder_strategy.XipDecod output = bytes() for i in range( strategy.fileHeaderLength()): - offset %= len(xorKey) + offset &= 255 #len(xorKey) byteA = (xorKey[offset] ^ input_[i]).to_bytes(1, "big") output += bytearray(byteA) offset += 1 @@ -40,16 +41,14 @@ def deXorTxt(input_: bytes) -> bytes: if i > 0: output += input_[-i:] - return output + return output def deXorVisualClip(input_: bytes) -> bytes: - from xipDecoder_strategy import Crc32Table - key : List[int] = [Crc32Table()[i] for i in range(Crc32Table.size)] + key : bytes = xipDecoder_strategy.VcKey().get() offset = 0xED - output = input_[0:8] # preserve the first 8 bytes. - for i in range(8, len(input_) ): - offset %= len(key) - byteA = ((key[(offset) % 0x100] ^ input_[i]) & 0xFF).to_bytes(1, "big") - output += bytearray(byteA) + output = bytearray(input_[:]) + for i in range(8, len(input_) ):# preserve the first 8 bytes. + offset &= 255 #len(key) + output[i] = key[offset % 256] ^ input_[i] offset += 1 return output diff --git a/keyFiles/disclaimer.txt b/keyFiles/disclaimer.txt index faad96e..ce152aa 100644 --- a/keyFiles/disclaimer.txt +++ b/keyFiles/disclaimer.txt @@ -1 +1 @@ -txtKey is just a crc32 table in the wrong order. \ No newline at end of file +txtKey is like a crc32 table \ No newline at end of file diff --git a/main.py b/main.py index 0f1af6f..8775ebf 100644 --- a/main.py +++ b/main.py @@ -1,20 +1,21 @@ -import hashlib, logging, struct, sys -from pprint import pprint -from typing import List, Optional +import pstats +import logging, struct, sys, bcolors +from typing import Any, Dict, List, Optional -import tests, functions -from myBytes import Bytes +import functions import xipDecoder_strategy from xipDecoder_strategy import XipDecoderStrategy import os +import numba -'''decoded = (encoded **e) %m -decoded = (inputB ** input) % key -https://en.wikipedia.org/wiki/Modular_exponentiation#:~:text=)-,Pseudocode,-%5Bedit%5D -https://en.wikipedia.org/wiki/RSA_(cryptosystem)#:~:text=calculations%20can%20be-,computed%20efficiently,-using%20the%20square -''' +@numba.njit() def xipRsa2(inputB: int, input_: int, key: int) -> int: + '''decoded = (encoded **e) %m + decoded = (inputB ** input) % key + https://en.wikipedia.org/wiki/Modular_exponentiation#:~:text=)-,Pseudocode,-%5Bedit%5D + https://en.wikipedia.org/wiki/RSA_(cryptosystem)#:~:text=calculations%20can%20be-,computed%20efficiently,-using%20the%20square + ''' output = 0 while input_ != 0: if input_ % 2 == 1: @@ -24,7 +25,7 @@ def xipRsa2(inputB: int, input_: int, key: int) -> int: input_ //= 2 return output - +@numba.njit() def xipRsa1(input_: int, key2: int, key1: int) -> int: # input: 8Btyes. output: 4Bytes output = 1 @@ -35,25 +36,38 @@ def xipRsa1(input_: int, key2: int, key1: int) -> int: key2 //= 2 return output -def xipRsa(input_: bytes, keyIndex: int) -> bytes: - output = bytes() - with open("./keyFiles/key1", "rb") as file: - key1data=bytes(file.read()) - with open("./keyFiles/key2", "rb") as file: - key2data=bytes(file.read()) - size=len(input_) - assert size % 8 == 0 - size //= 8 - for i in range(size): # decrypt 8 bytes to 4 in each cycle. - input_a = struct.unpack_from(' bytes: + output = bytes() + assert RsaDecryptor.key1 is not None and RsaDecryptor.key2 is not None + + size=len(input_) + assert size % 8 == 0 + size //= 8 + for i in range(size): # decrypt 8 bytes to 4 in each cycle. + input_a = struct.unpack_from(' Dict[ Any, Any ] : + ''' + This extracts the file content, it's compatible with async and parallel procesing. + ''' + + self._size1 , self._size2 = struct.unpack_from('> 8 & 0xff) << 8 | A1 & 0xff) << 8 | A1 >> 0x18) << 8 | A1 >> 8 & 0xff + rsaDecryptedSize = (((A1 >> 0x10 & 0xff) << 8 | A2 >> 0x18) << 8 | A2 & 0xff) << 8 | A2 >> 0x10 & 0xff + #offsetToStartOverwriting = (cryptChunkSize - rsaDecryptedSize) + 8 + 0x11c + self.baseOffset # writeZoneStartAddr + + try: + off1=8 + 0x11c + self.baseOffset + self.decryptedData = RsaDecryptor().xipRsa( self.xipFile[off1:off1+rsaDecryptedSize*2] , self.cryptKeyIndex) + except Exception as e: + self.noErrors = False + raise e + + '''# TODO : there's this loop that could be executed (timesLoopX) times, for big files? + for (A2 = A2 >> 0x10 & 3; A2 != 0; A2 = A2 - 1) { + *(byte *)writeZoneAddr = *(byte *)RSADecryptedIncrPointer; + RSADecryptedIncrPointer = (dword *)((int)RSADecryptedIncrPointer + 1); + writeZoneAddr = (dword *)((int)writeZoneAddr + 1); + } + ''' + timesLoopX = (A2 >> 16) & 3 #(0,1,2,3) + if timesLoopX != 0: + raise NotImplementedError + + sizeOfEncryptedData: int = cryptChunkSize - if padding != 0: - from myBytes import printableBytes as pb - logging.debug(f"{padding=} ; {pb(self.unknown3)=}") - self.unopenable_or_unimplemented = True - return - else: - self.unopenable_or_unimplemented = False + unCryptDataOff = self.baseOffset + 0x011C + 8 + sizeOfEncryptedData - self.baseOffset += padding # TODO remove this hack + fileDataCompressed = self.decryptedData + self.xipFile [ unCryptDataOff : unCryptDataOff + self.fileBlockSize - 8 - sizeOfEncryptedData ] + #if __debug__: + # self.fileDataCompressed = fileDataCompressed + import lzo + # unfortunately, the line below could suddenly crash python.exe because it's a C dll which will crash if the output buffer allocated (for the expected decompressed size) gets overflowed (because of a badly formed data) , even in a try-except block. + dataout=lzo.decompress(fileDataCompressed , False, self.uncompressedSize, algorithm="LZO1X") + + return {"index": self.fileIndex, "error":False, "finalData": dataout} + + def extractFileAsyncComplete( self ): + enableCrcCheck:bool = False - # shuffling algorithm part: - try: - # TODO : this is only one of the many possible shufflings - cryptChunk: bytes = xipFile[self.baseOffset + 0x011C + 8 : (self.baseOffset + 0x011C + 8 + sizeOfEncryptedData)] - self.decryptedData = xipRsa(cryptChunk[:0x10], self.cryptKeyIndex) + cryptChunk[0x10:0x14] + xipRsa(cryptChunk[0x18:0x20], self.cryptKeyIndex+3) \ - + cryptChunk[0x20:0x24] + xipRsa(cryptChunk[0x28:0x50], self.cryptKeyIndex+5) - except Exception as e: - self.unopenable_or_unimplemented |= True # WIP - return - - offset1 = baseOffset + 0x011C + 8 + sizeOfEncryptedData - sizeRest = self.fileDataCompressedSize - 8 - 48 - if sizeRest > 0: - self.uncryptedData = xipFile[offset1:offset1 + sizeRest] + fileData = self.extractAsync() + if not self.noErrors: + raise Exception(f"{bcolors.ERR}Exception when decompressing '{self.fileName}'{bcolors.ENDC}") + + dataout = fileData["finalData"] + + # additonal decryption for some file types + if len(self.fileName.split("\\")[-1]) > 3: + extension: str = self.fileName.split("\\")[-1][-4:] + if extension in self.format_.maskedFileTypes_ConfigFile(): + dataout = functions.deXorTxt(dataout) + + crcOk: bool + if enableCrcCheck: + import zlib + crcCalculated : int = zlib.crc32(dataout) + crcOk = self.crc32 == crcCalculated else: - self.uncryptedData = bytes() + crcOk = True + + if len(self.fileName.split("\\")[-1]) > 3: + extension: str = self.fileName.split("\\")[-1][-4:] + if extension in self.format_.maskedFileTypes_VisualClip(): + dataout = functions.deXorVisualClip(dataout) + + return_ = {"index": self.fileIndex, "crcOk": crcOk, "finalData": dataout } + + return return_ - self.fileDataCompressed = self.decryptedData + self.uncryptedData @property def unknown1(self): - return self.fileDescriptor[0x8 : 0x0c] # 4 bytes, doesnt affect shuffling. + return self.fileDescriptor[0x8 : 0x0c] # 4 bytes @property def unknown2(self): - return self.fileDescriptor[0x114 : 0x118] # 4 bytes, doesnt affect shuffling. + return self.fileDescriptor[0x114 : 0x118] # 4 bytes @property def unknown3(self): - return self.fileDescriptor[0x119 : 0x11c ] # 3 bytes , affects shuffling algorithm. always xx 00 00 ? + return self.fileDescriptor[0x119 : 0x11c ] # 3 bytes + @property + def unknown3b(self): + return self.fileDescriptor[0x118 : 0x11c ] -def openXip(name:str): - with open(name, "rb") as xipFile: +def openXip(pakFilename:str , enableParallel:bool = False ) : + with open("./inputFiles/"+pakFilename, "rb") as xipFile: xipFile=bytes(xipFile.read()) if xipFile[0:3] != b"XIP": raise Exception("Not a valid XIP file.") @@ -217,6 +267,7 @@ def openXip(name:str): if key not in os.listdir("./keyFiles"): raise Exception(f"Can't execute extractor, missing key file: {key}") xipDecoder.environment_ok = True + if 'key' in locals(): del key offsetSecretA: bytes = xipFile[4:5]+xipFile[6:7]+xipFile[8:9]+xipFile[11:12] @@ -227,8 +278,8 @@ def openXip(name:str): secretA = xipFile[offsetSecretA:int(offsetSecretA)+24] if type(xipDecoder) == xipDecoder_strategy.Xip3Decoder: ... # TODO add 0x98989898 step - raise NotImplementedError("Not implemented yet." ) - secretA = xipRsa(secretA,0x0c) + raise NotImplementedError + secretA = RsaDecryptor().xipRsa(secretA,0x0c) fileOffset : int numberOfFiles : int @@ -240,65 +291,87 @@ def openXip(name:str): for fileNumber in range(numberOfFiles): if fileOffset == offsetSecretA: fileOffset += secretASkipSize - pf = PackagedFile(xipFile, fileOffset, (numberOfFiles - fileNumber), xipDecoder) - if pf.unopenable_or_unimplemented==True: - fileOffset += xipDecoder.fileHeaderLength() + pf.fileBlockSize - continue + pf = PackagedFile(xipFile, fileOffset, (numberOfFiles - fileNumber), xipDecoder, fileNumber) files.append(pf) + fileOffset += xipDecoder.fileHeaderLength() + pf.fileBlockSize + if not pf.noErrors : + logging.debug(f"{bcolors.ERR}Parsing failed for some file{bcolors.ENDC}") + continue + #logging.debug(pf.fileName) + + if listFiles := False: + for pf in files: + print(pf.fileName) + + if extractFiles := True : + pathsToMake = set() + for pf in files: + # make folder structure synchronously + path = pf.fileName.split("\\") + if any( [ p in [".", ".."] for p in path ] ): + print(f"This filename/path is not supported for security reasons: {pf.fileName}") # this is not optimal + continue + if len(path) > 1: + folder = path[:-1] + folder = "\\".join(folder) + pathsToMake.add(f"./outFiles/{folder}") + + for path in pathsToMake: + os.makedirs(path, exist_ok=True) - logging.debug(pf.fileName) + for i, pf in enumerate(files): + try: + if not pf.noErrors : + print(f"{bcolors.ERR}Exception when parsing '{pf.fileName}'{bcolors.ENDC}") + else: + result = pf.extractFileAsyncComplete() + finalPath = f"./outFiles/" + pf.fileName.replace('\\','/') + + os.access( os.path.dirname(finalPath), os.W_OK) - dataout : bytes = bytes() - - from myBytes import printableBytes as pb - #logging.debug(f"{pprint({a: b for a, b in vars(pf).items() if a in ['unknown1', 'unknown2', 'unknown3', 'unknownA1' , 'unknownA2' , 'xorParam', 'cryptKeyIndex']})}") - #logging.debug(f"{pb(pf.unknown1)=}") - #logging.debug(f"{pb(pf.unknown2)=}") - #logging.debug(f"{pb(pf.unknown3)=}") + with open( finalPath, "wb") as file: + file.write(result["finalData"]) + + print(f"{i+1}/{len(files)} files completed.", end="\r", flush=True) + + if not result["crcOk"]: + print(f"{bcolors.ERR}CRC32 check failed for '{pf.fileName}'{bcolors.ENDC}") + + except Exception as e: + print(f"{bcolors.ERR}Exception when decompressing '{pf.fileName}'{bcolors.ENDC}") + print(e) + print("") - import lzo - try: - # unfortunately, the line below could suddenly crash python.exe because it's a C dll which will crash if the output buffer allocated (for the expected decompressed size) gets overflowed (because of a badly formed data) , even in a try-except block. - dataout=lzo.decompress(pf.fileDataCompressed , False, pf.uncompressedSize, algorithm="LZO1X") - except Exception as e: - print("Decompression failed") - - - if extractFiles := True : - if dataout != bytes(): - - # additonal decryption for some file types - if len(pf.fileName.split("\\")[-1]) > 3: - extension: str = pf.fileName.split("\\")[-1][-4:] - if extension in xipDecoder.maskedFileTypes_ConfigFile(): - dataout = functions.deXorTxt(dataout) - elif extension in xipDecoder.maskedFileTypes_VisualClip(): - dataout = functions.deXorVisualClip(dataout) - - # make the folder structure - path = pf.fileName.split("\\") - if len(path) > 1: - folder = path[:-1] - folder = "\\".join(folder) - if "outFiles" not in os.listdir(): - os.makedirs("./outFiles") - os.makedirs(f"./outFiles/{folder}", exist_ok=True) - - with open("./outFiles/" + pf.fileName.replace('\\','/') , "wb") as file: - file.write(dataout) - - fileOffset += xipDecoder.fileHeaderLength() + pf.fileBlockSize + + print(f"Done {pakFilename}.") return files -def main(fileName : Optional[str] = None): - if fileName is None: - fileName = "System.pak" - logging.basicConfig(level=logging.DEBUG) +def main_profile(fileName : str = "./System.pak"): + import cProfile + + logging.basicConfig(level=logging.ERROR) + cProfile.run(f"openXip('{fileName}')", "profile_output.prof", sort="cumulative" ) + stats = pstats.Stats("profile_output.prof") + stats.sort_stats('cumulative') + stats.print_stats(30) + +def main(fileName : str = "./System.pak"): + + logging.basicConfig(level=logging.ERROR) openXip(fileName) +def openMany(): + files = os.listdir("./inputFiles/") + for file_ in files: + if len(file_) > 4 and file_[-4:] == ".pak": + print(f"Extracting {file_.split('/')[-1]}") + openXip(file_) + if __name__ == "__main__": import sys - main(sys.argv[1] if len(sys.argv) > 1 else None) + main(sys.argv[1] if len(sys.argv) > 1 else "./System.pak") + + #openMany() diff --git a/myBytes.py b/myBytes.py index cf9a165..068e9b0 100644 --- a/myBytes.py +++ b/myBytes.py @@ -52,12 +52,9 @@ def __index__(self): # this is to override __hex__ return int(self.value) -def printableBytes(a: bytes , add0x: bool = True) -> str: - data:str = str(hex(int.from_bytes(a, byteorder='big'))) - data = data.replace("0x", "") - if len(data) % 2 != 0: - data = "0" + data - if add0x: - return "0x" + data +def printableBytes(input_: bytes , add0x: bool = False) -> str: + data:str = input_.hex() + if not add0x: + return data else: - return data \ No newline at end of file + return "0x" + data \ No newline at end of file diff --git a/my_lzo_dll/MakefileA b/my_lzo_dll/MakefileA new file mode 100644 index 0000000..8dd1455 --- /dev/null +++ b/my_lzo_dll/MakefileA @@ -0,0 +1,3 @@ +#64/32 bit depending on the system 64/32 +gcc-mydll: + gcc $(CPPFLAGS) $(GCC_CFLAGS) -shared -o myLzoDll.dll minilzo.c myLzoDll.c \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 96bc15e..885af49 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,3 @@ python-lzo -attrs \ No newline at end of file +attrs +numba \ No newline at end of file diff --git a/xipDecoder_strategy.py b/xipDecoder_strategy.py index 5f51ae1..a55b53f 100644 --- a/xipDecoder_strategy.py +++ b/xipDecoder_strategy.py @@ -1,6 +1,6 @@ from abc import abstractmethod from functools import cache -from typing import List +from typing import List, Optional import japText @@ -71,16 +71,41 @@ def required_keys(self) -> List[str]: class Crc32Table: # a.k.a. "vcKey", 256 32bit numbers # class variables: size : int = 256 + table : Optional[List[int]] = None + + def __init__(self) -> None: + if Crc32Table.table is None: + Crc32Table.table = [0 for _ in range(Crc32Table.size)] + for i in range(Crc32Table.size): + crc = i + for _ in range(8): + crc = (crc >> 1) ^ (0xEDB88320 if crc & 1 else 0) + Crc32Table.table[i] = crc @staticmethod - @cache - def __index__(index: int) -> int: - crc: int = index - for _ in range(8): - if crc & 1: - crc = (crc >> 1) ^ 0xEDB88320 - else: - crc >>= 1 - return crc + def get(index:int) -> int: + if Crc32Table.table is None: + Crc32Table() + index &= 255 + assert Crc32Table.table is not None + return Crc32Table.table[index] + + def __index__(self, index: int) -> int: + return Crc32Table().get(index) + def __getitem__(self, index: int) -> int: - return self.__index__(index % Crc32Table.size) \ No newline at end of file + return Crc32Table().get(index) + +class VcKey: + key : Optional[bytes] = None + + @staticmethod + def get() -> bytes: + if VcKey.key is None: + VcKey() + assert VcKey.key is not None + return VcKey.key + + def __init__(self) -> None: + if VcKey.key is None: + VcKey.key = bytes(Crc32Table().get(i)&0xff for i in range(256)) \ No newline at end of file