forked from SuperUserek/MyDRMTools
487 lines
14 KiB
Python
487 lines
14 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
SuperUserek AutoKeybox Decoder (Auto AES Decrypt Tool)
|
|
|
|
What it does:
|
|
- Downloads + parses AES keys from multiple sources (default list below)
|
|
- Caches the merged, deduped key list to a local "keys.txt"
|
|
- Only updates the cache once per day (unless --force-update-keys)
|
|
- Optionally includes/only-uses a custom user-provided AES key
|
|
- Tries AES modes (ECB/CBC/CFB/OFB/CTR) + IV strategies
|
|
- Validates by searching for b"INNER_MSTAR"
|
|
- Extracts payload using CHAI/kbox logic and saves output
|
|
|
|
Requires:
|
|
pip install requests pycryptodome
|
|
"""
|
|
|
|
import argparse
|
|
import os
|
|
import re
|
|
from dataclasses import dataclass
|
|
from datetime import date
|
|
from typing import Dict, List, Optional, Tuple
|
|
|
|
import requests
|
|
from Crypto.Cipher import AES
|
|
from Crypto.Util.Padding import pad
|
|
from Crypto.Util import Counter
|
|
|
|
DEFAULT_KEYS_URLS = [
|
|
"https://raw.githubusercontent.com/openlgtv/epk2extract/refs/heads/master/keys/AES.key",
|
|
# IMPORTANT: RAW endpoint (not /src/branch/ which is HTML)
|
|
"https://git.drmlab.io/SuperUserek/MyDRMTools/raw/branch/main/AutoKeyboxDecoder/KnownKeys.txt",
|
|
]
|
|
|
|
MAGIC = b"INNER_MSTAR"
|
|
|
|
# Match exactly AES key sizes in hex: 16/24/32 bytes => 32/48/64 hex chars
|
|
HEX_KEY_RE = re.compile(
|
|
r"(?i)(?<![0-9a-f])(?:[0-9a-f]{32}|[0-9a-f]{48}|[0-9a-f]{64})(?![0-9a-f])"
|
|
)
|
|
|
|
|
|
def download_text(url: str, timeout: int = 15) -> str:
|
|
r = requests.get(url, timeout=timeout)
|
|
r.raise_for_status()
|
|
return r.text
|
|
|
|
|
|
def parse_keys(text: str) -> List[str]:
|
|
"""
|
|
Extract AES keys from arbitrary text.
|
|
|
|
Works for:
|
|
- AES.key style lines with comments
|
|
- KnownKeys.txt where keys can be 1-per-line or space-separated
|
|
|
|
It finds ALL 32/48/64-hex tokens in each (comment-stripped) line.
|
|
"""
|
|
keys: List[str] = []
|
|
for line in text.splitlines():
|
|
line = line.strip()
|
|
if not line or line.startswith("#"):
|
|
continue
|
|
|
|
# strip inline comments
|
|
line = line.split("#", 1)[0]
|
|
|
|
# find every key token on the line
|
|
for m in HEX_KEY_RE.findall(line):
|
|
keys.append(m.upper())
|
|
|
|
return keys
|
|
|
|
|
|
def normalize_custom_key(key_hex: str) -> str:
|
|
k = key_hex.strip()
|
|
if k.startswith(("0x", "0X")):
|
|
k = k[2:]
|
|
k = re.sub(r"\s+", "", k)
|
|
if not re.fullmatch(r"[0-9A-Fa-f]+", k):
|
|
raise ValueError("Custom key is not valid hex")
|
|
kb = bytes.fromhex(k)
|
|
if len(kb) not in (16, 24, 32):
|
|
raise ValueError(
|
|
f"Custom key must be 16/24/32 bytes (got {len(kb)} bytes = {len(k)} hex chars)"
|
|
)
|
|
return k.upper()
|
|
|
|
|
|
def group_keys_by_aes_size(keys_hex: List[str]) -> Dict[int, List[str]]:
|
|
grouped: Dict[int, List[str]] = {16: [], 24: [], 32: []}
|
|
for khex in keys_hex:
|
|
try:
|
|
kb = bytes.fromhex(khex)
|
|
except ValueError:
|
|
continue
|
|
if len(kb) in grouped:
|
|
grouped[len(kb)].append(khex.upper())
|
|
return grouped
|
|
|
|
|
|
def _ensure_block_multiple(data: bytes) -> bytes:
|
|
# Keep your original behavior: pad ciphertext to 16 then decrypt for block modes
|
|
if len(data) % 16 == 0:
|
|
return data
|
|
return pad(data, 16)
|
|
|
|
|
|
def extract_payload(dec_data: bytes) -> bytes:
|
|
"""
|
|
Your payload extraction logic (unchanged).
|
|
"""
|
|
payload = None
|
|
for offset in (64, 96):
|
|
candidate = dec_data[offset:]
|
|
if b"CHAI" in candidate or b"kbox" in candidate:
|
|
payload = candidate
|
|
break
|
|
|
|
if payload is None:
|
|
payload = dec_data[64:]
|
|
|
|
if b"CHAI" in payload:
|
|
out_data = payload
|
|
elif b"kbox" in payload:
|
|
out_data = payload[:128]
|
|
else:
|
|
out_data = payload[:32]
|
|
|
|
return out_data
|
|
|
|
|
|
@dataclass
|
|
class AttemptResult:
|
|
key_hex: str
|
|
mode_name: str
|
|
iv_used: Optional[bytes]
|
|
plaintext: bytes
|
|
|
|
|
|
def try_decrypt_with_key(encrypted: bytes, key_hex: str) -> List[AttemptResult]:
|
|
try:
|
|
key = bytes.fromhex(key_hex)
|
|
except ValueError:
|
|
return []
|
|
|
|
successes: List[AttemptResult] = []
|
|
|
|
# --- ECB ---
|
|
try:
|
|
data = _ensure_block_multiple(encrypted)
|
|
cipher = AES.new(key, AES.MODE_ECB)
|
|
dec = cipher.decrypt(data)
|
|
if MAGIC in dec:
|
|
successes.append(AttemptResult(key_hex, "ECB", None, dec))
|
|
except Exception:
|
|
pass
|
|
|
|
# --- IV candidates for CBC/CFB/OFB ---
|
|
iv_candidates: List[Tuple[str, bytes, bytes]] = []
|
|
iv_candidates.append(("IV_ZERO", b"\x00" * 16, encrypted))
|
|
|
|
if len(encrypted) >= 16:
|
|
iv_candidates.append(("IV_PREFIX16", encrypted[:16], encrypted[16:]))
|
|
|
|
# --- CBC ---
|
|
for iv_label, iv, ct in iv_candidates:
|
|
try:
|
|
ct2 = _ensure_block_multiple(ct)
|
|
cipher = AES.new(key, AES.MODE_CBC, iv=iv)
|
|
dec = cipher.decrypt(ct2)
|
|
if MAGIC in dec:
|
|
successes.append(AttemptResult(key_hex, f"CBC({iv_label})", iv, dec))
|
|
except Exception:
|
|
pass
|
|
|
|
# --- CFB (segment_size=128) ---
|
|
for iv_label, iv, ct in iv_candidates:
|
|
try:
|
|
cipher = AES.new(key, AES.MODE_CFB, iv=iv, segment_size=128)
|
|
dec = cipher.decrypt(ct)
|
|
if MAGIC in dec:
|
|
successes.append(AttemptResult(key_hex, f"CFB({iv_label})", iv, dec))
|
|
except Exception:
|
|
pass
|
|
|
|
# --- OFB ---
|
|
for iv_label, iv, ct in iv_candidates:
|
|
try:
|
|
cipher = AES.new(key, AES.MODE_OFB, iv=iv)
|
|
dec = cipher.decrypt(ct)
|
|
if MAGIC in dec:
|
|
successes.append(AttemptResult(key_hex, f"OFB({iv_label})", iv, dec))
|
|
except Exception:
|
|
pass
|
|
|
|
# --- CTR strategies ---
|
|
# A) whole ciphertext, counter starts at 0
|
|
try:
|
|
ctr = Counter.new(128, initial_value=0)
|
|
cipher = AES.new(key, AES.MODE_CTR, counter=ctr)
|
|
dec = cipher.decrypt(encrypted)
|
|
if MAGIC in dec:
|
|
successes.append(AttemptResult(key_hex, "CTR(counter=0,whole_ct)", None, dec))
|
|
except Exception:
|
|
pass
|
|
|
|
# B) first 16 bytes is initial counter block; decrypt remaining bytes
|
|
if len(encrypted) >= 16:
|
|
try:
|
|
init_val = int.from_bytes(encrypted[:16], "big")
|
|
ctr = Counter.new(128, initial_value=init_val)
|
|
cipher = AES.new(key, AES.MODE_CTR, counter=ctr)
|
|
dec = cipher.decrypt(encrypted[16:])
|
|
if MAGIC in dec:
|
|
successes.append(
|
|
AttemptResult(
|
|
key_hex,
|
|
"CTR(counter=prefix16,ct_after_prefix)",
|
|
encrypted[:16],
|
|
dec,
|
|
)
|
|
)
|
|
except Exception:
|
|
pass
|
|
|
|
return successes
|
|
|
|
|
|
# -------------------------
|
|
# Key cache: keys.txt daily
|
|
# -------------------------
|
|
|
|
def is_cache_fresh(cache_path: str) -> bool:
|
|
"""
|
|
Fresh means: cache file mtime is 'today' (local date).
|
|
"""
|
|
if not os.path.exists(cache_path):
|
|
return False
|
|
try:
|
|
mtime = os.path.getmtime(cache_path)
|
|
except OSError:
|
|
return False
|
|
return date.fromtimestamp(mtime) == date.today()
|
|
|
|
|
|
def load_keys_from_cache(cache_path: str) -> List[str]:
|
|
"""
|
|
Read keys.txt. Accepts:
|
|
- one key per line
|
|
- ignores comments/blank lines
|
|
"""
|
|
keys: List[str] = []
|
|
if not os.path.exists(cache_path):
|
|
return keys
|
|
with open(cache_path, "r", encoding="utf-8", errors="ignore") as f:
|
|
for line in f:
|
|
line = line.strip()
|
|
if not line or line.startswith("#"):
|
|
continue
|
|
for m in HEX_KEY_RE.findall(line):
|
|
keys.append(m.upper())
|
|
return keys
|
|
|
|
|
|
def save_keys_to_cache(cache_path: str, keys: List[str], sources: List[str]) -> None:
|
|
os.makedirs(os.path.dirname(os.path.abspath(cache_path)) or ".", exist_ok=True)
|
|
with open(cache_path, "w", encoding="utf-8") as f:
|
|
f.write("# SuperUserek AutoKeybox Decoder - Cached AES keys\n")
|
|
f.write(f"# Updated: {date.today().isoformat()}\n")
|
|
f.write("# Sources:\n")
|
|
for s in sources:
|
|
f.write(f"# - {s}\n")
|
|
f.write("\n")
|
|
for k in keys:
|
|
f.write(k + "\n")
|
|
|
|
|
|
def build_key_list(
|
|
keys_urls: List[str],
|
|
custom_key: Optional[str],
|
|
only_custom: bool,
|
|
cache_path: str,
|
|
force_update_keys: bool,
|
|
) -> Tuple[List[str], bool]:
|
|
"""
|
|
Returns (keys, used_cache)
|
|
"""
|
|
keys: List[str] = []
|
|
|
|
# custom key first (if any)
|
|
if custom_key:
|
|
keys.append(normalize_custom_key(custom_key))
|
|
|
|
if only_custom:
|
|
return keys, False
|
|
|
|
# If cache is fresh and not forced, use it
|
|
if (not force_update_keys) and is_cache_fresh(cache_path):
|
|
cached = load_keys_from_cache(cache_path)
|
|
# Dedup but keep custom key first if present
|
|
seen = set(keys)
|
|
for k in cached:
|
|
if k not in seen:
|
|
# ensure valid AES size
|
|
try:
|
|
kb = bytes.fromhex(k)
|
|
except ValueError:
|
|
continue
|
|
if len(kb) not in (16, 24, 32):
|
|
continue
|
|
keys.append(k)
|
|
seen.add(k)
|
|
return keys, True
|
|
|
|
# Otherwise download + parse, then write cache
|
|
seen = set(keys)
|
|
downloaded: List[str] = []
|
|
|
|
for url in keys_urls:
|
|
text = download_text(url)
|
|
for k in parse_keys(text):
|
|
try:
|
|
kb = bytes.fromhex(k)
|
|
except ValueError:
|
|
continue
|
|
if len(kb) not in (16, 24, 32):
|
|
continue
|
|
if k not in seen:
|
|
downloaded.append(k)
|
|
seen.add(k)
|
|
|
|
# Save merged list (custom key not included in cache; cache is for known keys)
|
|
save_keys_to_cache(cache_path, downloaded, keys_urls)
|
|
|
|
# final keys list = custom (if any) + downloaded
|
|
keys.extend(downloaded)
|
|
return keys, False
|
|
|
|
|
|
def auto_decrypt(
|
|
input_file: str,
|
|
keys_urls: List[str],
|
|
out_dir: Optional[str],
|
|
stop_on_first: bool,
|
|
custom_key: Optional[str],
|
|
only_custom: bool,
|
|
cache_path: str,
|
|
force_update_keys: bool,
|
|
) -> int:
|
|
with open(input_file, "rb") as f:
|
|
encrypted_data = f.read()
|
|
|
|
all_keys, used_cache = build_key_list(
|
|
keys_urls=keys_urls,
|
|
custom_key=custom_key,
|
|
only_custom=only_custom,
|
|
cache_path=cache_path,
|
|
force_update_keys=force_update_keys,
|
|
)
|
|
|
|
if not all_keys:
|
|
print("[-] No keys to try. Provide --key or disable --only-custom.")
|
|
return 3
|
|
|
|
grouped = group_keys_by_aes_size(all_keys)
|
|
cache_msg = f"(cache: {os.path.abspath(cache_path)}"
|
|
cache_msg += ", used cached keys)" if used_cache else ", updated/downloaded keys)"
|
|
print(f"[*] Keys ready {cache_msg}")
|
|
print(f" AES-128: {len(grouped[16])}")
|
|
print(f" AES-192: {len(grouped[24])}")
|
|
print(f" AES-256: {len(grouped[32])}")
|
|
print(f" Total: {len(all_keys)}")
|
|
|
|
# Try in a practical order
|
|
ordered_keys = grouped[16] + grouped[32] + grouped[24]
|
|
|
|
base = os.path.splitext(os.path.basename(input_file))[0]
|
|
out_dir = out_dir or os.path.dirname(os.path.abspath(input_file)) or "."
|
|
os.makedirs(out_dir, exist_ok=True)
|
|
|
|
found_any = 0
|
|
|
|
for key_hex in ordered_keys:
|
|
results = try_decrypt_with_key(encrypted_data, key_hex)
|
|
if not results:
|
|
continue
|
|
|
|
for r in results:
|
|
found_any += 1
|
|
payload = extract_payload(r.plaintext)
|
|
|
|
safe_mode = re.sub(r"[^A-Za-z0-9_.-]+", "_", r.mode_name)
|
|
out_path = os.path.join(out_dir, f"{base}_decrypted_{safe_mode}_{r.key_hex[:16]}.dat")
|
|
|
|
with open(out_path, "wb") as f:
|
|
f.write(payload)
|
|
|
|
iv_info = ""
|
|
if r.iv_used is not None:
|
|
iv_info = f", iv={r.iv_used.hex().upper()}"
|
|
|
|
print(f"[+] MATCH: key={r.key_hex} mode={r.mode_name}{iv_info}")
|
|
print(f" Saved: {out_path} ({len(payload)} bytes)")
|
|
|
|
if stop_on_first:
|
|
return 0
|
|
|
|
if found_any == 0:
|
|
print("[-] No working key/mode found (no INNER_MSTAR marker detected).")
|
|
return 2
|
|
|
|
print(f"[*] Done. Matches found: {found_any}")
|
|
return 0
|
|
|
|
|
|
def main():
|
|
ap = argparse.ArgumentParser(
|
|
description="Try AES keys (cached daily) across AES modes to decrypt an input file."
|
|
)
|
|
ap.add_argument("file", help="Path to encrypted input file (e.g. file.dat)")
|
|
|
|
ap.add_argument(
|
|
"--keys-url",
|
|
action="append",
|
|
default=None,
|
|
help="Add a key list URL to download+parse (repeatable). If omitted, uses built-in defaults.",
|
|
)
|
|
|
|
ap.add_argument(
|
|
"--keys-cache",
|
|
default="keys.txt",
|
|
help='Path to local keys cache file (default: "keys.txt")',
|
|
)
|
|
|
|
ap.add_argument(
|
|
"--force-update-keys",
|
|
action="store_true",
|
|
help="Force re-download + rebuild keys cache (otherwise updates only once per day).",
|
|
)
|
|
|
|
ap.add_argument("--outdir", default=None, help="Output directory (default: alongside input file)")
|
|
|
|
ap.add_argument(
|
|
"--key",
|
|
default=None,
|
|
help="Custom AES key in hex (32/48/64 hex chars). You can also prefix with 0x or include spaces.",
|
|
)
|
|
ap.add_argument(
|
|
"--only-custom",
|
|
action="store_true",
|
|
help="Only try the custom key (skip downloading key lists / cache).",
|
|
)
|
|
|
|
ap.add_argument(
|
|
"--all-matches",
|
|
action="store_true",
|
|
help="Save all matches (default: stop on first match).",
|
|
)
|
|
|
|
args = ap.parse_args()
|
|
keys_urls = args.keys_url if args.keys_url else DEFAULT_KEYS_URLS
|
|
|
|
try:
|
|
rc = auto_decrypt(
|
|
input_file=args.file,
|
|
keys_urls=keys_urls,
|
|
out_dir=args.outdir,
|
|
stop_on_first=not args.all_matches,
|
|
custom_key=args.key,
|
|
only_custom=args.only_custom,
|
|
cache_path=args.keys_cache,
|
|
force_update_keys=args.force_update_keys,
|
|
)
|
|
except requests.RequestException as e:
|
|
print(f"[-] Failed to download a keys list: {e}")
|
|
raise SystemExit(4)
|
|
except ValueError as e:
|
|
print(f"[-] {e}")
|
|
raise SystemExit(5)
|
|
|
|
raise SystemExit(rc)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main() |