MyDRMTools/AutoKeyboxDecoder/autokeybox_decoder.py
SuperUserek 164ed468e6 Update to cache keys, and download it once daily.
Use --force-update-keys to redownload new keys from sources.
2026-03-05 11:19:39 +00:00

487 lines
14 KiB
Python

#!/usr/bin/env python3
"""
SuperUserek AutoKeybox Decoder (Auto AES Decrypt Tool)
What it does:
- Downloads + parses AES keys from multiple sources (default list below)
- Caches the merged, deduped key list to a local "keys.txt"
- Only updates the cache once per day (unless --force-update-keys)
- Optionally includes/only-uses a custom user-provided AES key
- Tries AES modes (ECB/CBC/CFB/OFB/CTR) + IV strategies
- Validates by searching for b"INNER_MSTAR"
- Extracts payload using CHAI/kbox logic and saves output
Requires:
pip install requests pycryptodome
"""
import argparse
import os
import re
from dataclasses import dataclass
from datetime import date
from typing import Dict, List, Optional, Tuple
import requests
from Crypto.Cipher import AES
from Crypto.Util.Padding import pad
from Crypto.Util import Counter
DEFAULT_KEYS_URLS = [
"https://raw.githubusercontent.com/openlgtv/epk2extract/refs/heads/master/keys/AES.key",
# IMPORTANT: RAW endpoint (not /src/branch/ which is HTML)
"https://git.drmlab.io/SuperUserek/MyDRMTools/raw/branch/main/AutoKeyboxDecoder/KnownKeys.txt",
]
MAGIC = b"INNER_MSTAR"
# Match exactly AES key sizes in hex: 16/24/32 bytes => 32/48/64 hex chars
HEX_KEY_RE = re.compile(
r"(?i)(?<![0-9a-f])(?:[0-9a-f]{32}|[0-9a-f]{48}|[0-9a-f]{64})(?![0-9a-f])"
)
def download_text(url: str, timeout: int = 15) -> str:
r = requests.get(url, timeout=timeout)
r.raise_for_status()
return r.text
def parse_keys(text: str) -> List[str]:
"""
Extract AES keys from arbitrary text.
Works for:
- AES.key style lines with comments
- KnownKeys.txt where keys can be 1-per-line or space-separated
It finds ALL 32/48/64-hex tokens in each (comment-stripped) line.
"""
keys: List[str] = []
for line in text.splitlines():
line = line.strip()
if not line or line.startswith("#"):
continue
# strip inline comments
line = line.split("#", 1)[0]
# find every key token on the line
for m in HEX_KEY_RE.findall(line):
keys.append(m.upper())
return keys
def normalize_custom_key(key_hex: str) -> str:
k = key_hex.strip()
if k.startswith(("0x", "0X")):
k = k[2:]
k = re.sub(r"\s+", "", k)
if not re.fullmatch(r"[0-9A-Fa-f]+", k):
raise ValueError("Custom key is not valid hex")
kb = bytes.fromhex(k)
if len(kb) not in (16, 24, 32):
raise ValueError(
f"Custom key must be 16/24/32 bytes (got {len(kb)} bytes = {len(k)} hex chars)"
)
return k.upper()
def group_keys_by_aes_size(keys_hex: List[str]) -> Dict[int, List[str]]:
grouped: Dict[int, List[str]] = {16: [], 24: [], 32: []}
for khex in keys_hex:
try:
kb = bytes.fromhex(khex)
except ValueError:
continue
if len(kb) in grouped:
grouped[len(kb)].append(khex.upper())
return grouped
def _ensure_block_multiple(data: bytes) -> bytes:
# Keep your original behavior: pad ciphertext to 16 then decrypt for block modes
if len(data) % 16 == 0:
return data
return pad(data, 16)
def extract_payload(dec_data: bytes) -> bytes:
"""
Your payload extraction logic (unchanged).
"""
payload = None
for offset in (64, 96):
candidate = dec_data[offset:]
if b"CHAI" in candidate or b"kbox" in candidate:
payload = candidate
break
if payload is None:
payload = dec_data[64:]
if b"CHAI" in payload:
out_data = payload
elif b"kbox" in payload:
out_data = payload[:128]
else:
out_data = payload[:32]
return out_data
@dataclass
class AttemptResult:
key_hex: str
mode_name: str
iv_used: Optional[bytes]
plaintext: bytes
def try_decrypt_with_key(encrypted: bytes, key_hex: str) -> List[AttemptResult]:
try:
key = bytes.fromhex(key_hex)
except ValueError:
return []
successes: List[AttemptResult] = []
# --- ECB ---
try:
data = _ensure_block_multiple(encrypted)
cipher = AES.new(key, AES.MODE_ECB)
dec = cipher.decrypt(data)
if MAGIC in dec:
successes.append(AttemptResult(key_hex, "ECB", None, dec))
except Exception:
pass
# --- IV candidates for CBC/CFB/OFB ---
iv_candidates: List[Tuple[str, bytes, bytes]] = []
iv_candidates.append(("IV_ZERO", b"\x00" * 16, encrypted))
if len(encrypted) >= 16:
iv_candidates.append(("IV_PREFIX16", encrypted[:16], encrypted[16:]))
# --- CBC ---
for iv_label, iv, ct in iv_candidates:
try:
ct2 = _ensure_block_multiple(ct)
cipher = AES.new(key, AES.MODE_CBC, iv=iv)
dec = cipher.decrypt(ct2)
if MAGIC in dec:
successes.append(AttemptResult(key_hex, f"CBC({iv_label})", iv, dec))
except Exception:
pass
# --- CFB (segment_size=128) ---
for iv_label, iv, ct in iv_candidates:
try:
cipher = AES.new(key, AES.MODE_CFB, iv=iv, segment_size=128)
dec = cipher.decrypt(ct)
if MAGIC in dec:
successes.append(AttemptResult(key_hex, f"CFB({iv_label})", iv, dec))
except Exception:
pass
# --- OFB ---
for iv_label, iv, ct in iv_candidates:
try:
cipher = AES.new(key, AES.MODE_OFB, iv=iv)
dec = cipher.decrypt(ct)
if MAGIC in dec:
successes.append(AttemptResult(key_hex, f"OFB({iv_label})", iv, dec))
except Exception:
pass
# --- CTR strategies ---
# A) whole ciphertext, counter starts at 0
try:
ctr = Counter.new(128, initial_value=0)
cipher = AES.new(key, AES.MODE_CTR, counter=ctr)
dec = cipher.decrypt(encrypted)
if MAGIC in dec:
successes.append(AttemptResult(key_hex, "CTR(counter=0,whole_ct)", None, dec))
except Exception:
pass
# B) first 16 bytes is initial counter block; decrypt remaining bytes
if len(encrypted) >= 16:
try:
init_val = int.from_bytes(encrypted[:16], "big")
ctr = Counter.new(128, initial_value=init_val)
cipher = AES.new(key, AES.MODE_CTR, counter=ctr)
dec = cipher.decrypt(encrypted[16:])
if MAGIC in dec:
successes.append(
AttemptResult(
key_hex,
"CTR(counter=prefix16,ct_after_prefix)",
encrypted[:16],
dec,
)
)
except Exception:
pass
return successes
# -------------------------
# Key cache: keys.txt daily
# -------------------------
def is_cache_fresh(cache_path: str) -> bool:
"""
Fresh means: cache file mtime is 'today' (local date).
"""
if not os.path.exists(cache_path):
return False
try:
mtime = os.path.getmtime(cache_path)
except OSError:
return False
return date.fromtimestamp(mtime) == date.today()
def load_keys_from_cache(cache_path: str) -> List[str]:
"""
Read keys.txt. Accepts:
- one key per line
- ignores comments/blank lines
"""
keys: List[str] = []
if not os.path.exists(cache_path):
return keys
with open(cache_path, "r", encoding="utf-8", errors="ignore") as f:
for line in f:
line = line.strip()
if not line or line.startswith("#"):
continue
for m in HEX_KEY_RE.findall(line):
keys.append(m.upper())
return keys
def save_keys_to_cache(cache_path: str, keys: List[str], sources: List[str]) -> None:
os.makedirs(os.path.dirname(os.path.abspath(cache_path)) or ".", exist_ok=True)
with open(cache_path, "w", encoding="utf-8") as f:
f.write("# SuperUserek AutoKeybox Decoder - Cached AES keys\n")
f.write(f"# Updated: {date.today().isoformat()}\n")
f.write("# Sources:\n")
for s in sources:
f.write(f"# - {s}\n")
f.write("\n")
for k in keys:
f.write(k + "\n")
def build_key_list(
keys_urls: List[str],
custom_key: Optional[str],
only_custom: bool,
cache_path: str,
force_update_keys: bool,
) -> Tuple[List[str], bool]:
"""
Returns (keys, used_cache)
"""
keys: List[str] = []
# custom key first (if any)
if custom_key:
keys.append(normalize_custom_key(custom_key))
if only_custom:
return keys, False
# If cache is fresh and not forced, use it
if (not force_update_keys) and is_cache_fresh(cache_path):
cached = load_keys_from_cache(cache_path)
# Dedup but keep custom key first if present
seen = set(keys)
for k in cached:
if k not in seen:
# ensure valid AES size
try:
kb = bytes.fromhex(k)
except ValueError:
continue
if len(kb) not in (16, 24, 32):
continue
keys.append(k)
seen.add(k)
return keys, True
# Otherwise download + parse, then write cache
seen = set(keys)
downloaded: List[str] = []
for url in keys_urls:
text = download_text(url)
for k in parse_keys(text):
try:
kb = bytes.fromhex(k)
except ValueError:
continue
if len(kb) not in (16, 24, 32):
continue
if k not in seen:
downloaded.append(k)
seen.add(k)
# Save merged list (custom key not included in cache; cache is for known keys)
save_keys_to_cache(cache_path, downloaded, keys_urls)
# final keys list = custom (if any) + downloaded
keys.extend(downloaded)
return keys, False
def auto_decrypt(
input_file: str,
keys_urls: List[str],
out_dir: Optional[str],
stop_on_first: bool,
custom_key: Optional[str],
only_custom: bool,
cache_path: str,
force_update_keys: bool,
) -> int:
with open(input_file, "rb") as f:
encrypted_data = f.read()
all_keys, used_cache = build_key_list(
keys_urls=keys_urls,
custom_key=custom_key,
only_custom=only_custom,
cache_path=cache_path,
force_update_keys=force_update_keys,
)
if not all_keys:
print("[-] No keys to try. Provide --key or disable --only-custom.")
return 3
grouped = group_keys_by_aes_size(all_keys)
cache_msg = f"(cache: {os.path.abspath(cache_path)}"
cache_msg += ", used cached keys)" if used_cache else ", updated/downloaded keys)"
print(f"[*] Keys ready {cache_msg}")
print(f" AES-128: {len(grouped[16])}")
print(f" AES-192: {len(grouped[24])}")
print(f" AES-256: {len(grouped[32])}")
print(f" Total: {len(all_keys)}")
# Try in a practical order
ordered_keys = grouped[16] + grouped[32] + grouped[24]
base = os.path.splitext(os.path.basename(input_file))[0]
out_dir = out_dir or os.path.dirname(os.path.abspath(input_file)) or "."
os.makedirs(out_dir, exist_ok=True)
found_any = 0
for key_hex in ordered_keys:
results = try_decrypt_with_key(encrypted_data, key_hex)
if not results:
continue
for r in results:
found_any += 1
payload = extract_payload(r.plaintext)
safe_mode = re.sub(r"[^A-Za-z0-9_.-]+", "_", r.mode_name)
out_path = os.path.join(out_dir, f"{base}_decrypted_{safe_mode}_{r.key_hex[:16]}.dat")
with open(out_path, "wb") as f:
f.write(payload)
iv_info = ""
if r.iv_used is not None:
iv_info = f", iv={r.iv_used.hex().upper()}"
print(f"[+] MATCH: key={r.key_hex} mode={r.mode_name}{iv_info}")
print(f" Saved: {out_path} ({len(payload)} bytes)")
if stop_on_first:
return 0
if found_any == 0:
print("[-] No working key/mode found (no INNER_MSTAR marker detected).")
return 2
print(f"[*] Done. Matches found: {found_any}")
return 0
def main():
ap = argparse.ArgumentParser(
description="Try AES keys (cached daily) across AES modes to decrypt an input file."
)
ap.add_argument("file", help="Path to encrypted input file (e.g. file.dat)")
ap.add_argument(
"--keys-url",
action="append",
default=None,
help="Add a key list URL to download+parse (repeatable). If omitted, uses built-in defaults.",
)
ap.add_argument(
"--keys-cache",
default="keys.txt",
help='Path to local keys cache file (default: "keys.txt")',
)
ap.add_argument(
"--force-update-keys",
action="store_true",
help="Force re-download + rebuild keys cache (otherwise updates only once per day).",
)
ap.add_argument("--outdir", default=None, help="Output directory (default: alongside input file)")
ap.add_argument(
"--key",
default=None,
help="Custom AES key in hex (32/48/64 hex chars). You can also prefix with 0x or include spaces.",
)
ap.add_argument(
"--only-custom",
action="store_true",
help="Only try the custom key (skip downloading key lists / cache).",
)
ap.add_argument(
"--all-matches",
action="store_true",
help="Save all matches (default: stop on first match).",
)
args = ap.parse_args()
keys_urls = args.keys_url if args.keys_url else DEFAULT_KEYS_URLS
try:
rc = auto_decrypt(
input_file=args.file,
keys_urls=keys_urls,
out_dir=args.outdir,
stop_on_first=not args.all_matches,
custom_key=args.key,
only_custom=args.only_custom,
cache_path=args.keys_cache,
force_update_keys=args.force_update_keys,
)
except requests.RequestException as e:
print(f"[-] Failed to download a keys list: {e}")
raise SystemExit(4)
except ValueError as e:
print(f"[-] {e}")
raise SystemExit(5)
raise SystemExit(rc)
if __name__ == "__main__":
main()