MA + NF changes

This commit is contained in:
chu23465 2025-05-12 14:41:21 +05:30
parent 7766581007
commit 3910a39571
10 changed files with 730 additions and 620 deletions

Binary file not shown.

Binary file not shown.

View File

@ -2,12 +2,20 @@ import base64
import json import json
import click import click
import re import re
import requests
from requests import JSONDecodeError from requests import JSONDecodeError
from httpx import URL from httpx import URL
import uuid import uuid
import xmltodict import xmltodict
import struct
import binascii
import os
import yt_dlp
from pathlib import Path
import uuid
import xml.etree.ElementTree as ET
import time import time
from datetime import datetime from datetime import datetime
from langcodes import Language from langcodes import Language
from vinetrimmer.objects import AudioTrack, TextTrack, Title, Tracks, VideoTrack from vinetrimmer.objects import AudioTrack, TextTrack, Title, Tracks, VideoTrack
@ -16,232 +24,328 @@ from vinetrimmer.vendor.pymp4.parser import Box
class MoviesAnywhere(BaseService): class MoviesAnywhere(BaseService):
""" """
Service code for US' streaming service MoviesAnywhere (https://moviesanywhere.com). Service code for US' streaming service MoviesAnywhere (https://moviesanywhere.com).
\b \b
Authorization: Cookies Authorization: Cookies
Security: SD-HD@L3, FHD SDR@L1 (any active device), FHD-UHD HDR-DV@L1 (whitelisted devices). Security: SD-HD@L3, FHD SDR@L1 (any active device), FHD-UHD HDR-DV@L1 (whitelisted devices).
NOTE: Can be accessed from any region, it does not seem to care. NOTE: Can be accessed from any region, it does not seem to care.
Accounts can only mount services when its US based though. Accounts can only mount services when its US based though.
""" """
ALIASES = ["MA", "moviesanywhere"] ALIASES = ["MA", "MoviesAnywhere"]
TITLE_RE = r"https://moviesanywhere\.com(?P<id>.+)" TITLE_RE = r"https://moviesanywhere\.com(?P<id>.+)"
VIDEO_CODEC_MAP = { VIDEO_CODEC_MAP = {
"H264": ["avc"], "H264": ["avc"],
"H265": ["hvc", "hev", "dvh"] "H265": ["hvc", "hev", "dvh"]
} }
AUDIO_CODEC_MAP = { AUDIO_CODEC_MAP = {
"AAC": ["mp4a", "HE", "stereo"], "AAC": ["mp4a", "HE", "stereo"],
"AC3": ["ac3"], "AC3": ["ac3"],
"EC3": ["ec3", "atmos"] "EC3": ["ec3", "atmos"]
} }
@staticmethod @staticmethod
@click.command(name="MoviesAnywhere", short_help="https://moviesanywhere.com") @click.command(name="MoviesAnywhere", short_help="https://moviesanywhere.com")
@click.argument("title", type=str) @click.argument("title", type=str)
@click.pass_context @click.pass_context
def cli(ctx, **kwargs): def cli(ctx, **kwargs):
return MoviesAnywhere(ctx, **kwargs) return MoviesAnywhere(ctx, **kwargs)
def __init__(self, ctx, title): def __init__(self, ctx, title):
super().__init__(ctx) super().__init__(ctx)
self.parse_title(ctx, title) self.parse_title(ctx, title)
self.configure() self.configure()
self.playready = True if "certificate_chain" in dir(ctx.obj.cdm) else False #ctx.obj.cdm.device.type == LocalDevice.Types.PLAYREADY self.playready = True if "certificate_chain" in dir(ctx.obj.cdm) else False #ctx.obj.cdm.device.type == LocalDevice.Types.PLAYREADY
self.atmos = ctx.parent.params["atmos"] self.atmos = ctx.parent.params["atmos"]
self.vcodec = ctx.parent.params["vcodec"] self.vcodec = ctx.parent.params["vcodec"]
self.acodec = ctx.parent.params["acodec"] self.acodec = ctx.parent.params["acodec"]
self.range = ctx.parent.params["range_"]
self.quality = ctx.parent.params["quality"] or 1080
def get_titles(self): if self.range != "SDR" or self.quality > 1080:
self.headers={ self.log.info(" + Setting VideoCodec to H265")
"authorization": f"Bearer {self.access_token}", self.vcodec = "H265"
"install-id": self.install_id,
}
res = self.session.post(
url="https://gateway.moviesanywhere.com/graphql",
json={
"platform": "web",
"variables": {"slug": self.title}, # Does not seem to care which platform will be used to give the best tracks available
"extensions": '{"persistedQuery":{"sha256Hash":"5cb001491262214406acf8237ea2b8b46ca6dbcf37e70e791761402f4f74336e","version":1}}', # ONE_GRAPH_PERSIST_QUERY_TOKEN
},
headers={
"authorization": f"Bearer {self.access_token}",
"install-id": self.install_id,
}
)
try: def get_titles(self):
self.content = res.json() self.headers={
except JSONDecodeError: "authorization": f"Bearer {self.access_token}",
self.log.exit(" - Not able to return title information") "install-id": self.install_id,
}
res = self.session.post(
url="https://gateway.moviesanywhere.com/graphql",
json={
"platform": "web",
"variables": {"slug": self.title}, # Does not seem to care which platform will be used to give the best tracks available
"extensions": '{"persistedQuery":{"sha256Hash":"5cb001491262214406acf8237ea2b8b46ca6dbcf37e70e791761402f4f74336e","version":1}}', # ONE_GRAPH_PERSIST_QUERY_TOKEN
},
headers={
"authorization": f"Bearer {self.access_token}",
"install-id": self.install_id,
}
)
title_data = self.content["data"]["page"] try:
self.content = res.json()
except JSONDecodeError:
self.log.exit(" - Not able to return title information")
title_info = [ title_data = self.content["data"]["page"]
x
for x in title_data["components"]
if x["__typename"] == "MovieMarqueeComponent"
][0]
title_info["title"] = re.sub(r" \(.+?\)", "", title_info["title"]) title_info = [
x
for x in title_data["components"]
if x["__typename"] == "MovieMarqueeComponent"
][0]
title_data = self.content["data"]["page"] title_info["title"] = re.sub(r" \(.+?\)", "", title_info["title"])
try:
Id = title_data["components"][0]["mainAction"]["playerData"]["playable"]["id"]
except KeyError:
self.log.exit(" - Account does not seem to own this title")
return Title( title_data = self.content["data"]["page"]
id_=Id, try:
type_=Title.Types.MOVIE, Id = title_data["components"][0]["mainAction"]["playerData"]["playable"]["id"]
name=title_info["title"], except KeyError:
year=title_info["year"], self.log.exit(" - Account does not seem to own this title")
original_lang="en",
source=self.ALIASES[0],
service_data=title_data,
)
def get_pssh_init(self, url): return Title(
import os, yt_dlp id_=Id,
from pathlib import Path type_=Title.Types.MOVIE,
init = 'init.mp4' name=title_info["title"],
year=title_info["year"],
original_lang="en",
source=self.ALIASES[0],
service_data=title_data,
)
files_to_delete = [init] def get_pssh_init(self, url):
for file_name in files_to_delete: init = 'init.mp4'
if os.path.exists(file_name):
os.remove(file_name)
def read_pssh(path: str): files_to_delete = [init]
raw = Path(path).read_bytes() for file_name in files_to_delete:
wv = raw.rfind(bytes.fromhex('edef8ba979d64acea3c827dcd51d21ed')) if os.path.exists(file_name):
if wv == -1: return None os.remove(file_name)
return base64.b64encode(raw[wv-12:wv-12+raw[wv-9]]).decode('utf-8')
ydl_opts = { ydl_opts = {
'format': 'bestvideo[ext=mp4]/bestaudio[ext=m4a]/best', 'format': 'bestvideo[ext=mp4]/bestaudio[ext=m4a]/best',
'allow_unplayable_formats': True, 'allow_unplayable_formats': True,
'user_agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3', 'user_agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3',
'no_warnings': True, 'no_warnings': True,
'quiet': True, 'quiet': True,
'outtmpl': init, 'outtmpl': init,
'no_merge': True, 'no_merge': True,
'test': True, 'test': True,
} }
with yt_dlp.YoutubeDL(ydl_opts) as ydl: with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info_dict = ydl.extract_info(url, download=True) info_dict = ydl.extract_info(url, download=True)
url = info_dict.get("url", None) url = info_dict.get("url", None)
if url is None: if url is None:
raise ValueError("Failed to download the video") raise ValueError("Failed to download the video")
video_file_name = ydl.prepare_filename(info_dict) video_file_name = ydl.prepare_filename(info_dict)
pssh = read_pssh(init) raw = Path(init).read_bytes()
wv = raw.rfind(bytes.fromhex('edef8ba979d64acea3c827dcd51d21ed'))
if wv != -1:
psshWV = base64.b64encode(raw[wv-12:wv-12+raw[wv-9]]).decode('utf-8')
for file_name in files_to_delete: playready_system_id = binascii.unhexlify("9A04F07998404286AB92E65BE0885F95")
if os.path.exists(file_name): pssh_boxes = []
os.remove(file_name) mp4_file = "init.mp4"
return pssh
def get_tracks(self, title): with open(mp4_file, "rb") as f:
player_data = self.content["data"]["page"]["components"][0]["mainAction"]["playerData"]["playable"] data = f.read()
videos = []
audios = []
for cr in player_data["videoAssets"]["dash"].values():
if not cr:
continue
for manifest in cr:
tracks = Tracks.from_mpd(
url=manifest["url"],
source=self.ALIASES[0],
session=self.session,
)
for video in tracks.videos: index = 0
pssh = self.get_pssh_init(manifest["url"]) while index < len(data):
video_pssh = Box.parse(base64.b64decode(pssh)) if index + 8 > len(data):
video.pssh = video_pssh break
video.license_url = manifest["playreadyLaUrl"] if self.playready else manifest["widevineLaUrl"]
video.contentId = URL(video.license_url).params._dict["ContentId"][
0
]
videos += [video]
# Extract Atmos audio track if available.
for audio in tracks.audios:
audio.pssh = video_pssh
audio.license_url = manifest["playreadyLaUrl"] if self.playready else manifest["widevineLaUrl"]
audio.contentId = URL(audio.license_url).params._dict["ContentId"][
0
]
if "atmos" in audio.url:
audio.atmos = True
audios += [audio]
corrected_video_list = [] box_size, box_type = struct.unpack_from(">I4s", data, index)
for res in ("uhd", "hdp", "hd", "sd"): if box_size < 8 or index + box_size > len(data):
for video in videos: break
if f"_{res}_video" not in video.url or not video.url.endswith(
f"&r={res}"
):
continue
if corrected_video_list and any( if box_type == b'moov' or box_type == b'moof':
video.id == vid.id for vid in corrected_video_list sub_index = index + 8
): while sub_index < index + box_size:
continue sub_size, sub_type = struct.unpack_from(">I4s", data, sub_index)
if sub_type == b'pssh':
system_id = data[sub_index + 12: sub_index + 28]
if system_id == playready_system_id:
pssh_data_size = struct.unpack_from(">I", data, sub_index + 28)[0]
pssh_data = data[sub_index + 32: sub_index + 32 + pssh_data_size]
pssh_boxes.append(pssh_data)
sub_index += sub_size
if "dash_hevc_hdr" in video.url: if box_type == b'pssh':
video.hdr10 = True system_id = data[index + 12: index + 28]
if "dash_hevc_dolbyvision" in video.url: if system_id == playready_system_id:
video.dv = True pssh_data_size = struct.unpack_from(">I", data, index + 28)[0]
pssh_data = data[index + 32: index + 32 + pssh_data_size]
pssh_boxes.append(pssh_data)
corrected_video_list += [video] index += box_size
tracks.add(corrected_video_list) if pssh_boxes:
tracks.audios = audios for i, pssh_data in enumerate(pssh_boxes):
tracks.videos = [x for x in tracks.videos if (x.codec or "")[:3] in self.VIDEO_CODEC_MAP[self.vcodec]] pssh_box = (
struct.pack(">I", len(pssh_data) + 32) +
b"pssh" +
struct.pack(">I", 0) +
playready_system_id +
struct.pack(">I", len(pssh_data)) +
pssh_data
)
base64_pssh = base64.b64encode(pssh_box).decode()
#print(base64_pssh)
psshPR = base64_pssh
return tracks header_offset = 6
xml_data = pssh_data[header_offset:].decode("utf-16le", errors='ignore')
xml_start = xml_data.find("<WRMHEADER")
xml_end = xml_data.find("</WRMHEADER>")
def get_chapters(self, title): if xml_start != -1 and xml_end != -1:
return [] xml_content = xml_data[xml_start:xml_end + len("</WRMHEADER>")]
xml_root = ET.fromstring(xml_content)
def certificate(self, **_): #print(ET.tostring(xml_root, encoding="utf-8").decode())
return None # will use common privacy cert else:
raise Exception("Failed to locate XML content in PSSH.")
def license(self, challenge: bytes, track: Tracks, **_) -> bytes: else:
license_message = self.session.post( raise Exception("No PlayReady PSSH boxes found.")
url=track.license_url,
data=challenge, # expects bytes
)
if "errorCode" in license_message.text:
self.log.exit(f" - Cannot complete license request: {license_message.text}")
return license_message.content
def configure(self): for file_name in files_to_delete:
access_token = None if os.path.exists(file_name):
install_id = None os.remove(file_name)
for cookie in self.cookies: return psshWV, psshPR
if cookie.name == "secure_access_token":
access_token = cookie.value
elif cookie.name == "install_id":
install_id = cookie.value
self.access_token = access_token def get_tracks(self, title):
self.install_id = install_id player_data = self.content["data"]["page"]["components"][0]["mainAction"]["playerData"]["playable"]
videos = []
audios = []
for cr in player_data["videoAssets"]["dash"].values():
if not cr:
continue
for manifest in cr:
tracks = Tracks.from_mpd(
url=manifest["url"],
source=self.ALIASES[0],
session=self.session,
)
self.session.headers.update( for video in tracks.videos:
{ psshWV, psshPR = self.get_pssh_init(manifest["url"])
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36", video.psshWV = psshWV
"Origin": "https://moviesanywhere.com", video.psshPR = psshPR
"Authorization": f"Bearer {self.access_token}", video.license_url = manifest["playreadyLaUrl"] if self.playready else manifest["widevineLaUrl"]
} video.contentId = URL(video.license_url).params._dict["ContentId"][
) 0
]
videos += [video]
# Extract Atmos audio track if available.
for audio in tracks.audios:
audio.psshWV = psshWV
audio.psshPR = psshPR
audio.license_url = manifest["playreadyLaUrl"] if self.playready else manifest["widevineLaUrl"]
audio.contentId = URL(audio.license_url).params._dict["ContentId"][
0
]
if "atmos" in audio.url:
audio.atmos = True
audios += [audio]
corrected_video_list = []
for res in ("uhd", "hdp", "hd", "sd"):
for video in videos:
if f"_{res}_video" not in video.url or not video.url.endswith(
f"&r={res}"
):
continue
if corrected_video_list and any(
video.id == vid.id for vid in corrected_video_list
):
continue
if "dash_hevc_hdr" in video.url:
video.hdr10 = True
if "dash_hevc_dolbyvision" in video.url:
video.dv = True
corrected_video_list += [video]
tracks.add(corrected_video_list)
tracks.audios = audios
tracks.videos = [x for x in tracks.videos if (x.codec or "")[:3] in self.VIDEO_CODEC_MAP[self.vcodec]]
return tracks
def get_chapters(self, title):
return []
def certificate(self, **_):
return None # will use common privacy cert
def license(self, challenge: bytes, track: Tracks, **_) -> bytes:
if not isinstance(challenge, bytes):
challenge = bytes(challenge, 'utf-8')
playback_session_id = str(uuid.uuid4())
license_message = requests.post(
headers = {
'accept': '*/*',
'accept-language': 'en-US,en;q=0.9,en-IN;q=0.8',
'cache-control': 'no-cache',
'content-type': 'application/octet-stream',
'dnt': '1',
'origin': 'https://moviesanywhere.com',
'pragma': 'no-cache',
'priority': 'u=1, i',
'referer': 'https://moviesanywhere.com/',
'sec-ch-ua-mobile': '?0',
'sec-fetch-dest': 'empty',
'sec-fetch-mode': 'cors',
'sec-fetch-site': 'same-site',
'soapaction': '"http://schemas.microsoft.com/DRM/2007/03/protocols/AcquireLicense"',
'user_agent': 'Dalvik/2.1.0 (Linux; U; Android 14; SM-S911B Build/UP1A.231005.007)',
},
params = {
"authorization": self.access_token,
"playbackSessionId": playback_session_id
},
url=track.license_url,
data=challenge, # expects bytes
)
self.log.debug(license_message.text)
if "errorCode" in license_message.text:
self.log.exit(f" - Cannot complete license request: {license_message.text}")
return license_message.content
def configure(self):
access_token = None
install_id = None
for cookie in self.cookies:
if cookie.name == "secure_access_token":
access_token = cookie.value
elif cookie.name == "install_id":
install_id = cookie.value
self.access_token = access_token
self.install_id = install_id
self.session.headers.update(
{
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36",
"Origin": "https://moviesanywhere.com",
"Authorization": f"Bearer {self.access_token}",
}
)

View File

@ -790,8 +790,8 @@ class Netflix(BaseService):
needs_repack=False, needs_repack=False,
# decryption # decryption
encrypted=x["isDrm"], encrypted=x["isDrm"],
psshPR=base64.b64decode(manifest["video_tracks"][0]["drmHeader"]["bytes"]) if psshPR else None, psshPR=manifest["video_tracks"][0]["drmHeader"]["bytes"] if psshPR else None,
psshWV=base64.b64decode(manifest["video_tracks"][0]["drmHeader"]["bytes"]) if not psshPR else None, psshWV=manifest["video_tracks"][0]["drmHeader"]["bytes"] if not psshPR else None,
kid=x["drmHeaderId"] if x["isDrm"] else None, kid=x["drmHeaderId"] if x["isDrm"] else None,
)) ))
@ -811,8 +811,8 @@ class Netflix(BaseService):
needs_repack=False, needs_repack=False,
# decryption # decryption
encrypted=x["isDrm"], encrypted=x["isDrm"],
psshPR=base64.b64decode(manifest["video_tracks"][0]["drmHeader"]["bytes"]) if psshPR else None, psshPR=manifest["video_tracks"][0]["drmHeader"]["bytes"] if psshPR else None,
psshWV=base64.b64decode(manifest["video_tracks"][0]["drmHeader"]["bytes"]) if not psshPR else None, psshWV=manifest["video_tracks"][0]["drmHeader"]["bytes"] if not psshPR else None,
kid=x.get("drmHeaderId") if x["isDrm"] else None, kid=x.get("drmHeaderId") if x["isDrm"] else None,
) for x in manifest["audio_tracks"]] ) for x in manifest["audio_tracks"]]

