This commit is contained in:
Mike 2026-02-07 19:39:32 +02:00
parent 41db41cda9
commit a827efce1f
4 changed files with 450 additions and 0 deletions

View File

@ -0,0 +1,145 @@
from vinetrimmer.objects.tracks import Tracks
from vinetrimmer.utils.widevine.device import LocalDevice
def _hydrate_hidden_tracks(self, title, manifest, tracks, license_url):
"""
Hydrate hidden audio and subtitle tracks.
Netflix does not include all download URLs for audio/subtitle tracks in the initial manifest for performance optimization.
This method sequentially requests these hidden tracks.
"""
from itertools import zip_longest
# Collect unhydrated audio tracks
unavailable_audio_tracks = []
for audio in manifest.get("audio_tracks", []):
if len(audio.get("streams", [])) == 0:
# Audio is available but has no stream information
self.log.info(f"Found hidden audio track: {audio.get('languageDescription', 'Unknown')}")
unavailable_audio_tracks.append((
audio.get("new_track_id"),
audio.get("id")
))
# Collect unhydrated subtitle tracks
unavailable_subtitle_tracks = []
for subtitle in manifest.get("timedtexttracks", []):
if subtitle.get("isNoneTrack"):
continue
if subtitle.get("hydrated") == False:
# Subtitle is not hydrated
# self.log.info(f"Found hidden subtitle track: {subtitle.get('languageDescription', 'Unknown')}")
unavailable_subtitle_tracks.append((
subtitle.get("new_track_id"),
subtitle.get("id")
))
if not unavailable_audio_tracks and not unavailable_subtitle_tracks:
self.log.info("No hidden audio or subtitle tracks found")
return
self.log.info(f"Fetching {len(unavailable_audio_tracks)} hidden audio track(s) and {len(unavailable_subtitle_tracks)} hidden subtitle track(s)...")
# Hydrate hidden tracks one by one
for audio_info, subtitle_info in zip_longest(
unavailable_audio_tracks,
unavailable_subtitle_tracks,
fillvalue=(None, None)
):
audio_track_id, audio_id = audio_info if audio_info[0] else (None, None)
subtitle_track_id, subtitle_id = subtitle_info if subtitle_info[0] else (None, None)
try:
# Request specific hidden track
hydrated_manifest = self.get_manifest(
title,
self.profiles,
required_audio_track_id=audio_track_id,
required_text_track_id=subtitle_track_id
)
# Parse hydrated tracks
hydrated_tracks = self.manifest_as_tracks(hydrated_manifest)
# Add audio track
if audio_track_id:
for audio in manifest.get("audio_tracks", []):
if audio.get("id") == audio_id:
for track in hydrated_tracks.audio:
if track.encrypted:
track.extra["license_url"] = license_url
tracks.add(track, warn_only=True)
# self.log.info(f"Added hidden audio track: {track.language}")
break
# Add subtitle track
if subtitle_track_id:
for subtitle in manifest.get("timedtexttracks", []):
if subtitle.get("id") == subtitle_id:
for track in hydrated_tracks.subtitles:
tracks.add(track, warn_only=True)
# self.log.info(f"Added hidden subtitle track: {track.language}")
break
except Exception as e:
self.log.warning(f"Error hydrating hidden track: {e}")
continue
def get_tracks(self, title):
if self.vcodec == "H264":
# If H.264, get both MPL and HPL tracks as they alternate in terms of bitrate
tracks = Tracks()
self.config["profiles"]["video"]["H264"]["MPL+HPL+QC"] = (
self.config["profiles"]["video"]["H264"]["MPL"] + self.config["profiles"]["video"]["H264"]["HPL"] +
self.config["profiles"]["video"]["H264"]["QC"]
)
if self.audio_only or self.subs_only or self.chapters_only:
profiles = ["MPL+HPL+QC"]
else:
profiles = self.profile.split("+")
for profile in profiles:
try:
manifest = self.get_manifest(title, self.config["profiles"]["video"]["H264"][profile])
except:
manifest = self.get_manifest(title, self.config["profiles"]["video"]["H264"]["MPL"] +
self.config["profiles"]["video"]["H264"]["HPL"])
manifest_tracks = self.manifest_as_tracks(manifest)
license_url = manifest["links"]["license"]["href"]
if self.cdm.device.security_level == 3 and self.cdm.device.type == LocalDevice.Types.CHROME:
self.cdm.uuid = self.cdm.chrome_uuid
self.cdm.urn = f"urn:uuid:{self.cdm.uuid}"
max_quality = max(x.height for x in manifest_tracks.videos)
# if profile == "MPL" and max_quality >= 720:
# manifest_sd = self.get_manifest(title, self.config["profiles"]["video"]["H264"]["BPL"])
# license_url_sd = manifest_sd["links"]["license"]["href"]
# if "SD_LADDER" in manifest_sd["video_tracks"][0]["streams"][0]["tags"]:
# # SD manifest is new encode encrypted with different keys that won't work for HD
# continue
# license_url = license_url_sd
if profile == "HPL" and max_quality >= 1080:
if "SEGMENT_MAP_2KEY" in manifest["video_tracks"][0]["streams"][0]["tags"]:
# 1080p license restricted from Android L3, 720p license will work for 1080p
manifest_720 = self.get_manifest(
title, [x for x in self.config["profiles"]["video"]["H264"]["HPL"] if "l40" not in x]
)
license_url = manifest_720["links"]["license"]["href"]
else:
# Older encode, can't use 720p keys for 1080p
continue
for track in manifest_tracks:
if track.encrypted:
track.extra["license_url"] = license_url
# Fix PSSH with correct KID
# track.pssh.init_data = b'\x08\x01\x12\x10' + bytes.fromhex(track.kid)
tracks.add(manifest_tracks, warn_only=True)
# Hydrate hidden audio and subtitle tracks (using the last manifest)
_hydrate_hidden_tracks(title, manifest, tracks, license_url)
return tracks
else:
pass

