import sys import subprocess from typing import Union, Optional from pathlib import Path from zlib import crc32 from Crypto.Cipher import AES from Crypto.Hash import CMAC from cryptography.hazmat.primitives.keywrap import aes_key_unwrap class Pak: def __init__(self, bgroupcert_sizes: dict, zgpriv_sizes: dict): self.mstar_key_bank_magic_id = b"Mstar.Key.Bank" self.secure_store_file_magic_id = b"MSTAR_SECURE_STORE_FILE_MAGIC_ID" self.bgroupcert_sizes = bgroupcert_sizes self.zgpriv_sizes = zgpriv_sizes def process_files(self, boot_file: Path, sedata_file: Path, playready_keys: dict) -> None: self.validate_files(boot_file, sedata_file) boot_key = self.get_boot_key(input_file=boot_file).hex() boot_keys = { "brand_unknown": { "MODEL_UNKNOWN": boot_key } } decryption_bgc_zgp = DecryptionBgcZgp( boot_keys=boot_keys, playready_keys=playready_keys ) chunks = self.get_chunks_from_sedata(input_file=sedata_file) for bgp_security_level, bgp_size in self.bgroupcert_sizes.items(): print(f"\nTrying get BGCert - Security Level: {bgp_security_level}") for chunk in chunks: data = self.prepare_data(chunk) bgroupcert_data = self.decrypt_bgroupcert(decryption_bgc_zgp, data, bgp_size) if not bgroupcert_data: continue print("Firmware:") print(f" - Key : {decryption_bgc_zgp.boot_key.hex()}") print(f" - Brand: {decryption_bgc_zgp.brand}") for chunk in chunks: data = self.prepare_data(chunk) zgpriv_data = self.decrypt_zgpriv(decryption_bgc_zgp, data, self.zgpriv_sizes[bgp_security_level]) if not zgpriv_data: continue code = self.process_decrypted_data(decryption_bgc_zgp, bgroupcert_data, zgpriv_data) if code == 0: break def get_boot_key(self, input_file: Path, offset: int = 16) -> bytes: data = DecryptionBgcZgp.get_file_data(input_file) index = data.find(self.mstar_key_bank_magic_id) if index == -1 or index < offset: raise ValueError("Error: Boot key not found or invalid position") return data[index - offset:index] def get_chunks_from_sedata(self, input_file: Path, min_size: int = 0, max_size: int = 2000) -> list[dict]: data = DecryptionBgcZgp.get_file_data(input_file) chunks = [] start_pos = 0 while True: pos = data.find(self.secure_store_file_magic_id, start_pos) if pos == -1: break header_start = max(0, pos - 32) header = data[header_start:pos] data_start = pos + len(self.secure_store_file_magic_id) next_pos = data.find(self.secure_store_file_magic_id, data_start) if next_pos == -1: data_end = len(data) else: data_end = next_pos extracted_data = data[data_start:data_end] total_size = len(header) + len(self.secure_store_file_magic_id) + len(extracted_data) if min_size <= total_size <= max_size: chunks.append({ "position": header_start if len(header) == 32 else pos, "header": header if len(header) == 32 else b"", "has_full_header": len(header) == 32, "data": extracted_data, "total_size": total_size }) start_pos = data_end return chunks def prepare_data(self, chunk: dict) -> bytes: magic_id = self.secure_store_file_magic_id header = chunk["header"] if chunk["has_full_header"] else b"" return header + magic_id + chunk["data"] def process_decrypted_data( self, decryption_bgc_zgp: "DecryptionBgcZgp", bgroupcert_data: bytes, zgpriv_data: bytes ) -> int: bgroupcert_tmp_path = Path("bgroupcert.decrypt_bgc_zgp_pak.tmp") zgpriv_tmp_path = Path("zgpriv_temp.decrypt_bgc_zgp_pak.tmp") code = 1 try: bgroupcert_tmp_path.write_bytes(bgroupcert_data) zgpriv_tmp_path.write_bytes(zgpriv_data) command = [ "pyplayready", "create-device", "-c", str(bgroupcert_tmp_path), "-k", str(zgpriv_tmp_path) ] result = subprocess.run( command, capture_output=True, text=True ) if result.returncode == 0: prd_path = Path(result.stderr.split("Saved to: ")[-1].strip()) folder_path = Path("playready_decryption_bgc_zgp_pak") / prd_path.stem if folder_path.exists() and any(item.is_file() for item in folder_path.iterdir()): raise ValueError(f"Error: This certificate already exists in {folder_path}") folder_path.mkdir(parents=True, exist_ok=True) bgroupcert_out_path = folder_path / "bgroupcert.dat" bgroupcert_out_path.write_bytes(bgroupcert_data) zgpriv_out_path = folder_path / "zgpriv.dat" zgpriv_out_path.write_bytes(zgpriv_data) prd_path.rename(folder_path / prd_path.name) security_level = decryption_bgc_zgp.get_security_level(bgroupcert_data) print("Decryption:") print(f" - BGroupCert Security level: {security_level}") print(f" - BGroupCert Path : {bgroupcert_out_path}") print(f" - ZGPriv Path : {zgpriv_out_path}") print(f" - PRD Path : {folder_path / prd_path.name}") print(f" - Files exported to : {folder_path.parent}") code = result.returncode finally: bgroupcert_tmp_path.unlink(missing_ok=True) zgpriv_tmp_path.unlink(missing_ok=True) return code @staticmethod def validate_files(boot_file: Path, sedata_file: Path) -> None: if not boot_file.exists(): raise ValueError(f"Error: Boot File not found -> {boot_file}") if not sedata_file.exists(): raise ValueError(f"Error: SeData File not found -> {sedata_file}") @staticmethod def decrypt_bgroupcert(decryption_bgc_zgp: "DecryptionBgcZgp", data: bytes, size: int): try: decrypted_bgroupcert_data = decryption_bgc_zgp.decrypt_aes_ecb( inp=data, size=size, is_bgroupcert=True ) return decryption_bgc_zgp.process_bgroupcert(data=decrypted_bgroupcert_data) except ValueError: return None @staticmethod def decrypt_zgpriv(decryption_bgc_zgp: "DecryptionBgcZgp", data: bytes, size: int): try: decrypted_zgpriv_data = decryption_bgc_zgp.decrypt_aes_ecb( inp=data, size=size ) return decryption_bgc_zgp.process_zgpriv(data=decrypted_zgpriv_data, is_sl3000=False) except ValueError: return None class DecryptionBgcZgp: def __init__(self, boot_keys: dict, playready_keys: dict): self.boot_keys = boot_keys self.playready_keys = playready_keys self.boot_key: Optional[bytes] = None self.brand: Optional[str] = None self.bgroupcert_start_magic = b"CHAI" self.bgroupcert_end_magic = b"\x93\xfa\xc5\xab" def decrypt_aes_ecb(self,inp: Union[Path, bytes],size: Optional[int] = None, is_bgroupcert: bool = False) -> bytes: if isinstance(inp, bytes): encrypted_data = inp else: encrypted_data = self.get_file_data(inp) for brand, keys in self.boot_keys.items(): for model, boot_key in keys.items(): if self.boot_key: key = self.boot_key else: key = self.hex_to_bytes(boot_key) cipher = AES.new(key, AES.MODE_ECB) if size: data = cipher.decrypt(encrypted_data[:size]) else: data = cipher.decrypt(encrypted_data) if is_bgroupcert and not self.bgroupcert_start_magic in data and not self.bgroupcert_end_magic in data: continue if is_bgroupcert: self.boot_key = key self.brand = brand return data raise ValueError( "Error: Failed to decrypt the bgroupcert. No valid AES key matched or the decrypted data is invalid." ) def decrypt_zgpriv(self, encrypted_data: bytes) -> bytes: transient_key = self.hex_to_bytes(self.playready_keys["porting_kit"]["transient"]) cmac = CMAC.new(transient_key, ciphermod=AES) intermediate_key = self.hex_to_bytes(self.playready_keys["porting_kit"]["intermediate"]) intermediate_data = ( b"\x01" + intermediate_key + # Intermediate key b"\x00" + b"\x00" * 16 + # Context b"\x00\x80" ) cmac.update(intermediate_data) wrapping_key = cmac.digest() wrapped_key = encrypted_data[:48] unwrapped_key = aes_key_unwrap(wrapping_key, wrapped_key) return unwrapped_key def process_zgpriv(self, data: bytes, is_sl3000: bool) -> bytes: content = self.remove_header(data) if is_sl3000: content = self.decrypt_zgpriv(encrypted_data=content) if self.brand == "lg" and len(data) == 128: zgpriv_data = content[64:-32] else: zgpriv_data = content[:32] return zgpriv_data def process_bgroupcert(self, data: bytes) -> bytes: content = self.remove_header(data) start_index = content.find(self.bgroupcert_start_magic) end_index = content.find(self.bgroupcert_end_magic) if start_index == -1: raise ValueError("Error: BGCert start magic sequence not found.") if end_index == -1: raise ValueError("Error: BGCert end magic sequence not found.") if end_index < start_index: raise ValueError("Error: BGCert end magic sequence found before the start magic sequence.") bgroupcert_data = content[start_index:end_index + len(self.bgroupcert_end_magic)] return bgroupcert_data @staticmethod def remove_header(data: bytes, header: Union[str, bytes] = "INNER_MSTAR_FILE") -> bytes: if isinstance(header, str): header = header.encode() header_index = data.find(header) if header_index == -1: return data content = data[header_index + len(header):] return content @staticmethod def hex_to_bytes(data: Union[str, bytes]) -> bytes: if isinstance(data, str): data = bytes.fromhex(data) return data @staticmethod def get_file_data(input_file: Path) -> bytes: with open(input_file, "rb") as f: data = f.read() return data @staticmethod def get_security_level(data: bytes) -> str: if b"SL3000" in data: security_level = "SL3000" elif b"SL2000" in data: security_level = "SL2000" elif b"SL150" in data: security_level = "SL150" else: security_level = "UNKNOWN" return security_level def process_files(self, bgroupcert_file: Path, zgpriv_file: Path): if not bgroupcert_file.exists(): raise ValueError(f"Error: BGCert File not found -> {bgroupcert_file}") if not zgpriv_file.exists(): raise ValueError(f"Error: ZGPriv File not found -> {zgpriv_file}") decrypted_bgroupcert_data = self.decrypt_aes_ecb( inp=bgroupcert_file, is_bgroupcert=True ) bgroupcert_data = self.process_bgroupcert( data=decrypted_bgroupcert_data ) print("Firmware:") print(f" - Key : {decryption_bgc_zgp.boot_key.hex()}") print(f" - Brand: {decryption_bgc_zgp.brand}") security_level = self.get_security_level(bgroupcert_data) decrypted_zgpriv_data = self.decrypt_aes_ecb( inp=zgpriv_file ) zgpriv_data = self.process_zgpriv( data=decrypted_zgpriv_data, is_sl3000=security_level == "SL3000" ) bgc_zgp_id = hex(crc32(bgroupcert_data + zgpriv_data))[2:] folder_name = f"playready_decryption_bgc_zgp_pak/{self.brand}_{security_level}_{bgc_zgp_id}" folder_path = Path(folder_name) if folder_path.exists(): raise ValueError(f"Error: This certificate already exists in {folder_path}") folder_path.mkdir(parents=True, exist_ok=True) bgroupcert_out_path = Path(folder_path / "bgroupcert.dat") bgroupcert_out_path.write_bytes(bgroupcert_data) zgpriv_out_path = Path(folder_path / "zgpriv.dat") zgpriv_out_path.write_bytes(zgpriv_data) command = [ "pyplayready", "create-device", "-c", str(bgroupcert_out_path), "-k", str(zgpriv_out_path), "-o", folder_path ] result = subprocess.run( command, capture_output=True, text=True ) prd_path = None if result.returncode == 0: prd_path = folder_path / Path(result.stderr.split("Saved to: ")[-1].strip()).name print("Decryption:") print(f" - BGroupCert Security level: {security_level}") print(f" - BGroupCert Path : {bgroupcert_out_path}") print(f" - ZGPriv Path : {zgpriv_out_path}") if prd_path: print(f" - PRD Path : {prd_path}") print(f" - Files exported to : {folder_path}") if __name__ == "__main__": if len(sys.argv) < 3: print("Usage:") print(" BGroupCert and ZGPriv") print(f" decrypt_bgc_zgp_pak.py ") print(" Pak") print(f" decrypt_bgc_zgp_pak.py ") sys.exit(1) boot_keys = { "vestel": { "MB130": "8981D083B3D53B3DF1AC529A70F244C0", "MB_VARIANT_1": "24490B4CC95F739CE34138478E47139E" }, "lg": { "HE_LCD_NC5U_AAADAIAA": "E33AB4C45C2570B8AD15A921F752DEB6", "HE_DTV_W21A_AFADATAA": "0007FF4154534D92FC55AA0FFF0110E0" } } playready_keys = { "porting_kit": { "transient": "8B222FFD1E76195659CF2703898C427F", "intermediate": "9CE93432C7D74016BA684763F801E136", } } # Pak / SeData bgroupcert_sizes = { "SL2000": 1760, "SL3000": 1440 } zgpriv_sizes = { "SL2000": 176, "SL3000": 176 } print("Process started...") if "boot" in sys.argv[1] and "sedata" in sys.argv[2]: pak = Pak(bgroupcert_sizes, zgpriv_sizes) boot_file = Path(sys.argv[1]) sedata_file = Path(sys.argv[2]) pak.process_files(boot_file, sedata_file, playready_keys) else: decryption_bgc_zgp = DecryptionBgcZgp( boot_keys=boot_keys, playready_keys=playready_keys ) bgroupcert_file = Path(sys.argv[1]) zgpriv_file = Path(sys.argv[2]) decryption_bgc_zgp.process_files(bgroupcert_file, zgpriv_file) print("\nProcess completed successfully.") sys.exit(0)