Few more netflix related changes

This commit is contained in:
chu23465 2025-04-29 11:54:44 +05:30
parent 18d70442e3
commit fca2266d33
6 changed files with 547 additions and 279 deletions

View File

@ -15,9 +15,10 @@ from vinetrimmer.objects import AudioTrack, MenuTrack, TextTrack, Title, Tracks,
from vinetrimmer.services.BaseService import BaseService
from vinetrimmer.utils.collections import as_list, flatten
from vinetrimmer.utils.MSL import MSL
from vinetrimmer.utils.MSL.schemes import KeyExchangeSchemes
from vinetrimmer.utils.MSL.schemes import KeyExchangeSchemes, EntityAuthenticationSchemes
from vinetrimmer.utils.MSL.schemes.UserAuthentication import UserAuthentication
from pywidevine.device import DeviceTypes
from vinetrimmer.utils.gen_esn import chrome_esn_generator, android_esn_generator, playready_esn_generator
from vinetrimmer.utils.widevine.device import LocalDevice, RemoteDevice
from vinetrimmer.vendor.pymp4.parser import Box
from vinetrimmer.utils.gen_esn import chrome_esn_generator
@ -48,7 +49,6 @@ class Netflix(BaseService):
"""
ALIASES = ["NF", "netflix"]
#GEOFENCE = ['us']
TITLE_RE = [
r"^(?:https?://(?:www\.)?netflix\.com(?:/[a-z0-9]{2})?/(?:title/|watch/|.+jbv=))?(?P<id>\d+)",
r"^https?://(?:www\.)?unogs\.com/title/(?P<id>\d+)",
@ -91,6 +91,11 @@ class Netflix(BaseService):
self.cdm = ctx.obj.cdm
# Check if the configuration file says to use PlayReady
self.use_playready = self.config["configuration"].get("drm_system", "").lower() == "playready"
if self.use_playready:
self.log.info("Using PlayReady DRM for Netflix")
# General
self.download_proxied = len(self.GEOFENCE) > 0 # needed if the title is unavailable at home IP
self.profiles = []
@ -135,7 +140,7 @@ class Netflix(BaseService):
service_data=episode
) for episode in episodes]
# TODO: Get original language without making an extra manifest.xml request
# TODO: Get original language without making an extra manifest request
self.log.warning("HEVC PROFILES for the first title sometimes FAIL with Validation error so we use H264 HPL as a first trial, if it does not exist, we try H264 MPL")
try:
manifest = self.get_manifest(titles[0], self.profiles)
@ -176,14 +181,21 @@ class Netflix(BaseService):
manifest_tracks = self.manifest_as_tracks(manifest)
license_url = manifest["links"]["license"]["href"]
# Verify CDM attributes securely
is_android_l3 = False
try:
if hasattr(self.cdm, 'device') and hasattr(self.cdm.device, 'security_level') and hasattr(self.cdm.device, 'type'):
is_android_l3 = (self.cdm.device.security_level == 3 and self.cdm.device.type == LocalDevice.Types.ANDROID)
except AttributeError:
pass
if (self.cdm.device.security_level == 3 and self.cdm.device.type == DeviceTypes.ANDROID if "common_privacy_cert" in dir self.cdm else False):
if is_android_l3:
max_quality = max(x.height for x in manifest_tracks.videos)
if profile == "MPL" and max_quality >= 720:
manifest_sd = self.get_manifest(title, self.config["profiles"]["video"]["H264"]["BPL"])
license_url_sd = manifest_sd["links"]["license"]["href"]
if "SD_LADDER" in manifest_sd["video_tracks"][0]["streams"][0]["tags"]:
# SD manifest.xml is new encode encrypted with different keys that won't work for HD
# SD manifest is new encode encrypted with different keys that won't work for HD
continue
license_url = license_url_sd
if profile == "HPL" and max_quality >= 1080:
@ -250,6 +262,14 @@ class Netflix(BaseService):
# TODO: Needs something better than this
track.hdr10 = track.codec.split("-")[1] == "hdr" # hevc-hdr, vp9-hdr
track.dv = track.codec.startswith("hevc-dv")
if track.psshWV and isinstance(track.pssh, bytes):
# Check if it has a specific header and remove it if necessary
if track.pssh.startswith(b'\x00\x00'):
# Find the starting point of the real PSSH
pssh_start = track.pssh.find(b'pssh')
if pssh_start > 0:
# Extract only the pssh part without extra headers
track.pssh = track.pssh[pssh_start-4:]
return manifest_tracks
def get_chapters(self, title):
@ -336,10 +356,15 @@ class Netflix(BaseService):
def license(self, challenge, track, session_id, **_):
if not self.msl:
raise self.log.exit(" - Cannot get license, MSL client has not been created yet.")
header, payload_data = self.msl.send_message(
endpoint=self.config["endpoints"]["licence"],
params={},
application_data={
if isinstance(challenge, str):
challenge = challenge.encode('utf-8')
# Determines whether we are using PlayReady
use_playready = self.config["configuration"].get("drm_system", "").lower() == "playready"
# Build the license request payload
license_request = {
"version": 2,
"url": track.extra["license_url"],
"id": int(time.time() * 10000),
@ -354,9 +379,23 @@ class Netflix(BaseService):
"xid": str(int((int(time.time()) + 0.1612) * 1000)),
}],
"echo": "sessionId"
},
}
# If we use PlayReady, we may want to add specific parameters
if use_playready:
license_request["params"][0].update({
"playreadyInitiator": "drmAgent", # It might help with PlayReady
"drmType": "playready",
"drmVersion": self.config["configuration"]["drm_version"]
})
header, payload_data = self.msl.send_message(
endpoint=self.config["endpoints"]["licence"],
params={},
application_data=license_request,
userauthdata=self.userauthdata
)
if not payload_data:
raise self.log.exit(f" - Failed to get license: {header['message']} [{header['code']}]")
if "error" in payload_data[0]:
@ -379,60 +418,56 @@ class Netflix(BaseService):
# Service specific functions
def configure(self):
self.session.headers.update({"Origin": "https://netflix.com"})
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# Configure session to bypass SSL verification
self.session.verify = False
self.session.headers.update({
"Origin": "https://netflix.com",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.90 Safari/537.36"
})
self.profiles = self.get_profiles()
self.log.info("Initializing a Netflix MSL client")
# Grab ESN based on CDM from secrets if no ESN argument provided
if (self.cdm.device.type == DeviceTypes.CHROME if "common_privacy_cert" in dir self.cdm else if "common_privacy_cert" not in dir self.cdm): # ESN GENERATOR FOR CHROME
self.esn = chrome_esn_generator()
else:
sel.log.info(self.config)
esn_map = self.config.get("esn_map", {})
self.log.info(esn_map)
self.esn = esn_map.get(self.cdm.device.system_id) or esn_map.get(str(self.cdm.device.system_id))
if not self.esn:
raise self.log.exit(" - No ESN specified")
self.log.info(f" + ESN: {self.esn}")
try:
scheme = {
DeviceTypes.CHROME: KeyExchangeSchemes.AsymmetricWrapped,
DeviceTypes.ANDROID: KeyExchangeSchemes.Widevine,
# Check if it is configured to use PlayReady
self.use_playready = self.config["configuration"].get("drm_system", "").lower() == "playready"
}[self.cdm.device.type]
except:
if self.use_playready:
self.log.info(" + Using PlayReady DRM with Chrome MSL method")
# Always use Chrome for MSL handshake
scheme = KeyExchangeSchemes.AsymmetricWrapped
self.esn = chrome_esn_generator()
self.log.info(f" + Using Chrome ESN for MSL: {self.esn}")
self.log.info(f" + Scheme: {scheme}")
# MSL handshake
try:
self.msl = MSL.handshake(
scheme=scheme,
session=self.session,
endpoint=self.config["endpoints"]["manifest.xml"],
endpoint=self.config["endpoints"]["manifest"],
sender=self.esn,
cdm=self.cdm,
msl_keys_path=self.get_cache("msl_{id}_{esn}_{scheme}.json".format(
id=self.cdm.device.system_id or self.cdm.get_name(),
esn=self.esn,
scheme=scheme
))
msl_keys_path=self.get_cache(f"msl_chrome.json")
)
self.log.info(" + MSL handshake successful")
except Exception as e:
self.log.error(f" - MSL handshake failed: {e}")
raise self.log.exit(f" - Cannot proceed: {e}")
# Authentication Management
if not self.session.cookies:
raise self.log.exit(" - No cookies provided, cannot log in.")
if self.cdm.device.type == DeviceTypes.CHROME if "common_privacy_cert" in dir self.cdm else if "common_privacy_cert" not in dir self.cdm:
# For Chrome we will use NetflixID cookies
self.userauthdata = UserAuthentication.NetflixIDCookies(
netflixid=self.session.cookies.get_dict()["NetflixId"],
securenetflixid=self.session.cookies.get_dict()["SecureNetflixId"]
)
else:
if not self.credentials:
raise self.log.exit(" - Credentials are required for Android CDMs, and none were provided.")
# need to get cookies via an android-like way
# outdated
# self.android_login(credentials.username, credentials.password)
# need to use EmailPassword for userauthdata, it specifically checks for this
self.userauthdata = UserAuthentication.EmailPassword(
email=self.credentials.username,
password=self.credentials.password
)
self.react_context = self.get_react_context()
def get_profiles(self):
@ -502,120 +537,98 @@ class Netflix(BaseService):
return jsonpickle.decode(fd.read())
def get_metadata(self, title_id):
"""
Obtain Metadata information about a title by it's ID.
:param title_id: Title's ID.
:returns: Title Metadata.
"""
# We use the correct DRM system based on your configuration
drm_system = "playready" if self.use_playready else "widevine"
self.log.info(f" + Requesting metadata with DRM system: {drm_system}")
"""
# Wip non-working code for the newer shakti metadata replacement
metadata = self.session.post(
url=self.config["endpoints"]["website"].format(
build_id=self.react_context["serverDefs"]["data"]["BUILD_IDENTIFIER"]
),
params={
# features
"webp": self.react_context["browserInfo"]["data"]["features"]["webp"],
"drmSystem": self.config["configuration"]["drm_system"],
# truths
"isVolatileBillboardsEnabled": self.react_context["truths"]["data"]["volatileBillboardsEnabled"],
"routeAPIRequestsThroughFTL": self.react_context["truths"]["data"]["routeAPIRequestsThroughFTL"],
"isTop10Supported": self.react_context["truths"]["data"]["isTop10Supported"],
"categoryCraversEnabled": self.react_context["truths"]["data"]["categoryCraversEnabled"],
"hasVideoMerchInBob": self.react_context["truths"]["data"]["hasVideoMerchInBob"],
"persoInfoDensity": self.react_context["truths"]["data"]["enablePersoInfoDensityToggle"],
"contextAwareImages": self.react_context["truths"]["data"]["contextAwareImages"],
# ?
"falcor_server": "0.1.0",
"withSize": True,
"materialize": True,
"original_path": quote_plus(
f"/shakti/{self.react_context['serverDefs']['data']['BUILD_IDENTIFIER']}/pathEvaluator"
)
},
headers=dict(
**self.react_context["abContext"]["data"]["headers"],
**{
"X-Netflix.Client.Request.Name": "ui/falcorUnclassified",
"X-Netflix.esn": self.react_context["esnGeneratorModel"]["data"]["esn"],
"x-netflix.nq.stack": self.react_context["serverDefs"]["data"]["stack"],
"x-netflix.request.client.user.guid": (
self.react_context["memberContext"]["data"]["userInfo"]["guid"]
)
},
**self.react_context["requestHeaders"]["data"]
),
data={
"path": json.dumps([
[
"videos",
70155547,
[
"bobSupplementalMessage",
"bobSupplementalMessageIcon",
"bookmarkPosition",
"delivery",
"displayRuntime",
"evidence",
"hasSensitiveMetadata",
"interactiveBookmark",
"maturity",
"numSeasonsLabel",
"promoVideo",
"releaseYear",
"seasonCount",
"title",
"userRating",
"userRatingRequestId",
"watched"
]
],
[
"videos",
70155547,
"seasonList",
"current",
"summary"
]
]),
"authURL": self.react_context["memberContext"]["data"]["userInfo"]["authURL"]
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.90 Safari/537.36',
'Accept': '*/*',
'Accept-Language': 'en-US,en;q=0.9',
'Accept-Encoding': 'gzip, deflate, br',
'Referer': 'https://www.netflix.com/browse',
'Connection': 'keep-alive'
}
)
print(metadata.headers)
print(metadata.text)
exit()
"""
# Let's add some more headers that might help
if "NetflixId" in self.session.cookies:
netflix_id = self.session.cookies["NetflixId"]
headers["X-Netflix-Id"] = netflix_id
try:
# First let's try without specific headers
metadata = self.session.get(
self.config["endpoints"]["metadata"].format(build_id=self.react_context['serverDefs']['data']['BUILD_IDENTIFIER']),
params={
"movieid": title_id,
"drmSystem": self.config["configuration"]["drm_system"],
"drmSystem": drm_system,
"isWatchlistEnabled": False,
"isShortformEnabled": False,
"isVolatileBillboardsEnabled": self.react_context["truths"]["data"]["volatileBillboardsEnabled"],
"languages": self.meta_lang
}
},
headers=headers,
timeout=30
).json()
except requests.HTTPError as e:
if e.response.status_code == 500:
self.log.warning(
" - Recieved a HTTP 500 error while getting metadata, deleting cached reactContext data"
)
os.unlink(self.get_cache("web_data.json"))
return self.get_metadata(self, title_id)
raise
except json.JSONDecodeError:
raise self.log.exit(" - Failed to get metadata, title might not be available in your region.")
else:
if "status" in metadata and metadata["status"] == "error":
raise self.log.exit(
f" - Failed to get metadata, cookies might be expired. ({metadata['message']})"
)
raise Exception(f"Failed to get metadata: {metadata['message']}")
return metadata
except Exception as e:
self.log.warning(f" - Failed to get metadata: {e}")
# Updating the reactContext might help
self.log.info(" + Refreshing React context and retrying")
try:
os.unlink(self.get_cache("web_data.json"))
self.react_context = self.get_react_context()
# Try again with the new context
metadata = self.session.get(
self.config["endpoints"]["metadata"].format(build_id=self.react_context['serverDefs']['data']['BUILD_IDENTIFIER']),
params={
"movieid": title_id,
"drmSystem": drm_system,
"isWatchlistEnabled": False,
"isShortformEnabled": False,
"isVolatileBillboardsEnabled": self.react_context["truths"]["data"]["volatileBillboardsEnabled"],
"languages": self.meta_lang
},
headers=headers,
timeout=30
).json()
if "status" in metadata and metadata["status"] == "error":
raise self.log.exit(f" - Failed to get metadata: {metadata['message']}")
return metadata
except Exception as e2:
self.log.error(f" - Second attempt failed: {e2}")
# Last resort: try not to use reactContext directly
try:
# Use a hardcoded URL for metadata
metadata = self.session.get(
"https://www.netflix.com/nq/website/memberapi/vb53f341a/metadata",
params={
"movieid": title_id,
"drmSystem": drm_system,
"isWatchlistEnabled": False,
"isShortformEnabled": False,
"isVolatileBillboardsEnabled": True,
"languages": self.meta_lang
},
headers=headers,
timeout=30
).json()
if "status" in metadata and metadata["status"] == "error":
raise self.log.exit(f" - Failed to get metadata: {metadata['message']}")
return metadata
except Exception:
raise self.log.exit(" - All attempts to get metadata failed, title might not be available in your region, cookies might be expired, or there's a network issue.")
def get_manifest(self, title, video_profiles):
if isinstance(video_profiles, dict):
@ -638,34 +651,37 @@ class Netflix(BaseService):
))))
self.log.debug("Profiles:\n\t" + "\n\t".join(profiles))
params = {}
if self.cdm.device.type == DeviceTypes.CHROME if "common_privacy_cert" in dir self.cdm else if "common_privacy_cert" not in dir self.cdm:
params = {
"reqAttempt": 1,
"reqPriority": 0,
"reqName": "prefetch/manifest.xml",
"reqName": "prefetch/manifest",
"clienttype": "akira",
}
# Determines the drm type to use
drm_type = "playready" if self.use_playready else "widevine"
drm_version = self.config["configuration"]["drm_version"]
manifest_data = {
'version': 2,
'url': '/manifest.xml',
'url': '/manifest',
"id": int(time.time()),
"esn": self.esn,
'languages': ['en-US'],
'uiVersion': 'shakti-va3fd86e3',
'uiVersion': self.react_context["serverDefs"]["data"]["uiVersion"],
'clientVersion': '6.0041.930.911',
'params':{'type': 'standard',
'params':{
'type': 'standard',
'viewableId': title.service_data.get("episodeId", title.service_data["id"]),
'profiles': profiles,
'flavor': 'STANDARD',
"drmType": self.config["configuration"]["drm_system"],
"drmVersion": self.config["configuration"]["drm_version"],
"drmType": drm_type,
"drmVersion": drm_version,
'usePsshBox': True,
'isBranching': False,
'useHttpsStreams': False,
'imageSubtitleHeight': 1080,
'uiVersion': 'shakti-va3fd86e3',
'uiVersion': self.react_context["serverDefs"]["data"]["uiVersion"],
'clientVersion': '6.0041.930.911',
'platform': '113.0.1774',
'supportsPreReleasePin': True,
@ -673,7 +689,6 @@ class Netflix(BaseService):
'showAllSubDubTracks': True,
'titleSpecificData': {},
"videoOutputInfo": [{
# todo ; make this return valid, but "secure" values, maybe it helps
"type": "DigitalVideoOutputDescriptor",
"outputType": "unknown",
"supportedHdcpVersions": self.config["configuration"]["supported_hdcp_versions"],
@ -692,11 +707,21 @@ class Netflix(BaseService):
}
}
# Add specific parameters for PlayReady if needed
if self.use_playready:
# Add specific parameters for the PlayReady manifest request
manifest_data['params'].update({
'usePsshBox': True,
'useHttpsStreams': False,
'device_type': 'CHROME',
'device_version': '27175', # Use the Chrome version reported in the logs
})
self.log.debug(manifest_data)
try:
_, payload_chunks = self.msl.send_message(
endpoint=self.config["endpoints"]["manifest.xml"],
endpoint=self.config["endpoints"]["manifest"],
params=params,
application_data=manifest_data,
userauthdata=self.userauthdata
@ -704,6 +729,22 @@ class Netflix(BaseService):
if "errorDetails" in payload_chunks:
raise Exception(f"Manifest call failed: {payload_chunks['errorDetails']}")
return payload_chunks
except Exception as e:
self.log.error(f"Failed to get manifest: {e}")
# If the error is related to PSSH box, try changing the settings
if self.use_playready and "pssh" in str(e).lower():
self.log.info("Retrying with different PSSH settings")
manifest_data['params']['usePsshBox'] = False
_, payload_chunks = self.msl.send_message(
endpoint=self.config["endpoints"]["manifest"],
params=params,
application_data=manifest_data,
userauthdata=self.userauthdata
)
if "errorDetails" in payload_chunks:
raise Exception(f"Manifest call failed: {payload_chunks['errorDetails']}")
return payload_chunks
raise
def manifest_as_tracks(self, manifest):
# filter audio_tracks so that each stream is an entry instead of each track
@ -711,9 +752,22 @@ class Netflix(BaseService):
[dict(t, **d) for d in t["streams"]]
for t in manifest["audio_tracks"]
] for x in y]
return Tracks(
# VIDEO
[VideoTrack(
# Video tracks
video_tracks = []
for x in manifest["video_tracks"][0]["streams"]:
# Special handling for PlayReady PSSH
pssh_data = None
if x["isDrm"]:
raw_pssh = base64.b64decode(manifest["video_tracks"][0]["drmHeader"]["bytes"])
# Check if it is a PlayReady PSSH (contains WRM HEADER in UTF-16LE)
if b'<\x00W\x00R\x00M\x00H\x00E\x00A\x00D\x00E\x00R\x00' in raw_pssh:
psshPR = True
else:
psshPR = False
video_tracks.append(VideoTrack(
id_=x["downloadable_id"],
source=self.ALIASES[0],
url=x["urls"][0]["url"],
@ -728,11 +782,13 @@ class Netflix(BaseService):
needs_repack=False,
# decryption
encrypted=x["isDrm"],
psshPR=manifest["video_tracks"][0]["drmHeader"]["bytes"] if x["isDrm"] else None,
psshPR=base64.b64decode(manifest["video_tracks"][0]["drmHeader"]["bytes"]) if psshPR else None,
psshWV=base64.b64decode(manifest["video_tracks"][0]["drmHeader"]["bytes"]) if not psshPR else None,
kid=x["drmHeaderId"] if x["isDrm"] else None,
) for x in manifest["video_tracks"][0]["streams"]],
# AUDIO
[AudioTrack(
))
# Audio e sottotitoli rimangono invariati
audio_tracks = [AudioTrack(
id_=x["downloadable_id"],
source=self.ALIASES[0],
url=x["urls"][0]["url"],
@ -747,11 +803,12 @@ class Netflix(BaseService):
needs_repack=False,
# decryption
encrypted=x["isDrm"],
psshPR=x["drmHeader"]["bytes"] if x["isDrm"] else None,
kid=x.get("drmHeaderId") if x["isDrm"] else None, # TODO: haven't seen enc audio, needs testing
) for x in manifest["audio_tracks"]],
# SUBTITLE
[TextTrack(
psshPR=base64.b64decode(manifest["video_tracks"][0]["drmHeader"]["bytes"]) if psshPR else None,
psshWV=base64.b64decode(manifest["video_tracks"][0]["drmHeader"]["bytes"]) if not psshPR else None,
kid=x.get("drmHeaderId") if x["isDrm"] else None,
) for x in manifest["audio_tracks"]]
subtitle_tracks = [TextTrack(
id_=list(x["downloadableIds"].values())[0],
source=self.ALIASES[0],
url=next(iter(next(iter(x["ttDownloadables"].values()))["downloadUrls"].values())),
@ -764,7 +821,8 @@ class Netflix(BaseService):
# text track options
sdh=x["rawTrackType"] == "closedcaptions"
) for x in manifest["timedtexttracks"] if not x["isNoneTrack"]]
)
return Tracks(video_tracks, audio_tracks, subtitle_tracks)
@staticmethod
def get_original_language(manifest):

View File

@ -24,7 +24,10 @@ from vinetrimmer.utils.MSL.MSLKeys import MSLKeys
from vinetrimmer.utils.MSL.schemes import EntityAuthenticationSchemes # noqa: F401
from vinetrimmer.utils.MSL.schemes import KeyExchangeSchemes
from vinetrimmer.utils.MSL.schemes.EntityAuthentication import EntityAuthentication
from vinetrimmer.utils.MSL.schemes.PlayReadyKeyExchangeScheme import PlayReady as PlayReadyScheme
from vinetrimmer.utils.MSL.schemes.KeyExchangeRequest import KeyExchangeRequest
from vinetrimmer.utils.widevine.device import RemoteDevice
from vinetrimmer.utils.gen_esn import chrome_esn_generator, android_esn_generator, playready_esn_generator
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.poolmanager import PoolManager
from requests.packages.urllib3.util import ssl_
@ -62,9 +65,23 @@ class MSL:
self.message_id = message_id
@classmethod
def handshake(cls, scheme, session, endpoint, sender, cdm=None, msl_keys_path=None):
def handshake(cls, scheme, session, endpoint, sender=None, cdm=None, msl_keys_path=None):
session.verify = False
message_id = random.randint(0, pow(2, 52))
msl_keys = MSL.load_cache_data(msl_keys_path)
# Genera automaticamente l'ESN se non è specificato
if not sender:
if scheme == KeyExchangeSchemes.Widevine:
sender = android_esn_generator()
cls.log.info(f"Generated Widevine ESN: {sender}")
elif scheme == KeyExchangeSchemes.PlayReady:
sender = playready_esn_generator()
cls.log.info(f"Generated PlayReady ESN: {sender}")
else: # AsymmetricWrapped o altri schemi
sender = chrome_esn_generator()
cls.log.info(f"Generated Chrome ESN: {sender}")
if msl_keys is not None:
cls.log.info("Using cached MSL data")
else:
@ -78,6 +95,14 @@ class MSL:
if not msl_keys_path:
raise cls.log.exit("- No cached data and no MSL key path specified")
# Determine entity authentication scheme based on key exchange scheme
entity_auth_scheme = EntityAuthenticationSchemes.Unauthenticated
if scheme == KeyExchangeSchemes.Widevine:
entity_auth_scheme = EntityAuthenticationSchemes.Widevine
elif scheme == KeyExchangeSchemes.PlayReady:
entity_auth_scheme = EntityAuthenticationSchemes.PlayReady
# Key request data generation
if scheme == KeyExchangeSchemes.Widevine:
msl_keys.cdm_session = cdm.open(
pssh=b"\x0A\x7A\x00\x6C\x38\x2B",
@ -87,6 +112,13 @@ class MSL:
keyrequestdata = KeyExchangeRequest.Widevine(
keyrequest=cdm.get_license_challenge(msl_keys.cdm_session)
)
elif scheme == KeyExchangeSchemes.PlayReady:
# Per PlayReady, usiamo comunque AsymmetricWrapped per lo scambio chiavi
keyrequestdata = KeyExchangeRequest.AsymmetricWrapped(
keypairid="superKeyPair",
mechanism="JWK_RSA",
publickey=msl_keys.rsa.publickey().exportKey(format="DER")
)
else:
keyrequestdata = KeyExchangeRequest.AsymmetricWrapped(
keypairid="superKeyPair",
@ -94,8 +126,28 @@ class MSL:
publickey=msl_keys.rsa.publickey().exportKey(format="DER")
)
# Use the appropriate entity authentication based on scheme
if scheme == KeyExchangeSchemes.Widevine:
entityauthdata = EntityAuthentication.Widevine(
devtype="ANDROID",
keyrequest=base64.b64encode(b"\x0A\x7A\x00\x6C\x38\x2B").decode()
)
elif scheme == KeyExchangeSchemes.PlayReady:
# Prova a creare l'entità PlayReady senza parametri - potrebbe avere un costruttore che prende implicitamente i dati necessari
try:
entityauthdata = EntityAuthentication.PlayReady() # Senza parametri
cls.log.info("Created PlayReady entity authentication without parameters")
except TypeError as e:
# Se fallisce, passa subito ad AsymmetricWrapped
cls.log.warning(f"Failed to create PlayReady entity auth: {e}")
cls.log.info("Falling back to AsymmetricWrapped")
scheme = KeyExchangeSchemes.AsymmetricWrapped
entityauthdata = EntityAuthentication.Unauthenticated(sender)
else:
entityauthdata = EntityAuthentication.Unauthenticated(sender)
data = jsonpickle.encode({
"entityauthdata": EntityAuthentication.Unauthenticated(sender),
"entityauthdata": entityauthdata,
"headerdata": base64.b64encode(MSL.generate_msg_header(
message_id=message_id,
sender=sender,
@ -117,7 +169,11 @@ class MSL:
try:
r = session.post(
url=endpoint,
data=data
data=data,
headers={
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.149 Safari/537.36",
"Content-Type": "application/json"
}
)
except requests.HTTPError as e:
raise cls.log.exit(f"- Key exchange failed, response data is unexpected: {e.response.text}")
@ -138,7 +194,7 @@ class MSL:
key_data = key_response_data["keydata"]
if scheme == KeyExchangeSchemes.Widevine:
if "Remote" in cdm.device.__class__.__name__:
if isinstance(cdm.device, RemoteDevice):
msl_keys.encryption, msl_keys.sign = cdm.device.exchange(
cdm.sessions[msl_keys.cdm_session],
license_res=key_data["cdmkeyresponse"],
@ -159,6 +215,24 @@ class MSL:
keys=keys,
permissions=["AllowSign", "AllowSignatureVerify"]
)
elif scheme == KeyExchangeSchemes.PlayReady:
# Gestione migliorata delle chiavi per PlayReady (emulando Chrome)
cipher_rsa = PKCS1_OAEP.new(msl_keys.rsa)
try:
# Decodifica le chiavi di crittografia e firma
msl_keys.encryption = MSL.base64key_decode(
json.JSONDecoder().decode(cipher_rsa.decrypt(
base64.b64decode(key_data["encryptionkey"])
).decode("utf-8"))["k"]
)
msl_keys.sign = MSL.base64key_decode(
json.JSONDecoder().decode(cipher_rsa.decrypt(
base64.b64decode(key_data["hmackey"])
).decode("utf-8"))["k"]
)
cls.log.info("PlayReady key exchange successful (Chrome emulation)")
except Exception as e:
raise cls.log.exit(f"- PlayReady key decryption failed: {str(e)}")
else:
cipher_rsa = PKCS1_OAEP.new(msl_keys.rsa)
msl_keys.encryption = MSL.base64key_decode(
@ -269,7 +343,7 @@ class MSL:
def send_message(self, endpoint, params, application_data, userauthdata=None):
message = self.create_message(application_data, userauthdata)
res = self.session.post(url=endpoint, data=message, params=params)
res = self.session.post(url=endpoint, data=message, params=params, verify=False)
header, payload_data = self.parse_message(res.text)
if "errordata" in header:
raise self.log.exit(

View File

@ -57,3 +57,14 @@ class EntityAuthentication(MSLObject):
"keyrequest": keyrequest
}
)
@classmethod
def PlayReady(cls, identity):
"""
The PlayReady entity authentication scheme is similar to the Unauthenticated scheme
but specifically identifies the entity as a PlayReady-compatible device.
"""
return cls(
scheme=EntityAuthenticationSchemes.PlayReady,
authdata={"identity": identity}
)

View File

@ -78,3 +78,17 @@ class KeyExchangeRequest(MSLObject):
scheme=KeyExchangeSchemes.Widevine,
keydata={"keyrequest": keyrequest}
)
@classmethod
def PlayReady(cls, keyrequest):
"""
PlayReady key exchange for MSL.
:param keyrequest: Base64-encoded PlayReady license challenge
"""
if not isinstance(keyrequest, str):
keyrequest = base64.b64encode(keyrequest).decode()
return cls(
scheme=KeyExchangeSchemes.PlayReady,
keydata={"keyrequest": keyrequest}
)

View File

@ -0,0 +1,111 @@
import base64
import json
import os
from Cryptodome.Cipher import AES, PKCS1_OAEP
from Cryptodome.Hash import HMAC, SHA256
from Cryptodome.Random import get_random_bytes
from Cryptodome.Util import Padding
from vinetrimmer.utils.MSL.schemes.KeyExchangeRequest import KeyExchangeRequest
class PlayReady(KeyExchangeRequest):
"""
Implementation of the PlayReady Key Exchange Scheme for MSL.
"""
def __init__(self):
self.encryption_key = None
self.sign_key = None
self.sender = None
def perform_key_exchange(self, session, endpoint, sender, cdm):
"""
Performs a key exchange using PlayReady.
Parameters:
session: HTTP session with necessary cookies
endpoint: Endpoint for key exchange
sender: ESN of the device
cdm: CDM instance
"""
self.sender = sender
# Generate random keys for encryption and signing
self.encryption_key = get_random_bytes(16) # AES-128
self.sign_key = get_random_bytes(32) # HMAC-SHA256
# Return keys in the format required by MSL
return {
"encryptionkey": base64.b64encode(self.encryption_key).decode("utf-8"),
"hmackey": base64.b64encode(self.sign_key).decode("utf-8")
}
def encrypt(self, data, encryption_envelope=None):
"""
Encrypts data using the encryption key.
Parameters:
data: Data to encrypt
encryption_envelope: Not used in PlayReady
"""
if not self.encryption_key:
raise ValueError("No encryption key available")
# Generate a random IV
iv = get_random_bytes(16)
# Encrypt the data
cipher = AES.new(self.encryption_key, AES.MODE_CBC, iv)
ciphertext = cipher.encrypt(Padding.pad(data.encode("utf-8"), 16))
# Return encrypted data in the format required by MSL
return {
"keyid": self.sender,
"iv": base64.b64encode(iv).decode("utf-8"),
"ciphertext": base64.b64encode(ciphertext).decode("utf-8")
}
def decrypt(self, data):
"""
Decrypts data using the encryption key.
Parameters:
data: Encrypted data (with iv and ciphertext)
"""
if not self.encryption_key:
raise ValueError("No encryption key available")
# Decode IV and ciphertext
iv = base64.b64decode(data["iv"])
ciphertext = base64.b64decode(data["ciphertext"])
# Decrypt the data
cipher = AES.new(self.encryption_key, AES.MODE_CBC, iv)
plaintext = Padding.unpad(cipher.decrypt(ciphertext), 16)
return plaintext
def sign(self, data):
"""
Signs data using the sign key.
Parameters:
data: Data to sign
"""
if not self.sign_key:
raise ValueError("No sign key available")
# Sign the data with HMAC-SHA256
signature = HMAC.new(self.sign_key, data.encode("utf-8"), SHA256).digest()
return base64.b64encode(signature)
def verify(self, data, signature):
"""
Verifies a signature.
Parameters:
data: Data that was signed
signature: Signature to verify
"""
expected_signature = self.sign(data)
return signature == expected_signature.decode("utf-8")

View File

@ -10,15 +10,15 @@ class EntityAuthenticationSchemes(Scheme):
"""https://github.com/Netflix/msl/wiki/Entity-Authentication-%28Configuration%29"""
Unauthenticated = "NONE"
Widevine = "WIDEVINE"
PlayReady = "PLAYREADY" # Aggiungi questa riga
class UserAuthenticationSchemes(Scheme):
"""https://github.com/Netflix/msl/wiki/User-Authentication-%28Configuration%29"""
EmailPassword = "EMAIL_PASSWORD"
NetflixIDCookies = "NETFLIXID"
class KeyExchangeSchemes(Scheme):
"""https://github.com/Netflix/msl/wiki/Key-Exchange-%28Configuration%29"""
AsymmetricWrapped = "ASYMMETRIC_WRAPPED"
Widevine = "WIDEVINE"
PlayReady = "PLAYREADY"