Binary file not shown.

206
tools/playready_rev.py Normal file
View File

@ -0,0 +1,206 @@
#!/usr/bin/env python3
"""
PlayReady public key hash revocation checker.
- Fetches Microsoft's revocation XML.
- Extracts the PlayReady Silverlight Runtime revocation list.
- Prints all 32-byte public key hashes (Base64).
- If a hash is provided (argv[1] or prompted), reports whether it's revoked.
- If 'bgroupcert.dat' in directory, extract the hash and reports whether it's revoked.
Compatible with Python 3.8+.
"""
from __future__ import annotations
import base64
import struct
import sys
import uuid
from typing import Iterable, List, Optional
import requests
import xmltodict
# --- Configuration ----------------------------------------------------------------
REV_URL = "https://go.microsoft.com/fwlink/?LinkId=110086"
# GUID for PlayReady Silverlight Runtime (as raw bytes like original code)
GUID_PR_SILVERLIGHT_RUNTIME = bytes([
0x4E, 0x9D, 0x8C, 0x8A, 0xB6, 0x52, 0x45, 0xA7,
0x97, 0x91, 0x69, 0x25, 0xA6, 0xB4, 0x79, 0x1F
])
# If construct is available, well use it to parse the binary blob declaratively.
try:
from construct import Struct as CStruct, Bytes, Int32ub
_HAS_CONSTRUCT = True
except Exception:
_HAS_CONSTRUCT = False
# --- Helpers ----------------------------------------------------------------------
def _expected_guid_le_hex(guid_bytes: bytes) -> str:
"""
The source blob compares the first 16 bytes against UUID(...).bytes_le.
Keep that exact behavior to match the original script.
"""
return uuid.UUID(guid_bytes.hex()).bytes_le.hex()
def _parse_revocation_blob_with_construct(blob: bytes) -> Optional[List[bytes]]:
"""
Parse a revocation list blob using construct (if installed).
Format per original code:
[0:16] GUID
[16:20] unknown / reserved
[20:24] entries_count (u32 big-endian)
[24: ] entries_count * 32-byte hashes
"""
if not _HAS_CONSTRUCT:
return None
RevHeader = CStruct(
"type_guid" / Bytes(16),
"reserved" / Bytes(4),
"entries_count" / Int32ub,
)
# First parse header to get count
if len(blob) < 24:
return []
header = RevHeader.parse(blob[:24])
# Verify GUID (same logic/endianness as original)
if blob[:16].hex() != _expected_guid_le_hex(GUID_PR_SILVERLIGHT_RUNTIME):
return []
# Extract entries
start = 24
end = start + (header.entries_count * 32)
if end > len(blob):
# Truncated blob; be defensive
end = min(len(blob), start + ((len(blob) - start) // 32) * 32)
entries = [blob[i:i+32] for i in range(start, end, 32)]
return entries
def _parse_revocation_blob_manually(blob: bytes) -> List[bytes]:
"""
Manual parser (no construct). Mirrors the original byte slicing exactly.
"""
if len(blob) < 24:
return []
# GUID check with the same endianness trick as original
if blob[:16].hex() != _expected_guid_le_hex(GUID_PR_SILVERLIGHT_RUNTIME):
return []
# entries_count is big-endian u32 at offset 20
entries_count = struct.unpack_from(">I", blob, 20)[0]
cursor = 24
out: List[bytes] = []
for _ in range(entries_count):
if cursor + 32 > len(blob):
break
out.append(blob[cursor:cursor+32])
cursor += 32
return out
def _iter_revocation_lists(xml_root: dict) -> Iterable[bytes]:
"""
Yields each decoded revocation list (ListData) as raw bytes.
Handles the cases where Revocation is a dict or a list of dicts.
"""
rev = xml_root.get("RevInfo", {}).get("Revocation")
if rev is None:
return
items = rev if isinstance(rev, list) else [rev]
for item in items:
data_b64 = item.get("ListData")
if not data_b64:
continue
try:
yield base64.b64decode(data_b64)
except Exception:
continue
def _fetch_revocation_index(url: str = REV_URL) -> dict:
resp = requests.get(url, timeout=20)
resp.raise_for_status()
# xmltodict produces nested OrderedDict-like structures; dict() is fine
return xmltodict.parse(resp.text)
def _collect_public_key_hashes(xml_root: dict) -> List[str]:
"""
Returns Base64-encoded 32-byte public key hashes from the
PlayReady Silverlight Runtime revocation list.
"""
hashes_b64: List[str] = []
for blob in _iter_revocation_lists(xml_root):
# Try construct first (if available), then fallback
entries = _parse_revocation_blob_with_construct(blob) if _HAS_CONSTRUCT else None
if entries is None:
entries = _parse_revocation_blob_manually(blob)
if not entries:
continue
for entry in entries:
hashes_b64.append(base64.b64encode(entry).decode("ascii"))
return hashes_b64
# --- CLI --------------------------------------------------------------------------
def main(argv: List[str]) -> int:
try:
xml_root = _fetch_revocation_index(REV_URL)
except Exception as e:
print(f"Failed to fetch or parse revocation index: {e}", file=sys.stderr)
return 2
publickey_hashes = _collect_public_key_hashes(xml_root)
# Print the list (like the original)
if publickey_hashes:
print("\n".join(publickey_hashes))
# Determine the input hash
pubkey_hash: Optional[str] = None
if len(argv) > 1 and argv[1]:
pubkey_hash = argv[1].strip()
print()
print("Provided hash:", pubkey_hash)
if not pubkey_hash:
bcert = open("bgroupcert.dat", "rb").read()
pubkey_hash = base64.b64encode(bcert[0x48:0x68]).decode()
print()
print("Calculated hash:", pubkey_hash)
if not pubkey_hash:
print("No hash provided.")
return 1
if pubkey_hash in publickey_hashes:
print("one of hashes in CRL match the input hash.")
print("PlayReady Cert is revoked.")
return 0
else:
print("PlayReady Cert is valid.")
return 0
if __name__ == "__main__":
sys.exit(main(sys.argv))

99
tools/test_cdm.py Normal file
View File

@ -0,0 +1,99 @@
import base64, json, requests
from enum import Enum, IntEnum
from hashlib import sha1
from Cryptodome.Cipher import AES
from Cryptodome.Util import Padding
class TrackType(str, Enum):
SD = "SD"
HD = "HD"
AUDIO = "AUDIO"
class WidevineError(IntEnum):
DRM_DEVICE_CERTIFICATE_REVOKED = 127
DRM_DEVICE_CERT_SERIAL_REVOKED = 175
INVALID_PSSH = 152
INVALID_LICENSE_CHALLENGE = 106
# -------------------------------------------------------------------
# Constants
# -------------------------------------------------------------------
AES_KEY = bytes.fromhex(
"1ae8ccd0e7985cc0b6203a55855a1034afc252980e970ca90e5202689f947ab9"
)
AES_IV = bytes.fromhex(
"d58ce954203b7c9a9a9d467f59839249"
)
PROVIDER = "widevine_test"
CONTENT_ID = "ZmtqM2xqYVNkZmFsa3Izag==" # example content id
def build_request(challenge) -> dict:
"""Build signed Widevine license request payload."""
if not isinstance(challenge, str):
challenge = base64.b64encode(challenge).decode()
payload = json.dumps(
{
"payload": challenge,
"provider": PROVIDER,
"content_id": CONTENT_ID,
"content_key_specs": [{"track_type": t.value} for t in TrackType],
},
separators=(",", ":"),
).encode()
request_b64 = base64.b64encode(payload).decode()
# Widevine-style signature
digest = sha1(base64.b64decode(request_b64)).digest()
cipher = AES.new(AES_KEY, AES.MODE_CBC, AES_IV)
signature = base64.b64encode(cipher.encrypt(Padding.pad(digest, 16))).decode()
return {
"request": request_b64,
"signature": signature,
"signer": PROVIDER,
}
def check_response(resp: dict):
"""Check for error status and raise if needed."""
code = resp.get("internal_status")
if not code:
return
try:
error = WidevineError(code)
raise RuntimeError(f"License error: {error.name} ({code})")
except ValueError:
# Unknown code, ignore
return
if __name__ == "__main__":
pssh = "CAESEFoHzpkm/EolkmbSIjR2qgkaCHVzcC1jZW5jIhhXZ2ZPbVNiOFNpV1NadElpTkhhcUNRPT0qADIA"
challenge = "CAQ="
# 1. Request service certificate
cert_resp = requests.post(
"https://license.widevine.com/cenc/getlicense/widevine_test",
json=build_request(challenge),
).json()
service_cert = cert_resp.get("license")
challenge = "" # use pywidevine to generate B64 challenge using pssh, service_cert
# 2. Issue real license request
license_resp = requests.post(
"https://license.widevine.com/cenc/getlicense/widevine_test",
json=build_request(challenge),
).json()
check_response(license_resp)
print("level:", license_resp.get("security_level"))
print("systemID:", license_resp.get("system_id"))
print("status:", license_resp.get("device_state"))