View File

@ -4,6 +4,7 @@ import json
import logging import logging
import os import os
import random import random
import httpx
import re import re
import sys import sys
import time import time
@ -32,468 +33,472 @@ from requests.packages.urllib3.poolmanager import PoolManager
from requests.packages.urllib3.util import ssl_ from requests.packages.urllib3.util import ssl_
class MSL: class MSL:
log = logging.getLogger("MSL") log = logging.getLogger("MSL")
def __init__(self, session, endpoint, sender, keys, message_id, user_auth=None): def __init__(self, session, endpoint, sender, keys, message_id, user_auth=None):
CIPHERS = ( CIPHERS = (
"ECDHE-RSA-AES256-GCM-SHA384:ECDHE-ECDSA-AES256-GCM-SHA384:ECDHE-RSA-AES256-SHA384:ECDHE-ECDSA-AES256-SHA384:ECDHE-RSA-AES128-GCM-SHA256:ECDHE-RSA-AES128-SHA256:AES256-SHA" "ECDHE-RSA-AES256-GCM-SHA384:ECDHE-ECDSA-AES256-GCM-SHA384:ECDHE-RSA-AES256-SHA384:ECDHE-ECDSA-AES256-SHA384:ECDHE-RSA-AES128-GCM-SHA256:ECDHE-RSA-AES128-SHA256:AES256-SHA"
) )
class TlsAdapter(HTTPAdapter): #class TlsAdapter(HTTPAdapter):
# def __init__(self, ssl_options=0, **kwargs):
def __init__(self, ssl_options=0, **kwargs): # self.ssl_options = ssl_options
self.ssl_options = ssl_options # super(TlsAdapter, self).__init__(**kwargs)
super(TlsAdapter, self).__init__(**kwargs) # def init_poolmanager(self, *pool_args, **pool_kwargs):
# ctx = ssl_.create_urllib3_context(ciphers=CIPHERS, cert_reqs=ssl.CERT_REQUIRED, options=self.ssl_options)
def init_poolmanager(self, *pool_args, **pool_kwargs): # self.poolmanager = PoolManager(*pool_args,
ctx = ssl_.create_urllib3_context(ciphers=CIPHERS, cert_reqs=ssl.CERT_REQUIRED, options=self.ssl_options) # ssl_context=ctx,
self.poolmanager = PoolManager(*pool_args, # **pool_kwargs)
ssl_context=ctx,
**pool_kwargs)
session = requests.session() #session = requests.session()
adapter = TlsAdapter(ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1) #adapter = TlsAdapter(ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1)
session.mount("https://", adapter) #session.mount("https://", adapter)
self.session = session session = httpx.Client(
self.endpoint = endpoint params=session.params,
self.sender = sender headers=session.headers,
self.keys = keys cookies=session.cookies,
self.user_auth = user_auth verify=True
self.message_id = message_id )
self.session = session
self.endpoint = endpoint
self.sender = sender
self.keys = keys
self.user_auth = user_auth
self.message_id = message_id
@classmethod @classmethod
def handshake(cls, scheme, session, endpoint, sender=None, cdm=None, msl_keys_path=None): def handshake(cls, scheme, session, endpoint, sender=None, cdm=None, msl_keys_path=None):
session.verify = False session.verify = False
message_id = random.randint(0, pow(2, 52)) message_id = random.randint(0, pow(2, 52))
msl_keys = MSL.load_cache_data(msl_keys_path) msl_keys = MSL.load_cache_data(msl_keys_path)
# Genera automaticamente l'ESN se non è specificato # Genera automaticamente l'ESN se non è specificato
if not sender: if not sender:
if scheme == KeyExchangeSchemes.Widevine: if scheme == KeyExchangeSchemes.Widevine:
sender = android_esn_generator() sender = android_esn_generator()
cls.log.info(f"Generated Widevine ESN: {sender}") cls.log.info(f"Generated Widevine ESN: {sender}")
elif scheme == KeyExchangeSchemes.PlayReady: elif scheme == KeyExchangeSchemes.PlayReady:
sender = playready_esn_generator() sender = playready_esn_generator()
cls.log.info(f"Generated PlayReady ESN: {sender}") cls.log.info(f"Generated PlayReady ESN: {sender}")
else: # AsymmetricWrapped o altri schemi else: # AsymmetricWrapped o altri schemi
sender = chrome_esn_generator() sender = chrome_esn_generator()
cls.log.info(f"Generated Chrome ESN: {sender}") cls.log.info(f"Generated Chrome ESN: {sender}")
if msl_keys is not None: if msl_keys is not None:
cls.log.info("Using cached MSL data") cls.log.info("Using cached MSL data")
else: else:
msl_keys = MSLKeys() msl_keys = MSLKeys()
if scheme != KeyExchangeSchemes.Widevine: if scheme != KeyExchangeSchemes.Widevine:
msl_keys.rsa = RSA.generate(2048) msl_keys.rsa = RSA.generate(2048)
if not cdm: if not cdm:
raise cls.log.exit("- No cached data and no CDM specified") raise cls.log.exit("- No cached data and no CDM specified")
if not msl_keys_path: if not msl_keys_path:
raise cls.log.exit("- No cached data and no MSL key path specified") raise cls.log.exit("- No cached data and no MSL key path specified")
# Determine entity authentication scheme based on key exchange scheme # Determine entity authentication scheme based on key exchange scheme
entity_auth_scheme = EntityAuthenticationSchemes.Unauthenticated entity_auth_scheme = EntityAuthenticationSchemes.Unauthenticated
if scheme == KeyExchangeSchemes.Widevine: if scheme == KeyExchangeSchemes.Widevine:
entity_auth_scheme = EntityAuthenticationSchemes.Widevine entity_auth_scheme = EntityAuthenticationSchemes.Widevine
elif scheme == KeyExchangeSchemes.PlayReady: elif scheme == KeyExchangeSchemes.PlayReady:
entity_auth_scheme = EntityAuthenticationSchemes.PlayReady entity_auth_scheme = EntityAuthenticationSchemes.PlayReady
# Key request data generation # Key request data generation
if scheme == KeyExchangeSchemes.Widevine: if scheme == KeyExchangeSchemes.Widevine:
msl_keys.cdm_session = cdm.open( msl_keys.cdm_session = cdm.open(
pssh=b"\x0A\x7A\x00\x6C\x38\x2B", pssh=b"\x0A\x7A\x00\x6C\x38\x2B",
raw=True, raw=True,
offline=True offline=True
) )
keyrequestdata = KeyExchangeRequest.Widevine( keyrequestdata = KeyExchangeRequest.Widevine(
keyrequest=cdm.get_license_challenge(msl_keys.cdm_session) keyrequest=cdm.get_license_challenge(msl_keys.cdm_session)
) )
elif scheme == KeyExchangeSchemes.PlayReady: elif scheme == KeyExchangeSchemes.PlayReady:
# Per PlayReady, usiamo comunque AsymmetricWrapped per lo scambio chiavi # Per PlayReady, usiamo comunque AsymmetricWrapped per lo scambio chiavi
keyrequestdata = KeyExchangeRequest.AsymmetricWrapped( keyrequestdata = KeyExchangeRequest.AsymmetricWrapped(
keypairid="superKeyPair", keypairid="superKeyPair",
mechanism="JWK_RSA", mechanism="JWK_RSA",
publickey=msl_keys.rsa.publickey().exportKey(format="DER") publickey=msl_keys.rsa.publickey().exportKey(format="DER")
) )
else: else:
keyrequestdata = KeyExchangeRequest.AsymmetricWrapped( keyrequestdata = KeyExchangeRequest.AsymmetricWrapped(
keypairid="superKeyPair", keypairid="superKeyPair",
mechanism="JWK_RSA", mechanism="JWK_RSA",
publickey=msl_keys.rsa.publickey().exportKey(format="DER") publickey=msl_keys.rsa.publickey().exportKey(format="DER")
) )
# Use the appropriate entity authentication based on scheme # Use the appropriate entity authentication based on scheme
if scheme == KeyExchangeSchemes.Widevine: if scheme == KeyExchangeSchemes.Widevine:
entityauthdata = EntityAuthentication.Widevine( entityauthdata = EntityAuthentication.Widevine(
devtype="ANDROID", devtype="ANDROID",
keyrequest=base64.b64encode(b"\x0A\x7A\x00\x6C\x38\x2B").decode() keyrequest=base64.b64encode(b"\x0A\x7A\x00\x6C\x38\x2B").decode()
) )
elif scheme == KeyExchangeSchemes.PlayReady: elif scheme == KeyExchangeSchemes.PlayReady:
# Prova a creare l'entità PlayReady senza parametri - potrebbe avere un costruttore che prende implicitamente i dati necessari # Prova a creare l'entità PlayReady senza parametri - potrebbe avere un costruttore che prende implicitamente i dati necessari
try: try:
entityauthdata = EntityAuthentication.PlayReady() # Senza parametri entityauthdata = EntityAuthentication.PlayReady() # Senza parametri
cls.log.info("Created PlayReady entity authentication without parameters") cls.log.info("Created PlayReady entity authentication without parameters")
except TypeError as e: except TypeError as e:
# Se fallisce, passa subito ad AsymmetricWrapped # Se fallisce, passa subito ad AsymmetricWrapped
cls.log.warning(f"Failed to create PlayReady entity auth: {e}") cls.log.warning(f"Failed to create PlayReady entity auth: {e}")
cls.log.info("Falling back to AsymmetricWrapped") cls.log.info("Falling back to AsymmetricWrapped")
scheme = KeyExchangeSchemes.AsymmetricWrapped scheme = KeyExchangeSchemes.AsymmetricWrapped
entityauthdata = EntityAuthentication.Unauthenticated(sender) entityauthdata = EntityAuthentication.Unauthenticated(sender)
else: else:
entityauthdata = EntityAuthentication.Unauthenticated(sender) entityauthdata = EntityAuthentication.Unauthenticated(sender)
data = jsonpickle.encode({ data = jsonpickle.encode({
"entityauthdata": entityauthdata, "entityauthdata": entityauthdata,
"headerdata": base64.b64encode(MSL.generate_msg_header( "headerdata": base64.b64encode(MSL.generate_msg_header(
message_id=message_id, message_id=message_id,
sender=sender, sender=sender,
is_handshake=True, is_handshake=True,
keyrequestdata=keyrequestdata keyrequestdata=keyrequestdata
).encode("utf-8")).decode("utf-8"), ).encode("utf-8")).decode("utf-8"),
"signature": "" "signature": ""
}, unpicklable=False) }, unpicklable=False)
data += json.dumps({ data += json.dumps({
"payload": base64.b64encode(json.dumps({ "payload": base64.b64encode(json.dumps({
"messageid": message_id, "messageid": message_id,
"data": "", "data": "",
"sequencenumber": 1, "sequencenumber": 1,
"endofmsg": True "endofmsg": True
}).encode("utf-8")).decode("utf-8"), }).encode("utf-8")).decode("utf-8"),
"signature": "" "signature": ""
}) })
try: try:
r = session.post( r = session.post(
url=endpoint, url=endpoint,
data=data, data=data,
headers={ headers={
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.149 Safari/537.36", "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.149 Safari/537.36",
"Content-Type": "application/json" "Content-Type": "application/json"
} }
) )
except requests.HTTPError as e: except requests.HTTPError as e:
raise cls.log.exit(f"- Key exchange failed, response data is unexpected: {e.response.text}") raise cls.log.exit(f"- Key exchange failed, response data is unexpected: {e.response.text}")
key_exchange = r.json() # expecting no payloads, so this is fine key_exchange = r.json() # expecting no payloads, so this is fine
if "errordata" in key_exchange: if "errordata" in key_exchange:
raise cls.log.exit("- Key exchange failed: " + json.loads(base64.b64decode( raise cls.log.exit("- Key exchange failed: " + json.loads(base64.b64decode(
key_exchange["errordata"] key_exchange["errordata"]
).decode())["errormsg"]) ).decode())["errormsg"])
# parse the crypto keys # parse the crypto keys
key_response_data = json.JSONDecoder().decode(base64.b64decode( key_response_data = json.JSONDecoder().decode(base64.b64decode(
key_exchange["headerdata"] key_exchange["headerdata"]
).decode("utf-8"))["keyresponsedata"] ).decode("utf-8"))["keyresponsedata"]
if key_response_data["scheme"] != str(scheme): if key_response_data["scheme"] != str(scheme):
raise cls.log.exit("- Key exchange scheme mismatch occurred") raise cls.log.exit("- Key exchange scheme mismatch occurred")
key_data = key_response_data["keydata"] key_data = key_response_data["keydata"]
if scheme == KeyExchangeSchemes.Widevine: if scheme == KeyExchangeSchemes.Widevine:
from vinetrimmer.utils.widevine.device import RemoteDevice from vinetrimmer.utils.widevine.device import RemoteDevice
if isinstance(cdm.device, RemoteDevice): if isinstance(cdm.device, RemoteDevice):
msl_keys.encryption, msl_keys.sign = cdm.device.exchange( msl_keys.encryption, msl_keys.sign = cdm.device.exchange(
cdm.sessions[msl_keys.cdm_session], cdm.sessions[msl_keys.cdm_session],
license_res=key_data["cdmkeyresponse"], license_res=key_data["cdmkeyresponse"],
enc_key_id=base64.b64decode(key_data["encryptionkeyid"]), enc_key_id=base64.b64decode(key_data["encryptionkeyid"]),
hmac_key_id=base64.b64decode(key_data["hmackeyid"]) hmac_key_id=base64.b64decode(key_data["hmackeyid"])
) )
cdm.parse_license(msl_keys.cdm_session, key_data["cdmkeyresponse"]) cdm.parse_license(msl_keys.cdm_session, key_data["cdmkeyresponse"])
else: else:
cdm.parse_license(msl_keys.cdm_session, key_data["cdmkeyresponse"]) cdm.parse_license(msl_keys.cdm_session, key_data["cdmkeyresponse"])
keys = cdm.get_keys(msl_keys.cdm_session) keys = cdm.get_keys(msl_keys.cdm_session)
msl_keys.encryption = MSL.get_widevine_key( msl_keys.encryption = MSL.get_widevine_key(
kid=base64.b64decode(key_data["encryptionkeyid"]), kid=base64.b64decode(key_data["encryptionkeyid"]),
keys=keys, keys=keys,
permissions=["AllowEncrypt", "AllowDecrypt"] permissions=["AllowEncrypt", "AllowDecrypt"]
) )
msl_keys.sign = MSL.get_widevine_key( msl_keys.sign = MSL.get_widevine_key(
kid=base64.b64decode(key_data["hmackeyid"]), kid=base64.b64decode(key_data["hmackeyid"]),
keys=keys, keys=keys,
permissions=["AllowSign", "AllowSignatureVerify"] permissions=["AllowSign", "AllowSignatureVerify"]
) )
elif scheme == KeyExchangeSchemes.PlayReady: elif scheme == KeyExchangeSchemes.PlayReady:
# Gestione migliorata delle chiavi per PlayReady (emulando Chrome) # Gestione migliorata delle chiavi per PlayReady (emulando Chrome)
cipher_rsa = PKCS1_OAEP.new(msl_keys.rsa) cipher_rsa = PKCS1_OAEP.new(msl_keys.rsa)
try: try:
# Decodifica le chiavi di crittografia e firma # Decodifica le chiavi di crittografia e firma
msl_keys.encryption = MSL.base64key_decode( msl_keys.encryption = MSL.base64key_decode(
json.JSONDecoder().decode(cipher_rsa.decrypt( json.JSONDecoder().decode(cipher_rsa.decrypt(
base64.b64decode(key_data["encryptionkey"]) base64.b64decode(key_data["encryptionkey"])
).decode("utf-8"))["k"] ).decode("utf-8"))["k"]
) )
msl_keys.sign = MSL.base64key_decode( msl_keys.sign = MSL.base64key_decode(
json.JSONDecoder().decode(cipher_rsa.decrypt( json.JSONDecoder().decode(cipher_rsa.decrypt(
base64.b64decode(key_data["hmackey"]) base64.b64decode(key_data["hmackey"])
).decode("utf-8"))["k"] ).decode("utf-8"))["k"]
) )
cls.log.info("PlayReady key exchange successful (Chrome emulation)") cls.log.info("PlayReady key exchange successful (Chrome emulation)")
except Exception as e: except Exception as e:
raise cls.log.exit(f"- PlayReady key decryption failed: {str(e)}") raise cls.log.exit(f"- PlayReady key decryption failed: {str(e)}")
else: else:
cipher_rsa = PKCS1_OAEP.new(msl_keys.rsa) cipher_rsa = PKCS1_OAEP.new(msl_keys.rsa)
msl_keys.encryption = MSL.base64key_decode( msl_keys.encryption = MSL.base64key_decode(
json.JSONDecoder().decode(cipher_rsa.decrypt( json.JSONDecoder().decode(cipher_rsa.decrypt(
base64.b64decode(key_data["encryptionkey"]) base64.b64decode(key_data["encryptionkey"])
).decode("utf-8"))["k"] ).decode("utf-8"))["k"]
) )
msl_keys.sign = MSL.base64key_decode( msl_keys.sign = MSL.base64key_decode(
json.JSONDecoder().decode(cipher_rsa.decrypt( json.JSONDecoder().decode(cipher_rsa.decrypt(
base64.b64decode(key_data["hmackey"]) base64.b64decode(key_data["hmackey"])
).decode("utf-8"))["k"] ).decode("utf-8"))["k"]
) )
msl_keys.mastertoken = key_response_data["mastertoken"] msl_keys.mastertoken = key_response_data["mastertoken"]
MSL.cache_keys(msl_keys, msl_keys_path) MSL.cache_keys(msl_keys, msl_keys_path)
cls.log.info("MSL handshake successful") cls.log.info("MSL handshake successful")
return cls( return cls(
session=session, session=session,
endpoint=endpoint, endpoint=endpoint,
sender=sender, sender=sender,
keys=msl_keys, keys=msl_keys,
message_id=message_id message_id=message_id
) )
@staticmethod @staticmethod
def load_cache_data(msl_keys_path=None): def load_cache_data(msl_keys_path=None):
if not msl_keys_path or not os.path.exists(msl_keys_path): if not msl_keys_path or not os.path.exists(msl_keys_path):
return None return None
with open(msl_keys_path, encoding="utf-8") as fd: with open(msl_keys_path, encoding="utf-8") as fd:
msl_keys = jsonpickle.decode(fd.read()) msl_keys = jsonpickle.decode(fd.read())
if msl_keys.rsa: if msl_keys.rsa:
# noinspection PyTypeChecker # noinspection PyTypeChecker
# expects RsaKey, but is a string, this is because jsonpickle can't pickle RsaKey object # expects RsaKey, but is a string, this is because jsonpickle can't pickle RsaKey object
# so as a workaround it exports to PEM, and then when reading, it imports that PEM back # so as a workaround it exports to PEM, and then when reading, it imports that PEM back
# to an RsaKey :) # to an RsaKey :)
msl_keys.rsa = RSA.importKey(msl_keys.rsa) msl_keys.rsa = RSA.importKey(msl_keys.rsa)
# If it's expired or close to, return None as it's unusable # If it's expired or close to, return None as it's unusable
if msl_keys.mastertoken and ((datetime.utcfromtimestamp(int(json.JSONDecoder().decode( if msl_keys.mastertoken and ((datetime.utcfromtimestamp(int(json.JSONDecoder().decode(
base64.b64decode(msl_keys.mastertoken["tokendata"]).decode("utf-8") base64.b64decode(msl_keys.mastertoken["tokendata"]).decode("utf-8")
)["expiration"])) - datetime.now()).total_seconds() / 60 / 60) < 10: )["expiration"])) - datetime.now()).total_seconds() / 60 / 60) < 10:
return None return None
return msl_keys return msl_keys
@staticmethod @staticmethod
def cache_keys(msl_keys, msl_keys_path): def cache_keys(msl_keys, msl_keys_path):
os.makedirs(os.path.dirname(msl_keys_path), exist_ok=True) os.makedirs(os.path.dirname(msl_keys_path), exist_ok=True)
if msl_keys.rsa: if msl_keys.rsa:
# jsonpickle can't pickle RsaKey objects :( # jsonpickle can't pickle RsaKey objects :(
msl_keys.rsa = msl_keys.rsa.export_key() msl_keys.rsa = msl_keys.rsa.export_key()
with open(msl_keys_path, "w", encoding="utf-8") as fd: with open(msl_keys_path, "w", encoding="utf-8") as fd:
fd.write(jsonpickle.encode(msl_keys)) fd.write(jsonpickle.encode(msl_keys))
if msl_keys.rsa: if msl_keys.rsa:
# re-import now # re-import now
msl_keys.rsa = RSA.importKey(msl_keys.rsa) msl_keys.rsa = RSA.importKey(msl_keys.rsa)
@staticmethod @staticmethod
def generate_msg_header(message_id, sender, is_handshake, userauthdata=None, keyrequestdata=None, compression="GZIP"): def generate_msg_header(message_id, sender, is_handshake, userauthdata=None, keyrequestdata=None, compression="GZIP"):
""" """
The MSL header carries all MSL data used for entity and user authentication, message encryption The MSL header carries all MSL data used for entity and user authentication, message encryption
and verification, and service tokens. Portions of the MSL header are encrypted. and verification, and service tokens. Portions of the MSL header are encrypted.
https://github.com/Netflix/msl/wiki/Messages#header-data https://github.com/Netflix/msl/wiki/Messages#header-data
:param message_id: number against which payload chunks are bound to protect against replay. :param message_id: number against which payload chunks are bound to protect against replay.
:param sender: ESN :param sender: ESN
:param is_handshake: This flag is set true if the message is a handshake message and will not include any :param is_handshake: This flag is set true if the message is a handshake message and will not include any
payload chunks. It will include keyrequestdata. payload chunks. It will include keyrequestdata.
:param userauthdata: UserAuthData :param userauthdata: UserAuthData
:param keyrequestdata: KeyRequestData :param keyrequestdata: KeyRequestData
:param compression: Supported compression algorithms. :param compression: Supported compression algorithms.
:return: The base64 encoded JSON String of the header :return: The base64 encoded JSON String of the header
""" """
header_data = { header_data = {
"messageid": message_id, "messageid": message_id,
"renewable": True, # MUST be True if is_handshake "renewable": True, # MUST be True if is_handshake
"handshake": is_handshake, "handshake": is_handshake,
"capabilities": { "capabilities": {
"compressionalgos": [compression] if compression else [], "compressionalgos": [compression] if compression else [],
"languages": ["en-US"], # bcp-47 "languages": ["en-US"], # bcp-47
"encoderformats": ["JSON"] "encoderformats": ["JSON"]
}, },
"timestamp": int(time.time()), "timestamp": int(time.time()),
# undocumented or unused: # undocumented or unused:
"sender": sender, "sender": sender,
"nonreplayable": False, "nonreplayable": False,
"recipient": "Netflix", "recipient": "Netflix",
} }
if userauthdata: if userauthdata:
header_data["userauthdata"] = userauthdata header_data["userauthdata"] = userauthdata
if keyrequestdata: if keyrequestdata:
header_data["keyrequestdata"] = [keyrequestdata] header_data["keyrequestdata"] = [keyrequestdata]
return jsonpickle.encode(header_data, unpicklable=False) return jsonpickle.encode(header_data, unpicklable=False)
@classmethod @classmethod
def get_widevine_key(cls, kid, keys, permissions): def get_widevine_key(cls, kid, keys, permissions):
for key in keys: for key in keys:
if key.kid != kid: if key.kid != kid:
continue continue
if key.type != "OPERATOR_SESSION": if key.type != "OPERATOR_SESSION":
cls.log.warning(f"Widevine Key Exchange: Wrong key type (not operator session) key {key}") cls.log.warning(f"Widevine Key Exchange: Wrong key type (not operator session) key {key}")
continue continue
if not set(permissions) <= set(key.permissions): if not set(permissions) <= set(key.permissions):
cls.log.warning(f"Widevine Key Exchange: Incorrect permissions, key {key}, needed perms {permissions}") cls.log.warning(f"Widevine Key Exchange: Incorrect permissions, key {key}, needed perms {permissions}")
continue continue
return key.key return key.key
return None return None
def send_message(self, endpoint, params, application_data, userauthdata=None): def send_message(self, endpoint, params, application_data, userauthdata=None):
message = self.create_message(application_data, userauthdata) message = self.create_message(application_data, userauthdata)
res = self.session.post(url=endpoint, data=message, params=params, verify=True) # verify=True? res = self.session.post(url=endpoint, data=message, params=params, verify=True) # verify=True?
header, payload_data = self.parse_message(res.text) header, payload_data = self.parse_message(res.text)
if "errordata" in header: if "errordata" in header:
raise self.log.exit( raise self.log.exit(
"- MSL response message contains an error: {}".format( "- MSL response message contains an error: {}".format(
json.loads(base64.b64decode(header["errordata"].encode("utf-8")).decode("utf-8")) json.loads(base64.b64decode(header["errordata"].encode("utf-8")).decode("utf-8"))
) )
) )
return header, payload_data return header, payload_data
def create_message(self, application_data, userauthdata=None): def create_message(self, application_data, userauthdata=None):
self.message_id += 1 # new message must ue a new message id self.message_id += 1 # new message must ue a new message id
headerdata = self.encrypt(self.generate_msg_header( headerdata = self.encrypt(self.generate_msg_header(
message_id=self.message_id, message_id=self.message_id,
sender=self.sender, sender=self.sender,
is_handshake=False, is_handshake=False,
userauthdata=userauthdata userauthdata=userauthdata
)) ))
header = json.dumps({ header = json.dumps({
"headerdata": base64.b64encode(headerdata.encode("utf-8")).decode("utf-8"), "headerdata": base64.b64encode(headerdata.encode("utf-8")).decode("utf-8"),
"signature": self.sign(headerdata).decode("utf-8"), "signature": self.sign(headerdata).decode("utf-8"),
"mastertoken": self.keys.mastertoken "mastertoken": self.keys.mastertoken
}) })
payload_chunks = [self.encrypt(json.dumps({ payload_chunks = [self.encrypt(json.dumps({
"messageid": self.message_id, "messageid": self.message_id,
"data": self.gzip_compress(json.dumps(application_data).encode("utf-8")).decode("utf-8"), "data": self.gzip_compress(json.dumps(application_data).encode("utf-8")).decode("utf-8"),
"compressionalgo": "GZIP", "compressionalgo": "GZIP",
"sequencenumber": 1, # todo ; use sequence_number from master token instead? "sequencenumber": 1, # todo ; use sequence_number from master token instead?
"endofmsg": True "endofmsg": True
}))] }))]
message = header message = header
for payload_chunk in payload_chunks: for payload_chunk in payload_chunks:
message += json.dumps({ message += json.dumps({
"payload": base64.b64encode(payload_chunk.encode("utf-8")).decode("utf-8"), "payload": base64.b64encode(payload_chunk.encode("utf-8")).decode("utf-8"),
"signature": self.sign(payload_chunk).decode("utf-8") "signature": self.sign(payload_chunk).decode("utf-8")
}) })
return message return message
def decrypt_payload_chunks(self, payload_chunks): def decrypt_payload_chunks(self, payload_chunks):
""" """
Decrypt and extract data from payload chunks Decrypt and extract data from payload chunks
:param payload_chunks: List of payload chunks :param payload_chunks: List of payload chunks
:return: json object :return: json object
""" """
raw_data = "" raw_data = ""
for payload_chunk in payload_chunks: for payload_chunk in payload_chunks:
# todo ; verify signature of payload_chunk["signature"] against payload_chunk["payload"] # todo ; verify signature of payload_chunk["signature"] against payload_chunk["payload"]
# expecting base64-encoded json string # expecting base64-encoded json string
payload_chunk = json.loads(base64.b64decode(payload_chunk["payload"]).decode("utf-8")) payload_chunk = json.loads(base64.b64decode(payload_chunk["payload"]).decode("utf-8"))
# decrypt the payload # decrypt the payload
payload_decrypted = AES.new( payload_decrypted = AES.new(
key=self.keys.encryption, key=self.keys.encryption,
mode=AES.MODE_CBC, mode=AES.MODE_CBC,
iv=base64.b64decode(payload_chunk["iv"]) iv=base64.b64decode(payload_chunk["iv"])
).decrypt(base64.b64decode(payload_chunk["ciphertext"])) ).decrypt(base64.b64decode(payload_chunk["ciphertext"]))
payload_decrypted = Padding.unpad(payload_decrypted, 16) payload_decrypted = Padding.unpad(payload_decrypted, 16)
payload_decrypted = json.loads(payload_decrypted.decode("utf-8")) payload_decrypted = json.loads(payload_decrypted.decode("utf-8"))
# decode and uncompress data if compressed # decode and uncompress data if compressed
payload_data = base64.b64decode(payload_decrypted["data"]) payload_data = base64.b64decode(payload_decrypted["data"])
if payload_decrypted.get("compressionalgo") == "GZIP": if payload_decrypted.get("compressionalgo") == "GZIP":
payload_data = zlib.decompress(payload_data, 16 + zlib.MAX_WBITS) payload_data = zlib.decompress(payload_data, 16 + zlib.MAX_WBITS)
raw_data += payload_data.decode("utf-8") raw_data += payload_data.decode("utf-8")
data = json.loads(raw_data) data = json.loads(raw_data)
if "error" in data: if "error" in data:
error = data["error"] error = data["error"]
error_display = error.get("display") error_display = error.get("display")
error_detail = re.sub(r" \(E3-[^)]+\)", "", error.get("detail", "")) error_detail = re.sub(r" \(E3-[^)]+\)", "", error.get("detail", ""))
#if error_display: #if error_display:
# self.log.critical(f"- {error_display}") # self.log.critical(f"- {error_display}")
#if error_detail: #if error_detail:
# self.log.critical(f"- {error_detail}") # self.log.critical(f"- {error_detail}")
if not (error_display or error_detail): if not (error_display or error_detail):
self.log.critical(f"- {error}") self.log.critical(f"- {error}")
sys.exit(1) sys.exit(1)
return data["result"] return data["result"]
def parse_message(self, message): def parse_message(self, message):
""" """
Parse an MSL message into a header and list of payload chunks Parse an MSL message into a header and list of payload chunks
:param message: MSL message :param message: MSL message
:returns: a 2-item tuple containing message and list of payload chunks if available :returns: a 2-item tuple containing message and list of payload chunks if available
""" """
parsed_message = json.loads("[{}]".format(message.replace("}{", "},{"))) parsed_message = json.loads("[{}]".format(message.replace("}{", "},{")))
header = parsed_message[0] header = parsed_message[0]
encrypted_payload_chunks = parsed_message[1:] if len(parsed_message) > 1 else [] encrypted_payload_chunks = parsed_message[1:] if len(parsed_message) > 1 else []
if encrypted_payload_chunks: if encrypted_payload_chunks:
payload_chunks = self.decrypt_payload_chunks(encrypted_payload_chunks) payload_chunks = self.decrypt_payload_chunks(encrypted_payload_chunks)
else: else:
payload_chunks = {} payload_chunks = {}
return header, payload_chunks return header, payload_chunks
@staticmethod @staticmethod
def gzip_compress(data): def gzip_compress(data):
out = BytesIO() out = BytesIO()
with gzip.GzipFile(fileobj=out, mode="w") as fd: with gzip.GzipFile(fileobj=out, mode="w") as fd:
fd.write(data) fd.write(data)
return base64.b64encode(out.getvalue()) return base64.b64encode(out.getvalue())
@staticmethod @staticmethod
def base64key_decode(payload): def base64key_decode(payload):
length = len(payload) % 4 length = len(payload) % 4
if length == 2: if length == 2:
payload += "==" payload += "=="
elif length == 3: elif length == 3:
payload += "=" payload += "="
elif length != 0: elif length != 0:
raise ValueError("Invalid base64 string") raise ValueError("Invalid base64 string")
return base64.urlsafe_b64decode(payload.encode("utf-8")) return base64.urlsafe_b64decode(payload.encode("utf-8"))
def encrypt(self, plaintext): def encrypt(self, plaintext):
""" """
Encrypt the given Plaintext with the encryption key Encrypt the given Plaintext with the encryption key
:param plaintext: :param plaintext:
:return: Serialized JSON String of the encryption Envelope :return: Serialized JSON String of the encryption Envelope
""" """
iv = get_random_bytes(16) iv = get_random_bytes(16)
return json.dumps({ return json.dumps({
"ciphertext": base64.b64encode( "ciphertext": base64.b64encode(
AES.new( AES.new(
self.keys.encryption, self.keys.encryption,
AES.MODE_CBC, AES.MODE_CBC,
iv iv
).encrypt( ).encrypt(
Padding.pad(plaintext.encode("utf-8"), 16) Padding.pad(plaintext.encode("utf-8"), 16)
) )
).decode("utf-8"), ).decode("utf-8"),
"keyid": "{}_{}".format(self.sender, json.loads( "keyid": "{}_{}".format(self.sender, json.loads(
base64.b64decode(self.keys.mastertoken["tokendata"]).decode("utf-8") base64.b64decode(self.keys.mastertoken["tokendata"]).decode("utf-8")
)["sequencenumber"]), )["sequencenumber"]),
"sha256": "AA==", "sha256": "AA==",
"iv": base64.b64encode(iv).decode("utf-8") "iv": base64.b64encode(iv).decode("utf-8")
}) })
def sign(self, text): def sign(self, text):
""" """
Calculates the HMAC signature for the given text with the current sign key and SHA256 Calculates the HMAC signature for the given text with the current sign key and SHA256
:param text: :param text:
:return: Base64 encoded signature :return: Base64 encoded signature
""" """
return base64.b64encode(HMAC.new(self.keys.sign, text.encode("utf-8"), SHA256).digest()) return base64.b64encode(HMAC.new(self.keys.sign, text.encode("utf-8"), SHA256).digest())

View File

@ -14,6 +14,7 @@ cdm:
DisneyPlus: 'mtc_mtc_atv_atv_sl3000' DisneyPlus: 'mtc_mtc_atv_atv_sl3000'
Sunnxt: 'xiaomi_mi_a1_15.0.0_60ceee88_8159_l3' Sunnxt: 'xiaomi_mi_a1_15.0.0_60ceee88_8159_l3'
iTunes: 'mtc_mtc_atv_atv_sl3000' iTunes: 'mtc_mtc_atv_atv_sl3000'
MoviesAnywhere: 'ktc_t31_43f_sl3000'
cdm_api: cdm_api:
- name: 'playready' - name: 'playready'