diff --git a/Services/amazon-ISM.py b/Services/amazon-ISM.py new file mode 100644 index 0000000..0cb2bcb --- /dev/null +++ b/Services/amazon-ISM.py @@ -0,0 +1,1147 @@ +from __future__ import annotations + +import base64 +import hashlib +import json +import os +import re +import time +from collections import defaultdict +from pathlib import Path +from typing import Optional +from urllib.parse import urlencode + +import click +import jsonpickle +import requests +from langcodes import Language +from tldextract import tldextract +from click.core import ParameterSource + +from vinetrimmer.objects import TextTrack, Title, Tracks +from vinetrimmer.objects.tracks import AudioTrack, MenuTrack, Track +from vinetrimmer.services.BaseService import BaseService +from vinetrimmer.utils.Logger import Logger +from vinetrimmer.utils.widevine.device import LocalDevice + + +class Amazon(BaseService): + """ + Service code for Amazon VOD (https://amazon.com) and Amazon Prime Video (https://primevideo.com). + + \b + Authorization: Cookies + Security: UHD@L1 FHD@L3(ChromeCDM) SD@L3, Maintains their own license server like Netflix, be cautious. + + \b + Region is chosen automatically based on domain extension found in cookies. + Prime Video specific code will be run if the ASIN is detected to be a prime video variant. + Use 'Amazon Video ASIN Display' for Tampermonkey addon for ASIN + https://greasyfork.org/en/scripts/381997-amazon-video-asin-display + + vt dl --list -z uk -q 1080 Amazon B09SLGYLK8 + """ + + ALIASES = ["AMZN", "amazon"] + TITLE_RE = r"^(?:https?://(?:www\.)?(?Pamazon\.(?Pcom|co\.uk|de|co\.jp)|primevideo\.com)(?:/.+)?/)?(?P[A-Z0-9]{10,}|amzn1\.dv\.gti\.[a-f0-9-]+)" # noqa: E501 + + REGION_TLD_MAP = { + "au": "com.au", + "br": "com.br", + "jp": "co.jp", + "mx": "com.mx", + "tr": "com.tr", + "gb": "co.uk", + "us": "com", + } + VIDEO_RANGE_MAP = { + "SDR": "None", + "HDR10": "Hdr10", + "DV": "DolbyVision", + } + + @staticmethod + @click.command(name="Amazon", short_help="https://amazon.com, https://primevideo.com", help=__doc__) + @click.argument("title", type=str, required=False) + @click.option("-b", "--bitrate", default="CBR", + type=click.Choice(["CVBR", "CBR", "CVBR+CBR"], case_sensitive=False), + help="Video Bitrate Mode to download in. CVBR=Constrained Variable Bitrate, CBR=Constant Bitrate.") + @click.option("-c", "--cdn", default=None, type=str, + help="CDN to download from, defaults to the CDN with the highest weight set by Amazon.") + # UHD, HD, SD. UHD only returns HEVC, ever, even for <=HD only content + @click.option("-vq", "--vquality", default="HD", + type=click.Choice(["SD", "HD", "UHD"], case_sensitive=False), + help="Manifest quality to request.") + @click.option("-s", "--single", is_flag=True, default=False, + help="Force single episode/season instead of getting series ASIN.") + @click.option("-am", "--amanifest", default="H265", + type=click.Choice(["CVBR", "CBR", "H265"], case_sensitive=False), + help="Manifest to use for audio. Defaults to H265 if the video manifest is missing 640k audio.") + @click.option("-aq", "--aquality", default="HD", + type=click.Choice(["SD", "HD", "UHD"], case_sensitive=False), + help="Manifest quality to request for audio. Defaults to the same as --quality.") + @click.option("-mt", "--manifest-type", default="DASH", + type=click.Choice(["DASH", "ISM"], case_sensitive=False), help="Select Manifest Type") + @click.pass_context + def cli(ctx, **kwargs): + return Amazon(ctx, **kwargs) + + def __init__(self, ctx, title, bitrate: str, cdn: str, vquality: str, single: bool, + amanifest: str, aquality: str, manifest_type: str): + m = self.parse_title(ctx, title) + self.bitrate = bitrate + self.bitrate_source = ctx.get_parameter_source("bitrate") + self.cdn = cdn + self.vquality = vquality + self.vquality_source = ctx.get_parameter_source("vquality") + self.single = single + self.amanifest = amanifest + self.aquality = aquality + + # Streaming technology + self.manifest_type = manifest_type + if self.manifest_type == 'ISM': + self.manifest_type = "SmoothStreaming" + + super().__init__(ctx) + + assert ctx.parent is not None + + self.vcodec = ctx.parent.params["vcodec"] or "H264" + self.range = ctx.parent.params["range_"] or "SDR" + self.chapters_only = ctx.parent.params["chapters_only"] + self.atmos = ctx.parent.params["atmos"] + self.quality = ctx.parent.params.get("quality") or 1080 + + self.cdm = ctx.obj.cdm + self.profile = ctx.obj.profile + + self.region: dict[str, str] = {} + self.endpoints: dict[str, str] = {} + self.device: dict[str, str] = {} + + self.pv = False + self.device_token = None + self.device_id: None + self.customer_id = None + self.client_id = "f22dbddb-ef2c-48c5-8876-bed0d47594fd" # browser client id + + if self.vquality_source != ParameterSource.COMMANDLINE: + if 0 < self.quality <= 576 and self.range == "SDR": + self.log.info(" + Setting manifest quality to SD") + self.vquality = "SD" + + if self.quality > 1080: + self.log.info(" + Setting manifest quality to UHD to be able to get 2160p video track") + self.vquality = "UHD" + + self.vquality = self.vquality or "HD" + + # if self.bitrate_source != ParameterSource.COMMANDLINE: + # if self.vcodec == "H265" and self.range == "SDR" and self.bitrate != "CVBR+CBR": + # self.bitrate = "CVBR+CBR" + # self.log.info(" + Changed bitrate mode to CVBR+CBR to be able to get H.265 SDR video track") + + # if self.vquality == "UHD" and self.range != "SDR" and self.bitrate != "CBR": + # self.bitrate = "CBR" + # self.log.info(f" + Changed bitrate mode to CBR to be able to get highest quality UHD {self.range} video track") + + self.orig_bitrate = self.bitrate + + if self.range == "DV" and self.manifest_type == "SmoothStreaming": + self.log.exit("ISM DolbyVision is not supported, Exiting...") + + self.log.info(f"Bitrate Mode: {self.bitrate}") + self.log.info(f"Video Quality: {self.vquality}") + self.log.info(f"Audio Quality: {self.aquality}") + + self.configure() + + # Abstracted functions + + def get_titles(self): + res = self.session.get( + url=self.endpoints["details"], + params={ + "titleID": self.title, + "isElcano": "1", + "sections": ["Atf", "Btf"] + }, + headers={ + "Accept": "application/json" + } + ) + + if not res.ok: + raise self.log.exit(f"Unable to get title: {res.text} [{res.status_code}]") + + data = res.json()["widgets"] + product_details = data.get("productDetails", {}).get("detail") + + if not product_details: + error = res.json()["degradations"][0] + raise self.log.exit(f"Unable to get title: {error['message']} [{error['code']}]") + + titles = [] + + if data["pageContext"]["subPageType"] == "Movie": + card = data["productDetails"]["detail"] + titles.append(Title( + id_=card["catalogId"], + type_=Title.Types.MOVIE, + name=product_details["title"], + #year=card["releaseYear"], + year=card.get("releaseYear", ""), + # language is obtained afterward + original_lang=None, + source=self.ALIASES[0], + service_data=card + )) + else: + cards = [ + x["detail"] + for x in data["titleContent"][0]["cards"] + if not self.single or + (self.single and self.title in data["self"]["asins"]) or + (self.single and self.title in x["self"]["asins"]) + ] + for card in cards: + episode_number = card.get("episodeNumber", 0) + if episode_number != 0: + titles.append(Title( + id_=card["catalogId"], + type_=Title.Types.TV, + name=product_details["parentTitle"], + season=product_details["seasonNumber"], + episode=episode_number, + episode_name=card["title"], + # language is obtained afterward + original_lang=None, + source=self.ALIASES[0], + service_data=card + )) + + if not self.single: + temp_title = self.title + temp_single = self.single + + self.single = True + for season in data["seasonSelector"]: + try: + if data["self"]["asins"][0] in season["self"]["asins"]: + continue + self.title = season["self"]["asins"][0] + except: + if data["self"]["asins"][0] in season["titleID"]: + continue + self.title = season["titleID"] + for title in self.get_titles(): + titles.append(title) + + self.title = temp_title + self.single = temp_single + + if titles: + # TODO: Needs playback permission on first title, title needs to be available + original_lang = self.get_original_language(self.get_manifest( + next((x for x in titles if x.type == Title.Types.MOVIE or x.episode > 0), titles[0]), + video_codec=self.vcodec, + bitrate_mode=self.bitrate, + quality=self.vquality, + ignore_errors=True, + manifest_type=self.manifest_type + )) + if original_lang: + for title in titles: + title.original_lang = Language.get(original_lang) + else: + #self.log.warning(" - Unable to obtain the title's original language, setting 'en' default...") + for title in titles: + title.original_lang = Language.get("en") + + return titles + + def get_tracks(self, title: Title) -> Tracks: + tracks = Tracks() + if self.chapters_only: + return [] + + # manifest, chosen_manifest, tracks = self.get_best_quality(title) + + manifest = self.get_manifest( + title, + video_codec=self.vcodec, + bitrate_mode=self.bitrate, + quality=self.vquality, + hdr=self.range, + ignore_errors=False, + manifest_type=self.manifest_type + ) + + # Move rightsException termination here so that script can attempt continuing + if "rightsException" in manifest["returnedTitleRendition"]["selectedEntitlement"]: + self.log.error(" - The profile used does not have the rights to this title.") + return + + self.customer_id = manifest["returnedTitleRendition"]["selectedEntitlement"]["grantedByCustomerId"] + + default_url_set = manifest["playbackUrls"]["urlSets"][manifest["playbackUrls"]["defaultUrlSetId"]] + encoding_version = default_url_set["urls"]["manifest"]["encodingVersion"] + self.log.info(f" + Detected encodingVersion={encoding_version}") + + chosen_manifest = self.choose_manifest(manifest, self.cdn) + + if not chosen_manifest: + raise self.log.exit("No manifests available") + + manifest_url = self.clean_mpd_url(chosen_manifest["avUrlInfoList"][0]["url"], False) + self.log.debug(manifest_url) + self.log.info(" + Downloading Manifest") + + if chosen_manifest["streamingTechnology"] == "DASH": + self.log.warning(f"Getting tracks from '{chosen_manifest['streamingTechnology']}' manifest...") + tracks = Tracks([ + x for x in iter(Tracks.from_mpd( + url=manifest_url, + # session=self.session, + source=self.ALIASES[0], + # lang=title.original_lang, + # silent=True + )) + ]) + elif chosen_manifest["streamingTechnology"] == "SmoothStreaming": + self.log.warning(f"Getting tracks from '{chosen_manifest['streamingTechnology']}' manifest...") + tracks = Tracks([ + x for x in iter(Tracks.from_ism( + url=manifest_url, + # session=self.session, + source=self.ALIASES[0], + # lang=title.original_lang, + # silent=True + )) + ]) + else: + raise self.log.exit(f"Unsupported manifest type: {chosen_manifest['streamingTechnology']}") + + need_separate_audio = ((self.aquality or self.vquality) != self.vquality + or self.amanifest == "CVBR" and (self.vcodec, self.bitrate) != ("H264", "CVBR") + or self.amanifest == "CBR" and (self.vcodec, self.bitrate) != ("H264", "CBR") + or self.amanifest == "H265" and self.vcodec != "H265" + or self.amanifest != "H265" and self.vcodec == "H265") + + if not need_separate_audio: + audios = defaultdict(list) + for audio in tracks.audios: + audios[audio.language].append(audio) + + for lang in audios: + if not any((x.bitrate or 0) >= 640000 for x in audios[lang]): + need_separate_audio = True + break + + if need_separate_audio: + manifest_type = self.amanifest or "H265" + self.log.info(f"Getting audio from {manifest_type} manifest for potential higher bitrate or better codec") + audio_manifest = self.get_manifest( + title=title, + video_codec="H265" if manifest_type == "H265" else "H264", + bitrate_mode="CVBR" if manifest_type != "CBR" else "CBR", + quality=self.aquality or self.vquality, + hdr=None, + ignore_errors=True + ) + if not audio_manifest: + self.log.warning(f" - Unable to get {manifest_type} audio manifests, skipping") + elif not (chosen_audio_manifest := self.choose_manifest(audio_manifest, self.cdn)): + self.log.warning(f" - No {manifest_type} audio manifests available, skipping") + else: + audio_mpd_url = self.clean_mpd_url(chosen_audio_manifest["avUrlInfoList"][0]["url"], optimise=False) + self.log.debug(audio_mpd_url) + self.log.info(" + Downloading HEVC manifest") + + try: + audio_mpd = Tracks([ + x for x in iter(Tracks.from_mpd( + url=audio_mpd_url, + # session=self.session, + source=self.ALIASES[0], + #lang=title.original_lang +# silent=True + )) + ]) + except KeyError: + self.log.warning(f" - Title has no {self.amanifest} stream, cannot get higher quality audio") + else: + tracks.add(audio_mpd.audios, warn_only=True) # expecting possible dupes, ignore + + need_uhd_audio = self.atmos + + if not self.amanifest and ((self.aquality == "UHD" and self.vquality != "UHD") or not self.aquality): + audios = defaultdict(list) + for audio in tracks.audios: + audios[audio.language].append(audio) + for lang in audios: + if not any((x.bitrate or 0) >= 640000 for x in audios[lang]): + need_uhd_audio = True + break + + if need_uhd_audio and (self.config.get("device") or {}).get("default", None): + self.log.info("Getting audio from UHD manifest for potential higher bitrate or better codec") + # temp_device = self.device + # temp_device_token = self.device_token + # temp_device_id = self.device_id + + # self.device, self.device_id, self.device_token = self.register_device_fallback("DV") + + uhd_audio_manifest = None + + try: + if self.cdm.device.type == LocalDevice.Types.CHROME and self.quality < 2160: + self.log.info(" + Switching to device to get UHD manifest") + self.register_device() + + uhd_audio_manifest = self.get_manifest( + title=title, + video_codec="H265", + bitrate_mode="CVBR+CBR", + quality="UHD", + hdr="DV", # Needed for 576kbps Atmos sometimes + ignore_errors=True + ) + except: + pass + + # self.device = temp_device + # self.device_token = temp_device_token + # self.device_id = temp_device_id + + if not uhd_audio_manifest: + self.log.warning(" - Unable to get UHD manifests, skipping") + elif not (chosen_uhd_audio_manifest := self.choose_manifest(uhd_audio_manifest, self.cdn)): + self.log.warning(" - No UHD manifests available, skipping") + else: + uhd_audio_mpd_url = self.clean_mpd_url(chosen_uhd_audio_manifest["avUrlInfoList"][0]["url"], optimise=False) + self.log.debug(uhd_audio_mpd_url) + self.log.info(" + Downloading UHD manifest") + + try: + uhd_audio_mpd = Tracks([ + x for x in iter(Tracks.from_mpd( + url=uhd_audio_mpd_url, + # session=self.session, + source=self.ALIASES[0], + # lang=title.original_lang, + # silent=True + )) + ]) + except KeyError: + self.log.warning(" - Title has no UHD stream, cannot get higher quality audio") + else: + # replace the audio tracks with DV manifest version if atmos is present + atmos_audios = [x for x in uhd_audio_mpd.audios if x.atmos] + tracks.add(atmos_audios, warn_only=True) + + for video in tracks.videos: + video.hdr10 = chosen_manifest["hdrFormat"] == "Hdr10" + video.dv = chosen_manifest["hdrFormat"] == "DolbyVision" + + for audio in tracks.audios: + audio.descriptive = audio.extra[1].get("audioTrackSubtype") == "descriptive" + # Amazon @lang is just the lang code, no dialect, @audioTrackId has it. + audio_track_id = audio.extra[1].get("audioTrackId") + if audio_track_id: + audio.language = Language.get(audio_track_id.split("_")[0]) # e.g. es-419_ec3_blabla + + for sub in manifest.get("subtitleUrls", []) + manifest.get("forcedNarratives", []): + tracks.add(TextTrack( + id_=sub.get( + "timedTextTrackId", + f"{sub['languageCode']}_{sub['type']}_{sub['subtype']}_{sub['index']}" + ), + source=self.ALIASES[0], + url=os.path.splitext(sub["url"])[0] + ".ttml2", # DFXP -> SRT forcefully seems to work fine TTML2 FTW + # metadata + codec=sub["format"].lower(), + language=sub["languageCode"], + #is_original_lang=title.original_lang and is_close_match(sub["languageCode"], [title.original_lang]), + forced="forced" in sub["displayName"], + sdh=sub["type"].lower() == "sdh" # TODO: what other sub types? cc? forced? + ), warn_only=True) # expecting possible dupes, ignore + + tracks.audios = self.dedupe_audio(tracks.audios) + + return tracks + + + def get_chapters(self, title: Title) -> list[MenuTrack]: + """Get chapters from Amazon's XRay Scenes API.""" + manifest = self.get_manifest( + title, + video_codec=self.vcodec, + bitrate_mode=self.bitrate, + quality=self.vquality, + hdr=self.range + ) + + if "xrayMetadata" in manifest: + xray_params = manifest["xrayMetadata"]["parameters"] + elif self.chapters_only: + xray_params = { + "pageId": "fullScreen", + "pageType": "xray", + "serviceToken": json.dumps({ + "consumptionType": "Streaming", + "deviceClass": "normal", + "playbackMode": "playback", + "vcid": manifest["returnedTitleRendition"]["contentId"], + }) + } + else: + return [] + + xray_params.update({ + "deviceID": self.device_id, + "deviceTypeID": self.config["device_types"]["browser"], # must be browser device type + "marketplaceID": self.region["marketplace_id"], + "gascEnabled": str(self.pv).lower(), + "decorationScheme": "none", + "version": "inception-v2", + "uxLocale": "en-US", + "featureScheme": "XRAY_WEB_2020_V1" + }) + + xray = self.session.get( + url=self.endpoints["xray"], + params=xray_params + ).json().get("page") + + if not xray: + return [] + + widgets = xray["sections"]["center"]["widgets"]["widgetList"] + + scenes = next((x for x in widgets if x["tabType"] == "scenesTab"), None) + if not scenes: + return [] + scenes = scenes["widgets"]["widgetList"][0]["items"]["itemList"] + + chapters = [] + + for scene in scenes: + chapter_title = scene["textMap"]["PRIMARY"] + match = re.search(r"(\d+\. |)(.+)", chapter_title) + if match: + chapter_title = match.group(2) + chapters.append(MenuTrack( + number=int(scene["id"].replace("/xray/scene/", "")), + title=chapter_title, + timecode=scene["textMap"]["TERTIARY"].replace("Starts at ", "") + )) + + return chapters + + def certificate(self, **_): + return self.config["certificate"] + + def license(self, challenge: bytes, title: Title, track, **_): + manifest_type = "SmoothStreaming" if track.descriptor == Track.Descriptor.ISM else "DASH" + + # device = self.device_license if self.profile == "DV" else self.device + # device_id = self.device_id_license if self.profile == "DV" else self.device_id + # device_token = self.device_token_license if self.profile == "DV" else self.device_token + + lic = self.session.post( + url=self.endpoints["licence"], + params={ + "asin": title.id, + "consumptionType": "Streaming", + "desiredResources": "PlayReadyLicense", + "deviceTypeID": self.device_license["device_type"], + "deviceID": self.device_id_license, + "firmware": 1, + "gascEnabled": str(self.pv).lower(), #Prime Video or not + "marketplaceID": self.region["marketplace_id"], + "resourceUsage": "ImmediateConsumption", + "videoMaterialType": "Feature", + "operatingSystemName": "Windows", + "operatingSystemVersion": "10.0", + "customerID": self.customer_id, + "deviceDrmOverride": "CENC", + "deviceStreamingTechnologyOverride": manifest_type, + "deviceVideoQualityOverride": self.vquality, + "deviceHdrFormatsOverride": self.VIDEO_RANGE_MAP.get(self.range, "None"), + }, + headers={ + "Accept": "application/json", + "Content-Type": "application/x-www-form-urlencoded", + "Authorization": f"Bearer {self.device_token_license}" + }, + data={ + "playReadyChallenge": base64.b64encode(challenge.encode('utf-8')).decode('utf-8') # expects base64 + } + ).json() + if "errorsByResource" in lic: + print(lic["errorsByResource"]) + error_code = lic["errorsByResource"]["playReadyLicense"] + if "errorCode" in error_code: + error_code = error_code["errorCode"] + elif "type" in error_code: + error_code = error_code["type"] + if error_code == "PRS.NoRights.AnonymizerIP": + raise self.log.exit(" - Amazon detected a Proxy/VPN and refused to return a license!") + message = lic["errorsByResource"]["playReadyLicense"]["message"] + raise self.log.exit(f" - Amazon reported an error during the License request: {message} [{error_code}]") + if "error" in lic: + error_code = lic["error"] + if "errorCode" in error_code: + error_code = error_code["errorCode"] + elif "type" in error_code: + error_code = error_code["type"] + if error_code == "PRS.NoRights.AnonymizerIP": + raise self.log.exit(" - Amazon detected a Proxy/VPN and refused to return a license!") + message = lic["error"]["message"] + raise self.log.exit(f" - Amazon reported an error during the License request: {message} [{error_code}]") + return lic["playReadyLicense"]["encodedLicenseResponse"] + + # Service specific functions + + def configure(self) -> None: + if len(self.title) > 10: + self.pv = True + + self.log.info("Getting Account Region") + self.region = self.get_region() + if not self.region: + raise self.log.exit(" - Failed to get Amazon Account region") + self.GEOFENCE.append(self.region["code"]) + self.log.info(f" + Region: {self.region['code']}") + + # endpoints must be prepared AFTER region data is retrieved + self.endpoints = self.prepare_endpoints(self.config["endpoints"], self.region) + + self.session.headers.update({ + "Origin": f"https://{self.region['base']}" + }) + + self.device = (self.config.get("device") or {}).get(self.profile, {}) + self.device_license = (self.config.get("device") or {}).get("license", {}) + + if (self.quality > 1080 or self.range != "SDR") and self.vcodec == "H265" and self.cdm.device.type == LocalDevice.Types.CHROME: + self.log.info("Using device to get UHD manifests") + self.register_device() + elif not self.device or self.cdm.device.type == LocalDevice.Types.CHROME or self.vquality == "SD": + # falling back to browser-based device ID + if not self.device: + self.log.warning( + "No Device information was provided for %s, using browser device...", + self.profile + ) + self.device_id = hashlib.sha224( + ("CustomerID" + self.session.headers["User-Agent"]).encode("utf-8") + ).hexdigest() + self.device = {"device_type": self.config["device_types"]["browser"]} + else: + self.register_device() + + def register_device(self) -> None: + self.device_license = (self.config.get("device") or {}).get("license", {}) + device_cache_path_license = self.get_cache("device_tokens_{profile}_{hash}.json".format( + profile=self.profile, + hash=hashlib.md5(json.dumps(self.device_license).encode()).hexdigest()[0:6] + )) + self.device_token_license = self.DeviceRegistration( + device=self.device_license, + endpoints=self.endpoints, + log=self.log, + cache_path=device_cache_path_license, + session=self.session + ).bearer + self.device_id_license = self.device_license.get("device_serial") + if not self.device_id_license: + raise self.log.exit(f" - A device serial is required in the config, perhaps use: {os.urandom(8).hex()}") + + self.device = (self.config.get("device") or {}).get(self.profile, {}) + device_cache_path = self.get_cache("device_tokens_{profile}_{hash}.json".format( + profile=self.profile, + hash=hashlib.md5(json.dumps(self.device).encode()).hexdigest()[0:6] + )) + self.device_token = self.DeviceRegistration( + device=self.device, + endpoints=self.endpoints, + log=self.log, + cache_path=device_cache_path, + session=self.session + ).bearer + self.device_id = self.device.get("device_serial") + if not self.device_id: + raise self.log.exit(f" - A device serial is required in the config, perhaps use: {os.urandom(8).hex()}") + + def get_region(self) -> dict: + domain_region = self.get_domain_region() + if not domain_region: + return {} + + region = self.config["regions"].get(domain_region) + if not region: + raise self.log.exit(f" - There's no region configuration data for the region: {domain_region}") + + region["code"] = domain_region + + if self.pv: + res = self.session.get("https://www.primevideo.com").text + match = re.search(r'ue_furl *= *([\'"])fls-(na|eu|fe)\.amazon\.[a-z.]+\1', res) + if match: + pv_region = match.group(2).lower() + else: + raise self.log.exit(" - Failed to get PrimeVideo region") + pv_region = {"na": "atv-ps"}.get(pv_region, f"atv-ps-{pv_region}") + region["base_manifest"] = f"{pv_region}.primevideo.com" + region["base"] = "www.primevideo.com" + + return region + + def get_domain_region(self): + """Get the region of the cookies from the domain.""" + tlds = [tldextract.extract(x.domain) for x in self.cookies if x.domain_specified] + tld = next((x.suffix for x in tlds if x.domain.lower() in ("amazon", "primevideo")), None) + if tld: + tld = tld.split(".")[-1] + return {"com": "us", "uk": "gb"}.get(tld, tld) + + def prepare_endpoint(self, name: str, uri: str, region: dict) -> str: + if name in ("browse", "playback", "licence", "xray"): + return f"https://{(region['base_manifest'])}{uri}" + if name in ("ontv", "devicelink", "details"): + if self.pv: + host = "www.primevideo.com" + else: + host = region["base"] + return f"https://{host}{uri}" + if name in ("codepair", "register", "token"): + return f"https://{self.config['regions']['us']['base_api']}{uri}" + raise ValueError(f"Unknown endpoint: {name}") + + def prepare_endpoints(self, endpoints: dict, region: dict) -> dict: + return {k: self.prepare_endpoint(k, v, region) for k, v in endpoints.items()} + + def choose_manifest(self, manifest: dict, cdn=None): + """Get manifest URL for the title based on CDN weight (or specified CDN).""" + if cdn: + cdn = cdn.lower() + manifest = next((x for x in manifest["audioVideoUrls"]["avCdnUrlSets"] if x["cdn"].lower() == cdn), {}) + if not manifest: + raise self.log.exit(f" - There isn't any DASH manifests available on the CDN \"{cdn}\" for this title") + else: + manifest = next((x for x in sorted([x for x in manifest["audioVideoUrls"]["avCdnUrlSets"]], key=lambda x: int(x["cdnWeightsRank"]))), {}) + + return manifest + + def get_manifest( + self, title: Title, video_codec: str, bitrate_mode: str, quality: str, hdr=None, + ignore_errors: bool = False, manifest_type: str = "DASH" + ) -> dict: + res = self.session.get( + url=self.endpoints["playback"], + params={ + "asin": title.id, + "consumptionType": "Streaming", + "desiredResources": ",".join([ + "PlaybackUrls", + "AudioVideoUrls", + "CatalogMetadata", + "ForcedNarratives", + "SubtitlePresets", + "SubtitleUrls", + "TransitionTimecodes", + "TrickplayUrls", + "CuepointPlaylist", + "XRayMetadata", + "PlaybackSettings", + ]), + "deviceID": self.device_id, + "deviceTypeID": self.device["device_type"], + "firmware": 1, + "gascEnabled": str(self.pv).lower(), + "marketplaceID": self.region["marketplace_id"], + "resourceUsage": "CacheResources", + "videoMaterialType": "Feature", + "playerType": "html5", + "clientId": self.client_id, + **({ + "operatingSystemName": "Linux" if quality == "SD" else "Windows", + "operatingSystemVersion": "unknown" if quality == "SD" else "10.0", + } if not self.device_token else {}), + "deviceDrmOverride": "CENC", + "deviceStreamingTechnologyOverride": manifest_type, + "deviceProtocolOverride": "Https", + "deviceVideoCodecOverride": video_codec, + "deviceBitrateAdaptationsOverride": bitrate_mode.replace("+", ","), + "deviceVideoQualityOverride": quality, + "deviceHdrFormatsOverride": self.VIDEO_RANGE_MAP.get(hdr, "None"), + "supportedDRMKeyScheme": "DUAL_KEY", # ? + "liveManifestType": "live,accumulating", # ? + "titleDecorationScheme": "primary-content", + "subtitleFormat": "TTMLv2", + "languageFeature": "MLFv2", # ? + "uxLocale": "en_US", + "xrayDeviceClass": "normal", + "xrayPlaybackMode": "playback", + "xrayToken": "XRAY_WEB_2020_V1", + "playbackSettingsFormatVersion": "1.0.0", + "playerAttributes": json.dumps({"frameRate": "HFR"}), + # possibly old/unused/does nothing: + "audioTrackId": "all", + }, + headers={ + "Authorization": f"Bearer {self.device_token}" if self.device_token else None, + }, + ) + try: + manifest = res.json() + except json.JSONDecodeError: + if ignore_errors: + return {} + + raise self.log.exit(" - Amazon didn't return JSON data when obtaining the Playback Manifest.") + + if "error" in manifest: + if ignore_errors: + return {} + raise self.log.exit(" - Amazon reported an error when obtaining the Playback Manifest.") + + # Commented out as we move the rights exception check elsewhere + # if "rightsException" in manifest["returnedTitleRendition"]["selectedEntitlement"]: + # if ignore_errors: + # return {} + # raise self.log.exit(" - The profile used does not have the rights to this title.") + + # Below checks ignore NoRights errors + + if ( + manifest.get("errorsByResource", {}).get("PlaybackUrls") and + manifest["errorsByResource"]["PlaybackUrls"].get("errorCode") != "PRS.NoRights.NotOwned" + ): + if ignore_errors: + return {} + error = manifest["errorsByResource"]["PlaybackUrls"] + raise self.log.exit(f" - Amazon had an error with the Playback Urls: {error['message']} [{error['errorCode']}]") + + if ( + manifest.get("errorsByResource", {}).get("AudioVideoUrls") and + manifest["errorsByResource"]["AudioVideoUrls"].get("errorCode") != "PRS.NoRights.NotOwned" + ): + if ignore_errors: + return {} + error = manifest["errorsByResource"]["AudioVideoUrls"] + raise self.log.exit(f" - Amazon had an error with the A/V Urls: {error['message']} [{error['errorCode']}]") + + return manifest + + @staticmethod + def get_original_language(manifest): + """Get a title's original language from manifest data.""" + try: + return next( + x["language"].replace("_", "-") + for x in manifest["catalogMetadata"]["playback"]["audioTracks"] + if x["isOriginalLanguage"] + ) + except (KeyError, StopIteration): + pass + + if "defaultAudioTrackId" in manifest.get("playbackUrls", {}): + try: + return manifest["playbackUrls"]["defaultAudioTrackId"].split("_")[0] + except IndexError: + pass + + try: + return sorted( + manifest["audioVideoUrls"]["audioTrackMetadata"], + key=lambda x: x["index"] + )[0]["languageCode"] + except (KeyError, IndexError): + pass + + return None + + @staticmethod + def clean_mpd_url(mpd_url, optimise=True): + print(f"MPD URL: {mpd_url}, optimise: {optimise}") + """Clean up an Amazon MPD manifest url.""" + if 'akamaihd.net' in mpd_url: + match = re.search(r'[^/]*\$[^/]*/', mpd_url) + if match: + dollar_sign_part = match.group(0) + mpd_url = mpd_url.replace(dollar_sign_part, '', 1) + return mpd_url + + if optimise: + return mpd_url.replace("~", "") + "?encoding=segmentBase" + else: + if match := re.match(r"(https?://.*/)d.?/.*~/(.*)", mpd_url): + print(f"returned: {''.join(match.groups())}") + return "".join(match.groups()) + elif match := re.match(r"(https?://.*/)d.?/.*\$.*?/(.*)", mpd_url): + print(f"returned: {''.join(match.groups())}") + return "".join(match.groups()) + elif match := re.match(r"(https?://.*/).*\$.*?/(.*)", mpd_url): + print(f"returned: {''.join(match.groups())}") + return "".join(match.groups()) + raise ValueError("Unable to parse MPD URL") + +# def get_best_quality(self, title): +# """ +# Choose the best quality manifest from CBR / CVBR +# """ + +# track_list = [] +# bitrates = [self.orig_bitrate] + +# if self.vcodec != "H265": +# bitrates = self.orig_bitrate.split('+') + +# for bitrate in bitrates: +# manifest = self.get_manifest( +# title, +# video_codec=self.vcodec, +# bitrate_mode=bitrate, +# quality=self.vquality, +# hdr=self.range, +# ignore_errors=False +# ) + +# if not manifest: +# self.log.warning(f"Skipping {bitrate} manifest due to error") +# continue + +# # return three empty objects if a rightsException error exists to correlate to manifest, chosen_manifest, tracks +# if "rightsException" in manifest["returnedTitleRendition"]["selectedEntitlement"]: +# return None, None, None + +# self.customer_id = manifest["returnedTitleRendition"]["selectedEntitlement"]["grantedByCustomerId"] + +# default_url_set = manifest["playbackUrls"]["urlSets"][manifest["playbackUrls"]["defaultUrlSetId"]] +# encoding_version = default_url_set["urls"]["manifest"]["encodingVersion"] +# self.log.info(f" + Detected encodingVersion={encoding_version}") + +# chosen_manifest = self.choose_manifest(manifest, self.cdn) + +# if not chosen_manifest: +# self.log.warning(f"No {bitrate} DASH manifests available") +# continue + +# mpd_url = self.clean_mpd_url(chosen_manifest["avUrlInfoList"][0]["url"], optimise=False) +# self.log.debug(mpd_url) +# self.log.info(f" + Downloading {bitrate} MPD") + +# tracks = Tracks([ +# x for x in iter(Tracks.from_mpd( +# url=mpd_url, +# # session=self.session, +# source=self.ALIASES[0], +# #lang=title.original_lang, +# # ignore_errors=True +# ) if ".ism" not in mpd_url else Tracks.from_ism( +# url=mpd_url, +# # session=self.session, +# source=self.ALIASES[0], +# #lang=title.original_lang, +# # ignore_errors=True +# )) +# ]) + +# for video in tracks.videos: +# video.note = bitrate + +# max_size = max(tracks.videos, key=lambda x: int(x.size or 0)).size + +# track_list.append({ +# 'bitrate': bitrate, +# 'max_size': max_size, +# 'manifest': manifest, +# 'chosen_manifest': chosen_manifest, +# 'tracks': tracks +# }) + +# best_quality = max(track_list, key=lambda x: x['max_size']) + +# if len(self.bitrate.split('+')) > 1: +# self.bitrate = best_quality['bitrate'] +# self.log.info("Selected video manifest bitrate: %s", best_quality['bitrate']) + +# return best_quality['manifest'], best_quality['chosen_manifest'], best_quality['tracks'] + + @staticmethod + def dedupe_audio(audios: list[AudioTrack]) -> list[AudioTrack]: + tracks_ids = set() + new_audios = [] + for audio in audios: + track_id = hashlib.md5( + "{codec}-{lang}-{bitrate}-{channels}-{extra}".format( + codec=audio.codec, + lang=audio.language, + bitrate=audio.bitrate, + channels=audio.channels, + extra=(audio.extra[1].get("audioTrackId") or ""), + ).encode() + ).hexdigest() + if track_id in tracks_ids: + continue + tracks_ids.add(track_id) + new_audios.append(audio) + + return new_audios + + # Service specific classes + + class DeviceRegistration: + + def __init__(self, device: dict, endpoints: dict, cache_path: Path, session: requests.Session, log: Logger): + self.session = session + self.device = device + self.endpoints = endpoints + self.cache_path = cache_path + self.log = log + + self.device = {k: str(v) if not isinstance(v, str) else v for k, v in self.device.items()} + + self.bearer = None + if os.path.isfile(self.cache_path): + with open(self.cache_path, encoding="utf-8") as fd: + cache = jsonpickle.decode(fd.read()) + #self.device["device_serial"] = cache["device_serial"] + if cache.get("expires_in", 0) > int(time.time()): + # not expired, lets use + self.log.info(" + Using cached device bearer") + self.bearer = cache["access_token"] + else: + # expired, refresh + self.log.info("Cached device bearer expired, refreshing...") + refreshed_tokens = self.refresh(self.device, cache["refresh_token"]) + refreshed_tokens["refresh_token"] = cache["refresh_token"] + # expires_in seems to be in minutes, create a unix timestamp and add the minutes in seconds + refreshed_tokens["expires_in"] = int(time.time()) + int(refreshed_tokens["expires_in"]) + with open(self.cache_path, "w", encoding="utf-8") as fd: + fd.write(jsonpickle.encode(refreshed_tokens)) + self.bearer = refreshed_tokens["access_token"] + else: + self.log.info(" + Registering new device bearer") + self.bearer = self.register(self.device) + + def register(self, device: dict) -> dict: + """ + Register device to the account + :param device: Device data to register + :return: Device bearer tokens + """ + # OnTV csrf + csrf_token = self.get_csrf_token() + + # Code pair + code_pair = self.get_code_pair(device) + + # Device link + response = self.session.post( + url=self.endpoints["devicelink"], + headers={ + "Accept": "*/*", + "Accept-Language": "en-US,en;q=0.9,es-US;q=0.8,es;q=0.7", # needed? + "Content-Type": "application/x-www-form-urlencoded", + "Referer": self.endpoints["ontv"] + }, + params=urlencode({ + # any reason it urlencodes here? requests can take a param dict... + "ref_": "atv_set_rd_reg", + "publicCode": code_pair["public_code"], # public code pair + "token": csrf_token # csrf token + }) + ) + if response.status_code != 200: + raise self.log.exit(f"Unexpected response with the codeBasedLinking request: {response.text} [{response.status_code}]") + + # Register + response = self.session.post( + url=self.endpoints["register"], + headers={ + "Content-Type": "application/json", + "Accept-Language": "en-US" + }, + json={ + "auth_data": { + "code_pair": code_pair + }, + "registration_data": device, + "requested_token_type": ["bearer"], + "requested_extensions": ["device_info", "customer_info"] + }, + cookies=None # for some reason, may fail if cookies are present. Odd. + ) + if response.status_code != 200: + raise self.log.exit(f"Unable to register: {response.text} [{response.status_code}]") + bearer = response.json()["response"]["success"]["tokens"]["bearer"] + bearer["expires_in"] = int(time.time()) + int(bearer["expires_in"]) + + # Cache bearer + os.makedirs(os.path.dirname(self.cache_path), exist_ok=True) + with open(self.cache_path, "w", encoding="utf-8") as fd: + fd.write(jsonpickle.encode(bearer)) + + return bearer["access_token"] + + def refresh(self, device: dict, refresh_token: str) -> dict: + response = self.session.post( + url=self.endpoints["token"], + json={ + "app_name": device["app_name"], + "app_version": device["app_version"], + "source_token_type": "refresh_token", + "source_token": refresh_token, + "requested_token_type": "access_token" + } + ).json() + if "error" in response: + self.cache_path.unlink(missing_ok=True) # Remove the cached device as its tokens have expired + raise self.log.exit( + f"Failed to refresh device token: {response['error_description']} [{response['error']}]" + ) + if response["token_type"] != "bearer": + raise self.log.exit("Unexpected returned refreshed token type") + return response + + def get_csrf_token(self) -> str: + """ + On the amazon website, you need a token that is in the html page, + this token is used to register the device + :return: OnTV Page's CSRF Token + """ + res = self.session.get(self.endpoints["ontv"]) + response = res.text + if 'input type="hidden" name="appAction" value="SIGNIN"' in response: + raise self.log.exit( + "Cookies are signed out, cannot get ontv CSRF token. " + f"Expecting profile to have cookies for: {self.endpoints['ontv']}" + ) + for match in re.finditer(r"", response): + prop = json.loads(match.group(1)) + prop = prop.get("props", {}).get("codeEntry", {}).get("token") + if prop: + return prop + raise self.log.exit("Unable to get ontv CSRF token") + + def get_code_pair(self, device: dict) -> dict: + """ + Getting code pairs based on the device that you are using + :return: public and private code pairs + """ + res = self.session.post( + url=self.endpoints["codepair"], + headers={ + "Content-Type": "application/json", + "Accept-Language": "en-US" + }, + json={"code_data": device} + ).json() + if "error" in res: + raise self.log.exit(f"Unable to get code pair: {res['error_description']} [{res['error']}]") + return res diff --git a/Services/amazon-ISM.yml b/Services/amazon-ISM.yml new file mode 100644 index 0000000..c01e4bd --- /dev/null +++ b/Services/amazon-ISM.yml @@ -0,0 +1,111 @@ +certificate: | + CAUSwgUKvAIIAxIQCuQRtZRasVgFt7DIvVtVHBi17OSpBSKOAjCCAQoCggEBAKU2UrYVOSDlcXajWhpEgGhqGraJtFdUPgu6plJGy9ViaRn5mhyXON5PXm + w1krQdi0SLxf00FfIgnYFLpDfvNeItGn9rcx0RNPwP39PW7aW0Fbqi6VCaKWlR24kRpd7NQ4woyMXr7xlBWPwPNxK4xmR/6UuvKyYWEkroyeIjWHAqgCjC + mpfIpVcPsyrnMuPFGl82MMVnAhTweTKnEPOqJpxQ1bdQvVNCvkba5gjOTbEnJ7aXegwhmCdRQzXjTeEV2dO8oo5YfxW6pRBovzF6wYBMQYpSCJIA24ptAP + /2TkneyJuqm4hJNFvtF8fsBgTQQ4TIhnX4bZ9imuhivYLa6HsCAwEAAToPYW1hem9uLmNvbS1wcm9kEoADETQD6R0H/h9fyg0Hw7mj0M7T4s0bcBf4fMhA + Rpwk2X4HpvB49bJ5Yvc4t41mAnXGe/wiXbzsddKMiMffkSE1QWK1CFPBgziU23y1PjQToGiIv/sJIFRKRJ4qMBxIl95xlvSEzKdt68n7wqGa442+uAgk7C + XU3uTfVofYY76CrPBnEKQfad/CVqTh48geNTb4qRH1TX30NzCsB9NWlcdvg10pCnWSm8cSHu1d9yH+2yQgsGe52QoHHCqHNzG/wAxMYWTevXQW7EPTBeFy + SPY0xUN+2F2FhCf5/A7uFUHywd0zNTswh0QJc93LBTh46clRLO+d4RKBiBSj3rah6Y5iXMw9N9o58tCRc9gFHrjfMNubopWHjDOO3ATUgqXrTp+fKVCmsG + uGl1ComHxXV9i1AqHwzzY2JY2vFqo73jR3IElr6oChPIwcNokmNc0D4TXtjE0BoYkbWKJfHvJJihzMOvDicWUsemVHvua9/FBtpbHgpbgwijFPjtQF9Ldb + 8Swf + +device: + default: + domain: Device + app_name: com.amazon.amazonvideo.livingroom + app_version: '1.1' + device_model: PadFone + os_version: '6.0.1' + device_type: 'A1KAXIG6VXSG8Y' + device_name: '%FIRST_NAME%''s%DUPE_STRATEGY_1ST% PadFone' + device_serial: '3cc610286459e273' + license: + domain: Device + app_name: com.amazon.amazonvideo.livingroom + app_version: '1.1' + device_model: PadFone + os_version: '6.0.1' + device_type: 'A3REWRVYBYPKUM' + device_name: '%FIRST_NAME%''s%DUPE_STRATEGY_1ST% PadFone' + device_serial: '3cc610286459e273' + +device_types: + browser: 'AOAGZA014O5RE' # all browsers? all platforms? + tv_generic: 'A2SNKIF736WF4T' # type is shared among various random smart tvs + pc_app: 'A1RTAM01W29CUP' + mobile_app: 'A43PXU4ZN2AL1' + echo: 'A7WXQPH584YP' # echo Gen2 + echo_dot: 'A32DOYMUN6DTXA' # echo dot Gen3 + echo_studio: 'A3RBAYBE7VM004' # for audio stuff, this is probably the one to use + fire_7: 'A2M4YX06LWP8WI' + fire_7_again: 'A1Q7QCGNMXAKYW' # not sure the difference + fire_hd_8: 'A1C66CX2XD756O' + fire_hd_8_again: 'A38EHHIB10L47V' # not sure the difference + fire_hd_8_plus_2020: 'AVU7CPPF2ZRAS' + fire_hd_10: 'A1ZB65LA390I4K' + fire_tv: 'A2E0SNTXJVT7WK' # this is not the stick, this is the older stick-like diamond shaped one + fire_tv_gen2: 'A12GXV8XMS007S' + fire_tv_cube: 'A2JKHJ0PX4J3L3' # this is the STB-style big bulky cube + fire_tv_stick_gen1: 'ADVBD696BHNV5' # non-4k fire tv stick + fire_tv_stick_gen2: 'A2LWARUGJLBYEW' + fire_tv_stick_with_alexa: 'A265XOI9586NML' + fire_tv_stick_4k: 'A2GFL5ZMWNE0PX' # 4k fire tv stick + fire_tv_stick_4k_gen3: 'AKPGW064GI9HE' + nvidia_shield: 'A1KAXIG6VXSG8Y' # nvidia shield, unknown which one or if all + +endpoints: + browse: '/cdp/catalog/Browse' + details: '/gp/video/api/getDetailPage' + playback: '/cdp/catalog/GetPlaybackResources' + licence: '/cdp/catalog/GetPlaybackResources' + # chapters/scenes + xray: '/swift/page/xray' + # device registration + ontv: '/gp/video/ontv/code' + devicelink: '/gp/video/api/codeBasedLinking' + codepair: '/auth/create/codepair' + register: '/auth/register' + token: '/auth/token' + +regions: + us: + base: 'www.amazon.com' + base_api: 'api.amazon.com' + base_manifest: 'atv-ps.amazon.com' + marketplace_id: 'ATVPDKIKX0DER' + + gb: + base: 'www.amazon.co.uk' + base_api: 'api.amazon.co.uk' + base_manifest: 'atv-ps-eu.amazon.co.uk' + marketplace_id: 'A2IR4J4NTCP2M5' # A1F83G8C2ARO7P is also another marketplace_id + + it: + base: 'www.amazon.it' + base_api: 'api.amazon.it' + base_manifest: 'atv-ps-eu.primevideo.com' + marketplace_id: 'A3K6Y4MI8GDYMT' + + de: + base: 'www.amazon.de' + base_api: 'api.amazon.de' + base_manifest: 'atv-ps-eu.amazon.de' + marketplace_id: 'A1PA6795UKMFR9' + + au: + base: 'www.amazon.com.au' + base_api: 'api.amazon.com.au' + base_manifest: 'atv-ps-fe.amazon.com.au' + marketplace_id: 'A3K6Y4MI8GDYMT' + + jp: + base: 'www.amazon.co.jp' + base_api: 'api.amazon.co.jp' + base_manifest: 'atv-ps-fe.amazon.co.jp' + marketplace_id: 'A1VC38T7YXB528' + + pl: + base: 'www.amazon.com' + base_api: 'api.amazon.com' + base_manifest: 'atv-ps-eu.primevideo.com' + marketplace_id: 'A3K6Y4MI8GDYMT'