diff --git a/ALL4/__init__.py b/ALL4/__init__.py new file mode 100644 index 0000000..5ed1197 --- /dev/null +++ b/ALL4/__init__.py @@ -0,0 +1,419 @@ +from __future__ import annotations + +import base64 +import hashlib +import json +import re +import sys +from collections.abc import Generator +from datetime import datetime, timezone +from http.cookiejar import MozillaCookieJar +from typing import Any, Optional, Union + +import click +from click import Context +from Crypto.Util.Padding import unpad +from Cryptodome.Cipher import AES +from pywidevine.cdm import Cdm as WidevineCdm +from unshackle.core.credential import Credential +from unshackle.core.manifests.dash import DASH +from unshackle.core.search_result import SearchResult +from unshackle.core.service import Service +from unshackle.core.titles import Episode, Movie, Movies, Series +from unshackle.core.tracks import Chapter, Subtitle, Tracks + + +class ALL4(Service): + """ + Service code for Channel 4's All4 streaming service (https://channel4.com). + + \b + Version: 1.0.1 + Author: stabbedbybrick + Authorization: Credentials + Robustness: + L3: 1080p, AAC2.0 + + \b + Tips: + - Use complete title URL or slug as input: + https://www.channel4.com/programmes/taskmaster OR taskmaster + - Use on demand URL for directly downloading episodes: + https://www.channel4.com/programmes/taskmaster/on-demand/75588-002 + - Both android and web/pc endpoints are checked for quality profiles. + If android is missing 1080p, it automatically falls back to web. + """ + + GEOFENCE = ("gb", "ie") + TITLE_RE = r"^(?:https?://(?:www\.)?channel4\.com/programmes/)?(?P[a-z0-9-]+)(?:/on-demand/(?P[0-9-]+))?" + + @staticmethod + @click.command(name="ALL4", short_help="https://channel4.com", help=__doc__) + @click.argument("title", type=str) + @click.pass_context + def cli(ctx: Context, **kwargs: Any) -> ALL4: + return ALL4(ctx, **kwargs) + + def __init__(self, ctx: Context, title: str): + self.title = title + super().__init__(ctx) + + self.authorization: str + self.asset_id: int + self.license_token: str + self.manifest: str + + self.session.headers.update( + { + "X-C4-Platform-Name": self.config["device"]["platform_name"], + "X-C4-Device-Type": self.config["device"]["device_type"], + "X-C4-Device-Name": self.config["device"]["device_name"], + "X-C4-App-Version": self.config["device"]["app_version"], + "X-C4-Optimizely-Datafile": self.config["device"]["optimizely_datafile"], + } + ) + + def authenticate(self, cookies: Optional[MozillaCookieJar] = None, credential: Optional[Credential] = None) -> None: + super().authenticate(cookies, credential) + if not credential: + raise EnvironmentError("Service requires Credentials for Authentication.") + + cache = self.cache.get(f"tokens_{credential.sha1}") + + if cache and not cache.expired: + # cached + self.log.info(" + Using cached Tokens...") + tokens = cache.data + elif cache and cache.expired: + # expired, refresh + self.log.info("Refreshing cached Tokens") + r = self.session.post( + self.config["endpoints"]["login"], + headers={"authorization": f"Basic {self.config['android']['auth']}"}, + data={ + "grant_type": "refresh_token", + "username": credential.username, + "password": credential.password, + "refresh_token": cache.data["refreshToken"], + }, + ) + try: + res = r.json() + except json.JSONDecodeError: + raise ValueError(f"Failed to refresh tokens: {r.text}") + + if "error" in res: + self.log.error(f"Failed to refresh tokens: {res['errorMessage']}") + sys.exit(1) + + tokens = res + self.log.info(" + Refreshed") + else: + # new + headers = {"authorization": f"Basic {self.config['android']['auth']}"} + data = { + "grant_type": "password", + "username": credential.username, + "password": credential.password, + } + r = self.session.post(self.config["endpoints"]["login"], headers=headers, data=data) + try: + res = r.json() + except json.JSONDecodeError: + raise ValueError(f"Failed to log in: {r.text}") + + if "error" in res: + self.log.error(f"Failed to log in: {res['errorMessage']}") + sys.exit(1) + + tokens = res + self.log.info(" + Acquired tokens...") + + cache.set(tokens, expiration=tokens["expiresIn"]) + + self.authorization = f"Bearer {tokens['accessToken']}" + + def search(self) -> Generator[SearchResult, None, None]: + params = { + "expand": "default", + "q": self.title, + "limit": "100", + "offset": "0", + } + + r = self.session.get(self.config["endpoints"]["search"], params=params) + r.raise_for_status() + + results = r.json() + if isinstance(results["results"], list): + for result in results["results"]: + yield SearchResult( + id_=result["brand"].get("websafeTitle"), + title=result["brand"].get("title"), + description=result["brand"].get("description"), + label=result.get("label"), + url=result["brand"].get("href"), + ) + + def get_titles(self) -> Union[Movies, Series]: + title, on_demand = (re.match(self.TITLE_RE, self.title).group(i) for i in ("id", "vid")) + + r = self.session.get( + self.config["endpoints"]["title"].format(title=title), + params={"client": "android-mod", "deviceGroup": "mobile", "include": "extended-restart"}, + headers={"Authorization": self.authorization}, + ) + if not r.ok: + self.log.error(r.text) + sys.exit(1) + + data = r.json() + + if on_demand is not None: + episodes = [ + Episode( + id_=episode["programmeId"], + service=self.__class__, + title=data["brand"]["title"], + season=episode["seriesNumber"], + number=episode["episodeNumber"], + name=episode["originalTitle"], + language="en", + data=episode["assetInfo"].get("streaming") or episode["assetInfo"].get("download"), + ) + for episode in data["brand"]["episodes"] + if episode.get("assetInfo") and episode["programmeId"] == on_demand + ] + if not episodes: + # Parse HTML of episode page to find title + data = self.get_html(self.title) + episodes = [ + Episode( + id_=data["selectedEpisode"]["programmeId"], + service=self.__class__, + title=data["brand"]["title"], + season=data["selectedEpisode"]["seriesNumber"] or 0, + number=data["selectedEpisode"]["episodeNumber"] or 0, + name=data["selectedEpisode"]["originalTitle"], + language="en", + data=data["selectedEpisode"], + ) + ] + + return Series(episodes) + + elif data["brand"]["programmeType"] == "FM": + return Movies( + [ + Movie( + id_=movie["programmeId"], + service=self.__class__, + name=data["brand"]["title"], + year=int(data["brand"]["summary"].split(" ")[0].strip().strip("()")), + language="en", + data=movie["assetInfo"].get("streaming") or movie["assetInfo"].get("download"), + ) + for movie in data["brand"]["episodes"] + ] + ) + else: + return Series( + [ + Episode( + id_=episode["programmeId"], + service=self.__class__, + title=data["brand"]["title"], + season=episode["seriesNumber"], + number=episode["episodeNumber"], + name=episode["originalTitle"], + language="en", + data=episode["assetInfo"].get("streaming") or episode["assetInfo"].get("download"), + ) + for episode in data["brand"]["episodes"] + if episode.get("assetInfo") + ] + ) + + def get_tracks(self, title: Union[Movie, Episode]) -> Tracks: + android_assets: tuple = self.android_playlist(title.id) + web_assets: tuple = self.web_playlist(title.id) + self.manifest, self.license_token, subtitle, data = self.sort_assets(title, android_assets, web_assets) + self.asset_id = int(title.data["assetId"]) + + tracks = DASH.from_url(self.manifest, self.session).to_tracks(title.language) + tracks.videos[0].data = data + + # manifest subtitles are sometimes empty even if they exist + # so we clear them and add the subtitles manually + tracks.subtitles.clear() + if subtitle is not None: + tracks.add( + Subtitle( + id_=hashlib.md5(subtitle.encode()).hexdigest()[0:6], + url=subtitle, + codec=Subtitle.Codec.from_mime(subtitle[-3:]), + language=title.language, + is_original_lang=True, + forced=False, + sdh=False, + ) + ) + else: + self.log.warning("- Subtitles are either missing or empty") + + for track in tracks.audio: + role = track.data["dash"]["representation"].find("Role") + if role is not None and role.get("value") in ["description", "alternative", "alternate"]: + track.descriptive = True + + return tracks + + def get_chapters(self, title: Union[Movie, Episode]) -> list[Chapter]: + track = title.tracks.videos[0] + + chapters = [ + Chapter( + name=f"Chapter {i + 1:02}", + timestamp=datetime.fromtimestamp((ms / 1000), tz=timezone.utc).strftime("%H:%M:%S.%f")[:-3], + ) + for i, ms in enumerate(x["breakOffset"] for x in track.data["adverts"]["breaks"]) + ] + + if track.data.get("endCredits", {}).get("squeezeIn"): + chapters.append( + Chapter( + name="Credits", + timestamp=datetime.fromtimestamp( + (track.data["endCredits"]["squeezeIn"] / 1000), tz=timezone.utc + ).strftime("%H:%M:%S.%f")[:-3], + ) + ) + + return chapters + + def get_widevine_service_certificate(self, **_: Any) -> str: + return WidevineCdm.common_privacy_cert + + def get_widevine_license(self, challenge: bytes, **_: Any) -> str: + payload = { + "message": base64.b64encode(challenge).decode("utf8"), + "token": self.license_token, + "request_id": self.asset_id, + "video": {"type": "ondemand", "url": self.manifest}, + } + + r = self.session.post(self.config["endpoints"]["license"], json=payload) + if not r.ok: + raise ConnectionError(f"License request failed: {r.json()['status']['type']}") + + return r.json()["license"] + + # Service specific functions + + def sort_assets(self, title: Union[Movie, Episode], android_assets: tuple, web_assets: tuple) -> tuple: + android_heights = None + web_heights = None + + if android_assets is not None: + try: + a_manifest, a_token, a_subtitle, data = android_assets + android_tracks = DASH.from_url(a_manifest, self.session).to_tracks(title.language) + android_heights = sorted([int(track.height) for track in android_tracks.videos], reverse=True) + except Exception: + android_heights = None + + if web_assets is not None: + try: + b_manifest, b_token, b_subtitle, data = web_assets + session = self.session + session.headers.update(self.config["headers"]) + web_tracks = DASH.from_url(b_manifest, session).to_tracks(title.language) + web_heights = sorted([int(track.height) for track in web_tracks.videos], reverse=True) + except Exception: + web_heights = None + + if not android_heights and not web_heights: + self.log.error("Failed to request manifest data. If you're behind a VPN/proxy, you might be blocked") + sys.exit(1) + + if not android_heights or android_heights[0] < 1080: + lic_token = self.decrypt_token(b_token, client="WEB") + return b_manifest, lic_token, b_subtitle, data + else: + lic_token = self.decrypt_token(a_token, client="ANDROID") + return a_manifest, lic_token, a_subtitle, data + + def android_playlist(self, video_id: str) -> tuple: + url = self.config["android"]["vod"].format(video_id=video_id) + headers = {"authorization": self.authorization} + + r = self.session.get(url=url, headers=headers) + if not r.ok: + self.log.warning("Request for Android endpoint returned %s", r) + return None + + data = json.loads(r.content) + manifest = data["videoProfiles"][0]["streams"][0]["uri"] + token = data["videoProfiles"][0]["streams"][0]["token"] + subtitle = next( + (x["url"] for x in data["subtitlesAssets"] if x["url"].endswith(".vtt")), + None, + ) + + return manifest, token, subtitle, data + + def web_playlist(self, video_id: str) -> tuple: + url = self.config["web"]["vod"].format(programmeId=video_id) + r = self.session.get(url, headers=self.config["headers"]) + if not r.ok: + self.log.warning("Request for WEB endpoint returned %s", r) + return None + + data = json.loads(r.content) + + for item in data["videoProfiles"]: + if item["name"] == "dashwv-dyn-stream-1": + token = item["streams"][0]["token"] + manifest = item["streams"][0]["uri"] + + subtitle = next( + (x["url"] for x in data["subtitlesAssets"] if x["url"].endswith(".vtt")), + None, + ) + + return manifest, token, subtitle, data + + def decrypt_token(self, token: str, client: str) -> tuple: + if client == "ANDROID": + key = self.config["android"]["key"] + iv = self.config["android"]["iv"] + + if client == "WEB": + key = self.config["web"]["key"] + iv = self.config["web"]["iv"] + + if isinstance(token, str): + token = base64.b64decode(token) + cipher = AES.new( + key=base64.b64decode(key), + iv=base64.b64decode(iv), + mode=AES.MODE_CBC, + ) + data = unpad(cipher.decrypt(token), AES.block_size) + dec_token = data.decode().split("|")[1] + return dec_token.strip() + + def get_html(self, url: str) -> dict: + r = self.session.get(url=url, headers=self.config["headers"]) + r.raise_for_status() + + init_data = re.search( + "", + "".join(r.content.decode().replace("\u200c", "").replace("\r\n", "").replace("undefined", "null")), + ) + try: + data = json.loads(init_data.group(1)) + return data["initialData"] + except Exception: + self.log.error(f"Failed to get episode for {url}") + sys.exit(1) diff --git a/ALL4/config.yaml b/ALL4/config.yaml new file mode 100644 index 0000000..1468e01 --- /dev/null +++ b/ALL4/config.yaml @@ -0,0 +1,27 @@ +headers: + Accept-Language: en-US,en;q=0.8 + User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/77.0.3865.75 Safari/537.36 + +endpoints: + login: https://api.channel4.com/online/v2/auth/token + title: https://api.channel4.com/online/v1/views/content-hubs/{title}.json + license: https://c4.eme.lp.aws.redbeemedia.com/wvlicenceproxy-service/widevine/acquire + search: https://all4nav.channel4.com/v1/api/search + +android: + key: QVlESUQ4U0RGQlA0TThESA==" + iv: MURDRDAzODNES0RGU0w4Mg==" + auth: MzZVVUN0OThWTVF2QkFnUTI3QXU4ekdIbDMxTjlMUTE6Sllzd3lIdkdlNjJWbGlrVw== + vod: https://api.channel4.com/online/v1/vod/stream/{video_id}?client=android-mod + +web: + key: bjljTGllWWtxd3pOQ3F2aQ== + iv: b2R6Y1UzV2RVaVhMdWNWZA== + vod: https://www.channel4.com/vod/stream/{programmeId} + +device: + platform_name: android + device_type: mobile + device_name: "Sony C6903 (C6903)" + app_version: "android_app:9.4.2" + optimizely_datafile: "2908" diff --git a/AMZN/__init__.py b/AMZN/__init__.py new file mode 100644 index 0000000..83050b4 --- /dev/null +++ b/AMZN/__init__.py @@ -0,0 +1,1174 @@ +import base64 +import hashlib +import json +from logging import Logger +import os +from pathlib import Path +import re +import sys +from collections import defaultdict +from http.cookiejar import CookieJar +import time +from typing import Any, Optional, Literal, Union +from urllib.parse import quote, urlencode, urlparse, urlunparse +from uuid import uuid4 + +import requests + +import click +import jsonpickle +from langcodes import Language +from click.core import ParameterSource + +from unshackle.core.cacher import Cacher +from unshackle.core.credential import Credential +from unshackle.core.manifests import DASH, ISM +from unshackle.core.service import Service +from unshackle.core.titles import Episode, Movie, Movies, Series, Title_T, Titles_T +from unshackle.core.tracks import Chapter, Chapters, Subtitle, Tracks, Track, Video +from unshackle.core.tracks.audio import Audio +from unshackle.core.utilities import is_close_match +from unshackle.core.utils.collections import as_list + + +class AMZN(Service): + + """ + Service code for Amazon VOD (https://amazon.com) and Amazon Prime Video (https://primevideo.com). + + \b + + Authorization: Cookies + + Security: + + UHD@L1/SL3000 + FHD@L3(ChromeCDM)/SL2000 + SD@L3 + Certain SL2000 can do UHD + + \b + + Maintains their own license server like Netflix, be cautious. + + Region is chosen automatically based on domain extension found in cookies. + Prime Video specific code will be run if the ASIN is detected to be a prime video variant. + Use 'Amazon Video ASIN Display' for Tampermonkey addon for ASIN + https://greasyfork.org/en/scripts/496577-amazon-video-asin-display + + vt dl --list -z uk -q 1080 Amazon B09SLGYLK8 + """ + # GEOFENCE = ("",) + ALIASES = ("Amazon", "prime", 'amazon') + TITLE_RE = r"^(?:https?://(?:www\.)?(?Pamazon\.(?Pcom|co\.uk|de|co\.jp)|primevideo\.com)(?:/.+)?/)?(?P[A-Z0-9]{10,}|amzn1\.dv\.gti\.[a-f0-9-]+)" # noqa: E501 + + REGION_TLD_MAP = { + "au": "com.au", + "br": "com.br", + "jp": "co.jp", + "mx": "com.mx", + "tr": "com.tr", + "gb": "co.uk", + "us": "com", + } + VIDEO_RANGE_MAP = { + "SDR": "None", + "HDR10": "Hdr10", + "DV": "DolbyVision", + } + VIDEO_CODEC_MAP = { + "H264": ["avc1"], + "H265": ["hvc1", "dvh1"] + } + @staticmethod + @click.command(name="AMZN", short_help="https://amazon.com, https://primevideo.com", help=__doc__) + @click.argument("title", type=str, required=False) + @click.option("-b", "--bitrate", default="CBR", + type=click.Choice(["CVBR", "CBR", "CVBR+CBR"], case_sensitive=False), + help="Video Bitrate Mode to download in. CVBR=Constrained Variable Bitrate, CBR=Constant Bitrate.") + @click.option("-c", "--cdn", default=None, type=str, + help="CDN to download from, defaults to the CDN with the highest weight set by Amazon.") + # UHD, HD, SD. UHD only returns HEVC, ever, even for <=HD only content + @click.option("-vq", "--vquality", default="HD", + type=click.Choice(["SD", "HD", "UHD"], case_sensitive=False), + help="Manifest quality to request.") + @click.option("-s", "--single", is_flag=True, default=False, + help="Force single episode/season instead of getting series ASIN.") + @click.option("-am", "--amanifest", default="H265", + type=click.Choice(["CVBR", "CBR", "H265"], case_sensitive=False), + help="Manifest to use for audio. Defaults to H265 if the video manifest is missing 640k audio.") + @click.option("-aq", "--aquality", default="SD", + type=click.Choice(["SD", "HD", "UHD"], case_sensitive=False), + help="Manifest quality to request for audio. Defaults to the same as --quality.") + # @click.option("-ism", "--ism", is_flag=True, default=False, + # help="Set manifest override to SmoothStreaming. Defaults to DASH w/o this flag.") ## DPRECATED + @click.option("-aa", "--atmos", is_flag=True, default=False, + help="Prefer Atmos audio if available, otherwise defaults to 640k audio.") + @click.option("-drm", "--drm-system", type=click.Choice(["widevine", "playready"], case_sensitive=False), + default="playready", + help="which drm system to use") + + @click.pass_context + def cli(ctx, **kwargs): + return AMZN(ctx, **kwargs) + + def __init__(self, ctx, title, bitrate: str, cdn: str, vquality: str, single: bool, amanifest: str, aquality: str, drm_system: Literal["widevine", "playready"], atmos: bool) -> None: + m = self.parse_title(ctx, title) + self.domain = m.get("domain") + self.domain_region = m.get("region") + self.drm_system = drm_system + self.bitrate = bitrate + self.bitrate_source = ctx.get_parameter_source("bitrate") + self.vquality = vquality + self.vquality_source = ctx.get_parameter_source("vquality") + self.cdn = cdn + self.single = single + self.amanifest = amanifest + self.aquality = aquality + self.atmos = atmos + super().__init__(ctx) + + assert ctx.parent is not None + + + self.chapters_only = ctx.parent.params.get("chapters_only") + self.quality = ctx.parent.params.get("quality") + + self.cdm = ctx.obj.cdm + self.profile = ctx.obj.profile + self.region: dict[str, str] = {} + self.endpoints: dict[str, str] = {} + self.device: dict[str, str] = {} + + self.pv = self.domain == "primevideo.com" + self.device_token = None + self.device_id = None + self.customer_id = None + self.client_id = "f22dbddb-ef2c-48c5-8876-bed0d47594fd" # browser client id + + vcodec = ctx.parent.params.get("vcodec") + range = ctx.parent.params.get("range_") + + self.range = range[0].name if range else "SDR" + self.vcodec = "H265" if vcodec and vcodec == Video.Codec.HEVC else "H264" + + if self.vquality_source != ParameterSource.COMMANDLINE: + if any(q <= 576 for q in self.quality) and "SDR" == self.range: + self.log.info(" + Setting manifest quality to SD") + self.vquality = "SD" + + if any(q > 1080 for q in self.quality): + self.log.info(" + Setting manifest quality to UHD and vcodec to H265 to be able to get 2160p video track") + self.vquality = "UHD" + self.vcodec = "H265" + + self.vquality = self.vquality or "HD" + + if self.bitrate_source != ParameterSource.COMMANDLINE: + if self.vcodec == "H265" and self.range == "SDR" and self.bitrate != "CVBR+CBR": + self.bitrate = "CVBR+CBR" + self.log.info(" + Changed bitrate mode to CVBR+CBR to be able to get H.265 SDR video track") + + if self.vquality == "UHD" and self.range != "SDR" and self.bitrate != "CBR": + self.bitrate = "CBR" + self.log.info(f" + Changed bitrate mode to CBR to be able to get highest quality UHD {self.range} video track") + + self.orig_bitrate = self.bitrate + + + self.manifestTypeTry = "DASH" + self.log.info("Getting tracks from MPD manifest") + + def authenticate(self, cookies: Optional[CookieJar] = None, credential: Optional[Credential] = None) -> None: + super().authenticate(cookies, credential) + if not cookies: + raise EnvironmentError("Service requires Cookies for Authentication.") + + self.session.cookies.update(cookies) + self.configure() + + # Abstracted functions + + def get_titles(self) -> Titles_T: + res = self.session.get( + url=self.endpoints["details"], + params={"titleID": self.title, "isElcano": "1", "sections": "Atf"}, + headers={"Accept": "application/json"}, + ).json()["widgets"] + + entity = res["header"]["detail"].get("entityType") + if not entity: + self.log.error(" - Failed to get entity type") + sys.exit(1) + + if entity == "Movie": + metadata = res["header"]["detail"] + return Movies( + [ + Movie( + id_=metadata.get("catalogId"), + year=metadata.get("releaseYear"), + name=metadata.get("title"), + service=self.__class__, + data=metadata, + ) + ] + ) + elif entity == "TV Show": + seasons = [x.get("titleID") for x in res["seasonSelector"]] + + episodes = [] + for season in seasons: + res = self.session.get( + url=self.endpoints["detail"], + params={"titleID": season, "isElcano": "1", "sections": "Btf"}, + headers={"Accept": "application/json"}, + ).json()["widgets"] + + # cards = [x["detail"] for x in as_list(res["titleContent"][0]["cards"])] + cards = [ + {**x["detail"], "sequenceNumber": x["self"]["sequenceNumber"]} + for x in res["episodeList"]["episodes"] + ] + + product_details = res["productDetails"]["detail"] + + episodes.extend( + Episode( + id_=title.get("titleId") or title["catalogId"], + title=product_details.get("parentTitle") or product_details["title"], + year=title.get("releaseYear") or product_details.get("releaseYear"), + season=product_details.get("seasonNumber"), + number=title.get("sequenceNumber"), + name=title.get("title"), + service=self.__class__, + data=title, + ) + for title in cards + if title["entityType"] == "TV Show" + ) + + return Series(episodes) + + def get_tracks(self, title: Title_T) -> Tracks: + tracks = Tracks() + if self.chapters_only: + return [] + + #manifest, chosen_manifest, tracks = self.get_best_quality(title) + + manifest = self.get_manifest( + title, + video_codec=self.vcodec, + bitrate_mode=self.bitrate, + quality=self.vquality, + hdr=self.range, + ignore_errors=False + + ) + + # Move rightsException termination here so that script can attempt continuing + if "rightsException" in manifest["returnedTitleRendition"]["selectedEntitlement"]: + self.log.error(" - The profile used does not have the rights to this title.") + return + + self.customer_id = manifest["returnedTitleRendition"]["selectedEntitlement"]["grantedByCustomerId"] + + default_url_set = manifest["playbackUrls"]["urlSets"][manifest["playbackUrls"]["defaultUrlSetId"]] + encoding_version = default_url_set["urls"]["manifest"]["encodingVersion"] + self.log.info(f" + Detected encodingVersion={encoding_version}") + + #print(manifest) + chosen_manifest = self.choose_manifest(manifest, self.cdn) + + if not chosen_manifest: + raise self.log.exit(f"No manifests available") + + manifest_url = self.clean_mpd_url(chosen_manifest["avUrlInfoList"][0]["url"], False) + self.log.debug(manifest_url) + # if self.event: + # devicetype = self.device["device_type"] + # manifest_url = chosen_manifest["avUrlInfoList"][0]["url"] + # manifest_url = f"{manifest_url}?amznDtid={devicetype}&encoding=segmentBase" + self.log.info(" + Downloading Manifest") + + if chosen_manifest["streamingTechnology"] == "DASH": + tracks = Tracks([ + x for x in iter(DASH.from_url(url=manifest_url, session=self.session).to_tracks(language="es")) + ]) + elif chosen_manifest["streamingTechnology"] == "SmoothStreaming": + tracks = Tracks([ + x for x in iter(ISM.from_url(url=manifest_url, session=self.session).to_tracks(language="es")) + ]) + else: + raise self.log.exit(f"Unsupported manifest type: {chosen_manifest['streamingTechnology']}") + + need_separate_audio = ((self.aquality or self.vquality) != self.vquality + or self.amanifest == "CVBR" and (self.vcodec, self.bitrate) != ("H264", "CVBR") + or self.amanifest == "CBR" and (self.vcodec, self.bitrate) != ("H264", "CBR") + or self.amanifest == "H265" and self.vcodec != "H265" + or self.amanifest != "H265" and self.vcodec == "H265") + + if not need_separate_audio: + audios = defaultdict(list) + for audio in tracks.audio: + audios[audio.language].append(audio) + + for lang in audios: + if not any((x.bitrate or 0) >= 640000 for x in audios[lang]): + need_separate_audio = True + break + + if need_separate_audio: # and not self.atmos: + tracks.audio.clear() + manifest_type = self.amanifest or "H265" + self.log.info(f"Getting audio from {manifest_type} manifest for potential higher bitrate or better codec") + audio_manifest = self.get_manifest( + title=title, + video_codec="H265" if manifest_type == "H265" else "H264", + bitrate_mode="CVBR" if manifest_type != "CBR" else "CBR", + quality=self.aquality or self.vquality, + hdr=None, + ignore_errors=True + ) + if not audio_manifest: + self.log.warning(f" - Unable to get {manifest_type} audio manifests, skipping") + elif not (chosen_audio_manifest := self.choose_manifest(audio_manifest, self.cdn)): + self.log.warning(f" - No {manifest_type} audio manifests available, skipping") + else: + audio_mpd_url = self.clean_mpd_url(chosen_audio_manifest["avUrlInfoList"][0]["url"], optimise=False) + self.log.debug(audio_mpd_url) + # if self.event: + # devicetype = self.device["device_type"] + # audio_mpd_url = chosen_audio_manifest["avUrlInfoList"][0]["url"] + # audio_mpd_url = f"{audio_mpd_url}?amznDtid={devicetype}&encoding=segmentBase" + self.log.info(" + Downloading HEVC manifest") + + try: + audio_mpd = Tracks([ + x for x in iter(DASH.from_url(url=audio_mpd_url, session=self.session).to_tracks(language="en")) + ]) + except KeyError: + self.log.warning(f" - Title has no {self.amanifest} stream, cannot get higher quality audio") + else: + tracks.audio = audio_mpd.audio # expecting possible dupes, ignore + + need_uhd_audio = self.atmos + + if not self.amanifest and ((self.aquality == "UHD" and self.vquality != "UHD") or not self.aquality): + audios = defaultdict(list) + for audio in tracks.audio: + audios[audio.language].append(audio) + for lang in audios: + if not any((x.bitrate or 0) >= 640000 for x in audios[lang]): + need_uhd_audio = True + break + + if need_uhd_audio and (self.config.get("device") or {}).get(self.profile, None): + self.log.info("Getting audio from UHD manifest for potential higher bitrate or better codec") + temp_device = self.device + temp_device_token = self.device_token + temp_device_id = self.device_id + uhd_audio_manifest = None + + try: + if self.cdm.device.type in ["CHROME", "PLAYREADY"] and self.quality < 2160: + self.log.info(f" + Switching to device to get UHD manifest") + self.register_device() + + uhd_audio_manifest = self.get_manifest( + title=title, + video_codec="H265", + bitrate_mode="CVBR+CBR", + quality="UHD", + hdr="DV", # Needed for 576kbps Atmos sometimes + ignore_errors=True + ) + except: + pass + + self.device = temp_device + self.device_token = temp_device_token + self.device_id = temp_device_id + + if not uhd_audio_manifest: + self.log.warning(f" - Unable to get UHD manifests, skipping") + elif not (chosen_uhd_audio_manifest := self.choose_manifest(uhd_audio_manifest, self.cdn)): + self.log.warning(f" - No UHD manifests available, skipping") + else: + tracks.audio.clear() + uhd_audio_mpd_url = self.clean_mpd_url(chosen_uhd_audio_manifest["avUrlInfoList"][0]["url"], optimise=False) + self.log.debug(uhd_audio_mpd_url) + # if self.event: + # devicetype = self.device["device_type"] + # uhd_audio_mpd_url = chosen_uhd_audio_manifest["avUrlInfoList"][0]["url"] + # uhd_audio_mpd_url = f"{uhd_audio_mpd_url}?amznDtid={devicetype}&encoding=segmentBase" + self.log.info(" + Downloading UHD manifest") + + try: + uhd_audio_mpd = Tracks([ + x for x in iter(DASH.from_url(url=uhd_audio_mpd_url, session=self.session).to_tracks(language="en")) + ]) + except KeyError: + self.log.warning(f" - Title has no UHD stream, cannot get higher quality audio") + else: + # replace the audio tracks with DV manifest version if atmos is present + if any(x for x in uhd_audio_mpd.audio if x.atmos): + tracks.audio = uhd_audio_mpd.audio + + for video in tracks.videos: + video.hdr10 = chosen_manifest["hdrFormat"] == "Hdr10" + video.dv = chosen_manifest["hdrFormat"] == "DolbyVision" + + for audio in tracks.audio: + audio.descriptive = audio.data["dash"]["adaptation_set"].get("audioTrackSubtype") == "descriptive" + # Amazon @lang is just the lang code, no dialect, @audioTrackId has it. + audio_track_id = audio.data["dash"]["adaptation_set"].get("audioTrackId") + if audio_track_id: + audio.language = Language.get(audio_track_id.split("_")[0]) # e.g. es-419_ec3_blabla + # Remove any audio tracks with dialog boost! + if audio.data["dash"]["adaptation_set"] is not None and "boosteddialog" in audio.data["dash"]["adaptation_set"].get("audioTrackSubtype", ""): + audio.bitrate = 1 + + for sub in manifest.get("subtitleUrls", []) + manifest.get("forcedNarratives", []): + try: + tracks.add(Subtitle( + id_=sub.get( + "timedTextTrackId", + f"{sub['languageCode']}_{sub['type']}_{sub['subtype']}_{sub.get('index', 'default')}" + ), + url=os.path.splitext(sub["url"])[0] + ".srt", # DFXP -> SRT forcefully seems to work fine + # metadata + codec=Subtitle.Codec.from_codecs("srt"), # sub["format"].lower(), + language=sub["languageCode"], + #is_original_lang=title.original_lang and is_close_match(sub["languageCode"], [title.original_lang]), + forced="forced" in sub["displayName"], + sdh=sub["type"].lower() == "sdh" # TODO: what other sub types? cc? forced? + ), warn_only=True) # expecting possible dupes, ignore + except KeyError: + # Log the KeyError Exception but continue (as only the subtitles will be missing) + self.log.error("Unexpected subtitle track id data format, subtitles will be missing", exc_info=True) + + for track in tracks: + track.needs_proxy = False + + tracks.audio = self._dedupe(tracks.audio) + + return tracks + + @staticmethod + def _dedupe(items: list) -> list: + if not items: + return items + if isinstance(items[0].url, list): + return items + + # Create a more specific key for deduplication that includes resolution/bitrate + seen = {} + for item in items: + # For video tracks, use codec + resolution + bitrate as key + if hasattr(item, 'width') and hasattr(item, 'height'): + key = f"{item.codec}_{item.width}x{item.height}_{item.bitrate}" + # For audio tracks, use codec + language + bitrate + channels as key + elif hasattr(item, 'channels'): + key = f"{item.codec}_{item.language}_{item.bitrate}_{item.channels}" + # Fallback to URL for other track types + else: + key = item.url + + # Keep the item if we haven't seen this exact combination + if key not in seen: + seen[key] = item + + return list(seen.values()) + + def get_chapters(self, title: Title_T) -> Chapters: + """Get chapters from Amazon's XRay Scenes API.""" + manifest = self.get_manifest( + title, + video_codec=self.vcodec, + bitrate_mode=self.bitrate, + quality="UHD", + hdr=self.range + ) + + if "xrayMetadata" in manifest: + xray_params = manifest["xrayMetadata"]["parameters"] + elif self.chapters_only: + xray_params = { + "pageId": "fullScreen", + "pageType": "xray", + "serviceToken": json.dumps({ + "consumptionType": "Streaming", + "deviceClass": "normal", + "playbackMode": "playback", + "vcid": manifest["returnedTitleRendition"]["contentId"], + }) + } + else: + return [] + + xray_params.update({ + "deviceID": self.device_id, + "deviceTypeID": self.config["device_types"]["browser"], # must be browser device type + "marketplaceID": self.region["marketplace_id"], + "gascEnabled": str(self.pv).lower(), + "decorationScheme": "none", + "version": "inception-v2", + "uxLocale": "en-US", + "featureScheme": "XRAY_WEB_2020_V1" + }) + + xray = self.session.get( + url=self.endpoints["xray"], + params=xray_params + ).json().get("page") + + if not xray: + return [] + + widgets = xray["sections"]["center"]["widgets"]["widgetList"] + + scenes = next((x for x in widgets if x["tabType"] == "scenesTab"), None) + if not scenes: + return [] + scenes = scenes["widgets"]["widgetList"][0]["items"]["itemList"] + + chapters = [] + + for scene in scenes: + chapter_title = scene["textMap"]["PRIMARY"] + match = re.search(r"(\d+\. |)(.+)", chapter_title) + if match: + chapter_title = match.group(2) + chapters.append(Chapter( + name=chapter_title, + timestamp=scene["textMap"]["TERTIARY"].replace("Starts at ", "") + )) + + return chapters + + def get_widevine_service_certificate(self, **_: Any) -> str: + return self.config["certificate"] + + def get_widevine_license(self, *, challenge: bytes, title: Title_T, track) -> None: + response = self.session.post( + url=self.endpoints["license"], + params={ + "asin": title.id, + "consumptionType": "Streaming", + "desiredResources": "Widevine2License", + "deviceTypeID": self.device["device_type"], + "deviceID": self.device_id, + "firmware": 1, + "gascEnabled": str(self.pv).lower(), + "marketplaceID": self.region["marketplace_id"], + "resourceUsage": "ImmediateConsumption", + "videoMaterialType": "Feature", + "operatingSystemName": "Linux" if any(q <= 576 for q in self.quality) else "Windows", + "operatingSystemVersion": "unknown" if any(q <= 576 for q in self.quality) else "10.0", + "customerID": self.customer_id, + "deviceDrmOverride": "CENC", + "deviceStreamingTechnologyOverride": "DASH", + "deviceVideoQualityOverride": "HD", + "deviceHdrFormatsOverride": "None", + }, + headers={ + "Accept": "application/json", + "Content-Type": "application/x-www-form-urlencoded", + "Authorization": f"Bearer {self.device_token}", + }, + data={ + "widevine2Challenge": base64.b64encode(challenge).decode(), + "includeHdcpTestKeyInLicense": "false", + }, + ).json() + if "errorsByResource" in response: + error_code = response["errorsByResource"]["Widevine2License"] + if "errorCode" in error_code: + error_code = error_code["errorCode"] + elif "type" in error_code: + error_code = error_code["type"] + + if error_code in ["PRS.NoRights.AnonymizerIP", "PRS.NoRights.NotOwned"]: + self.log.error("Proxy detected, Unable to License") + elif error_code == "PRS.Dependency.DRM.Widevine.UnsupportedCdmVersion": + self.log.error("Cdm version not supported") + else: + self.log.error(f" x Error from Amazon's License Server: [{error_code}]") + sys.exit(1) + + return response["widevine2License"]["license"] + + def get_playready_license(self, challenge: Union[bytes, str], title: Title_T, **_): + lic_list = [] + lic_challenge = base64.b64encode(challenge).decode("utf-8") if isinstance(challenge, bytes) else base64.b64encode(challenge.encode("utf-8")).decode("utf-8") + self.log.debug(f"Challenge - {lic_challenge}") + params = { + "asin": title.id, + "consumptionType": "Streaming", # Streaming or Download both work + "desiredResources": "PlayReadyLicense", + "deviceTypeID": self.device["device_type"], + "deviceID": self.device_id, + "firmware": 1, + "gascEnabled": str(self.pv).lower(), + "marketplaceID": self.region["marketplace_id"], + "resourceUsage": "ImmediateConsumption", + "videoMaterialType": "Feature", + "operatingSystemName": "Windows", + "operatingSystemVersion": "10.0", + "customerID": self.customer_id, + "deviceDrmOverride": "CENC", #CENC or Playready both work + "deviceStreamingTechnologyOverride": "DASH", # or SmoothStreaming + "deviceVideoQualityOverride": self.vquality, + "deviceHdrFormatsOverride": self.VIDEO_RANGE_MAP.get(self.range, "None"), + } + headers = { + "Accept": "application/json", + "Content-Type": "application/x-www-form-urlencoded", + "Authorization": f"Bearer {self.device_token}" + } + data = { + "playReadyChallenge": lic_challenge, # expects base64 + "includeHdcpTestKeyInLicense": "true" + } + lic = self.session.post( + url=self.endpoints["licence"], + params=params, + headers=headers, + data=data + ).json() + lic_list.append(lic) + # params["deviceStreamingTechnologyOverride"] = "SmoothStreaming" + params["deviceDrmOverride"] = "Playready" + lic = self.session.post( + url=self.endpoints["licence"], + params=params, + headers=headers, + data=data + ).json() + lic_list.append(lic) + + for lic in lic_list: + if "errorsByResource" in lic: + error_code = lic["errorsByResource"]["PlayReadyLicense"] + self.log.debug(error_code) + if "errorCode" in error_code: + error_code = error_code["errorCode"] + elif "type" in error_code: + error_code = error_code["type"] + if error_code == "PRS.NoRights.AnonymizerIP": + self.log.error(" - Amazon detected a Proxy/VPN and refused to return a license!") + continue + message = lic["errorsByResource"]["PlayReadyLicense"]["message"] + self.log.error(f" - Amazon reported an error during the License request: {message} [{error_code}]") + continue + elif "error" in lic: + error_code = lic["error"] + if "errorCode" in error_code: + error_code = error_code["errorCode"] + elif "type" in error_code: + error_code = error_code["type"] + if error_code == "PRS.NoRights.AnonymizerIP": + self.log.error(" - Amazon detected a Proxy/VPN and refused to return a license!") + continue + message = lic["error"]["message"] + self.log.error(f" - Amazon reported an error during the License request: {message} [{error_code}]") + continue + else: + xmrlic = base64.b64decode(lic["playReadyLicense"]["encodedLicenseResponse"].encode("utf-8")).decode("utf-8") + self.log.debug(xmrlic) + return xmrlic # Return Xml licence + + # Service specific functions + + def configure(self): + if len(self.title) > 10 and not (self.domain or "").startswith("amazon."): + self.pv = True + + self.log.info("Getting account region") + self.region = self.get_region() + if not self.region: + self.log.error(" - Failed to get Amazon account region") + sys.exit(1) + # self.GEOFENCE.append(self.region["code"]) + self.log.info(f" + Region: {self.region['code'].upper()}") + + # endpoints must be prepared AFTER region data is retrieved + self.endpoints = self.prepare_endpoints(self.config["endpoints"], self.region) + + self.session.headers.update({"Origin": f"https://{self.region['base']}"}) + + self.device = (self.config.get("device") or {}).get(self.profile, "default") + + if (int(self.quality[0]) > 1080 or self.range != "SDR" or self.atmos): + self.log.info(f"Using device to get UHD manifests") + self.register_device() + + elif not self.device or self.vquality != "UHD" or self.drm_system == "widevine": + # falling back to browser-based device ID + if not self.device: + self.log.warning( + "No Device information was provided for %s, using browser device...", + self.profile + ) + self.device_id = hashlib.sha224( + ("CustomerID" + self.session.headers["User-Agent"]).encode("utf-8") + ).hexdigest() + self.device = {"device_type": self.config["device_types"]["browser"]} + else: + self.register_device() + + def register_device(self) -> None: + self.device = (self.config.get("device") or {}).get(self.profile, "default") + device_cache_path = f"device_tokens_{self.profile}_{hashlib.md5(json.dumps(self.device).encode()).hexdigest()[0:6]}" + self.device_token = self.DeviceRegistration( + device=self.device, + endpoints=self.endpoints, + log=self.log, + cache_path=device_cache_path, + session=self.session + ).bearer + self.device_id = self.device.get("device_serial") + if not self.device_id: + raise self.log.error(f" - A device serial is required in the config, perhaps use: {os.urandom(8).hex()}") + + def get_region(self): + domain_region = self.get_domain_region() + if not domain_region: + return {} + + region = self.config["regions"].get(domain_region) + if not region: + raise self.log.error(f" - There's no region configuration data for the region: {domain_region}") + + region["code"] = domain_region + + if self.pv: + res = self.session.get("https://www.primevideo.com").text + match = re.search(r'ue_furl *= *([\'"])fls-(na|eu|fe)\.amazon\.[a-z.]+\1', res) + if match: + pv_region = match.group(2).lower() + else: + raise self.log.error(" - Failed to get PrimeVideo region") + pv_region = {"na": "atv-ps"}.get(pv_region, f"atv-ps-{pv_region}") + region["base_manifest"] = f"{pv_region}.primevideo.com" + region["base"] = "www.primevideo.com" + + return region + + def get_domain_region(self): + """Get the region of the cookies from the domain.""" + tld = (self.domain_region or "").split(".")[-1] + if not tld: + domains = [x.domain for x in self.session.cookies if x.domain_specified] + tld = next((x.split(".")[-1] for x in domains if x.startswith((".amazon.", ".primevideo."))), None) + return {"com": "us", "uk": "gb"}.get(tld, tld) + + def prepare_endpoint(self, name: str, uri: str, region: dict) -> str: + if name in ("browse", "playback", "licence", "xray"): + return f"https://{(region['base_manifest'])}{uri}" + if name in ("ontv", "ontvold", "mytv", "devicelink", "details", "getDetailWidgets"): + if self.pv: + host = "www.primevideo.com" + else: + host = region["base"] + return f"https://{host}{uri}" + if name in ("codepair", "register", "token"): + return f"https://{self.config['regions']['us']['base_api']}{uri}" + raise ValueError(f"Unknown endpoint: {name}") + + def prepare_endpoints(self, endpoints: dict, region: dict) -> dict: + return {k: self.prepare_endpoint(k, v, region) for k, v in endpoints.items()} + + def choose_manifest(self, manifest: dict, cdn=None): + """Get manifest URL for the title based on CDN weight (or specified CDN).""" + if cdn: + cdn = cdn.lower() + manifest = next((x for x in manifest["audioVideoUrls"]["avCdnUrlSets"] if x["cdn"].lower() == cdn), {}) + if not manifest: + raise self.log.exit(f" - There isn't any DASH manifests available on the CDN \"{cdn}\" for this title") + else: + manifest = next((x for x in sorted([x for x in manifest["audioVideoUrls"]["avCdnUrlSets"]], key=lambda x: int(x["cdnWeightsRank"]))), {}) + + return manifest + + def get_manifest( + self, title, video_codec: str, bitrate_mode: str, quality: str, hdr=None, + ignore_errors: bool = False + ) -> dict: + res = self.session.get( + url=self.endpoints["playback"], + params={ + "asin": title.id, + "consumptionType": "Streaming", + "desiredResources": ",".join([ + "PlaybackUrls", + "AudioVideoUrls", + "CatalogMetadata", + "ForcedNarratives", + "SubtitlePresets", + "SubtitleUrls", + "TransitionTimecodes", + "TrickplayUrls", + "CuepointPlaylist", + "XRayMetadata", + "PlaybackSettings", + ]), + "deviceID": self.device_id, + "deviceTypeID": self.device["device_type"], + "firmware": 1, + "gascEnabled": str(self.pv).lower(), + "marketplaceID": self.region["marketplace_id"], + "resourceUsage": "CacheResources", + "videoMaterialType": "Feature", + "playerType": "html5", + "clientId": self.client_id, + **({ + "operatingSystemName": "Linux" if quality == "SD" else "Windows", + "operatingSystemVersion": "unknown" if quality == "SD" else "10.0", + } if not self.device_token else {}), + "deviceDrmOverride": "CENC", + "deviceStreamingTechnologyOverride": "DASH", + "deviceProtocolOverride": "Https", + "deviceVideoCodecOverride": video_codec, + "deviceBitrateAdaptationsOverride": bitrate_mode.replace("+", ","), + "deviceVideoQualityOverride": quality, + "deviceHdrFormatsOverride": self.VIDEO_RANGE_MAP.get(hdr, "None"), + "supportedDRMKeyScheme": "DUAL_KEY", # ? + "liveManifestType": "live,accumulating", # ? + "titleDecorationScheme": "primary-content", + "subtitleFormat": "TTMLv2", + "languageFeature": "MLFv2", # ? + "uxLocale": "en_US", + "xrayDeviceClass": "normal", + "xrayPlaybackMode": "playback", + "xrayToken": "XRAY_WEB_2020_V1", + "playbackSettingsFormatVersion": "1.0.0", + "playerAttributes": json.dumps({"frameRate": "HFR"}), + # possibly old/unused/does nothing: + "audioTrackId": "all", + }, + headers={ + "Authorization": f"Bearer {self.device_token}" if self.device_token else None, + }, + ) + try: + manifest = res.json() + except json.JSONDecodeError: + if ignore_errors: + return {} + + raise self.log.exit(" - Amazon didn't return JSON data when obtaining the Playback Manifest.") + + if "error" in manifest: + if ignore_errors: + return {} + raise self.log.exit(" - Amazon reported an error when obtaining the Playback Manifest.") + + # Commented out as we move the rights exception check elsewhere + # if "rightsException" in manifest["returnedTitleRendition"]["selectedEntitlement"]: + # if ignore_errors: + # return {} + # raise self.log.exit(" - The profile used does not have the rights to this title.") + + # Below checks ignore NoRights errors + + if ( + manifest.get("errorsByResource", {}).get("PlaybackUrls") and + manifest["errorsByResource"]["PlaybackUrls"].get("errorCode") != "PRS.NoRights.NotOwned" + ): + if ignore_errors: + return {} + error = manifest["errorsByResource"]["PlaybackUrls"] + raise self.log.exit(f" - Amazon had an error with the Playback Urls: {error['message']} [{error['errorCode']}]") + + if ( + manifest.get("errorsByResource", {}).get("AudioVideoUrls") and + manifest["errorsByResource"]["AudioVideoUrls"].get("errorCode") != "PRS.NoRights.NotOwned" + ): + if ignore_errors: + return {} + error = manifest["errorsByResource"]["AudioVideoUrls"] + raise self.log.exit(f" - Amazon had an error with the A/V Urls: {error['message']} [{error['errorCode']}]") + + return manifest + + @staticmethod + def get_original_language(manifest): + """Get a title's original language from manifest data.""" + try: + return next( + x["language"].replace("_", "-") + for x in manifest["catalogMetadata"]["playback"]["audioTracks"] + if x["isOriginalLanguage"] + ) + except (KeyError, StopIteration): + pass + + if "defaultAudioTrackId" in manifest.get("playbackUrls", {}): + try: + return manifest["playbackUrls"]["defaultAudioTrackId"].split("_")[0] + except IndexError: + pass + + try: + return sorted( + manifest["audioVideoUrls"]["audioTrackMetadata"], + key=lambda x: x["index"] + )[0]["languageCode"] + except (KeyError, IndexError): + pass + + return None + + @staticmethod + def clean_mpd_url(mpd_url, optimise=True): + print(f"MPD URL: {mpd_url}, optimise: {optimise}") + """Clean up an Amazon MPD manifest url.""" + if 'akamaihd.net' in mpd_url: + match = re.search(r'[^/]*\$[^/]*/', mpd_url) + if match: + dollar_sign_part = match.group(0) + mpd_url = mpd_url.replace(dollar_sign_part, '', 1) + return mpd_url + + if optimise: + return mpd_url.replace("~", "") + "?encoding=segmentBase" + else: + if match := re.match(r"(https?://.*/)d.?/.*~/(.*)", mpd_url): + print(f"returned: {''.join(match.groups())}") + return "".join(match.groups()) + elif match := re.match(r"(https?://.*/)d.?/.*\$.*?/(.*)", mpd_url): + print(f"returned: {''.join(match.groups())}") + return "".join(match.groups()) + elif match := re.match(r"(https?://.*/).*\$.*?/(.*)", mpd_url): + print(f"returned: {''.join(match.groups())}") + return "".join(match.groups()) + raise ValueError("Unable to parse MPD URL") + + def parse_title(self, ctx, title): + title = title or ctx.parent.params.get("title") + if not title: + self.log.error(" - No title ID specified") + if not getattr(self, "TITLE_RE"): + self.title = title + return {} + for regex in as_list(self.TITLE_RE): + m = re.search(regex, title) + if m: + self.title = m.group("id") + return m.groupdict() + self.log.warning(f" - Unable to parse title ID {title!r}, using as-is") + self.title = title + + # Service specific classes + + class DeviceRegistration: + def __init__(self, device: dict, endpoints: dict, cache_path: str, session: requests.Session, log: Logger): + self.session = session + self.device = device + self.endpoints = endpoints + self.cache_path = cache_path + self.log = log + self.cache = Cacher('AMZN') + # self.device = {k: str(v) if not isinstance(v, str) else v for k, v in self.device.items()} + + self.bearer = None + + self._cache = self.cache.get(self.cache_path) + if self._cache: + if self._cache.data.get("expires_in", 0) > int(time.time()): + self.log.info(" + Using cached device bearer") + self.bearer = self._cache["access_token"] + else: + self.log.info("Refreshing cached device bearer...") + refreshed_tokens = self.refresh(self.device, self._cache.data["refresh_token"], self._cache.data["access_token"]) + refreshed_tokens["refresh_token"] = self._cache.data["refresh_token"] + # expires_in seems to be in minutes, create a unix timestamp and add the minutes in seconds + refreshed_tokens["expires_in"] = int(time.time()) + int(refreshed_tokens["expires_in"]) + self._cache.data = refreshed_tokens + self.bearer = refreshed_tokens["access_token"] + else: + self.log.info(" + Registering new device bearer") + self.bearer = self.register(self.device) + + def register(self, device: dict) -> dict: + """ + Register device to the account + :param device: Device data to register + :return: Device bearer tokens + """ + # OnTV csrf + csrf_token, referer = self.get_csrf_token() + + # Code pair + code_pair = self.get_code_pair(device) + + # Device link + response = self.session.post( + url=self.endpoints["devicelink"], + headers={ + "Accept": "*/*", + "Accept-Language": "en-US,en;q=0.9,es-US;q=0.8,es;q=0.7", # needed? + "Content-Type": "application/x-www-form-urlencoded", + "Referer": referer + }, + params=urlencode({ + # any reason it urlencodes here? requests can take a param dict... + "ref_": "atv_set_rd_reg", + "publicCode": code_pair["public_code"], # public code pair + "token": csrf_token # csrf token + }) + ) + if response.status_code != 200: + raise self.log.error(f"Unexpected response with the codeBasedLinking request: {response.text} [{response.status_code}]") + + # Register + response = self.session.post( + url=self.endpoints["register"], + headers={ + "Content-Type": "application/json", + "Accept-Language": "en-US", + }, + json={ + "auth_data": { + "code_pair": code_pair + }, + "registration_data": device, + "requested_token_type": ["bearer"], + "requested_extensions": ["device_info", "customer_info"] + }, + cookies=None # for some reason, may fail if cookies are present. Odd. + ) + if response.status_code != 200: + raise self.log.error(f"Unable to register: {response.text} [{response.status_code}]") + bearer = response.json()["response"]["success"]["tokens"]["bearer"] + bearer["expires_in"] = int(time.time()) + int(bearer["expires_in"]) + + # Cache bearer + self._cache.set(bearer) + # os.makedirs(os.path.dirname(self.cache_path), exist_ok=True) + # with open(self.cache_path, "w", encoding="utf-8") as fd: + # fd.write(jsonpickle.encode(bearer)) + + return bearer["access_token"] + + def refresh(self, device: dict, refresh_token: str, access_token: str) -> dict: + # using the refresh token get the cookies needed for making calls to *.amazon.com + response = requests.post( + url=self.endpoints["token"], + headers={ + 'User-Agent': 'AmazonWebView/Amazon Alexa/2.2.223830.0/iOS/11.4.1/iPhone', # https://gitlab.com/keatontaylor/alexapy/-/commit/540b6333d973177bbc98e6ef39b00134f80ef0bb + 'Accept-Language': 'en-US', + 'Accept-Charset': 'utf-8', + 'Connection': 'keep-alive', + 'Content-Type': 'application/x-www-form-urlencoded', + 'Accept': '*/*' + }, + cookies={ + 'at-main': access_token, + }, + data={ + **device, + 'domain': '.' + self.endpoints["token"].split("/")[-3], + 'source_token': str(refresh_token), + 'requested_token_type': 'auth_cookies', + 'source_token_type': 'refresh_token', + } + ) + response_json = response.json() + cookies = {} + self.log.debug(response_json) + if response.status_code == 200: + # Extract the cookies from the response + raw_cookies = response_json['response']['tokens']['cookies']['.amazon.com'] + for cookie in raw_cookies: + cookies[cookie['Name']] = cookie['Value'] + else: + error = response_json['response']["error"] + self.cache_path.unlink(missing_ok=True) + raise self.log.error(f"Error when refreshing cookies: {error['message']} [{error['code']}]") + + response = requests.post( + url=self.endpoints["token"], + headers={ + 'Content-Type': 'application/json; charset=utf-8', + 'Accept-Encoding': 'gzip, deflate, br', + 'Connection': 'keep-alive', + 'Accept': 'application/json; charset=utf-8', + 'User-Agent': "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/135.0.0.0 Safari/537.36", + 'Accept-Language': 'en-US,en-US;q=1.0', + 'x-amzn-identity-auth-domain': self.endpoints["token"].split("/")[-3], + 'x-amzn-requestid': str(uuid4()).replace('-', '') + }, + json={ + **device, + 'requested_token_type': 'access_token', + 'source_token_type': 'refresh_token', + 'source_token': str(refresh_token), + }, # https://github.com/Sandmann79/xbmc/blob/dab17d913ee877d96115e6f799623bca158f3f24/plugin.video.amazon-test/resources/lib/login.py#L593 + cookies=cookies + ) + response_json = response.json() + + if response.status_code != 200 or "error" in response_json: + self.cache_path.unlink(missing_ok=True) # Remove the cached device as its tokens have expired + raise self.log.error(f"Failed to refresh device token -> {response_json['error_description']} [{response_json['error']}]") + self.log.debug(response_json) + if response_json["token_type"] != "bearer": + raise self.log.error("Unexpected returned refreshed token type") + + return response_json + + def get_csrf_token(self) -> str: + """ + On the amazon website, you need a token that is in the html page, + this token is used to register the device + :return: OnTV Page's CSRF Token + """ + try: + res = self.session.get(self.endpoints["ontv"]) + response = res.text + if 'input type="hidden" name="appAction" value="SIGNIN"' in response: + raise self.log.error( + "Cookies are signed out, cannot get ontv CSRF token. " + f"Expecting profile to have cookies for: {self.endpoints['ontv']}" + ) + for match in re.finditer(r"", response): + prop = json.loads(match.group(1)) + prop = prop.get("props", {}).get("codeEntry", {}).get("token") + if prop: + return prop, self.endpoints["ontv"] + raise self.log.error(f"Unable to get ontv CSRF token - Navigate to {self.endpoints['mytv']}, login and save cookies from code pair page to default.txt") + except: + res = self.session.get(self.endpoints["ontvold"]) + response = res.text + if 'input type="hidden" name="appAction" value="SIGNIN"' in response: + raise self.log.error( + "Cookies are signed out, cannot get ontv CSRF token. " + f"Expecting profile to have cookies for: {self.endpoints['ontvold']}" + ) + for match in re.finditer(r"", response): + prop = json.loads(match.group(1)) + prop = prop.get("props", {}).get("codeEntry", {}).get("token") + if prop: + return prop, self.endpoints["ontvold"] + raise self.log.error(f"Unable to get ontv CSRF token - Navigate to {self.endpoints['mytv']}, login and save cookies from code pair page to default.txt") + + def get_code_pair(self, device: dict) -> dict: + """ + Getting code pairs based on the device that you are using + :return: public and private code pairs + """ + res = self.session.post( + url=self.endpoints["codepair"], + headers={ + "Content-Type": "application/json", + "Accept-Language": "en-US", + }, + json={"code_data": device} + ).json() + if "error" in res: + raise self.log.error(f"Unable to get code pair: {res['error_description']} [{res['error']}]") + return res diff --git a/AMZN/__pycache__/__init__.cpython-310.pyc b/AMZN/__pycache__/__init__.cpython-310.pyc new file mode 100644 index 0000000..7b84f6b Binary files /dev/null and b/AMZN/__pycache__/__init__.cpython-310.pyc differ diff --git a/AMZN/__pycache__/__init__.cpython-311.pyc b/AMZN/__pycache__/__init__.cpython-311.pyc new file mode 100644 index 0000000..a2729bc Binary files /dev/null and b/AMZN/__pycache__/__init__.cpython-311.pyc differ diff --git a/AMZN/__pycache__/__init__.cpython-312.pyc b/AMZN/__pycache__/__init__.cpython-312.pyc new file mode 100644 index 0000000..9592925 Binary files /dev/null and b/AMZN/__pycache__/__init__.cpython-312.pyc differ diff --git a/AMZN/config.yaml b/AMZN/config.yaml new file mode 100644 index 0000000..37bf920 --- /dev/null +++ b/AMZN/config.yaml @@ -0,0 +1,162 @@ +certificate: | + CAUSwgUKvAIIAxIQCuQRtZRasVgFt7DIvVtVHBi17OSpBSKOAjCCAQoCggEBAKU2UrYVOSDlcXajWhpEgGhqGraJtFdUPgu6plJGy9ViaRn5mhyXON5PXm + w1krQdi0SLxf00FfIgnYFLpDfvNeItGn9rcx0RNPwP39PW7aW0Fbqi6VCaKWlR24kRpd7NQ4woyMXr7xlBWPwPNxK4xmR/6UuvKyYWEkroyeIjWHAqgCjC + mpfIpVcPsyrnMuPFGl82MMVnAhTweTKnEPOqJpxQ1bdQvVNCvkba5gjOTbEnJ7aXegwhmCdRQzXjTeEV2dO8oo5YfxW6pRBovzF6wYBMQYpSCJIA24ptAP + /2TkneyJuqm4hJNFvtF8fsBgTQQ4TIhnX4bZ9imuhivYLa6HsCAwEAAToPYW1hem9uLmNvbS1wcm9kEoADETQD6R0H/h9fyg0Hw7mj0M7T4s0bcBf4fMhA + Rpwk2X4HpvB49bJ5Yvc4t41mAnXGe/wiXbzsddKMiMffkSE1QWK1CFPBgziU23y1PjQToGiIv/sJIFRKRJ4qMBxIl95xlvSEzKdt68n7wqGa442+uAgk7C + XU3uTfVofYY76CrPBnEKQfad/CVqTh48geNTb4qRH1TX30NzCsB9NWlcdvg10pCnWSm8cSHu1d9yH+2yQgsGe52QoHHCqHNzG/wAxMYWTevXQW7EPTBeFy + SPY0xUN+2F2FhCf5/A7uFUHywd0zNTswh0QJc93LBTh46clRLO+d4RKBiBSj3rah6Y5iXMw9N9o58tCRc9gFHrjfMNubopWHjDOO3ATUgqXrTp+fKVCmsG + uGl1ComHxXV9i1AqHwzzY2JY2vFqo73jR3IElr6oChPIwcNokmNc0D4TXtjE0BoYkbWKJfHvJJihzMOvDicWUsemVHvua9/FBtpbHgpbgwijFPjtQF9Ldb + 8Swf + +device: + + # old: # !<< take note that this is done per-profile + # domain: Device + # app_name: AIV + # app_version: '3.12.0' + # device_model: 'SHIELD Android TV' + # os_version: '28' + # device_type: A1KAXIG6VXSG8Y + # device_serial: '13f5b56b4a17de5d136f0e4c28236109' # os.urandom(16).hex() + # device_name: "Build/LMY47D Shield TV" + # software_version: '248' + + # old2: + # 'domain': 'DeviceLegacy' + # 'device_type': A1KAXIG6VXSG8Y, + # 'device_serial': '870f53d1b509594c2f8cd5e340a7d374' + # 'app_name': 'com.amazon.avod.thirdpartyclient' + # 'app_version': '296016847' + # 'device_model': 'mdarcy/nvidia/SHIELD Android TV' + # 'os_version': 'NVIDIA/mdarcy/mdarcy:11/RQ1A.210105.003/7094531_2971.7725:user/release-keys' + + # old3: + # domain: Device + # app_name: com.amazon.amazonvideo.livingroom + # app_version: '1.4' + # device_model: PadFone + # os_version: '6.2.5' + # device_type: 'A2SNKIF736WF4T' + # device_name: 'T008 Build/JSS15Q PadFone' # "%FIRST_NAME%'s%DUPE_STRATEGY_1ST% PadFone" + # device_serial: 'c1ebbb433da4afdf' + + # old4: + # domain: Device + # app_name: com.amazon.amazonvideo.livingroom + # app_version: '1.4' + # device_model: 'Hisense TV' + # os_version: '3.9.5' + # device_type: 'A3T3XXY42KZQNP' # A2B5DGIWVDH8J3, A3GTP8TAF8V3YG, AFTHA001 # https://developer.amazon.com/docs/fire-tv/identify-amazon-fire-tv-devices.html, https://github.com/giofrida/Hisense-Amazon-Enabler + # device_name: "%FIRST_NAME%'s%DUPE_STRATEGY_1ST% Hisense" # KS964, Build/RP1A.201005.001 + # device_serial: '8e3ddf49ee384247' + + # old5: + # domain: Device + # app_name: com.amazon.amazonvideo.livingroom + # app_version: '1.1' + # device_model: Hisense + # os_version: '6.0.1' #6.10.19 + # device_type: 'A3REWRVYBYPKUM' + # device_name: '%FIRST_NAME%''s%DUPE_STRATEGY_1ST% Hisense' + # device_serial: 'cd24294bffb75a46' # os.urandom(8).hex() + + default: + domain: Device + app_name: com.amazon.amazonvideo.livingroom + app_version: '1.4' + device_model: 'MTC' + os_version: '6.0.1' #6.10.19 + device_type: 'A2HYAJ0FEWP6N3' + device_name: '%FIRST_NAME%''s%DUPE_STRATEGY_1ST% MTC' + device_serial: 'e6eb1ecdc8e34320' + +#Hisense_HU32E5600FHWV: A2RGJ95OVLR12U +#Hisense_HU50A6100UW: AAJ692ZPT1X85 +#Hisense_HE55A700EUWTS: A3REWRVYBYPKUM +#MTC_ATV: A2HYAJ0FEWP6N3 + +device_types: + browser: 'AOAGZA014O5RE' # all browsers? all platforms? + tv_generic: 'A2SNKIF736WF4T' # type is shared among various random smart tvs + pc_app: 'A1RTAM01W29CUP' + mobile_app: 'A43PXU4ZN2AL1' + echo: 'A7WXQPH584YP' # echo Gen2 + echo_dot: 'A32DOYMUN6DTXA' # echo dot Gen3 + echo_studio: 'A3RBAYBE7VM004' # for audio stuff, this is probably the one to use + fire_7: 'A2M4YX06LWP8WI' + fire_7_again: 'A1Q7QCGNMXAKYW' # not sure the difference + fire_hd_8: 'A1C66CX2XD756O' + fire_hd_8_again: 'A38EHHIB10L47V' # not sure the difference + fire_hd_8_plus_2020: 'AVU7CPPF2ZRAS' + fire_hd_10: 'A1ZB65LA390I4K' + fire_tv: 'A2E0SNTXJVT7WK' # this is not the stick, this is the older stick-like diamond shaped one + fire_tv_gen2: 'A12GXV8XMS007S' + fire_tv_cube: 'A2JKHJ0PX4J3L3' # this is the STB-style big bulky cube + fire_tv_stick_gen1: 'ADVBD696BHNV5' # non-4k fire tv stick + fire_tv_stick_gen2: 'A2LWARUGJLBYEW' + fire_tv_stick_with_alexa: 'A265XOI9586NML' + fire_tv_stick_4k: 'A2GFL5ZMWNE0PX' # 4k fire tv stick + fire_tv_stick_4k_gen3: 'AKPGW064GI9HE' + nvidia_shield: 'A1KAXIG6VXSG8Y' # nvidia shield, unknown which one or if all + +endpoints: + browse: '/cdp/catalog/Browse' + details: '/gp/video/api/getDetailPage' + getDetailWidgets: '/gp/video/api/getDetailWidgets' + playback: '/cdp/catalog/GetPlaybackResources' + licence: '/cdp/catalog/GetPlaybackResources' + # chapters/scenes + xray: '/swift/page/xray' + # device registration + ontv: '/region/eu/ontv/code?ref_=atv_auth_red_aft' #/gp/video/ontv/code + ontvold: '/gp/video/ontv/code/ref=atv_device_code' + mytv: '/mytv' + devicelink: '/gp/video/api/codeBasedLinking' + codepair: '/auth/create/codepair' + register: '/auth/register' + token: '/auth/token' + #cookies: '/ap/exchangetoken/cookies' + +regions: + us: + base: 'www.amazon.com' + base_api: 'api.amazon.com' + base_manifest: 'atv-ps.amazon.com' + marketplace_id: 'ATVPDKIKX0DER' + + gb: + base: 'www.amazon.co.uk' + base_api: 'api.amazon.co.uk' + base_manifest: 'atv-ps-eu.amazon.co.uk' + marketplace_id: 'A2IR4J4NTCP2M5' # A1F83G8C2ARO7P is also another marketplace_id + + it: + base: 'www.amazon.it' + base_api: 'api.amazon.it' + base_manifest: 'atv-ps-eu.primevideo.com' + marketplace_id: 'A3K6Y4MI8GDYMT' + + de: + base: 'www.amazon.de' + base_api: 'api.amazon.de' + base_manifest: 'atv-ps-eu.amazon.de' + marketplace_id: 'A1PA6795UKMFR9' + + au: + base: 'www.amazon.com.au' + base_api: 'api.amazon.com.au' + base_manifest: 'atv-ps-fe.amazon.com.au' + marketplace_id: 'A3K6Y4MI8GDYMT' + + jp: + base: 'www.amazon.co.jp' + base_api: 'api.amazon.co.jp' + base_manifest: 'atv-ps-fe.amazon.co.jp' + marketplace_id: 'A1VC38T7YXB528' + + pl: + base: 'www.amazon.com' + base_api: 'api.amazon.com' + base_manifest: 'atv-ps-eu.primevideo.com' + marketplace_id: 'A3K6Y4MI8GDYMT' diff --git a/ATVP/__init__.py b/ATVP/__init__.py new file mode 100644 index 0000000..8acf2f0 --- /dev/null +++ b/ATVP/__init__.py @@ -0,0 +1,354 @@ +import base64 +import json +import re +from datetime import datetime + +import click +import m3u8 +import requests + +from unshackle.core.downloaders import n_m3u8dl_re +from unshackle.core.manifests import m3u8 as m3u8_parser +from unshackle.core.service import Service +from unshackle.core.titles import Episode, Movie, Movies, Series +from unshackle.core.tracks import Audio, Subtitle, Tracks, Video +from unshackle.core.utils.collections import as_list +from pyplayready.cdm import Cdm as PlayReadyCdm + + +class ATVP(Service): + """ + Service code for Apple's TV Plus streaming service (https://tv.apple.com). + + \b + WIP: decrypt and removal of bumper/dub cards + + \b + Authorization: Cookies + Security: UHD@L1 FHD@L1 HD@L3 + """ + + ALIASES = ["ATVP", "appletvplus", "appletv+"] + TITLE_RE = ( + r"^(?:https?://tv\.apple\.com(?:/[a-z]{2})?/(?:movie|show|episode)/[a-z0-9-]+/)?(?Pumc\.cmc\.[a-z0-9]+)" # noqa: E501 + ) + + VIDEO_CODEC_MAP = {"H264": ["avc"], "H265": ["hvc", "hev", "dvh"]} + AUDIO_CODEC_MAP = {"AAC": ["HE", "stereo"], "AC3": ["ac3"], "EC3": ["ec3", "atmos"]} + + @staticmethod + @click.command(name="ATVP", short_help="https://tv.apple.com") + @click.argument("title", type=str, required=False) + @click.pass_context + def cli(ctx, **kwargs): + return ATVP(ctx, **kwargs) + + def __init__(self, ctx, title): + super().__init__(ctx) + self.title = title + self.cdm = ctx.obj.cdm + if not isinstance(self.cdm, PlayReadyCdm): + self.log.warning("PlayReady CDM not provided, exiting") + raise SystemExit(1) + self.vcodec = ctx.parent.params["vcodec"] + self.acodec = ctx.parent.params["acodec"] + self.alang = ctx.parent.params["lang"] + self.subs_only = ctx.parent.params["subs_only"] + self.quality = ctx.parent.params["quality"] + + self.extra_server_parameters = None + # initialize storefront with a default value. + self.storefront = 'us' # or any default value + + def get_titles(self): + self.configure() + r = None + for i in range(2): + try: + self.params = { + "utsk": "6e3013c6d6fae3c2::::::9318c17fb39d6b9c", + "caller": "web", + "sf": self.storefront, + "v": "46", + "pfm": "appletv", + "mfr": "Apple", + "locale": "en-US", + "l": "en", + "ctx_brand": "tvs.sbd.4000", + "count": "100", + "skip": "0", + } + r = self.session.get( + url=self.config["endpoints"]["title"].format(type={0: "shows", 1: "movies"}[i], id=self.title), + params=self.params, + ) + except requests.HTTPError as e: + if e.response.status_code != 404: + raise + else: + if r.ok: + break + if not r: + raise self.log.exit(f" - Title ID {self.title!r} could not be found.") + try: + title_information = r.json()["data"]["content"] + except json.JSONDecodeError: + raise ValueError(f"Failed to load title manifest: {r.text}") + + if title_information["type"] == "Movie": + movie = Movie( + id_=self.title, + service=self.__class__, + name=title_information["title"], + year=datetime.fromtimestamp(title_information["releaseDate"] / 1000).year, + language=title_information["originalSpokenLanguages"][0]["locale"], + data=title_information, + ) + return Movies([movie]) + else: + r = self.session.get( + url=self.config["endpoints"]["tv_episodes"].format(id=self.title), + params=self.params, + ) + try: + episodes = r.json()["data"]["episodes"] + except json.JSONDecodeError: + raise ValueError(f"Failed to load episodes list: {r.text}") + + episodes_list = [ + Episode( + id_=episode["id"], + service=self.__class__, + title=episode["showTitle"], + season=episode["seasonNumber"], + number=episode["episodeNumber"], + name=episode.get("title"), + year=datetime.fromtimestamp(title_information["releaseDate"] / 1000).year, + language=title_information["originalSpokenLanguages"][0]["locale"], + data={**episode, "originalSpokenLanguages": title_information["originalSpokenLanguages"]}, + ) + for episode in episodes + ] + return Series(episodes_list) + + def get_tracks(self, title): + # call configure() before using self.storefront + self.configure() + + self.params = { + "utsk": "6e3013c6d6fae3c2::::::9318c17fb39d6b9c", + "caller": "web", + "sf": self.storefront, + "v": "46", + "pfm": "appletv", + "mfr": "Apple", + "locale": "en-US", + "l": "en", + "ctx_brand": "tvs.sbd.4000", + "count": "100", + "skip": "0", + } + r = self.session.get( + url=self.config["endpoints"]["manifest"].format(id=title.data["id"]), + params=self.params, + ) + try: + stream_data = r.json() + except json.JSONDecodeError: + raise ValueError(f"Failed to load stream data: {r.text}") + stream_data = stream_data["data"]["content"]["playables"][0] + + if not stream_data["isEntitledToPlay"]: + raise self.log.exit(" - User is not entitled to play this title") + + self.extra_server_parameters = stream_data["assets"]["fpsKeyServerQueryParameters"] + r = requests.get( + url=stream_data["assets"]["hlsUrl"], + headers={"User-Agent": "AppleTV6,2/11.1"}, + ) + res = r.text + + master = m3u8.loads(res, r.url) + tracks = m3u8_parser.parse( + master=master, + language=title.data["originalSpokenLanguages"][0]["locale"] or "en", + session=self.session, + ) + + # Set track properties based on type + for track in tracks: + if isinstance(track, Video): + # Convert codec string to proper Video.Codec enum if needed + if isinstance(track.codec, str): + codec_str = track.codec.lower() + if codec_str in ["avc", "h264", "h.264"]: + track.codec = Video.Codec.AVC + elif codec_str in ["hvc", "hev", "hevc", "h265", "h.265", "dvh"]: + track.codec = Video.Codec.HEVC + else: + print(f"Unknown video codec '{track.codec}', keeping as string") + + # Set pr_pssh for PlayReady license requests + if track.drm: + for drm in track.drm: + if hasattr(drm, 'data') and 'pssh_b64' in drm.data: + track.pr_pssh = drm.data['pssh_b64'] + elif isinstance(track, Audio): + # Extract bitrate from URL + bitrate = re.search(r"&g=(\d+?)&", track.url) + if not bitrate: + bitrate = re.search(r"_gr(\d+)_", track.url) # alternative pattern + if bitrate: + track.bitrate = int(bitrate.group(1)[-3::]) * 1000 # e.g. 128->128,000, 2448->448,000 + else: + raise ValueError(f"Unable to get a bitrate value for Track {track.id}") + codec_str = track.codec.replace("_vod", "") if track.codec else "" + if codec_str == "DD+": + track.codec = Audio.Codec.EC3 + elif codec_str == "DD": + track.codec = Audio.Codec.AC3 + elif codec_str in ["HE", "stereo", "AAC"]: + track.codec = Audio.Codec.AAC + elif codec_str == "atmos": + track.codec = Audio.Codec.EC3 + else: + if not hasattr(track.codec, "value"): + print(f"Unknown audio codec '{codec_str}', defaulting to AAC") + track.codec = Audio.Codec.AAC + + # Set pr_pssh for PlayReady license requests + if track.drm: + for drm in track.drm: + if hasattr(drm, 'data') and 'pssh_b64' in drm.data: + track.pr_pssh = drm.data['pssh_b64'] + elif isinstance(track, Subtitle): + codec_str = track.codec if track.codec else "" + if codec_str.lower() in ["vtt", "webvtt"]: + track.codec = Subtitle.Codec.WebVTT + elif codec_str.lower() in ["srt", "subrip"]: + track.codec = Subtitle.Codec.SubRip + elif codec_str.lower() in ["ttml", "dfxp"]: + track.codec = Subtitle.Codec.TimedTextMarkupLang + elif codec_str.lower() in ["ass", "ssa"]: + track.codec = Subtitle.Codec.SubStationAlphav4 + else: + if not hasattr(track.codec, "value"): + print(f"Unknown subtitle codec '{codec_str}', defaulting to WebVTT") + track.codec = Subtitle.Codec.WebVTT + + # Set pr_pssh for PlayReady license requests + if track.drm: + for drm in track.drm: + if hasattr(drm, 'data') and 'pssh_b64' in drm.data: + track.pr_pssh = drm.data['pssh_b64'] + + # Try to filter by CDN, but fallback to all tracks if filtering fails + try: + filtered_tracks = [ + x + for x in tracks + if any( + param.startswith("cdn=vod-ap") or param == "cdn=ap" + for param in as_list(x.url)[0].split("?")[1].split("&") + ) + ] + + for track in tracks: + if track not in tracks.attachments: + track.downloader = n_m3u8dl_re + if isinstance(track, (Video, Audio)): + track.needs_repack = True + + if filtered_tracks: + return Tracks(filtered_tracks) + else: + return Tracks(tracks) + + except Exception: + return Tracks(tracks) + + def get_chapters(self, title): + return [] + + def certificate(self, **_): + return None # will use common privacy cert + + def get_pssh(self, track) -> None: + res = self.session.get(as_list(track.url)[0]) + playlist = m3u8.loads(res.text, uri=res.url) + keys = list(filter(None, (playlist.session_keys or []) + (playlist.keys or []))) + for key in keys: + if key.keyformat and "playready" in key.keyformat.lower(): + track.pr_pssh = key.uri.split(",")[-1] + return + + def get_playready_license(self, *, challenge: bytes, title, track) -> str: + if isinstance(challenge, str): + challenge = challenge.encode() + + self.get_pssh(track) + + res = self.session.post( + url=self.config["endpoints"]["license"], + json={ + "streaming-request": { + "version": 1, + "streaming-keys": [ + { + "challenge": base64.b64encode(challenge).decode("utf-8"), + "key-system": "com.microsoft.playready", + "uri": f"data:text/plain;charset=UTF-16;base64,{track.pr_pssh}", + "id": 0, + "lease-action": "start", + "adamId": self.extra_server_parameters["adamId"], + "isExternal": True, + "svcId": self.extra_server_parameters["svcId"], + }, + ], + }, + }, + ).json() + return res["streaming-response"]["streaming-keys"][0]["license"] + + # Service specific functions + + def configure(self): + cc = self.session.cookies.get_dict()["itua"] + r = self.session.get( + "https://gist.githubusercontent.com/BrychanOdlum/2208578ba151d1d7c4edeeda15b4e9b1/raw/8f01e4a4cb02cf97a48aba4665286b0e8de14b8e/storefrontmappings.json" + ).json() + for g in r: + if g["code"] == cc: + self.storefront = g["storefrontId"] + + environment = self.get_environment_config() + if not environment: + raise ValueError("Failed to get AppleTV+ WEB TV App Environment Configuration...") + self.session.headers.update( + { + "User-Agent": self.config["user_agent"], + "Authorization": f"Bearer {environment['developerToken']}", + "media-user-token": self.session.cookies.get_dict()["media-user-token"], + "x-apple-music-user-token": self.session.cookies.get_dict()["media-user-token"], + } + ) + + def get_environment_config(self): + """Loads environment config data from WEB App's serialized server data.""" + res = self.session.get("https://tv.apple.com").text + + script_match = re.search( + r']*id=["\']serialized-server-data["\'][^>]*>(.*?)', + res, + re.DOTALL, + ) + if script_match: + try: + script_content = script_match.group(1).strip() + data = json.loads(script_content) + if data and len(data) > 0 and "data" in data[0] and "configureParams" in data[0]["data"]: + return data[0]["data"]["configureParams"] + except (json.JSONDecodeError, KeyError, IndexError) as e: + print(f"Failed to parse serialized server data: {e}") + + return None diff --git a/ATVP/__pycache__/__init__.cpython-310.pyc b/ATVP/__pycache__/__init__.cpython-310.pyc new file mode 100644 index 0000000..d99dee8 Binary files /dev/null and b/ATVP/__pycache__/__init__.cpython-310.pyc differ diff --git a/ATVP/__pycache__/__init__.cpython-312.pyc b/ATVP/__pycache__/__init__.cpython-312.pyc new file mode 100644 index 0000000..7010a7a Binary files /dev/null and b/ATVP/__pycache__/__init__.cpython-312.pyc differ diff --git a/ATVP/config.yaml b/ATVP/config.yaml new file mode 100644 index 0000000..cc4bdd9 --- /dev/null +++ b/ATVP/config.yaml @@ -0,0 +1,38 @@ +user_agent: 'ATVE/6.2.0 Android/10 build/6A226 maker/Google model/Chromecast FW/QTS2.200918.0337115981' +storefront_mapping_url: 'https://gist.githubusercontent.com/BrychanOdlum/2208578ba151d1d7c4edeeda15b4e9b1/raw/8f01e4a4cb02cf97a48aba4665286b0e8de14b8e/storefrontmappings.json' + +endpoints: + title: 'https://tv.apple.com/api/uts/v3/{type}/{id}' + tv_episodes: 'https://tv.apple.com/api/uts/v2/view/show/{id}/episodes' + manifest: 'https://tv.apple.com/api/uts/v2/view/product/{id}/personalized' + license: 'https://play.itunes.apple.com/WebObjects/MZPlay.woa/wa/fpsRequest' + environment: 'https://tv.apple.com' + +params: + utsk: '6e3013c6d6fae3c2::::::9318c17fb39d6b9c' + caller: 'web' + v: '46' + pfm: 'appletv' + mfr: 'Apple' + locale: 'en-US' + l: 'en' + ctx_brand: 'tvs.sbd.4000' + count: '100' + skip: '0' + +headers: + Accept: 'application/json' + Accept-Language: 'en-US,en;q=0.9' + Connection: 'keep-alive' + DNT: '1' + Origin: 'https://tv.apple.com' + Referer: 'https://tv.apple.com/' + Sec-Fetch-Dest: 'empty' + Sec-Fetch-Mode: 'cors' + Sec-Fetch-Site: 'same-origin' + +quality_map: + SD: 480 + HD720: 720 + HD: 1080 + UHD: 2160 \ No newline at end of file diff --git a/AUBC/__init__.py b/AUBC/__init__.py new file mode 100644 index 0000000..da73806 --- /dev/null +++ b/AUBC/__init__.py @@ -0,0 +1,257 @@ +from __future__ import annotations + +import hashlib +import json +import re +from collections.abc import Generator +from typing import Any, Optional, Union +from urllib.parse import urljoin + +import click +from click import Context +from requests import Request +from unshackle.core.constants import AnyTrack +from unshackle.core.manifests.dash import DASH +from unshackle.core.search_result import SearchResult +from unshackle.core.service import Service +from unshackle.core.titles import Episode, Movie, Movies, Series +from unshackle.core.tracks import Chapter, Chapters, Subtitle, Tracks + + +class AUBC(Service): + """ + \b + Service code for ABC iView streaming service (https://iview.abc.net.au/). + + \b + Version: 1.0.3 + Author: stabbedbybrick + Authorization: None + Robustness: + L3: 1080p, AAC2.0 + + \b + Tips: + - Input should be complete URL: + SHOW: https://iview.abc.net.au/show/return-to-paradise + EPISODE: https://iview.abc.net.au/video/DR2314H001S00 + MOVIE: https://iview.abc.net.au/show/way-back / https://iview.abc.net.au/show/way-back/video/ZW3981A001S00 + + """ + + GEOFENCE = ("au",) + ALIASES = ("iview", "abciview", "iv",) + + @staticmethod + @click.command(name="AUBC", short_help="https://iview.abc.net.au/", help=__doc__) + @click.argument("title", type=str) + @click.pass_context + def cli(ctx: Context, **kwargs: Any) -> AUBC: + return AUBC(ctx, **kwargs) + + def __init__(self, ctx: Context, title: str): + self.title = title + super().__init__(ctx) + + self.session.headers.update(self.config["headers"]) + + def search(self) -> Generator[SearchResult, None, None]: + url = ( + "https://y63q32nvdl-1.algolianet.com/1/indexes/*/queries?x-algolia-agent=Algolia" + "%20for%20JavaScript%20(4.9.1)%3B%20Browser%20(lite)%3B%20react%20(17.0.2)%3B%20" + "react-instantsearch%20(6.30.2)%3B%20JS%20Helper%20(3.10.0)&x-" + "algolia-api-key=bcdf11ba901b780dc3c0a3ca677fbefc&x-algolia-application-id=Y63Q32NVDL" + ) + payload = { + "requests": [ + { + "indexName": "ABC_production_iview_web", + "params": f"query={self.title}&tagFilters=&userToken=anonymous-74be3cf1-1dc7-4fa1-9cff-19592162db1c", + } + ], + } + + results = self._request("POST", url, payload=payload)["results"] + hits = [x for x in results[0]["hits"] if x["docType"] == "Program"] + + for result in hits: + yield SearchResult( + id_="https://iview.abc.net.au/show/{}".format(result.get("slug")), + title=result.get("title"), + description=result.get("synopsis"), + label=result.get("subType"), + url="https://iview.abc.net.au/show/{}".format(result.get("slug")), + ) + + def get_titles(self) -> Union[Movies, Series]: + title_re = r"^(?:https?://(?:www.)?iview.abc.net.au/(?Pshow|video)/)?(?P[a-zA-Z0-9_-]+)" + try: + kind, title_id = (re.match(title_re, self.title).group(i) for i in ("type", "id")) + except Exception: + raise ValueError("- Could not parse ID from title") + + if kind == "show": + data = self._request("GET", "/v3/show/{}".format(title_id)) + label = data.get("type") + + if label.lower() in ("series", "program"): + episodes = self._series(title_id) + return Series(episodes) + + elif label.lower() in ("feature", "movie"): + movie = self._movie(data) + return Movies(movie) + + elif kind == "video": + episode = self._episode(title_id) + return Series([episode]) + + def get_tracks(self, title: Union[Movie, Episode]) -> Tracks: + video = self._request("GET", "/v3/video/{}".format(title.id)) + if not video.get("playable"): + raise ConnectionError(video.get("unavailableMessage")) + + playlist = video.get("_embedded", {}).get("playlist", {}) + if not playlist: + raise ConnectionError("Could not find a playlist for this title") + + streams = next(x["streams"]["mpegdash"] for x in playlist if x["type"] == "program") + captions = next((x.get("captions") for x in playlist if x["type"] == "program"), None) + title.data["protected"] = streams.get("protected", False) + + if "720" in streams: + streams["1080"] = streams["720"].replace("720", "1080") + + manifest = next( + (url for key in ["1080", "720", "sd", "sd-low"] if key in streams + for url in [streams[key]] + if self.session.head(url).status_code == 200), + None + ) + if not manifest: + raise ValueError("Could not find a manifest for this title") + + tracks = DASH.from_url(manifest, self.session).to_tracks(title.language) + + for track in tracks.audio: + role = track.data["dash"]["adaptation_set"].find("Role") + if role is not None and role.get("value") in ["description", "alternative", "alternate"]: + track.descriptive = True + + if captions: + subtitles = captions.get("src-vtt") + tracks.add( + Subtitle( + id_=hashlib.md5(subtitles.encode()).hexdigest()[0:6], + url=subtitles, + codec=Subtitle.Codec.from_mime(subtitles[-3:]), + language=title.language, + forced=False, + ) + ) + + return tracks + + def get_chapters(self, title: Union[Movie, Episode]) -> Chapters: + if not title.data.get("cuePoints"): + return Chapters() + + credits = next((x.get("start") for x in title.data["cuePoints"] if x["type"] == "end-credits"), None) + if credits: + return Chapters([Chapter(name="Credits", timestamp=credits * 1000)]) + + return Chapters() + + def get_widevine_service_certificate(self, **_: Any) -> str: + return None + + def get_widevine_license(self, *, challenge: bytes, title: Union[Movies, Series], track: AnyTrack) -> Optional[Union[bytes, str]]: + if not title.data.get("protected"): + return None + + customdata = self._license(title.id) + headers = {"customdata": customdata} + + r = self.session.post(self.config["endpoints"]["license"], headers=headers, data=challenge) + r.raise_for_status() + return r.content + + # Service specific + + def _series(self, title: str) -> Episode: + data = self._request("GET", "/v3/series/{}".format(title)) + + seasons = data if isinstance(data, list) else [data] + + episodes = [ + self.create_episode(episode) + for season in seasons + for episode in season.get("_embedded", {}).get("videoEpisodes", {}).get("items", []) + ] + + return Series(episodes) + + def _movie(self, data: dict) -> Movie: + return [ + Movie( + id_=data["_embedded"]["highlightVideo"]["id"], + service=self.__class__, + name=data.get("title"), + year=data.get("productionYear"), + data=data, + language=data.get("analytics", {}).get("dataLayer", {}).get("d_language", "en"), + ) + ] + + def _episode(self, video_id: str) -> Episode: + data = self._request("GET", "/v3/video/{}".format(video_id)) + return self.create_episode(data) + + def _license(self, video_id: str): + token = self._request("POST", "/v3/token/jwt", data={"clientId": self.config["client"]})["token"] + response = self._request("GET", "/v3/token/drm/{}".format(video_id), headers={"bearer": token}) + + return response["license"] + + def create_episode(self, episode: dict) -> Episode: + title = episode["showTitle"] + episode_id = episode.get("id", "") + series_id = episode.get("analytics", {}).get("dataLayer", {}).get("d_series_id", "") + episode_name = episode.get("analytics", {}).get("dataLayer", {}).get("d_episode_name", "") + episode_number = re.search(r"Episode (\d+)", episode.get("displaySubtitle", "")) + name = re.search(r"S\d+\sEpisode\s\d+\s(.*)", episode_name) + language = episode.get("analytics", {}).get("dataLayer", {}).get("d_language", "en") + + season = int(series_id.split("-")[-1]) if series_id else 0 + number = int(episode_number.group(1)) if episode_number else 0 + + if not number: + if match := re.search(r"[A-Z](\d{3})(?=S\d{2})", episode_id): + number = int(match.group(1)) + + return Episode( + id_=episode["id"], + service=self.__class__, + title=title, + season=season, + number=number, + name=name.group(1) if name else episode_name, + data=episode, + language=language, + ) + + def _request(self, method: str, api: str, **kwargs: Any) -> Any[dict | str]: + url = urljoin(self.config["endpoints"]["base_url"], api) + + prep = self.session.prepare_request(Request(method, url, **kwargs)) + + response = self.session.send(prep) + if response.status_code != 200: + raise ConnectionError(f"{response.text}") + + try: + return json.loads(response.content) + + except json.JSONDecodeError as e: + raise ValueError(f"Failed to parse JSON: {response.text}") from e + diff --git a/AUBC/config.yaml b/AUBC/config.yaml new file mode 100644 index 0000000..604b739 --- /dev/null +++ b/AUBC/config.yaml @@ -0,0 +1,9 @@ +headers: + User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:126.0) Gecko/20100101 Firefox/126.0 + accept-language: en-US,en;q=0.8 + +endpoints: + base_url: https://api.iview.abc.net.au + license: https://wv-keyos.licensekeyserver.com/ + +client: "1d4b5cba-42d2-403e-80e7-34565cdf772d" \ No newline at end of file diff --git a/CBC/__init__.py b/CBC/__init__.py new file mode 100644 index 0000000..c3ab426 --- /dev/null +++ b/CBC/__init__.py @@ -0,0 +1,319 @@ +from __future__ import annotations + +import json +import re +import sys +from collections.abc import Generator +from http.cookiejar import CookieJar +from typing import Any, Optional, Union +from urllib.parse import urljoin + +import click +from click import Context +from requests import Request +from unshackle.core.constants import AnyTrack +from unshackle.core.credential import Credential +from unshackle.core.manifests import DASH, HLS +from unshackle.core.search_result import SearchResult +from unshackle.core.service import Service +from unshackle.core.titles import Episode, Movie, Movies, Series +from unshackle.core.tracks import Chapter, Chapters, Tracks + + +class CBC(Service): + """ + \b + Service code for CBC Gem streaming service (https://gem.cbc.ca/). + + \b + Version: 1.0.1 + Author: stabbedbybrick + Authorization: Credentials + Robustness: + AES-128: 1080p, DDP5.1 + Widevine L3: 720p, DDP5.1 + + \b + Tips: + - Input can be complete title URL or just the slug: + SHOW: https://gem.cbc.ca/murdoch-mysteries OR murdoch-mysteries + MOVIE: https://gem.cbc.ca/the-babadook OR the-babadook + + \b + Notes: + - DRM encrypted titles max out at 720p. + - CCExtrator v0.94 will likely fail to extract subtitles. It's recommended to downgrade to v0.93. + - Some audio tracks contain invalid data, causing warning messages from mkvmerge during muxing + These can be ignored. + + """ + + GEOFENCE = ("ca",) + ALIASES = ("gem", "cbcgem",) + + @staticmethod + @click.command(name="CBC", short_help="https://gem.cbc.ca/", help=__doc__) + @click.argument("title", type=str) + @click.pass_context + def cli(ctx: Context, **kwargs: Any) -> CBC: + return CBC(ctx, **kwargs) + + def __init__(self, ctx: Context, title: str): + self.title: str = title + super().__init__(ctx) + + self.base_url: str = self.config["endpoints"]["base_url"] + + def search(self) -> Generator[SearchResult, None, None]: + params = { + "device": "web", + "pageNumber": "1", + "pageSize": "20", + "term": self.title, + } + response: dict = self._request("GET", "/ott/catalog/v1/gem/search", params=params) + + for result in response.get("result", []): + yield SearchResult( + id_="https://gem.cbc.ca/{}".format(result.get("url")), + title=result.get("title"), + description=result.get("synopsis"), + label=result.get("type"), + url="https://gem.cbc.ca/{}".format(result.get("url")), + ) + + def authenticate(self, cookies: Optional[CookieJar] = None, credential: Optional[Credential] = None) -> None: + super().authenticate(cookies, credential) + if not credential: + raise EnvironmentError("Service requires Credentials for Authentication.") + + tokens: Optional[Any] = self.cache.get(f"tokens_{credential.sha1}") + + """ + All grant types for future reference: + PASSWORD("password"), + ACCESS_TOKEN("access_token"), + REFRESH_TOKEN("refresh_token"), + CLIENT_CREDENTIALS("client_credentials"), + AUTHORIZATION_CODE("authorization_code"), + CODE("code"); + """ + + if tokens and not tokens.expired: + # cached + self.log.info(" + Using cached tokens") + auth_token: str = tokens.data["access_token"] + + elif tokens and tokens.expired: + # expired, refresh + self.log.info("Refreshing cached tokens...") + auth_url, scopes = self.settings() + params = { + "client_id": self.config["client"]["id"], + "grant_type": "refresh_token", + "refresh_token": tokens.data["refresh_token"], + "scope": scopes, + } + + access: dict = self._request("POST", auth_url, params=params) + + # Shorten expiration by one hour to account for clock skew + tokens.set(access, expiration=int(access["expires_in"]) - 3600) + auth_token: str = access["access_token"] + + else: + # new + self.log.info("Requesting new tokens...") + auth_url, scopes = self.settings() + params = { + "client_id": self.config["client"]["id"], + "grant_type": "password", + "username": credential.username, + "password": credential.password, + "scope": scopes, + } + + access: dict = self._request("POST", auth_url, params=params) + + # Shorten expiration by one hour to account for clock skew + tokens.set(access, expiration=int(access["expires_in"]) - 3600) + auth_token: str = access["access_token"] + + claims_token: str = self.claims_token(auth_token) + self.session.headers.update({"x-claims-token": claims_token}) + + def get_titles(self) -> Union[Movies, Series]: + title_re: str = r"^(?:https?://(?:www.)?gem.cbc.ca/)?(?P[a-zA-Z0-9_-]+)" + try: + title_id: str = re.match(title_re, self.title).group("id") + except Exception: + raise ValueError("- Could not parse ID from title") + + params = {"device": "web"} + data: dict = self._request("GET", "/ott/catalog/v2/gem/show/{}".format(title_id), params=params) + label: str = data.get("contentType", "").lower() + + if label in ("film", "movie", "standalone"): + movies: list[Movie] = self._movie(data) + return Movies(movies) + + else: + episodes: list[Episode] = self._show(data) + return Series(episodes) + + def get_tracks(self, title: Union[Movie, Episode]) -> Tracks: + index: dict = self._request( + "GET", "/media/meta/v1/index.ashx", params={"appCode": "gem", "idMedia": title.id, "output": "jsonObject"} + ) + + title.data["extra"] = { + "chapters": index["Metas"].get("Chapitres"), + "credits": index["Metas"].get("CreditStartTime"), + } + + self.drm: bool = index["Metas"].get("isDrmActive") == "true" + if self.drm: + tech: str = next(tech["name"] for tech in index["availableTechs"] if "widevine" in tech["drm"]) + else: + tech: str = next(tech["name"] for tech in index["availableTechs"] if not tech["drm"]) + + response: dict = self._request( + "GET", self.config["endpoints"]["validation"].format("android", title.id, "smart-tv", tech) + ) + + manifest = response.get("url") + self.license = next((x["value"] for x in response["params"] if "widevineLicenseUrl" in x["name"]), None) + self.token = next((x["value"] for x in response["params"] if "widevineAuthToken" in x["name"]), None) + + stream_type: Union[HLS, DASH] = HLS if tech == "hls" else DASH + tracks: Tracks = stream_type.from_url(manifest, self.session).to_tracks(language=title.language) + + if stream_type == DASH: + for track in tracks.audio: + label = track.data["dash"]["adaptation_set"].find("Label") + if label is not None and "descriptive" in label.text.lower(): + track.descriptive = True + + for track in tracks: + track.language = title.language + + return tracks + + def get_chapters(self, title: Union[Movie, Episode]) -> Chapters: + extra: dict = title.data["extra"] + + chapters = [] + if extra.get("chapters"): + chapters = [Chapter(timestamp=x) for x in set(extra["chapters"].split(","))] + + if extra.get("credits"): + chapters.append(Chapter(name="Credits", timestamp=float(extra["credits"]))) + + return Chapters(chapters) + + def get_widevine_service_certificate(self, **_: Any) -> str: + return None + + def get_widevine_license( + self, *, challenge: bytes, title: Union[Movies, Series], track: AnyTrack + ) -> Optional[Union[bytes, str]]: + if not self.license or not self.token: + return None + + headers = {"x-dt-auth-token": self.token} + r = self.session.post(self.license, headers=headers, data=challenge) + r.raise_for_status() + return r.content + + # Service specific + + def _show(self, data: dict) -> list[Episode]: + lineups: list = next((x["lineups"] for x in data["content"] if x.get("title", "").lower() == "episodes"), None) + if not lineups: + self.log.warning("No episodes found for: {}".format(data.get("title"))) + return + + titles = [] + for season in lineups: + for episode in season["items"]: + if episode.get("mediaType", "").lower() == "episode": + parts = episode.get("title", "").split(".", 1) + episode_name = parts[1].strip() if len(parts) > 1 else parts[0].strip() + titles.append( + Episode( + id_=episode["idMedia"], + service=self.__class__, + title=data.get("title"), + season=int(season.get("seasonNumber", 0)), + number=int(episode.get("episodeNumber", 0)), + name=episode_name, + year=episode.get("metadata", {}).get("productionYear"), + language=data["structuredMetadata"].get("inLanguage", "en-CA"), + data=episode, + ) + ) + + return titles + + def _movie(self, data: dict) -> list[Movie]: + unwanted: tuple = ("episodes", "trailers", "extras") + lineups: list = next((x["lineups"] for x in data["content"] if x.get("title", "").lower() not in unwanted), None) + if not lineups: + self.log.warning("No movies found for: {}".format(data.get("title"))) + return + + titles = [] + for season in lineups: + for movie in season["items"]: + if movie.get("mediaType", "").lower() == "episode": + parts = movie.get("title", "").split(".", 1) + movie_name = parts[1].strip() if len(parts) > 1 else parts[0].strip() + titles.append( + Movie( + id_=movie.get("idMedia"), + service=self.__class__, + name=movie_name, + year=movie.get("metadata", {}).get("productionYear"), + language=data["structuredMetadata"].get("inLanguage", "en-CA"), + data=movie, + ) + ) + + return titles + + def settings(self) -> tuple: + settings = self._request("GET", "/ott/catalog/v1/gem/settings", params={"device": "web"}) + auth_url: str = settings["identityManagement"]["ropc"]["url"] + scopes: str = settings["identityManagement"]["ropc"]["scopes"] + return auth_url, scopes + + def claims_token(self, token: str) -> str: + headers = { + "Authorization": "Bearer " + token, + } + params = {"device": "web"} + response: dict = self._request( + "GET", "/ott/subscription/v2/gem/Subscriber/profile", headers=headers, params=params + ) + return response["claimsToken"] + + def _request(self, method: str, api: str, **kwargs: Any) -> Any[dict | str]: + url: str = urljoin(self.base_url, api) + + prep: Request = self.session.prepare_request(Request(method, url, **kwargs)) + response = self.session.send(prep) + if response.status_code not in (200, 426): + raise ConnectionError(f"{response.status_code} - {response.text}") + + try: + data = json.loads(response.content) + error_keys = ["errorMessage", "ErrorMessage", "ErrorCode", "errorCode", "error"] + error_message = next((data.get(key) for key in error_keys if key in data), None) + if error_message: + self.log.error(f"\n - Error: {error_message}\n") + sys.exit(1) + + return data + + except json.JSONDecodeError: + raise ConnectionError("Request for {} failed: {}".format(response.url, response.text)) diff --git a/CBC/config.yaml b/CBC/config.yaml new file mode 100644 index 0000000..31bd19a --- /dev/null +++ b/CBC/config.yaml @@ -0,0 +1,7 @@ +endpoints: + base_url: "https://services.radio-canada.ca" + validation: "/media/validation/v2?appCode=gem&&deviceType={}&idMedia={}&manifestType={}&output=json&tech={}" + api_key: "3f4beddd-2061-49b0-ae80-6f1f2ed65b37" + +client: + id: "fc05b0ee-3865-4400-a3cc-3da82c330c23" \ No newline at end of file diff --git a/CBS/__init__.py b/CBS/__init__.py new file mode 100644 index 0000000..a28c287 --- /dev/null +++ b/CBS/__init__.py @@ -0,0 +1,242 @@ +from __future__ import annotations + +import json +import re +import sys +from collections.abc import Generator +from typing import Any, Optional, Union +from urllib.parse import urljoin + +import click +from requests import Request +from unshackle.core.constants import AnyTrack +from unshackle.core.manifests import DASH +from unshackle.core.search_result import SearchResult +from unshackle.core.service import Service +from unshackle.core.titles import Episode, Series, Title_T, Titles_T +from unshackle.core.tracks import Chapter, Chapters, Tracks +from unshackle.core.utils.sslciphers import SSLCiphers +from unshackle.core.utils.xml import load_xml + + +class CBS(Service): + """ + \b + Service code for CBS.com streaming service (https://cbs.com). + Credit to @srpen6 for the tip on anonymous session + + \b + Version: 1.0.1 + Author: stabbedbybrick + Authorization: None + Robustness: + Widevine: + L3: 2160p, DDP5.1 + + \b + Tips: + - Input should be complete URLs: + SERIES: https://www.cbs.com/shows/tracker/ + EPISODE: https://www.cbs.com/shows/video/E0wG_ovVMkLlHOzv7KDpUV9bjeKFFG2v/ + + \b + Common VPN/proxy errors: + - SSLError(SSLEOFError(8, '[SSL: UNEXPECTED_EOF_WHILE_READING]')) + - ConnectionError: 406 Not Acceptable, 403 Forbidden + + """ + + GEOFENCE = ("us",) + + @staticmethod + @click.command(name="CBS", short_help="https://cbs.com", help=__doc__) + @click.argument("title", type=str, required=False) + @click.pass_context + def cli(ctx, **kwargs) -> CBS: + return CBS(ctx, **kwargs) + + def __init__(self, ctx, title): + self.title = title + super().__init__(ctx) + + def search(self) -> Generator[SearchResult, None, None]: + params = { + "term": self.title, + "termCount": 50, + "showCanVids": "true", + } + results = self._request("GET", "/apps-api/v3.1/androidphone/contentsearch/search.json", params=params)["terms"] + + for result in results: + yield SearchResult( + id_=result.get("path"), + title=result.get("title"), + description=None, + label=result.get("term_type"), + url=result.get("path"), + ) + + def get_titles(self) -> Titles_T: + title_re = r"https://www\.cbs\.com/shows/(?P