464 lines
16 KiB
Python
464 lines
16 KiB
Python
import sys
|
|
import subprocess
|
|
|
|
from typing import Union, Optional
|
|
from pathlib import Path
|
|
from zlib import crc32
|
|
|
|
from Crypto.Cipher import AES
|
|
from Crypto.Hash import CMAC
|
|
|
|
from cryptography.hazmat.primitives.keywrap import aes_key_unwrap
|
|
|
|
|
|
class Pak:
|
|
def __init__(self, bgroupcert_sizes: dict, zgpriv_sizes: dict):
|
|
self.mstar_key_bank_magic_id = b"Mstar.Key.Bank"
|
|
self.secure_store_file_magic_id = b"MSTAR_SECURE_STORE_FILE_MAGIC_ID"
|
|
self.bgroupcert_sizes = bgroupcert_sizes
|
|
self.zgpriv_sizes = zgpriv_sizes
|
|
|
|
def process_files(self, boot_file: Path, sedata_file: Path, playready_keys: dict) -> None:
|
|
self.validate_files(boot_file, sedata_file)
|
|
boot_key = self.get_boot_key(input_file=boot_file).hex()
|
|
|
|
boot_keys = {
|
|
"brand_unknown": {
|
|
"MODEL_UNKNOWN": boot_key
|
|
}
|
|
}
|
|
|
|
decryption_bgc_zgp = DecryptionBgcZgp(
|
|
boot_keys=boot_keys,
|
|
playready_keys=playready_keys
|
|
)
|
|
|
|
chunks = self.get_chunks_from_sedata(input_file=sedata_file)
|
|
|
|
for bgp_security_level, bgp_size in self.bgroupcert_sizes.items():
|
|
print(f"\nTrying get BGCert - Security Level: {bgp_security_level}")
|
|
for chunk in chunks:
|
|
data = self.prepare_data(chunk)
|
|
|
|
bgroupcert_data = self.decrypt_bgroupcert(decryption_bgc_zgp, data, bgp_size)
|
|
if not bgroupcert_data:
|
|
continue
|
|
|
|
print("Firmware:")
|
|
print(f" - Key : {decryption_bgc_zgp.boot_key.hex()}")
|
|
print(f" - Brand: {decryption_bgc_zgp.brand}")
|
|
|
|
for chunk in chunks:
|
|
data = self.prepare_data(chunk)
|
|
|
|
zgpriv_data = self.decrypt_zgpriv(decryption_bgc_zgp, data, self.zgpriv_sizes[bgp_security_level])
|
|
if not zgpriv_data:
|
|
continue
|
|
|
|
code = self.process_decrypted_data(decryption_bgc_zgp, bgroupcert_data, zgpriv_data)
|
|
if code == 0:
|
|
break
|
|
|
|
def get_boot_key(self, input_file: Path, offset: int = 16) -> bytes:
|
|
data = DecryptionBgcZgp.get_file_data(input_file)
|
|
|
|
index = data.find(self.mstar_key_bank_magic_id)
|
|
if index == -1 or index < offset:
|
|
raise ValueError("Error: Boot key not found or invalid position")
|
|
|
|
return data[index - offset:index]
|
|
|
|
def get_chunks_from_sedata(self, input_file: Path, min_size: int = 0, max_size: int = 2000) -> list[dict]:
|
|
data = DecryptionBgcZgp.get_file_data(input_file)
|
|
|
|
chunks = []
|
|
start_pos = 0
|
|
|
|
while True:
|
|
pos = data.find(self.secure_store_file_magic_id, start_pos)
|
|
if pos == -1:
|
|
break
|
|
|
|
header_start = max(0, pos - 32)
|
|
|
|
header = data[header_start:pos]
|
|
|
|
data_start = pos + len(self.secure_store_file_magic_id)
|
|
|
|
next_pos = data.find(self.secure_store_file_magic_id, data_start)
|
|
if next_pos == -1:
|
|
data_end = len(data)
|
|
else:
|
|
data_end = next_pos
|
|
|
|
extracted_data = data[data_start:data_end]
|
|
total_size = len(header) + len(self.secure_store_file_magic_id) + len(extracted_data)
|
|
|
|
if min_size <= total_size <= max_size:
|
|
chunks.append({
|
|
"position": header_start if len(header) == 32 else pos,
|
|
"header": header if len(header) == 32 else b"",
|
|
"has_full_header": len(header) == 32,
|
|
"data": extracted_data,
|
|
"total_size": total_size
|
|
})
|
|
|
|
start_pos = data_end
|
|
|
|
return chunks
|
|
|
|
def prepare_data(self, chunk: dict) -> bytes:
|
|
magic_id = self.secure_store_file_magic_id
|
|
header = chunk["header"] if chunk["has_full_header"] else b""
|
|
return header + magic_id + chunk["data"]
|
|
|
|
def process_decrypted_data(
|
|
self, decryption_bgc_zgp: "DecryptionBgcZgp", bgroupcert_data: bytes, zgpriv_data: bytes
|
|
) -> int:
|
|
bgroupcert_tmp_path = Path("bgroupcert.decrypt_bgc_zgp_pak.tmp")
|
|
zgpriv_tmp_path = Path("zgpriv_temp.decrypt_bgc_zgp_pak.tmp")
|
|
|
|
code = 1
|
|
|
|
try:
|
|
bgroupcert_tmp_path.write_bytes(bgroupcert_data)
|
|
zgpriv_tmp_path.write_bytes(zgpriv_data)
|
|
|
|
command = [
|
|
"pyplayready",
|
|
"create-device",
|
|
"-c", str(bgroupcert_tmp_path),
|
|
"-k", str(zgpriv_tmp_path)
|
|
]
|
|
|
|
result = subprocess.run(
|
|
command,
|
|
capture_output=True,
|
|
text=True
|
|
)
|
|
|
|
if result.returncode == 0:
|
|
prd_path = Path(result.stderr.split("Saved to: ")[-1].strip())
|
|
folder_path = Path("playready_decryption_bgc_zgp_pak") / prd_path.stem
|
|
|
|
if folder_path.exists() and any(item.is_file() for item in folder_path.iterdir()):
|
|
raise ValueError(f"Error: This certificate already exists in {folder_path}")
|
|
|
|
folder_path.mkdir(parents=True, exist_ok=True)
|
|
|
|
bgroupcert_out_path = folder_path / "bgroupcert.dat"
|
|
bgroupcert_out_path.write_bytes(bgroupcert_data)
|
|
|
|
zgpriv_out_path = folder_path / "zgpriv.dat"
|
|
zgpriv_out_path.write_bytes(zgpriv_data)
|
|
|
|
prd_path.rename(folder_path / prd_path.name)
|
|
|
|
security_level = decryption_bgc_zgp.get_security_level(bgroupcert_data)
|
|
|
|
print("Decryption:")
|
|
print(f" - BGroupCert Security level: {security_level}")
|
|
print(f" - BGroupCert Path : {bgroupcert_out_path}")
|
|
print(f" - ZGPriv Path : {zgpriv_out_path}")
|
|
print(f" - PRD Path : {folder_path / prd_path.name}")
|
|
print(f" - Files exported to : {folder_path.parent}")
|
|
code = result.returncode
|
|
finally:
|
|
bgroupcert_tmp_path.unlink(missing_ok=True)
|
|
zgpriv_tmp_path.unlink(missing_ok=True)
|
|
return code
|
|
|
|
@staticmethod
|
|
def validate_files(boot_file: Path, sedata_file: Path) -> None:
|
|
if not boot_file.exists():
|
|
raise ValueError(f"Error: Boot File not found -> {boot_file}")
|
|
if not sedata_file.exists():
|
|
raise ValueError(f"Error: SeData File not found -> {sedata_file}")
|
|
|
|
@staticmethod
|
|
def decrypt_bgroupcert(decryption_bgc_zgp: "DecryptionBgcZgp", data: bytes, size: int):
|
|
try:
|
|
decrypted_bgroupcert_data = decryption_bgc_zgp.decrypt_aes_ecb(
|
|
inp=data,
|
|
size=size,
|
|
is_bgroupcert=True
|
|
)
|
|
return decryption_bgc_zgp.process_bgroupcert(data=decrypted_bgroupcert_data)
|
|
except ValueError:
|
|
return None
|
|
|
|
@staticmethod
|
|
def decrypt_zgpriv(decryption_bgc_zgp: "DecryptionBgcZgp", data: bytes, size: int):
|
|
try:
|
|
decrypted_zgpriv_data = decryption_bgc_zgp.decrypt_aes_ecb(
|
|
inp=data,
|
|
size=size
|
|
)
|
|
return decryption_bgc_zgp.process_zgpriv(data=decrypted_zgpriv_data, is_sl3000=False)
|
|
except ValueError:
|
|
return None
|
|
|
|
|
|
class DecryptionBgcZgp:
|
|
def __init__(self, boot_keys: dict, playready_keys: dict):
|
|
self.boot_keys = boot_keys
|
|
self.playready_keys = playready_keys
|
|
self.boot_key: Optional[bytes] = None
|
|
self.brand: Optional[str] = None
|
|
self.bgroupcert_start_magic = b"CHAI"
|
|
self.bgroupcert_end_magic = b"\x93\xfa\xc5\xab"
|
|
|
|
def decrypt_aes_ecb(self,inp: Union[Path, bytes],size: Optional[int] = None, is_bgroupcert: bool = False) -> bytes:
|
|
if isinstance(inp, bytes):
|
|
encrypted_data = inp
|
|
else:
|
|
encrypted_data = self.get_file_data(inp)
|
|
|
|
for brand, keys in self.boot_keys.items():
|
|
for model, boot_key in keys.items():
|
|
if self.boot_key:
|
|
key = self.boot_key
|
|
else:
|
|
key = self.hex_to_bytes(boot_key)
|
|
|
|
cipher = AES.new(key, AES.MODE_ECB)
|
|
|
|
if size:
|
|
data = cipher.decrypt(encrypted_data[:size])
|
|
else:
|
|
data = cipher.decrypt(encrypted_data)
|
|
|
|
if is_bgroupcert and not self.bgroupcert_start_magic in data and not self.bgroupcert_end_magic in data:
|
|
continue
|
|
|
|
if is_bgroupcert:
|
|
self.boot_key = key
|
|
self.brand = brand
|
|
|
|
return data
|
|
|
|
raise ValueError(
|
|
"Error: Failed to decrypt the bgroupcert. No valid AES key matched or the decrypted data is invalid."
|
|
)
|
|
|
|
def decrypt_zgpriv(self, encrypted_data: bytes) -> bytes:
|
|
transient_key = self.hex_to_bytes(self.playready_keys["porting_kit"]["transient"])
|
|
cmac = CMAC.new(transient_key, ciphermod=AES)
|
|
|
|
intermediate_key = self.hex_to_bytes(self.playready_keys["porting_kit"]["intermediate"])
|
|
intermediate_data = (
|
|
b"\x01" +
|
|
intermediate_key + # Intermediate key
|
|
b"\x00" +
|
|
b"\x00" * 16 + # Context
|
|
b"\x00\x80"
|
|
)
|
|
cmac.update(intermediate_data)
|
|
|
|
wrapping_key = cmac.digest()
|
|
|
|
wrapped_key = encrypted_data[:48]
|
|
unwrapped_key = aes_key_unwrap(wrapping_key, wrapped_key)
|
|
|
|
return unwrapped_key
|
|
|
|
def process_zgpriv(self, data: bytes, is_sl3000: bool) -> bytes:
|
|
content = self.remove_header(data)
|
|
|
|
if is_sl3000:
|
|
content = self.decrypt_zgpriv(encrypted_data=content)
|
|
|
|
if self.brand == "lg" and len(data) == 128:
|
|
zgpriv_data = content[64:-32]
|
|
else:
|
|
zgpriv_data = content[:32]
|
|
|
|
return zgpriv_data
|
|
|
|
def process_bgroupcert(self, data: bytes) -> bytes:
|
|
content = self.remove_header(data)
|
|
|
|
start_index = content.find(self.bgroupcert_start_magic)
|
|
end_index = content.find(self.bgroupcert_end_magic)
|
|
|
|
if start_index == -1:
|
|
raise ValueError("Error: BGCert start magic sequence not found.")
|
|
|
|
if end_index == -1:
|
|
raise ValueError("Error: BGCert end magic sequence not found.")
|
|
|
|
if end_index < start_index:
|
|
raise ValueError("Error: BGCert end magic sequence found before the start magic sequence.")
|
|
|
|
bgroupcert_data = content[start_index:end_index + len(self.bgroupcert_end_magic)]
|
|
|
|
return bgroupcert_data
|
|
|
|
@staticmethod
|
|
def remove_header(data: bytes, header: Union[str, bytes] = "INNER_MSTAR_FILE") -> bytes:
|
|
if isinstance(header, str):
|
|
header = header.encode()
|
|
|
|
header_index = data.find(header)
|
|
|
|
if header_index == -1:
|
|
return data
|
|
|
|
content = data[header_index + len(header):]
|
|
|
|
return content
|
|
|
|
@staticmethod
|
|
def hex_to_bytes(data: Union[str, bytes]) -> bytes:
|
|
if isinstance(data, str):
|
|
data = bytes.fromhex(data)
|
|
return data
|
|
|
|
@staticmethod
|
|
def get_file_data(input_file: Path) -> bytes:
|
|
with open(input_file, "rb") as f:
|
|
data = f.read()
|
|
return data
|
|
|
|
@staticmethod
|
|
def get_security_level(data: bytes) -> str:
|
|
if b"SL3000" in data:
|
|
security_level = "SL3000"
|
|
elif b"SL2000" in data:
|
|
security_level = "SL2000"
|
|
elif b"SL150" in data:
|
|
security_level = "SL150"
|
|
else:
|
|
security_level = "UNKNOWN"
|
|
return security_level
|
|
|
|
def process_files(self, bgroupcert_file: Path, zgpriv_file: Path):
|
|
if not bgroupcert_file.exists():
|
|
raise ValueError(f"Error: BGCert File not found -> {bgroupcert_file}")
|
|
|
|
if not zgpriv_file.exists():
|
|
raise ValueError(f"Error: ZGPriv File not found -> {zgpriv_file}")
|
|
|
|
decrypted_bgroupcert_data = self.decrypt_aes_ecb(
|
|
inp=bgroupcert_file,
|
|
is_bgroupcert=True
|
|
)
|
|
bgroupcert_data = self.process_bgroupcert(
|
|
data=decrypted_bgroupcert_data
|
|
)
|
|
|
|
print("Firmware:")
|
|
print(f" - Key : {decryption_bgc_zgp.boot_key.hex()}")
|
|
print(f" - Brand: {decryption_bgc_zgp.brand}")
|
|
|
|
security_level = self.get_security_level(bgroupcert_data)
|
|
|
|
decrypted_zgpriv_data = self.decrypt_aes_ecb(
|
|
inp=zgpriv_file
|
|
)
|
|
zgpriv_data = self.process_zgpriv(
|
|
data=decrypted_zgpriv_data,
|
|
is_sl3000=security_level == "SL3000"
|
|
)
|
|
|
|
bgc_zgp_id = hex(crc32(bgroupcert_data + zgpriv_data))[2:]
|
|
folder_name = f"playready_decryption_bgc_zgp_pak/{self.brand}_{security_level}_{bgc_zgp_id}"
|
|
folder_path = Path(folder_name)
|
|
|
|
if folder_path.exists():
|
|
raise ValueError(f"Error: This certificate already exists in {folder_path}")
|
|
|
|
folder_path.mkdir(parents=True, exist_ok=True)
|
|
|
|
bgroupcert_out_path = Path(folder_path / "bgroupcert.dat")
|
|
bgroupcert_out_path.write_bytes(bgroupcert_data)
|
|
|
|
zgpriv_out_path = Path(folder_path / "zgpriv.dat")
|
|
zgpriv_out_path.write_bytes(zgpriv_data)
|
|
|
|
command = [
|
|
"pyplayready", "create-device",
|
|
"-c", str(bgroupcert_out_path),
|
|
"-k", str(zgpriv_out_path),
|
|
"-o", folder_path
|
|
]
|
|
|
|
result = subprocess.run(
|
|
command,
|
|
capture_output=True,
|
|
text=True
|
|
)
|
|
|
|
prd_path = None
|
|
if result.returncode == 0:
|
|
prd_path = folder_path / Path(result.stderr.split("Saved to: ")[-1].strip()).name
|
|
|
|
print("Decryption:")
|
|
print(f" - BGroupCert Security level: {security_level}")
|
|
print(f" - BGroupCert Path : {bgroupcert_out_path}")
|
|
print(f" - ZGPriv Path : {zgpriv_out_path}")
|
|
if prd_path:
|
|
print(f" - PRD Path : {prd_path}")
|
|
print(f" - Files exported to : {folder_path}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
if len(sys.argv) < 3:
|
|
print("Usage:")
|
|
print(" BGroupCert and ZGPriv")
|
|
print(f" decrypt_bgc_zgp_pak.py <bgroupcert_file> <zgpriv_file>")
|
|
print(" Pak")
|
|
print(f" decrypt_bgc_zgp_pak.py <boot_file> <sedata_file>")
|
|
sys.exit(1)
|
|
|
|
boot_keys = {
|
|
"vestel": {
|
|
"MB130": "8981D083B3D53B3DF1AC529A70F244C0",
|
|
"MB_VARIANT_1": "24490B4CC95F739CE34138478E47139E"
|
|
},
|
|
"lg": {
|
|
"HE_LCD_NC5U_AAADAIAA": "E33AB4C45C2570B8AD15A921F752DEB6",
|
|
"HE_DTV_W21A_AFADATAA": "0007FF4154534D92FC55AA0FFF0110E0"
|
|
}
|
|
}
|
|
|
|
playready_keys = {
|
|
"porting_kit": {
|
|
"transient": "8B222FFD1E76195659CF2703898C427F",
|
|
"intermediate": "9CE93432C7D74016BA684763F801E136",
|
|
}
|
|
}
|
|
|
|
# Pak / SeData
|
|
bgroupcert_sizes = {
|
|
"SL2000": 1760,
|
|
"SL3000": 1440
|
|
}
|
|
zgpriv_sizes = {
|
|
"SL2000": 176,
|
|
"SL3000": 176
|
|
}
|
|
|
|
print("Process started...")
|
|
|
|
if "boot" in sys.argv[1] and "sedata" in sys.argv[2]:
|
|
pak = Pak(bgroupcert_sizes, zgpriv_sizes)
|
|
|
|
boot_file = Path(sys.argv[1])
|
|
sedata_file = Path(sys.argv[2])
|
|
|
|
pak.process_files(boot_file, sedata_file, playready_keys)
|
|
else:
|
|
decryption_bgc_zgp = DecryptionBgcZgp(
|
|
boot_keys=boot_keys,
|
|
playready_keys=playready_keys
|
|
)
|
|
|
|
bgroupcert_file = Path(sys.argv[1])
|
|
zgpriv_file = Path(sys.argv[2])
|
|
|
|
decryption_bgc_zgp.process_files(bgroupcert_file, zgpriv_file)
|
|
|
|
print("\nProcess completed successfully.")
|
|
|
|
sys.exit(0) |