diff --git a/Vinetrimmer/scripts/netflix_hidden_subs_example.py b/Vinetrimmer/scripts/netflix_hidden_subs_example.py new file mode 100644 index 0000000..c0e306d --- /dev/null +++ b/Vinetrimmer/scripts/netflix_hidden_subs_example.py @@ -0,0 +1,145 @@ +from vinetrimmer.objects.tracks import Tracks +from vinetrimmer.utils.widevine.device import LocalDevice + + +def _hydrate_hidden_tracks(self, title, manifest, tracks, license_url): + """ + Hydrate hidden audio and subtitle tracks. + Netflix does not include all download URLs for audio/subtitle tracks in the initial manifest for performance optimization. + This method sequentially requests these hidden tracks. + """ + from itertools import zip_longest + + # Collect unhydrated audio tracks + unavailable_audio_tracks = [] + for audio in manifest.get("audio_tracks", []): + if len(audio.get("streams", [])) == 0: + # Audio is available but has no stream information + self.log.info(f"Found hidden audio track: {audio.get('languageDescription', 'Unknown')}") + unavailable_audio_tracks.append(( + audio.get("new_track_id"), + audio.get("id") + )) + + # Collect unhydrated subtitle tracks + unavailable_subtitle_tracks = [] + for subtitle in manifest.get("timedtexttracks", []): + if subtitle.get("isNoneTrack"): + continue + if subtitle.get("hydrated") == False: + # Subtitle is not hydrated + # self.log.info(f"Found hidden subtitle track: {subtitle.get('languageDescription', 'Unknown')}") + unavailable_subtitle_tracks.append(( + subtitle.get("new_track_id"), + subtitle.get("id") + )) + + if not unavailable_audio_tracks and not unavailable_subtitle_tracks: + self.log.info("No hidden audio or subtitle tracks found") + return + + self.log.info(f"Fetching {len(unavailable_audio_tracks)} hidden audio track(s) and {len(unavailable_subtitle_tracks)} hidden subtitle track(s)...") + + # Hydrate hidden tracks one by one + for audio_info, subtitle_info in zip_longest( + unavailable_audio_tracks, + unavailable_subtitle_tracks, + fillvalue=(None, None) + ): + audio_track_id, audio_id = audio_info if audio_info[0] else (None, None) + subtitle_track_id, subtitle_id = subtitle_info if subtitle_info[0] else (None, None) + + try: + # Request specific hidden track + hydrated_manifest = self.get_manifest( + title, + self.profiles, + required_audio_track_id=audio_track_id, + required_text_track_id=subtitle_track_id + ) + + # Parse hydrated tracks + hydrated_tracks = self.manifest_as_tracks(hydrated_manifest) + + # Add audio track + if audio_track_id: + for audio in manifest.get("audio_tracks", []): + if audio.get("id") == audio_id: + for track in hydrated_tracks.audio: + if track.encrypted: + track.extra["license_url"] = license_url + tracks.add(track, warn_only=True) + # self.log.info(f"Added hidden audio track: {track.language}") + break + + # Add subtitle track + if subtitle_track_id: + for subtitle in manifest.get("timedtexttracks", []): + if subtitle.get("id") == subtitle_id: + for track in hydrated_tracks.subtitles: + tracks.add(track, warn_only=True) + # self.log.info(f"Added hidden subtitle track: {track.language}") + break + except Exception as e: + self.log.warning(f"Error hydrating hidden track: {e}") + continue + + +def get_tracks(self, title): + if self.vcodec == "H264": + # If H.264, get both MPL and HPL tracks as they alternate in terms of bitrate + tracks = Tracks() + + self.config["profiles"]["video"]["H264"]["MPL+HPL+QC"] = ( + self.config["profiles"]["video"]["H264"]["MPL"] + self.config["profiles"]["video"]["H264"]["HPL"] + + self.config["profiles"]["video"]["H264"]["QC"] + ) + + if self.audio_only or self.subs_only or self.chapters_only: + profiles = ["MPL+HPL+QC"] + else: + profiles = self.profile.split("+") + + for profile in profiles: + try: + manifest = self.get_manifest(title, self.config["profiles"]["video"]["H264"][profile]) + except: + manifest = self.get_manifest(title, self.config["profiles"]["video"]["H264"]["MPL"] + + self.config["profiles"]["video"]["H264"]["HPL"]) + manifest_tracks = self.manifest_as_tracks(manifest) + license_url = manifest["links"]["license"]["href"] + + if self.cdm.device.security_level == 3 and self.cdm.device.type == LocalDevice.Types.CHROME: + self.cdm.uuid = self.cdm.chrome_uuid + self.cdm.urn = f"urn:uuid:{self.cdm.uuid}" + max_quality = max(x.height for x in manifest_tracks.videos) + # if profile == "MPL" and max_quality >= 720: + # manifest_sd = self.get_manifest(title, self.config["profiles"]["video"]["H264"]["BPL"]) + # license_url_sd = manifest_sd["links"]["license"]["href"] + # if "SD_LADDER" in manifest_sd["video_tracks"][0]["streams"][0]["tags"]: + # # SD manifest is new encode encrypted with different keys that won't work for HD + # continue + # license_url = license_url_sd + if profile == "HPL" and max_quality >= 1080: + if "SEGMENT_MAP_2KEY" in manifest["video_tracks"][0]["streams"][0]["tags"]: + # 1080p license restricted from Android L3, 720p license will work for 1080p + manifest_720 = self.get_manifest( + title, [x for x in self.config["profiles"]["video"]["H264"]["HPL"] if "l40" not in x] + ) + license_url = manifest_720["links"]["license"]["href"] + else: + # Older encode, can't use 720p keys for 1080p + continue + + for track in manifest_tracks: + if track.encrypted: + track.extra["license_url"] = license_url + # Fix PSSH with correct KID + # track.pssh.init_data = b'\x08\x01\x12\x10' + bytes.fromhex(track.kid) + tracks.add(manifest_tracks, warn_only=True) + + # Hydrate hidden audio and subtitle tracks (using the last manifest) + _hydrate_hidden_tracks(title, manifest, tracks, license_url) + return tracks + else: + pass \ No newline at end of file diff --git a/tools/fetch_compare_dcsl.zip b/tools/fetch_compare_dcsl.zip new file mode 100644 index 0000000..d1fda11 Binary files /dev/null and b/tools/fetch_compare_dcsl.zip differ diff --git a/tools/playready_rev.py b/tools/playready_rev.py new file mode 100644 index 0000000..1a195db --- /dev/null +++ b/tools/playready_rev.py @@ -0,0 +1,206 @@ +#!/usr/bin/env python3 +""" +PlayReady public key hash revocation checker. + +- Fetches Microsoft's revocation XML. +- Extracts the PlayReady Silverlight Runtime revocation list. +- Prints all 32-byte public key hashes (Base64). +- If a hash is provided (argv[1] or prompted), reports whether it's revoked. +- If 'bgroupcert.dat' in directory, extract the hash and reports whether it's revoked. + +Compatible with Python 3.8+. +""" + +from __future__ import annotations + +import base64 +import struct +import sys +import uuid +from typing import Iterable, List, Optional + +import requests +import xmltodict + +# --- Configuration ---------------------------------------------------------------- + +REV_URL = "https://go.microsoft.com/fwlink/?LinkId=110086" + +# GUID for PlayReady Silverlight Runtime (as raw bytes like original code) +GUID_PR_SILVERLIGHT_RUNTIME = bytes([ + 0x4E, 0x9D, 0x8C, 0x8A, 0xB6, 0x52, 0x45, 0xA7, + 0x97, 0x91, 0x69, 0x25, 0xA6, 0xB4, 0x79, 0x1F +]) + +# If construct is available, we’ll use it to parse the binary blob declaratively. +try: + from construct import Struct as CStruct, Bytes, Int32ub + _HAS_CONSTRUCT = True +except Exception: + _HAS_CONSTRUCT = False + + +# --- Helpers ---------------------------------------------------------------------- + +def _expected_guid_le_hex(guid_bytes: bytes) -> str: + """ + The source blob compares the first 16 bytes against UUID(...).bytes_le. + Keep that exact behavior to match the original script. + """ + return uuid.UUID(guid_bytes.hex()).bytes_le.hex() + + +def _parse_revocation_blob_with_construct(blob: bytes) -> Optional[List[bytes]]: + """ + Parse a revocation list blob using construct (if installed). + Format per original code: + [0:16] GUID + [16:20] unknown / reserved + [20:24] entries_count (u32 big-endian) + [24: ] entries_count * 32-byte hashes + """ + if not _HAS_CONSTRUCT: + return None + + RevHeader = CStruct( + "type_guid" / Bytes(16), + "reserved" / Bytes(4), + "entries_count" / Int32ub, + ) + + # First parse header to get count + if len(blob) < 24: + return [] + + header = RevHeader.parse(blob[:24]) + + # Verify GUID (same logic/endianness as original) + if blob[:16].hex() != _expected_guid_le_hex(GUID_PR_SILVERLIGHT_RUNTIME): + return [] + + # Extract entries + start = 24 + end = start + (header.entries_count * 32) + if end > len(blob): + # Truncated blob; be defensive + end = min(len(blob), start + ((len(blob) - start) // 32) * 32) + + entries = [blob[i:i+32] for i in range(start, end, 32)] + return entries + + +def _parse_revocation_blob_manually(blob: bytes) -> List[bytes]: + """ + Manual parser (no construct). Mirrors the original byte slicing exactly. + """ + if len(blob) < 24: + return [] + + # GUID check with the same endianness trick as original + if blob[:16].hex() != _expected_guid_le_hex(GUID_PR_SILVERLIGHT_RUNTIME): + return [] + + # entries_count is big-endian u32 at offset 20 + entries_count = struct.unpack_from(">I", blob, 20)[0] + cursor = 24 + out: List[bytes] = [] + for _ in range(entries_count): + if cursor + 32 > len(blob): + break + out.append(blob[cursor:cursor+32]) + cursor += 32 + return out + + +def _iter_revocation_lists(xml_root: dict) -> Iterable[bytes]: + """ + Yields each decoded revocation list (ListData) as raw bytes. + Handles the cases where Revocation is a dict or a list of dicts. + """ + rev = xml_root.get("RevInfo", {}).get("Revocation") + if rev is None: + return + + items = rev if isinstance(rev, list) else [rev] + for item in items: + data_b64 = item.get("ListData") + if not data_b64: + continue + try: + yield base64.b64decode(data_b64) + except Exception: + continue + + +def _fetch_revocation_index(url: str = REV_URL) -> dict: + resp = requests.get(url, timeout=20) + resp.raise_for_status() + # xmltodict produces nested OrderedDict-like structures; dict() is fine + return xmltodict.parse(resp.text) + + +def _collect_public_key_hashes(xml_root: dict) -> List[str]: + """ + Returns Base64-encoded 32-byte public key hashes from the + PlayReady Silverlight Runtime revocation list. + """ + hashes_b64: List[str] = [] + + for blob in _iter_revocation_lists(xml_root): + # Try construct first (if available), then fallback + entries = _parse_revocation_blob_with_construct(blob) if _HAS_CONSTRUCT else None + if entries is None: + entries = _parse_revocation_blob_manually(blob) + + if not entries: + continue + + for entry in entries: + hashes_b64.append(base64.b64encode(entry).decode("ascii")) + + return hashes_b64 + + +# --- CLI -------------------------------------------------------------------------- + +def main(argv: List[str]) -> int: + try: + xml_root = _fetch_revocation_index(REV_URL) + except Exception as e: + print(f"Failed to fetch or parse revocation index: {e}", file=sys.stderr) + return 2 + + publickey_hashes = _collect_public_key_hashes(xml_root) + + # Print the list (like the original) + if publickey_hashes: + print("\n".join(publickey_hashes)) + + # Determine the input hash + pubkey_hash: Optional[str] = None + if len(argv) > 1 and argv[1]: + pubkey_hash = argv[1].strip() + print() + print("Provided hash:", pubkey_hash) + + if not pubkey_hash: + bcert = open("bgroupcert.dat", "rb").read() + pubkey_hash = base64.b64encode(bcert[0x48:0x68]).decode() + print() + print("Calculated hash:", pubkey_hash) + + if not pubkey_hash: + print("No hash provided.") + return 1 + + if pubkey_hash in publickey_hashes: + print("one of hashes in CRL match the input hash.") + print("PlayReady Cert is revoked.") + return 0 + else: + print("PlayReady Cert is valid.") + return 0 + + +if __name__ == "__main__": + sys.exit(main(sys.argv)) diff --git a/tools/test_cdm.py b/tools/test_cdm.py new file mode 100644 index 0000000..d5a35f0 --- /dev/null +++ b/tools/test_cdm.py @@ -0,0 +1,99 @@ +import base64, json, requests +from enum import Enum, IntEnum +from hashlib import sha1 +from Cryptodome.Cipher import AES +from Cryptodome.Util import Padding + +class TrackType(str, Enum): + SD = "SD" + HD = "HD" + AUDIO = "AUDIO" + + +class WidevineError(IntEnum): + DRM_DEVICE_CERTIFICATE_REVOKED = 127 + DRM_DEVICE_CERT_SERIAL_REVOKED = 175 + INVALID_PSSH = 152 + INVALID_LICENSE_CHALLENGE = 106 + + +# ------------------------------------------------------------------- +# Constants +# ------------------------------------------------------------------- + +AES_KEY = bytes.fromhex( + "1ae8ccd0e7985cc0b6203a55855a1034afc252980e970ca90e5202689f947ab9" +) +AES_IV = bytes.fromhex( + "d58ce954203b7c9a9a9d467f59839249" +) + +PROVIDER = "widevine_test" +CONTENT_ID = "ZmtqM2xqYVNkZmFsa3Izag==" # example content id + +def build_request(challenge) -> dict: + """Build signed Widevine license request payload.""" + if not isinstance(challenge, str): + challenge = base64.b64encode(challenge).decode() + + payload = json.dumps( + { + "payload": challenge, + "provider": PROVIDER, + "content_id": CONTENT_ID, + "content_key_specs": [{"track_type": t.value} for t in TrackType], + }, + separators=(",", ":"), + ).encode() + + request_b64 = base64.b64encode(payload).decode() + + # Widevine-style signature + digest = sha1(base64.b64decode(request_b64)).digest() + cipher = AES.new(AES_KEY, AES.MODE_CBC, AES_IV) + signature = base64.b64encode(cipher.encrypt(Padding.pad(digest, 16))).decode() + + return { + "request": request_b64, + "signature": signature, + "signer": PROVIDER, + } + + +def check_response(resp: dict): + """Check for error status and raise if needed.""" + code = resp.get("internal_status") + if not code: + return + + try: + error = WidevineError(code) + raise RuntimeError(f"License error: {error.name} ({code})") + except ValueError: + # Unknown code, ignore + return + +if __name__ == "__main__": + pssh = "CAESEFoHzpkm/EolkmbSIjR2qgkaCHVzcC1jZW5jIhhXZ2ZPbVNiOFNpV1NadElpTkhhcUNRPT0qADIA" + challenge = "CAQ=" + + # 1. Request service certificate + cert_resp = requests.post( + "https://license.widevine.com/cenc/getlicense/widevine_test", + json=build_request(challenge), + ).json() + service_cert = cert_resp.get("license") + + challenge = "" # use pywidevine to generate B64 challenge using pssh, service_cert + + # 2. Issue real license request + license_resp = requests.post( + "https://license.widevine.com/cenc/getlicense/widevine_test", + json=build_request(challenge), + ).json() + + check_response(license_resp) + + print("level:", license_resp.get("security_level")) + print("systemID:", license_resp.get("system_id")) + print("status:", license_resp.get("device_state"))