PlayReady/decrypt_bgc_zgp_pak.py
2026-02-09 07:29:17 +02:00

464 lines
16 KiB
Python

import sys
import subprocess
from typing import Union, Optional
from pathlib import Path
from zlib import crc32
from Crypto.Cipher import AES
from Crypto.Hash import CMAC
from cryptography.hazmat.primitives.keywrap import aes_key_unwrap
class Pak:
def __init__(self, bgroupcert_sizes: dict, zgpriv_sizes: dict):
self.mstar_key_bank_magic_id = b"Mstar.Key.Bank"
self.secure_store_file_magic_id = b"MSTAR_SECURE_STORE_FILE_MAGIC_ID"
self.bgroupcert_sizes = bgroupcert_sizes
self.zgpriv_sizes = zgpriv_sizes
def process_files(self, boot_file: Path, sedata_file: Path, playready_keys: dict) -> None:
self.validate_files(boot_file, sedata_file)
boot_key = self.get_boot_key(input_file=boot_file).hex()
boot_keys = {
"brand_unknown": {
"MODEL_UNKNOWN": boot_key
}
}
decryption_bgc_zgp = DecryptionBgcZgp(
boot_keys=boot_keys,
playready_keys=playready_keys
)
chunks = self.get_chunks_from_sedata(input_file=sedata_file)
for bgp_security_level, bgp_size in self.bgroupcert_sizes.items():
print(f"\nTrying get BGCert - Security Level: {bgp_security_level}")
for chunk in chunks:
data = self.prepare_data(chunk)
bgroupcert_data = self.decrypt_bgroupcert(decryption_bgc_zgp, data, bgp_size)
if not bgroupcert_data:
continue
print("Firmware:")
print(f" - Key : {decryption_bgc_zgp.boot_key.hex()}")
print(f" - Brand: {decryption_bgc_zgp.brand}")
for chunk in chunks:
data = self.prepare_data(chunk)
zgpriv_data = self.decrypt_zgpriv(decryption_bgc_zgp, data, self.zgpriv_sizes[bgp_security_level])
if not zgpriv_data:
continue
code = self.process_decrypted_data(decryption_bgc_zgp, bgroupcert_data, zgpriv_data)
if code == 0:
break
def get_boot_key(self, input_file: Path, offset: int = 16) -> bytes:
data = DecryptionBgcZgp.get_file_data(input_file)
index = data.find(self.mstar_key_bank_magic_id)
if index == -1 or index < offset:
raise ValueError("Error: Boot key not found or invalid position")
return data[index - offset:index]
def get_chunks_from_sedata(self, input_file: Path, min_size: int = 0, max_size: int = 2000) -> list[dict]:
data = DecryptionBgcZgp.get_file_data(input_file)
chunks = []
start_pos = 0
while True:
pos = data.find(self.secure_store_file_magic_id, start_pos)
if pos == -1:
break
header_start = max(0, pos - 32)
header = data[header_start:pos]
data_start = pos + len(self.secure_store_file_magic_id)
next_pos = data.find(self.secure_store_file_magic_id, data_start)
if next_pos == -1:
data_end = len(data)
else:
data_end = next_pos
extracted_data = data[data_start:data_end]
total_size = len(header) + len(self.secure_store_file_magic_id) + len(extracted_data)
if min_size <= total_size <= max_size:
chunks.append({
"position": header_start if len(header) == 32 else pos,
"header": header if len(header) == 32 else b"",
"has_full_header": len(header) == 32,
"data": extracted_data,
"total_size": total_size
})
start_pos = data_end
return chunks
def prepare_data(self, chunk: dict) -> bytes:
magic_id = self.secure_store_file_magic_id
header = chunk["header"] if chunk["has_full_header"] else b""
return header + magic_id + chunk["data"]
def process_decrypted_data(
self, decryption_bgc_zgp: "DecryptionBgcZgp", bgroupcert_data: bytes, zgpriv_data: bytes
) -> int:
bgroupcert_tmp_path = Path("bgroupcert.decrypt_bgc_zgp_pak.tmp")
zgpriv_tmp_path = Path("zgpriv_temp.decrypt_bgc_zgp_pak.tmp")
code = 1
try:
bgroupcert_tmp_path.write_bytes(bgroupcert_data)
zgpriv_tmp_path.write_bytes(zgpriv_data)
command = [
"pyplayready",
"create-device",
"-c", str(bgroupcert_tmp_path),
"-k", str(zgpriv_tmp_path)
]
result = subprocess.run(
command,
capture_output=True,
text=True
)
if result.returncode == 0:
prd_path = Path(result.stderr.split("Saved to: ")[-1].strip())
folder_path = Path("playready_decryption_bgc_zgp_pak") / prd_path.stem
if folder_path.exists() and any(item.is_file() for item in folder_path.iterdir()):
raise ValueError(f"Error: This certificate already exists in {folder_path}")
folder_path.mkdir(parents=True, exist_ok=True)
bgroupcert_out_path = folder_path / "bgroupcert.dat"
bgroupcert_out_path.write_bytes(bgroupcert_data)
zgpriv_out_path = folder_path / "zgpriv.dat"
zgpriv_out_path.write_bytes(zgpriv_data)
prd_path.rename(folder_path / prd_path.name)
security_level = decryption_bgc_zgp.get_security_level(bgroupcert_data)
print("Decryption:")
print(f" - BGroupCert Security level: {security_level}")
print(f" - BGroupCert Path : {bgroupcert_out_path}")
print(f" - ZGPriv Path : {zgpriv_out_path}")
print(f" - PRD Path : {folder_path / prd_path.name}")
print(f" - Files exported to : {folder_path.parent}")
code = result.returncode
finally:
bgroupcert_tmp_path.unlink(missing_ok=True)
zgpriv_tmp_path.unlink(missing_ok=True)
return code
@staticmethod
def validate_files(boot_file: Path, sedata_file: Path) -> None:
if not boot_file.exists():
raise ValueError(f"Error: Boot File not found -> {boot_file}")
if not sedata_file.exists():
raise ValueError(f"Error: SeData File not found -> {sedata_file}")
@staticmethod
def decrypt_bgroupcert(decryption_bgc_zgp: "DecryptionBgcZgp", data: bytes, size: int):
try:
decrypted_bgroupcert_data = decryption_bgc_zgp.decrypt_aes_ecb(
inp=data,
size=size,
is_bgroupcert=True
)
return decryption_bgc_zgp.process_bgroupcert(data=decrypted_bgroupcert_data)
except ValueError:
return None
@staticmethod
def decrypt_zgpriv(decryption_bgc_zgp: "DecryptionBgcZgp", data: bytes, size: int):
try:
decrypted_zgpriv_data = decryption_bgc_zgp.decrypt_aes_ecb(
inp=data,
size=size
)
return decryption_bgc_zgp.process_zgpriv(data=decrypted_zgpriv_data, is_sl3000=False)
except ValueError:
return None
class DecryptionBgcZgp:
def __init__(self, boot_keys: dict, playready_keys: dict):
self.boot_keys = boot_keys
self.playready_keys = playready_keys
self.boot_key: Optional[bytes] = None
self.brand: Optional[str] = None
self.bgroupcert_start_magic = b"CHAI"
self.bgroupcert_end_magic = b"\x93\xfa\xc5\xab"
def decrypt_aes_ecb(self,inp: Union[Path, bytes],size: Optional[int] = None, is_bgroupcert: bool = False) -> bytes:
if isinstance(inp, bytes):
encrypted_data = inp
else:
encrypted_data = self.get_file_data(inp)
for brand, keys in self.boot_keys.items():
for model, boot_key in keys.items():
if self.boot_key:
key = self.boot_key
else:
key = self.hex_to_bytes(boot_key)
cipher = AES.new(key, AES.MODE_ECB)
if size:
data = cipher.decrypt(encrypted_data[:size])
else:
data = cipher.decrypt(encrypted_data)
if is_bgroupcert and not self.bgroupcert_start_magic in data and not self.bgroupcert_end_magic in data:
continue
if is_bgroupcert:
self.boot_key = key
self.brand = brand
return data
raise ValueError(
"Error: Failed to decrypt the bgroupcert. No valid AES key matched or the decrypted data is invalid."
)
def decrypt_zgpriv(self, encrypted_data: bytes) -> bytes:
transient_key = self.hex_to_bytes(self.playready_keys["porting_kit"]["transient"])
cmac = CMAC.new(transient_key, ciphermod=AES)
intermediate_key = self.hex_to_bytes(self.playready_keys["porting_kit"]["intermediate"])
intermediate_data = (
b"\x01" +
intermediate_key + # Intermediate key
b"\x00" +
b"\x00" * 16 + # Context
b"\x00\x80"
)
cmac.update(intermediate_data)
wrapping_key = cmac.digest()
wrapped_key = encrypted_data[:48]
unwrapped_key = aes_key_unwrap(wrapping_key, wrapped_key)
return unwrapped_key
def process_zgpriv(self, data: bytes, is_sl3000: bool) -> bytes:
content = self.remove_header(data)
if is_sl3000:
content = self.decrypt_zgpriv(encrypted_data=content)
if self.brand == "lg" and len(data) == 128:
zgpriv_data = content[64:-32]
else:
zgpriv_data = content[:32]
return zgpriv_data
def process_bgroupcert(self, data: bytes) -> bytes:
content = self.remove_header(data)
start_index = content.find(self.bgroupcert_start_magic)
end_index = content.find(self.bgroupcert_end_magic)
if start_index == -1:
raise ValueError("Error: BGCert start magic sequence not found.")
if end_index == -1:
raise ValueError("Error: BGCert end magic sequence not found.")
if end_index < start_index:
raise ValueError("Error: BGCert end magic sequence found before the start magic sequence.")
bgroupcert_data = content[start_index:end_index + len(self.bgroupcert_end_magic)]
return bgroupcert_data
@staticmethod
def remove_header(data: bytes, header: Union[str, bytes] = "INNER_MSTAR_FILE") -> bytes:
if isinstance(header, str):
header = header.encode()
header_index = data.find(header)
if header_index == -1:
return data
content = data[header_index + len(header):]
return content
@staticmethod
def hex_to_bytes(data: Union[str, bytes]) -> bytes:
if isinstance(data, str):
data = bytes.fromhex(data)
return data
@staticmethod
def get_file_data(input_file: Path) -> bytes:
with open(input_file, "rb") as f:
data = f.read()
return data
@staticmethod
def get_security_level(data: bytes) -> str:
if b"SL3000" in data:
security_level = "SL3000"
elif b"SL2000" in data:
security_level = "SL2000"
elif b"SL150" in data:
security_level = "SL150"
else:
security_level = "UNKNOWN"
return security_level
def process_files(self, bgroupcert_file: Path, zgpriv_file: Path):
if not bgroupcert_file.exists():
raise ValueError(f"Error: BGCert File not found -> {bgroupcert_file}")
if not zgpriv_file.exists():
raise ValueError(f"Error: ZGPriv File not found -> {zgpriv_file}")
decrypted_bgroupcert_data = self.decrypt_aes_ecb(
inp=bgroupcert_file,
is_bgroupcert=True
)
bgroupcert_data = self.process_bgroupcert(
data=decrypted_bgroupcert_data
)
print("Firmware:")
print(f" - Key : {decryption_bgc_zgp.boot_key.hex()}")
print(f" - Brand: {decryption_bgc_zgp.brand}")
security_level = self.get_security_level(bgroupcert_data)
decrypted_zgpriv_data = self.decrypt_aes_ecb(
inp=zgpriv_file
)
zgpriv_data = self.process_zgpriv(
data=decrypted_zgpriv_data,
is_sl3000=security_level == "SL3000"
)
bgc_zgp_id = hex(crc32(bgroupcert_data + zgpriv_data))[2:]
folder_name = f"playready_decryption_bgc_zgp_pak/{self.brand}_{security_level}_{bgc_zgp_id}"
folder_path = Path(folder_name)
if folder_path.exists():
raise ValueError(f"Error: This certificate already exists in {folder_path}")
folder_path.mkdir(parents=True, exist_ok=True)
bgroupcert_out_path = Path(folder_path / "bgroupcert.dat")
bgroupcert_out_path.write_bytes(bgroupcert_data)
zgpriv_out_path = Path(folder_path / "zgpriv.dat")
zgpriv_out_path.write_bytes(zgpriv_data)
command = [
"pyplayready", "create-device",
"-c", str(bgroupcert_out_path),
"-k", str(zgpriv_out_path),
"-o", folder_path
]
result = subprocess.run(
command,
capture_output=True,
text=True
)
prd_path = None
if result.returncode == 0:
prd_path = folder_path / Path(result.stderr.split("Saved to: ")[-1].strip()).name
print("Decryption:")
print(f" - BGroupCert Security level: {security_level}")
print(f" - BGroupCert Path : {bgroupcert_out_path}")
print(f" - ZGPriv Path : {zgpriv_out_path}")
if prd_path:
print(f" - PRD Path : {prd_path}")
print(f" - Files exported to : {folder_path}")
if __name__ == "__main__":
if len(sys.argv) < 3:
print("Usage:")
print(" BGroupCert and ZGPriv")
print(f" decrypt_bgc_zgp_pak.py <bgroupcert_file> <zgpriv_file>")
print(" Pak")
print(f" decrypt_bgc_zgp_pak.py <boot_file> <sedata_file>")
sys.exit(1)
boot_keys = {
"vestel": {
"MB130": "8981D083B3D53B3DF1AC529A70F244C0",
"MB_VARIANT_1": "24490B4CC95F739CE34138478E47139E"
},
"lg": {
"HE_LCD_NC5U_AAADAIAA": "E33AB4C45C2570B8AD15A921F752DEB6",
"HE_DTV_W21A_AFADATAA": "0007FF4154534D92FC55AA0FFF0110E0"
}
}
playready_keys = {
"porting_kit": {
"transient": "8B222FFD1E76195659CF2703898C427F",
"intermediate": "9CE93432C7D74016BA684763F801E136",
}
}
# Pak / SeData
bgroupcert_sizes = {
"SL2000": 1760,
"SL3000": 1440
}
zgpriv_sizes = {
"SL2000": 176,
"SL3000": 176
}
print("Process started...")
if "boot" in sys.argv[1] and "sedata" in sys.argv[2]:
pak = Pak(bgroupcert_sizes, zgpriv_sizes)
boot_file = Path(sys.argv[1])
sedata_file = Path(sys.argv[2])
pak.process_files(boot_file, sedata_file, playready_keys)
else:
decryption_bgc_zgp = DecryptionBgcZgp(
boot_keys=boot_keys,
playready_keys=playready_keys
)
bgroupcert_file = Path(sys.argv[1])
zgpriv_file = Path(sys.argv[2])
decryption_bgc_zgp.process_files(bgroupcert_file, zgpriv_file)
print("\nProcess completed successfully.")
sys.exit(0)