diff --git a/vinetrimmer/devices/hisense_smarttv_32e5600eu_sl3000.prd b/vinetrimmer/devices/hisense_smarttv_32e5600eu_sl3000.prd new file mode 100644 index 0000000..33152ed Binary files /dev/null and b/vinetrimmer/devices/hisense_smarttv_32e5600eu_sl3000.prd differ diff --git a/vinetrimmer/devices/hisense_smarttv_he55a7000euwts_sl3000.prd b/vinetrimmer/devices/hisense_smarttv_he55a7000euwts_sl3000.prd index c2cde9b..0044670 100644 Binary files a/vinetrimmer/devices/hisense_smarttv_he55a7000euwts_sl3000.prd and b/vinetrimmer/devices/hisense_smarttv_he55a7000euwts_sl3000.prd differ diff --git a/vinetrimmer/devices/hisense_smarttv_ltdn55k2203gwus_sl2000.prd b/vinetrimmer/devices/hisense_smarttv_ltdn55k2203gwus_sl2000.prd new file mode 100644 index 0000000..269e9e5 Binary files /dev/null and b/vinetrimmer/devices/hisense_smarttv_ltdn55k2203gwus_sl2000.prd differ diff --git a/vinetrimmer/devices/ktc_t31_43f_sl3000.prd b/vinetrimmer/devices/ktc_t31_43f_sl3000.prd new file mode 100644 index 0000000..e5aae5e Binary files /dev/null and b/vinetrimmer/devices/ktc_t31_43f_sl3000.prd differ diff --git a/vinetrimmer/devices/mtc_mtc_atv_atv_sl3000.prd b/vinetrimmer/devices/mtc_mtc_atv_atv_sl3000.prd index eacb291..28459f5 100644 Binary files a/vinetrimmer/devices/mtc_mtc_atv_atv_sl3000.prd and b/vinetrimmer/devices/mtc_mtc_atv_atv_sl3000.prd differ diff --git a/vinetrimmer/key_store.db b/vinetrimmer/key_store.db index 5fb7a16..94c678c 100644 Binary files a/vinetrimmer/key_store.db and b/vinetrimmer/key_store.db differ diff --git a/vinetrimmer/services/moviesanywhere.py b/vinetrimmer/services/moviesanywhere.py index b87e6f0..40cd010 100644 --- a/vinetrimmer/services/moviesanywhere.py +++ b/vinetrimmer/services/moviesanywhere.py @@ -2,12 +2,20 @@ import base64 import json import click import re +import requests from requests import JSONDecodeError from httpx import URL import uuid import xmltodict - +import struct +import binascii +import os +import yt_dlp +from pathlib import Path +import uuid +import xml.etree.ElementTree as ET import time + from datetime import datetime from langcodes import Language from vinetrimmer.objects import AudioTrack, TextTrack, Title, Tracks, VideoTrack @@ -16,232 +24,328 @@ from vinetrimmer.vendor.pymp4.parser import Box class MoviesAnywhere(BaseService): - """ - Service code for US' streaming service MoviesAnywhere (https://moviesanywhere.com). + """ + Service code for US' streaming service MoviesAnywhere (https://moviesanywhere.com). - \b - Authorization: Cookies - Security: SD-HD@L3, FHD SDR@L1 (any active device), FHD-UHD HDR-DV@L1 (whitelisted devices). + \b + Authorization: Cookies + Security: SD-HD@L3, FHD SDR@L1 (any active device), FHD-UHD HDR-DV@L1 (whitelisted devices). - NOTE: Can be accessed from any region, it does not seem to care. - Accounts can only mount services when its US based though. + NOTE: Can be accessed from any region, it does not seem to care. + Accounts can only mount services when its US based though. - """ - ALIASES = ["MA", "moviesanywhere"] + """ + ALIASES = ["MA", "MoviesAnywhere"] - TITLE_RE = r"https://moviesanywhere\.com(?P.+)" + TITLE_RE = r"https://moviesanywhere\.com(?P.+)" - VIDEO_CODEC_MAP = { - "H264": ["avc"], - "H265": ["hvc", "hev", "dvh"] - } - AUDIO_CODEC_MAP = { - "AAC": ["mp4a", "HE", "stereo"], - "AC3": ["ac3"], - "EC3": ["ec3", "atmos"] - } + VIDEO_CODEC_MAP = { + "H264": ["avc"], + "H265": ["hvc", "hev", "dvh"] + } + AUDIO_CODEC_MAP = { + "AAC": ["mp4a", "HE", "stereo"], + "AC3": ["ac3"], + "EC3": ["ec3", "atmos"] + } - @staticmethod - @click.command(name="MoviesAnywhere", short_help="https://moviesanywhere.com") - @click.argument("title", type=str) + @staticmethod + @click.command(name="MoviesAnywhere", short_help="https://moviesanywhere.com") + @click.argument("title", type=str) - @click.pass_context - def cli(ctx, **kwargs): - return MoviesAnywhere(ctx, **kwargs) + @click.pass_context + def cli(ctx, **kwargs): + return MoviesAnywhere(ctx, **kwargs) - def __init__(self, ctx, title): - super().__init__(ctx) - self.parse_title(ctx, title) - self.configure() - self.playready = True if "certificate_chain" in dir(ctx.obj.cdm) else False #ctx.obj.cdm.device.type == LocalDevice.Types.PLAYREADY - self.atmos = ctx.parent.params["atmos"] - self.vcodec = ctx.parent.params["vcodec"] - self.acodec = ctx.parent.params["acodec"] + def __init__(self, ctx, title): + super().__init__(ctx) + self.parse_title(ctx, title) + self.configure() + self.playready = True if "certificate_chain" in dir(ctx.obj.cdm) else False #ctx.obj.cdm.device.type == LocalDevice.Types.PLAYREADY + self.atmos = ctx.parent.params["atmos"] + self.vcodec = ctx.parent.params["vcodec"] + self.acodec = ctx.parent.params["acodec"] + self.range = ctx.parent.params["range_"] + self.quality = ctx.parent.params["quality"] or 1080 - def get_titles(self): - self.headers={ - "authorization": f"Bearer {self.access_token}", - "install-id": self.install_id, - } - res = self.session.post( - url="https://gateway.moviesanywhere.com/graphql", - json={ - "platform": "web", - "variables": {"slug": self.title}, # Does not seem to care which platform will be used to give the best tracks available - "extensions": '{"persistedQuery":{"sha256Hash":"5cb001491262214406acf8237ea2b8b46ca6dbcf37e70e791761402f4f74336e","version":1}}', # ONE_GRAPH_PERSIST_QUERY_TOKEN - }, - headers={ - "authorization": f"Bearer {self.access_token}", - "install-id": self.install_id, - } - ) + if self.range != "SDR" or self.quality > 1080: + self.log.info(" + Setting VideoCodec to H265") + self.vcodec = "H265" - try: - self.content = res.json() - except JSONDecodeError: - self.log.exit(" - Not able to return title information") + def get_titles(self): + self.headers={ + "authorization": f"Bearer {self.access_token}", + "install-id": self.install_id, + } + res = self.session.post( + url="https://gateway.moviesanywhere.com/graphql", + json={ + "platform": "web", + "variables": {"slug": self.title}, # Does not seem to care which platform will be used to give the best tracks available + "extensions": '{"persistedQuery":{"sha256Hash":"5cb001491262214406acf8237ea2b8b46ca6dbcf37e70e791761402f4f74336e","version":1}}', # ONE_GRAPH_PERSIST_QUERY_TOKEN + }, + headers={ + "authorization": f"Bearer {self.access_token}", + "install-id": self.install_id, + } + ) - title_data = self.content["data"]["page"] + try: + self.content = res.json() + except JSONDecodeError: + self.log.exit(" - Not able to return title information") - title_info = [ - x - for x in title_data["components"] - if x["__typename"] == "MovieMarqueeComponent" - ][0] - - title_info["title"] = re.sub(r" \(.+?\)", "", title_info["title"]) + title_data = self.content["data"]["page"] - title_data = self.content["data"]["page"] - try: - Id = title_data["components"][0]["mainAction"]["playerData"]["playable"]["id"] - except KeyError: - self.log.exit(" - Account does not seem to own this title") - - return Title( - id_=Id, - type_=Title.Types.MOVIE, - name=title_info["title"], - year=title_info["year"], - original_lang="en", - source=self.ALIASES[0], - service_data=title_data, - ) - - def get_pssh_init(self, url): - import os, yt_dlp - from pathlib import Path - init = 'init.mp4' + title_info = [ + x + for x in title_data["components"] + if x["__typename"] == "MovieMarqueeComponent" + ][0] + + title_info["title"] = re.sub(r" \(.+?\)", "", title_info["title"]) - files_to_delete = [init] - for file_name in files_to_delete: - if os.path.exists(file_name): - os.remove(file_name) + title_data = self.content["data"]["page"] + try: + Id = title_data["components"][0]["mainAction"]["playerData"]["playable"]["id"] + except KeyError: + self.log.exit(" - Account does not seem to own this title") + + return Title( + id_=Id, + type_=Title.Types.MOVIE, + name=title_info["title"], + year=title_info["year"], + original_lang="en", + source=self.ALIASES[0], + service_data=title_data, + ) + + def get_pssh_init(self, url): + init = 'init.mp4' - def read_pssh(path: str): - raw = Path(path).read_bytes() - wv = raw.rfind(bytes.fromhex('edef8ba979d64acea3c827dcd51d21ed')) - if wv == -1: return None - return base64.b64encode(raw[wv-12:wv-12+raw[wv-9]]).decode('utf-8') - - ydl_opts = { - 'format': 'bestvideo[ext=mp4]/bestaudio[ext=m4a]/best', - 'allow_unplayable_formats': True, - 'user_agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3', - 'no_warnings': True, - 'quiet': True, - 'outtmpl': init, - 'no_merge': True, - 'test': True, - } - - with yt_dlp.YoutubeDL(ydl_opts) as ydl: - info_dict = ydl.extract_info(url, download=True) - url = info_dict.get("url", None) - if url is None: - raise ValueError("Failed to download the video") - video_file_name = ydl.prepare_filename(info_dict) + files_to_delete = [init] + for file_name in files_to_delete: + if os.path.exists(file_name): + os.remove(file_name) + + ydl_opts = { + 'format': 'bestvideo[ext=mp4]/bestaudio[ext=m4a]/best', + 'allow_unplayable_formats': True, + 'user_agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3', + 'no_warnings': True, + 'quiet': True, + 'outtmpl': init, + 'no_merge': True, + 'test': True, + } + + with yt_dlp.YoutubeDL(ydl_opts) as ydl: + info_dict = ydl.extract_info(url, download=True) + url = info_dict.get("url", None) + if url is None: + raise ValueError("Failed to download the video") + video_file_name = ydl.prepare_filename(info_dict) - pssh = read_pssh(init) + raw = Path(init).read_bytes() + wv = raw.rfind(bytes.fromhex('edef8ba979d64acea3c827dcd51d21ed')) + if wv != -1: + psshWV = base64.b64encode(raw[wv-12:wv-12+raw[wv-9]]).decode('utf-8') - for file_name in files_to_delete: - if os.path.exists(file_name): - os.remove(file_name) - return pssh + playready_system_id = binascii.unhexlify("9A04F07998404286AB92E65BE0885F95") + pssh_boxes = [] + mp4_file = "init.mp4" - def get_tracks(self, title): - player_data = self.content["data"]["page"]["components"][0]["mainAction"]["playerData"]["playable"] - videos = [] - audios = [] - for cr in player_data["videoAssets"]["dash"].values(): - if not cr: - continue - for manifest in cr: - tracks = Tracks.from_mpd( - url=manifest["url"], - source=self.ALIASES[0], - session=self.session, - ) + with open(mp4_file, "rb") as f: + data = f.read() - for video in tracks.videos: - pssh = self.get_pssh_init(manifest["url"]) - video_pssh = Box.parse(base64.b64decode(pssh)) - video.pssh = video_pssh - video.license_url = manifest["playreadyLaUrl"] if self.playready else manifest["widevineLaUrl"] - video.contentId = URL(video.license_url).params._dict["ContentId"][ - 0 - ] - videos += [video] - # Extract Atmos audio track if available. - for audio in tracks.audios: - audio.pssh = video_pssh - audio.license_url = manifest["playreadyLaUrl"] if self.playready else manifest["widevineLaUrl"] - audio.contentId = URL(audio.license_url).params._dict["ContentId"][ - 0 - ] - if "atmos" in audio.url: - audio.atmos = True - audios += [audio] + index = 0 + while index < len(data): + if index + 8 > len(data): + break - corrected_video_list = [] - for res in ("uhd", "hdp", "hd", "sd"): - for video in videos: - if f"_{res}_video" not in video.url or not video.url.endswith( - f"&r={res}" - ): - continue + box_size, box_type = struct.unpack_from(">I4s", data, index) + if box_size < 8 or index + box_size > len(data): + break - if corrected_video_list and any( - video.id == vid.id for vid in corrected_video_list - ): - continue + if box_type == b'moov' or box_type == b'moof': + sub_index = index + 8 + while sub_index < index + box_size: + sub_size, sub_type = struct.unpack_from(">I4s", data, sub_index) + if sub_type == b'pssh': + system_id = data[sub_index + 12: sub_index + 28] + if system_id == playready_system_id: + pssh_data_size = struct.unpack_from(">I", data, sub_index + 28)[0] + pssh_data = data[sub_index + 32: sub_index + 32 + pssh_data_size] + pssh_boxes.append(pssh_data) + sub_index += sub_size - if "dash_hevc_hdr" in video.url: - video.hdr10 = True - if "dash_hevc_dolbyvision" in video.url: - video.dv = True + if box_type == b'pssh': + system_id = data[index + 12: index + 28] + if system_id == playready_system_id: + pssh_data_size = struct.unpack_from(">I", data, index + 28)[0] + pssh_data = data[index + 32: index + 32 + pssh_data_size] + pssh_boxes.append(pssh_data) - corrected_video_list += [video] + index += box_size - tracks.add(corrected_video_list) - tracks.audios = audios - tracks.videos = [x for x in tracks.videos if (x.codec or "")[:3] in self.VIDEO_CODEC_MAP[self.vcodec]] + if pssh_boxes: + for i, pssh_data in enumerate(pssh_boxes): + pssh_box = ( + struct.pack(">I", len(pssh_data) + 32) + + b"pssh" + + struct.pack(">I", 0) + + playready_system_id + + struct.pack(">I", len(pssh_data)) + + pssh_data + ) + base64_pssh = base64.b64encode(pssh_box).decode() + #print(base64_pssh) + psshPR = base64_pssh - return tracks + header_offset = 6 + xml_data = pssh_data[header_offset:].decode("utf-16le", errors='ignore') + xml_start = xml_data.find("") + + if xml_start != -1 and xml_end != -1: + xml_content = xml_data[xml_start:xml_end + len("")] + xml_root = ET.fromstring(xml_content) + #print(ET.tostring(xml_root, encoding="utf-8").decode()) + else: + raise Exception("Failed to locate XML content in PSSH.") + else: + raise Exception("No PlayReady PSSH boxes found.") - def get_chapters(self, title): - return [] - def certificate(self, **_): - return None # will use common privacy cert + for file_name in files_to_delete: + if os.path.exists(file_name): + os.remove(file_name) + return psshWV, psshPR - def license(self, challenge: bytes, track: Tracks, **_) -> bytes: - license_message = self.session.post( - url=track.license_url, - data=challenge, # expects bytes - ) + def get_tracks(self, title): + player_data = self.content["data"]["page"]["components"][0]["mainAction"]["playerData"]["playable"] + videos = [] + audios = [] + for cr in player_data["videoAssets"]["dash"].values(): + if not cr: + continue + for manifest in cr: + tracks = Tracks.from_mpd( + url=manifest["url"], + source=self.ALIASES[0], + session=self.session, + ) - if "errorCode" in license_message.text: - self.log.exit(f" - Cannot complete license request: {license_message.text}") + for video in tracks.videos: + psshWV, psshPR = self.get_pssh_init(manifest["url"]) + video.psshWV = psshWV + video.psshPR = psshPR + video.license_url = manifest["playreadyLaUrl"] if self.playready else manifest["widevineLaUrl"] + video.contentId = URL(video.license_url).params._dict["ContentId"][ + 0 + ] + videos += [video] + # Extract Atmos audio track if available. + for audio in tracks.audios: + audio.psshWV = psshWV + audio.psshPR = psshPR + audio.license_url = manifest["playreadyLaUrl"] if self.playready else manifest["widevineLaUrl"] + audio.contentId = URL(audio.license_url).params._dict["ContentId"][ + 0 + ] + if "atmos" in audio.url: + audio.atmos = True + audios += [audio] - return license_message.content + corrected_video_list = [] + for res in ("uhd", "hdp", "hd", "sd"): + for video in videos: + if f"_{res}_video" not in video.url or not video.url.endswith( + f"&r={res}" + ): + continue - - def configure(self): - access_token = None - install_id = None - for cookie in self.cookies: - if cookie.name == "secure_access_token": - access_token = cookie.value - elif cookie.name == "install_id": - install_id = cookie.value + if corrected_video_list and any( + video.id == vid.id for vid in corrected_video_list + ): + continue - self.access_token = access_token - self.install_id = install_id + if "dash_hevc_hdr" in video.url: + video.hdr10 = True + if "dash_hevc_dolbyvision" in video.url: + video.dv = True - self.session.headers.update( - { - "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36", - "Origin": "https://moviesanywhere.com", - "Authorization": f"Bearer {self.access_token}", - } - ) + corrected_video_list += [video] + + tracks.add(corrected_video_list) + tracks.audios = audios + tracks.videos = [x for x in tracks.videos if (x.codec or "")[:3] in self.VIDEO_CODEC_MAP[self.vcodec]] + + return tracks + + def get_chapters(self, title): + return [] + + def certificate(self, **_): + return None # will use common privacy cert + + def license(self, challenge: bytes, track: Tracks, **_) -> bytes: + if not isinstance(challenge, bytes): + challenge = bytes(challenge, 'utf-8') + + playback_session_id = str(uuid.uuid4()) + + license_message = requests.post( + headers = { + 'accept': '*/*', + 'accept-language': 'en-US,en;q=0.9,en-IN;q=0.8', + 'cache-control': 'no-cache', + 'content-type': 'application/octet-stream', + 'dnt': '1', + 'origin': 'https://moviesanywhere.com', + 'pragma': 'no-cache', + 'priority': 'u=1, i', + 'referer': 'https://moviesanywhere.com/', + 'sec-ch-ua-mobile': '?0', + 'sec-fetch-dest': 'empty', + 'sec-fetch-mode': 'cors', + 'sec-fetch-site': 'same-site', + 'soapaction': '"http://schemas.microsoft.com/DRM/2007/03/protocols/AcquireLicense"', + 'user_agent': 'Dalvik/2.1.0 (Linux; U; Android 14; SM-S911B Build/UP1A.231005.007)', + }, + params = { + "authorization": self.access_token, + "playbackSessionId": playback_session_id + }, + url=track.license_url, + data=challenge, # expects bytes + ) + + self.log.debug(license_message.text) + + if "errorCode" in license_message.text: + self.log.exit(f" - Cannot complete license request: {license_message.text}") + + return license_message.content + + + def configure(self): + access_token = None + install_id = None + for cookie in self.cookies: + if cookie.name == "secure_access_token": + access_token = cookie.value + elif cookie.name == "install_id": + install_id = cookie.value + + self.access_token = access_token + self.install_id = install_id + + self.session.headers.update( + { + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36", + "Origin": "https://moviesanywhere.com", + "Authorization": f"Bearer {self.access_token}", + } + ) diff --git a/vinetrimmer/services/netflix.py b/vinetrimmer/services/netflix.py index 0b601e1..d2d95a3 100644 --- a/vinetrimmer/services/netflix.py +++ b/vinetrimmer/services/netflix.py @@ -790,8 +790,8 @@ class Netflix(BaseService): needs_repack=False, # decryption encrypted=x["isDrm"], - psshPR=base64.b64decode(manifest["video_tracks"][0]["drmHeader"]["bytes"]) if psshPR else None, - psshWV=base64.b64decode(manifest["video_tracks"][0]["drmHeader"]["bytes"]) if not psshPR else None, + psshPR=manifest["video_tracks"][0]["drmHeader"]["bytes"] if psshPR else None, + psshWV=manifest["video_tracks"][0]["drmHeader"]["bytes"] if not psshPR else None, kid=x["drmHeaderId"] if x["isDrm"] else None, )) @@ -811,8 +811,8 @@ class Netflix(BaseService): needs_repack=False, # decryption encrypted=x["isDrm"], - psshPR=base64.b64decode(manifest["video_tracks"][0]["drmHeader"]["bytes"]) if psshPR else None, - psshWV=base64.b64decode(manifest["video_tracks"][0]["drmHeader"]["bytes"]) if not psshPR else None, + psshPR=manifest["video_tracks"][0]["drmHeader"]["bytes"] if psshPR else None, + psshWV=manifest["video_tracks"][0]["drmHeader"]["bytes"] if not psshPR else None, kid=x.get("drmHeaderId") if x["isDrm"] else None, ) for x in manifest["audio_tracks"]] diff --git a/vinetrimmer/utils/MSL/__init__.py b/vinetrimmer/utils/MSL/__init__.py index ad1e92a..26de6e9 100644 --- a/vinetrimmer/utils/MSL/__init__.py +++ b/vinetrimmer/utils/MSL/__init__.py @@ -4,6 +4,7 @@ import json import logging import os import random +import httpx import re import sys import time @@ -32,468 +33,472 @@ from requests.packages.urllib3.poolmanager import PoolManager from requests.packages.urllib3.util import ssl_ class MSL: - log = logging.getLogger("MSL") + log = logging.getLogger("MSL") - def __init__(self, session, endpoint, sender, keys, message_id, user_auth=None): - - CIPHERS = ( - "ECDHE-RSA-AES256-GCM-SHA384:ECDHE-ECDSA-AES256-GCM-SHA384:ECDHE-RSA-AES256-SHA384:ECDHE-ECDSA-AES256-SHA384:ECDHE-RSA-AES128-GCM-SHA256:ECDHE-RSA-AES128-SHA256:AES256-SHA" - ) + def __init__(self, session, endpoint, sender, keys, message_id, user_auth=None): + + CIPHERS = ( + "ECDHE-RSA-AES256-GCM-SHA384:ECDHE-ECDSA-AES256-GCM-SHA384:ECDHE-RSA-AES256-SHA384:ECDHE-ECDSA-AES256-SHA384:ECDHE-RSA-AES128-GCM-SHA256:ECDHE-RSA-AES128-SHA256:AES256-SHA" + ) - class TlsAdapter(HTTPAdapter): - - def __init__(self, ssl_options=0, **kwargs): - self.ssl_options = ssl_options - super(TlsAdapter, self).__init__(**kwargs) - - def init_poolmanager(self, *pool_args, **pool_kwargs): - ctx = ssl_.create_urllib3_context(ciphers=CIPHERS, cert_reqs=ssl.CERT_REQUIRED, options=self.ssl_options) - self.poolmanager = PoolManager(*pool_args, - ssl_context=ctx, - **pool_kwargs) + #class TlsAdapter(HTTPAdapter): + # def __init__(self, ssl_options=0, **kwargs): + # self.ssl_options = ssl_options + # super(TlsAdapter, self).__init__(**kwargs) + # def init_poolmanager(self, *pool_args, **pool_kwargs): + # ctx = ssl_.create_urllib3_context(ciphers=CIPHERS, cert_reqs=ssl.CERT_REQUIRED, options=self.ssl_options) + # self.poolmanager = PoolManager(*pool_args, + # ssl_context=ctx, + # **pool_kwargs) - session = requests.session() - adapter = TlsAdapter(ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1) - session.mount("https://", adapter) - self.session = session - self.endpoint = endpoint - self.sender = sender - self.keys = keys - self.user_auth = user_auth - self.message_id = message_id + #session = requests.session() + #adapter = TlsAdapter(ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1) + #session.mount("https://", adapter) + session = httpx.Client( + params=session.params, + headers=session.headers, + cookies=session.cookies, + verify=True + ) + self.session = session + self.endpoint = endpoint + self.sender = sender + self.keys = keys + self.user_auth = user_auth + self.message_id = message_id - @classmethod - def handshake(cls, scheme, session, endpoint, sender=None, cdm=None, msl_keys_path=None): - session.verify = False - message_id = random.randint(0, pow(2, 52)) - msl_keys = MSL.load_cache_data(msl_keys_path) - - # Genera automaticamente l'ESN se non è specificato - if not sender: - if scheme == KeyExchangeSchemes.Widevine: - sender = android_esn_generator() - cls.log.info(f"Generated Widevine ESN: {sender}") - elif scheme == KeyExchangeSchemes.PlayReady: - sender = playready_esn_generator() - cls.log.info(f"Generated PlayReady ESN: {sender}") - else: # AsymmetricWrapped o altri schemi - sender = chrome_esn_generator() - cls.log.info(f"Generated Chrome ESN: {sender}") - - if msl_keys is not None: - cls.log.info("Using cached MSL data") - else: - msl_keys = MSLKeys() - if scheme != KeyExchangeSchemes.Widevine: - msl_keys.rsa = RSA.generate(2048) + @classmethod + def handshake(cls, scheme, session, endpoint, sender=None, cdm=None, msl_keys_path=None): + session.verify = False + message_id = random.randint(0, pow(2, 52)) + msl_keys = MSL.load_cache_data(msl_keys_path) + + # Genera automaticamente l'ESN se non è specificato + if not sender: + if scheme == KeyExchangeSchemes.Widevine: + sender = android_esn_generator() + cls.log.info(f"Generated Widevine ESN: {sender}") + elif scheme == KeyExchangeSchemes.PlayReady: + sender = playready_esn_generator() + cls.log.info(f"Generated PlayReady ESN: {sender}") + else: # AsymmetricWrapped o altri schemi + sender = chrome_esn_generator() + cls.log.info(f"Generated Chrome ESN: {sender}") + + if msl_keys is not None: + cls.log.info("Using cached MSL data") + else: + msl_keys = MSLKeys() + if scheme != KeyExchangeSchemes.Widevine: + msl_keys.rsa = RSA.generate(2048) - if not cdm: - raise cls.log.exit("- No cached data and no CDM specified") + if not cdm: + raise cls.log.exit("- No cached data and no CDM specified") - if not msl_keys_path: - raise cls.log.exit("- No cached data and no MSL key path specified") + if not msl_keys_path: + raise cls.log.exit("- No cached data and no MSL key path specified") - # Determine entity authentication scheme based on key exchange scheme - entity_auth_scheme = EntityAuthenticationSchemes.Unauthenticated - if scheme == KeyExchangeSchemes.Widevine: - entity_auth_scheme = EntityAuthenticationSchemes.Widevine - elif scheme == KeyExchangeSchemes.PlayReady: - entity_auth_scheme = EntityAuthenticationSchemes.PlayReady + # Determine entity authentication scheme based on key exchange scheme + entity_auth_scheme = EntityAuthenticationSchemes.Unauthenticated + if scheme == KeyExchangeSchemes.Widevine: + entity_auth_scheme = EntityAuthenticationSchemes.Widevine + elif scheme == KeyExchangeSchemes.PlayReady: + entity_auth_scheme = EntityAuthenticationSchemes.PlayReady - # Key request data generation - if scheme == KeyExchangeSchemes.Widevine: - msl_keys.cdm_session = cdm.open( - pssh=b"\x0A\x7A\x00\x6C\x38\x2B", - raw=True, - offline=True - ) - keyrequestdata = KeyExchangeRequest.Widevine( - keyrequest=cdm.get_license_challenge(msl_keys.cdm_session) - ) - elif scheme == KeyExchangeSchemes.PlayReady: - # Per PlayReady, usiamo comunque AsymmetricWrapped per lo scambio chiavi - keyrequestdata = KeyExchangeRequest.AsymmetricWrapped( - keypairid="superKeyPair", - mechanism="JWK_RSA", - publickey=msl_keys.rsa.publickey().exportKey(format="DER") - ) - else: - keyrequestdata = KeyExchangeRequest.AsymmetricWrapped( - keypairid="superKeyPair", - mechanism="JWK_RSA", - publickey=msl_keys.rsa.publickey().exportKey(format="DER") - ) + # Key request data generation + if scheme == KeyExchangeSchemes.Widevine: + msl_keys.cdm_session = cdm.open( + pssh=b"\x0A\x7A\x00\x6C\x38\x2B", + raw=True, + offline=True + ) + keyrequestdata = KeyExchangeRequest.Widevine( + keyrequest=cdm.get_license_challenge(msl_keys.cdm_session) + ) + elif scheme == KeyExchangeSchemes.PlayReady: + # Per PlayReady, usiamo comunque AsymmetricWrapped per lo scambio chiavi + keyrequestdata = KeyExchangeRequest.AsymmetricWrapped( + keypairid="superKeyPair", + mechanism="JWK_RSA", + publickey=msl_keys.rsa.publickey().exportKey(format="DER") + ) + else: + keyrequestdata = KeyExchangeRequest.AsymmetricWrapped( + keypairid="superKeyPair", + mechanism="JWK_RSA", + publickey=msl_keys.rsa.publickey().exportKey(format="DER") + ) - # Use the appropriate entity authentication based on scheme - if scheme == KeyExchangeSchemes.Widevine: - entityauthdata = EntityAuthentication.Widevine( - devtype="ANDROID", - keyrequest=base64.b64encode(b"\x0A\x7A\x00\x6C\x38\x2B").decode() - ) - elif scheme == KeyExchangeSchemes.PlayReady: - # Prova a creare l'entità PlayReady senza parametri - potrebbe avere un costruttore che prende implicitamente i dati necessari - try: - entityauthdata = EntityAuthentication.PlayReady() # Senza parametri - cls.log.info("Created PlayReady entity authentication without parameters") - except TypeError as e: - # Se fallisce, passa subito ad AsymmetricWrapped - cls.log.warning(f"Failed to create PlayReady entity auth: {e}") - cls.log.info("Falling back to AsymmetricWrapped") - scheme = KeyExchangeSchemes.AsymmetricWrapped - entityauthdata = EntityAuthentication.Unauthenticated(sender) - else: - entityauthdata = EntityAuthentication.Unauthenticated(sender) + # Use the appropriate entity authentication based on scheme + if scheme == KeyExchangeSchemes.Widevine: + entityauthdata = EntityAuthentication.Widevine( + devtype="ANDROID", + keyrequest=base64.b64encode(b"\x0A\x7A\x00\x6C\x38\x2B").decode() + ) + elif scheme == KeyExchangeSchemes.PlayReady: + # Prova a creare l'entità PlayReady senza parametri - potrebbe avere un costruttore che prende implicitamente i dati necessari + try: + entityauthdata = EntityAuthentication.PlayReady() # Senza parametri + cls.log.info("Created PlayReady entity authentication without parameters") + except TypeError as e: + # Se fallisce, passa subito ad AsymmetricWrapped + cls.log.warning(f"Failed to create PlayReady entity auth: {e}") + cls.log.info("Falling back to AsymmetricWrapped") + scheme = KeyExchangeSchemes.AsymmetricWrapped + entityauthdata = EntityAuthentication.Unauthenticated(sender) + else: + entityauthdata = EntityAuthentication.Unauthenticated(sender) - data = jsonpickle.encode({ - "entityauthdata": entityauthdata, - "headerdata": base64.b64encode(MSL.generate_msg_header( - message_id=message_id, - sender=sender, - is_handshake=True, - keyrequestdata=keyrequestdata - ).encode("utf-8")).decode("utf-8"), - "signature": "" - }, unpicklable=False) - data += json.dumps({ - "payload": base64.b64encode(json.dumps({ - "messageid": message_id, - "data": "", - "sequencenumber": 1, - "endofmsg": True - }).encode("utf-8")).decode("utf-8"), - "signature": "" - }) + data = jsonpickle.encode({ + "entityauthdata": entityauthdata, + "headerdata": base64.b64encode(MSL.generate_msg_header( + message_id=message_id, + sender=sender, + is_handshake=True, + keyrequestdata=keyrequestdata + ).encode("utf-8")).decode("utf-8"), + "signature": "" + }, unpicklable=False) + data += json.dumps({ + "payload": base64.b64encode(json.dumps({ + "messageid": message_id, + "data": "", + "sequencenumber": 1, + "endofmsg": True + }).encode("utf-8")).decode("utf-8"), + "signature": "" + }) - try: - r = session.post( - url=endpoint, - data=data, - headers={ - "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.149 Safari/537.36", - "Content-Type": "application/json" - } - ) - except requests.HTTPError as e: - raise cls.log.exit(f"- Key exchange failed, response data is unexpected: {e.response.text}") + try: + r = session.post( + url=endpoint, + data=data, + headers={ + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.149 Safari/537.36", + "Content-Type": "application/json" + } + ) + except requests.HTTPError as e: + raise cls.log.exit(f"- Key exchange failed, response data is unexpected: {e.response.text}") - key_exchange = r.json() # expecting no payloads, so this is fine - if "errordata" in key_exchange: - raise cls.log.exit("- Key exchange failed: " + json.loads(base64.b64decode( - key_exchange["errordata"] - ).decode())["errormsg"]) + key_exchange = r.json() # expecting no payloads, so this is fine + if "errordata" in key_exchange: + raise cls.log.exit("- Key exchange failed: " + json.loads(base64.b64decode( + key_exchange["errordata"] + ).decode())["errormsg"]) - # parse the crypto keys - key_response_data = json.JSONDecoder().decode(base64.b64decode( - key_exchange["headerdata"] - ).decode("utf-8"))["keyresponsedata"] + # parse the crypto keys + key_response_data = json.JSONDecoder().decode(base64.b64decode( + key_exchange["headerdata"] + ).decode("utf-8"))["keyresponsedata"] - if key_response_data["scheme"] != str(scheme): - raise cls.log.exit("- Key exchange scheme mismatch occurred") + if key_response_data["scheme"] != str(scheme): + raise cls.log.exit("- Key exchange scheme mismatch occurred") - key_data = key_response_data["keydata"] - if scheme == KeyExchangeSchemes.Widevine: - from vinetrimmer.utils.widevine.device import RemoteDevice - if isinstance(cdm.device, RemoteDevice): - msl_keys.encryption, msl_keys.sign = cdm.device.exchange( - cdm.sessions[msl_keys.cdm_session], - license_res=key_data["cdmkeyresponse"], - enc_key_id=base64.b64decode(key_data["encryptionkeyid"]), - hmac_key_id=base64.b64decode(key_data["hmackeyid"]) - ) - cdm.parse_license(msl_keys.cdm_session, key_data["cdmkeyresponse"]) - else: - cdm.parse_license(msl_keys.cdm_session, key_data["cdmkeyresponse"]) - keys = cdm.get_keys(msl_keys.cdm_session) - msl_keys.encryption = MSL.get_widevine_key( - kid=base64.b64decode(key_data["encryptionkeyid"]), - keys=keys, - permissions=["AllowEncrypt", "AllowDecrypt"] - ) - msl_keys.sign = MSL.get_widevine_key( - kid=base64.b64decode(key_data["hmackeyid"]), - keys=keys, - permissions=["AllowSign", "AllowSignatureVerify"] - ) - elif scheme == KeyExchangeSchemes.PlayReady: - # Gestione migliorata delle chiavi per PlayReady (emulando Chrome) - cipher_rsa = PKCS1_OAEP.new(msl_keys.rsa) - try: - # Decodifica le chiavi di crittografia e firma - msl_keys.encryption = MSL.base64key_decode( - json.JSONDecoder().decode(cipher_rsa.decrypt( - base64.b64decode(key_data["encryptionkey"]) - ).decode("utf-8"))["k"] - ) - msl_keys.sign = MSL.base64key_decode( - json.JSONDecoder().decode(cipher_rsa.decrypt( - base64.b64decode(key_data["hmackey"]) - ).decode("utf-8"))["k"] - ) - cls.log.info("PlayReady key exchange successful (Chrome emulation)") - except Exception as e: - raise cls.log.exit(f"- PlayReady key decryption failed: {str(e)}") - else: - cipher_rsa = PKCS1_OAEP.new(msl_keys.rsa) - msl_keys.encryption = MSL.base64key_decode( - json.JSONDecoder().decode(cipher_rsa.decrypt( - base64.b64decode(key_data["encryptionkey"]) - ).decode("utf-8"))["k"] - ) - msl_keys.sign = MSL.base64key_decode( - json.JSONDecoder().decode(cipher_rsa.decrypt( - base64.b64decode(key_data["hmackey"]) - ).decode("utf-8"))["k"] - ) - msl_keys.mastertoken = key_response_data["mastertoken"] + key_data = key_response_data["keydata"] + if scheme == KeyExchangeSchemes.Widevine: + from vinetrimmer.utils.widevine.device import RemoteDevice + if isinstance(cdm.device, RemoteDevice): + msl_keys.encryption, msl_keys.sign = cdm.device.exchange( + cdm.sessions[msl_keys.cdm_session], + license_res=key_data["cdmkeyresponse"], + enc_key_id=base64.b64decode(key_data["encryptionkeyid"]), + hmac_key_id=base64.b64decode(key_data["hmackeyid"]) + ) + cdm.parse_license(msl_keys.cdm_session, key_data["cdmkeyresponse"]) + else: + cdm.parse_license(msl_keys.cdm_session, key_data["cdmkeyresponse"]) + keys = cdm.get_keys(msl_keys.cdm_session) + msl_keys.encryption = MSL.get_widevine_key( + kid=base64.b64decode(key_data["encryptionkeyid"]), + keys=keys, + permissions=["AllowEncrypt", "AllowDecrypt"] + ) + msl_keys.sign = MSL.get_widevine_key( + kid=base64.b64decode(key_data["hmackeyid"]), + keys=keys, + permissions=["AllowSign", "AllowSignatureVerify"] + ) + elif scheme == KeyExchangeSchemes.PlayReady: + # Gestione migliorata delle chiavi per PlayReady (emulando Chrome) + cipher_rsa = PKCS1_OAEP.new(msl_keys.rsa) + try: + # Decodifica le chiavi di crittografia e firma + msl_keys.encryption = MSL.base64key_decode( + json.JSONDecoder().decode(cipher_rsa.decrypt( + base64.b64decode(key_data["encryptionkey"]) + ).decode("utf-8"))["k"] + ) + msl_keys.sign = MSL.base64key_decode( + json.JSONDecoder().decode(cipher_rsa.decrypt( + base64.b64decode(key_data["hmackey"]) + ).decode("utf-8"))["k"] + ) + cls.log.info("PlayReady key exchange successful (Chrome emulation)") + except Exception as e: + raise cls.log.exit(f"- PlayReady key decryption failed: {str(e)}") + else: + cipher_rsa = PKCS1_OAEP.new(msl_keys.rsa) + msl_keys.encryption = MSL.base64key_decode( + json.JSONDecoder().decode(cipher_rsa.decrypt( + base64.b64decode(key_data["encryptionkey"]) + ).decode("utf-8"))["k"] + ) + msl_keys.sign = MSL.base64key_decode( + json.JSONDecoder().decode(cipher_rsa.decrypt( + base64.b64decode(key_data["hmackey"]) + ).decode("utf-8"))["k"] + ) + msl_keys.mastertoken = key_response_data["mastertoken"] - MSL.cache_keys(msl_keys, msl_keys_path) - cls.log.info("MSL handshake successful") - return cls( - session=session, - endpoint=endpoint, - sender=sender, - keys=msl_keys, - message_id=message_id - ) + MSL.cache_keys(msl_keys, msl_keys_path) + cls.log.info("MSL handshake successful") + return cls( + session=session, + endpoint=endpoint, + sender=sender, + keys=msl_keys, + message_id=message_id + ) - @staticmethod - def load_cache_data(msl_keys_path=None): - if not msl_keys_path or not os.path.exists(msl_keys_path): - return None - with open(msl_keys_path, encoding="utf-8") as fd: - msl_keys = jsonpickle.decode(fd.read()) - if msl_keys.rsa: - # noinspection PyTypeChecker - # expects RsaKey, but is a string, this is because jsonpickle can't pickle RsaKey object - # so as a workaround it exports to PEM, and then when reading, it imports that PEM back - # to an RsaKey :) - msl_keys.rsa = RSA.importKey(msl_keys.rsa) - # If it's expired or close to, return None as it's unusable - if msl_keys.mastertoken and ((datetime.utcfromtimestamp(int(json.JSONDecoder().decode( - base64.b64decode(msl_keys.mastertoken["tokendata"]).decode("utf-8") - )["expiration"])) - datetime.now()).total_seconds() / 60 / 60) < 10: - return None - return msl_keys + @staticmethod + def load_cache_data(msl_keys_path=None): + if not msl_keys_path or not os.path.exists(msl_keys_path): + return None + with open(msl_keys_path, encoding="utf-8") as fd: + msl_keys = jsonpickle.decode(fd.read()) + if msl_keys.rsa: + # noinspection PyTypeChecker + # expects RsaKey, but is a string, this is because jsonpickle can't pickle RsaKey object + # so as a workaround it exports to PEM, and then when reading, it imports that PEM back + # to an RsaKey :) + msl_keys.rsa = RSA.importKey(msl_keys.rsa) + # If it's expired or close to, return None as it's unusable + if msl_keys.mastertoken and ((datetime.utcfromtimestamp(int(json.JSONDecoder().decode( + base64.b64decode(msl_keys.mastertoken["tokendata"]).decode("utf-8") + )["expiration"])) - datetime.now()).total_seconds() / 60 / 60) < 10: + return None + return msl_keys - @staticmethod - def cache_keys(msl_keys, msl_keys_path): - os.makedirs(os.path.dirname(msl_keys_path), exist_ok=True) - if msl_keys.rsa: - # jsonpickle can't pickle RsaKey objects :( - msl_keys.rsa = msl_keys.rsa.export_key() - with open(msl_keys_path, "w", encoding="utf-8") as fd: - fd.write(jsonpickle.encode(msl_keys)) - if msl_keys.rsa: - # re-import now - msl_keys.rsa = RSA.importKey(msl_keys.rsa) + @staticmethod + def cache_keys(msl_keys, msl_keys_path): + os.makedirs(os.path.dirname(msl_keys_path), exist_ok=True) + if msl_keys.rsa: + # jsonpickle can't pickle RsaKey objects :( + msl_keys.rsa = msl_keys.rsa.export_key() + with open(msl_keys_path, "w", encoding="utf-8") as fd: + fd.write(jsonpickle.encode(msl_keys)) + if msl_keys.rsa: + # re-import now + msl_keys.rsa = RSA.importKey(msl_keys.rsa) - @staticmethod - def generate_msg_header(message_id, sender, is_handshake, userauthdata=None, keyrequestdata=None, compression="GZIP"): - """ - The MSL header carries all MSL data used for entity and user authentication, message encryption - and verification, and service tokens. Portions of the MSL header are encrypted. - https://github.com/Netflix/msl/wiki/Messages#header-data + @staticmethod + def generate_msg_header(message_id, sender, is_handshake, userauthdata=None, keyrequestdata=None, compression="GZIP"): + """ + The MSL header carries all MSL data used for entity and user authentication, message encryption + and verification, and service tokens. Portions of the MSL header are encrypted. + https://github.com/Netflix/msl/wiki/Messages#header-data - :param message_id: number against which payload chunks are bound to protect against replay. - :param sender: ESN - :param is_handshake: This flag is set true if the message is a handshake message and will not include any - payload chunks. It will include keyrequestdata. - :param userauthdata: UserAuthData - :param keyrequestdata: KeyRequestData - :param compression: Supported compression algorithms. + :param message_id: number against which payload chunks are bound to protect against replay. + :param sender: ESN + :param is_handshake: This flag is set true if the message is a handshake message and will not include any + payload chunks. It will include keyrequestdata. + :param userauthdata: UserAuthData + :param keyrequestdata: KeyRequestData + :param compression: Supported compression algorithms. - :return: The base64 encoded JSON String of the header - """ - header_data = { - "messageid": message_id, - "renewable": True, # MUST be True if is_handshake - "handshake": is_handshake, - "capabilities": { - "compressionalgos": [compression] if compression else [], - "languages": ["en-US"], # bcp-47 - "encoderformats": ["JSON"] - }, - "timestamp": int(time.time()), - # undocumented or unused: - "sender": sender, - "nonreplayable": False, - "recipient": "Netflix", - } - if userauthdata: - header_data["userauthdata"] = userauthdata - if keyrequestdata: - header_data["keyrequestdata"] = [keyrequestdata] - return jsonpickle.encode(header_data, unpicklable=False) + :return: The base64 encoded JSON String of the header + """ + header_data = { + "messageid": message_id, + "renewable": True, # MUST be True if is_handshake + "handshake": is_handshake, + "capabilities": { + "compressionalgos": [compression] if compression else [], + "languages": ["en-US"], # bcp-47 + "encoderformats": ["JSON"] + }, + "timestamp": int(time.time()), + # undocumented or unused: + "sender": sender, + "nonreplayable": False, + "recipient": "Netflix", + } + if userauthdata: + header_data["userauthdata"] = userauthdata + if keyrequestdata: + header_data["keyrequestdata"] = [keyrequestdata] + return jsonpickle.encode(header_data, unpicklable=False) - @classmethod - def get_widevine_key(cls, kid, keys, permissions): - for key in keys: - if key.kid != kid: - continue - if key.type != "OPERATOR_SESSION": - cls.log.warning(f"Widevine Key Exchange: Wrong key type (not operator session) key {key}") - continue - if not set(permissions) <= set(key.permissions): - cls.log.warning(f"Widevine Key Exchange: Incorrect permissions, key {key}, needed perms {permissions}") - continue - return key.key - return None + @classmethod + def get_widevine_key(cls, kid, keys, permissions): + for key in keys: + if key.kid != kid: + continue + if key.type != "OPERATOR_SESSION": + cls.log.warning(f"Widevine Key Exchange: Wrong key type (not operator session) key {key}") + continue + if not set(permissions) <= set(key.permissions): + cls.log.warning(f"Widevine Key Exchange: Incorrect permissions, key {key}, needed perms {permissions}") + continue + return key.key + return None - def send_message(self, endpoint, params, application_data, userauthdata=None): - message = self.create_message(application_data, userauthdata) - res = self.session.post(url=endpoint, data=message, params=params, verify=True) # verify=True? - header, payload_data = self.parse_message(res.text) - if "errordata" in header: - raise self.log.exit( - "- MSL response message contains an error: {}".format( - json.loads(base64.b64decode(header["errordata"].encode("utf-8")).decode("utf-8")) - ) - ) - return header, payload_data + def send_message(self, endpoint, params, application_data, userauthdata=None): + message = self.create_message(application_data, userauthdata) + res = self.session.post(url=endpoint, data=message, params=params, verify=True) # verify=True? + header, payload_data = self.parse_message(res.text) + if "errordata" in header: + raise self.log.exit( + "- MSL response message contains an error: {}".format( + json.loads(base64.b64decode(header["errordata"].encode("utf-8")).decode("utf-8")) + ) + ) + return header, payload_data - def create_message(self, application_data, userauthdata=None): - self.message_id += 1 # new message must ue a new message id + def create_message(self, application_data, userauthdata=None): + self.message_id += 1 # new message must ue a new message id - headerdata = self.encrypt(self.generate_msg_header( - message_id=self.message_id, - sender=self.sender, - is_handshake=False, - userauthdata=userauthdata - )) + headerdata = self.encrypt(self.generate_msg_header( + message_id=self.message_id, + sender=self.sender, + is_handshake=False, + userauthdata=userauthdata + )) - header = json.dumps({ - "headerdata": base64.b64encode(headerdata.encode("utf-8")).decode("utf-8"), - "signature": self.sign(headerdata).decode("utf-8"), - "mastertoken": self.keys.mastertoken - }) + header = json.dumps({ + "headerdata": base64.b64encode(headerdata.encode("utf-8")).decode("utf-8"), + "signature": self.sign(headerdata).decode("utf-8"), + "mastertoken": self.keys.mastertoken + }) - payload_chunks = [self.encrypt(json.dumps({ - "messageid": self.message_id, - "data": self.gzip_compress(json.dumps(application_data).encode("utf-8")).decode("utf-8"), - "compressionalgo": "GZIP", - "sequencenumber": 1, # todo ; use sequence_number from master token instead? - "endofmsg": True - }))] + payload_chunks = [self.encrypt(json.dumps({ + "messageid": self.message_id, + "data": self.gzip_compress(json.dumps(application_data).encode("utf-8")).decode("utf-8"), + "compressionalgo": "GZIP", + "sequencenumber": 1, # todo ; use sequence_number from master token instead? + "endofmsg": True + }))] - message = header - for payload_chunk in payload_chunks: - message += json.dumps({ - "payload": base64.b64encode(payload_chunk.encode("utf-8")).decode("utf-8"), - "signature": self.sign(payload_chunk).decode("utf-8") - }) + message = header + for payload_chunk in payload_chunks: + message += json.dumps({ + "payload": base64.b64encode(payload_chunk.encode("utf-8")).decode("utf-8"), + "signature": self.sign(payload_chunk).decode("utf-8") + }) - return message + return message - def decrypt_payload_chunks(self, payload_chunks): - """ - Decrypt and extract data from payload chunks + def decrypt_payload_chunks(self, payload_chunks): + """ + Decrypt and extract data from payload chunks - :param payload_chunks: List of payload chunks - :return: json object - """ - raw_data = "" + :param payload_chunks: List of payload chunks + :return: json object + """ + raw_data = "" - for payload_chunk in payload_chunks: - # todo ; verify signature of payload_chunk["signature"] against payload_chunk["payload"] - # expecting base64-encoded json string - payload_chunk = json.loads(base64.b64decode(payload_chunk["payload"]).decode("utf-8")) - # decrypt the payload - payload_decrypted = AES.new( - key=self.keys.encryption, - mode=AES.MODE_CBC, - iv=base64.b64decode(payload_chunk["iv"]) - ).decrypt(base64.b64decode(payload_chunk["ciphertext"])) - payload_decrypted = Padding.unpad(payload_decrypted, 16) - payload_decrypted = json.loads(payload_decrypted.decode("utf-8")) - # decode and uncompress data if compressed - payload_data = base64.b64decode(payload_decrypted["data"]) - if payload_decrypted.get("compressionalgo") == "GZIP": - payload_data = zlib.decompress(payload_data, 16 + zlib.MAX_WBITS) - raw_data += payload_data.decode("utf-8") + for payload_chunk in payload_chunks: + # todo ; verify signature of payload_chunk["signature"] against payload_chunk["payload"] + # expecting base64-encoded json string + payload_chunk = json.loads(base64.b64decode(payload_chunk["payload"]).decode("utf-8")) + # decrypt the payload + payload_decrypted = AES.new( + key=self.keys.encryption, + mode=AES.MODE_CBC, + iv=base64.b64decode(payload_chunk["iv"]) + ).decrypt(base64.b64decode(payload_chunk["ciphertext"])) + payload_decrypted = Padding.unpad(payload_decrypted, 16) + payload_decrypted = json.loads(payload_decrypted.decode("utf-8")) + # decode and uncompress data if compressed + payload_data = base64.b64decode(payload_decrypted["data"]) + if payload_decrypted.get("compressionalgo") == "GZIP": + payload_data = zlib.decompress(payload_data, 16 + zlib.MAX_WBITS) + raw_data += payload_data.decode("utf-8") - data = json.loads(raw_data) - if "error" in data: - error = data["error"] - error_display = error.get("display") - error_detail = re.sub(r" \(E3-[^)]+\)", "", error.get("detail", "")) + data = json.loads(raw_data) + if "error" in data: + error = data["error"] + error_display = error.get("display") + error_detail = re.sub(r" \(E3-[^)]+\)", "", error.get("detail", "")) - #if error_display: - # self.log.critical(f"- {error_display}") - #if error_detail: - # self.log.critical(f"- {error_detail}") + #if error_display: + # self.log.critical(f"- {error_display}") + #if error_detail: + # self.log.critical(f"- {error_detail}") - if not (error_display or error_detail): - self.log.critical(f"- {error}") + if not (error_display or error_detail): + self.log.critical(f"- {error}") - sys.exit(1) + sys.exit(1) - return data["result"] + return data["result"] - def parse_message(self, message): - """ - Parse an MSL message into a header and list of payload chunks + def parse_message(self, message): + """ + Parse an MSL message into a header and list of payload chunks - :param message: MSL message - :returns: a 2-item tuple containing message and list of payload chunks if available - """ - parsed_message = json.loads("[{}]".format(message.replace("}{", "},{"))) + :param message: MSL message + :returns: a 2-item tuple containing message and list of payload chunks if available + """ + parsed_message = json.loads("[{}]".format(message.replace("}{", "},{"))) - header = parsed_message[0] - encrypted_payload_chunks = parsed_message[1:] if len(parsed_message) > 1 else [] - if encrypted_payload_chunks: - payload_chunks = self.decrypt_payload_chunks(encrypted_payload_chunks) - else: - payload_chunks = {} + header = parsed_message[0] + encrypted_payload_chunks = parsed_message[1:] if len(parsed_message) > 1 else [] + if encrypted_payload_chunks: + payload_chunks = self.decrypt_payload_chunks(encrypted_payload_chunks) + else: + payload_chunks = {} - return header, payload_chunks + return header, payload_chunks - @staticmethod - def gzip_compress(data): - out = BytesIO() - with gzip.GzipFile(fileobj=out, mode="w") as fd: - fd.write(data) - return base64.b64encode(out.getvalue()) + @staticmethod + def gzip_compress(data): + out = BytesIO() + with gzip.GzipFile(fileobj=out, mode="w") as fd: + fd.write(data) + return base64.b64encode(out.getvalue()) - @staticmethod - def base64key_decode(payload): - length = len(payload) % 4 - if length == 2: - payload += "==" - elif length == 3: - payload += "=" - elif length != 0: - raise ValueError("Invalid base64 string") - return base64.urlsafe_b64decode(payload.encode("utf-8")) + @staticmethod + def base64key_decode(payload): + length = len(payload) % 4 + if length == 2: + payload += "==" + elif length == 3: + payload += "=" + elif length != 0: + raise ValueError("Invalid base64 string") + return base64.urlsafe_b64decode(payload.encode("utf-8")) - def encrypt(self, plaintext): - """ - Encrypt the given Plaintext with the encryption key - :param plaintext: - :return: Serialized JSON String of the encryption Envelope - """ - iv = get_random_bytes(16) - return json.dumps({ - "ciphertext": base64.b64encode( - AES.new( - self.keys.encryption, - AES.MODE_CBC, - iv - ).encrypt( - Padding.pad(plaintext.encode("utf-8"), 16) - ) - ).decode("utf-8"), - "keyid": "{}_{}".format(self.sender, json.loads( - base64.b64decode(self.keys.mastertoken["tokendata"]).decode("utf-8") - )["sequencenumber"]), - "sha256": "AA==", - "iv": base64.b64encode(iv).decode("utf-8") - }) + def encrypt(self, plaintext): + """ + Encrypt the given Plaintext with the encryption key + :param plaintext: + :return: Serialized JSON String of the encryption Envelope + """ + iv = get_random_bytes(16) + return json.dumps({ + "ciphertext": base64.b64encode( + AES.new( + self.keys.encryption, + AES.MODE_CBC, + iv + ).encrypt( + Padding.pad(plaintext.encode("utf-8"), 16) + ) + ).decode("utf-8"), + "keyid": "{}_{}".format(self.sender, json.loads( + base64.b64decode(self.keys.mastertoken["tokendata"]).decode("utf-8") + )["sequencenumber"]), + "sha256": "AA==", + "iv": base64.b64encode(iv).decode("utf-8") + }) - def sign(self, text): - """ - Calculates the HMAC signature for the given text with the current sign key and SHA256 - :param text: - :return: Base64 encoded signature - """ - return base64.b64encode(HMAC.new(self.keys.sign, text.encode("utf-8"), SHA256).digest()) + def sign(self, text): + """ + Calculates the HMAC signature for the given text with the current sign key and SHA256 + :param text: + :return: Base64 encoded signature + """ + return base64.b64encode(HMAC.new(self.keys.sign, text.encode("utf-8"), SHA256).digest()) diff --git a/vinetrimmer/vinetrimmer.yml b/vinetrimmer/vinetrimmer.yml index aea2709..134a7fd 100644 --- a/vinetrimmer/vinetrimmer.yml +++ b/vinetrimmer/vinetrimmer.yml @@ -14,6 +14,7 @@ cdm: DisneyPlus: 'mtc_mtc_atv_atv_sl3000' Sunnxt: 'xiaomi_mi_a1_15.0.0_60ceee88_8159_l3' iTunes: 'mtc_mtc_atv_atv_sl3000' + MoviesAnywhere: 'ktc_t31_43f_sl3000' cdm_api: - name: 'playready'