549 lines
22 KiB
Python
549 lines
22 KiB
Python
import base64
|
|
import hashlib
|
|
import json
|
|
import os
|
|
import re
|
|
import sys
|
|
from collections import defaultdict
|
|
from http.cookiejar import CookieJar
|
|
from typing import Any, Optional
|
|
from urllib.parse import urlparse, urlunparse
|
|
|
|
import click
|
|
from langcodes import Language
|
|
from pywidevine.device import DeviceTypes
|
|
|
|
from devine.core.credential import Credential
|
|
from devine.core.manifests import DASH
|
|
from devine.core.service import Service
|
|
from devine.core.titles import Episode, Movie, Movies, Series, Title_T, Titles_T
|
|
from devine.core.tracks import Chapter, Chapters, Subtitle, Tracks, Track, Video
|
|
from devine.core.utilities import is_close_match
|
|
from devine.core.utils.collections import as_list
|
|
|
|
|
|
class AMZN(Service):
|
|
"""
|
|
\b
|
|
Service code for Amazon Prime Video (https://primevideo.com).
|
|
Based on original code for VT, credit to original author.
|
|
|
|
\b
|
|
Authorization: Cookies
|
|
Robustness:
|
|
Widevine:
|
|
L1: 2160p
|
|
L3 Chrome: 720p, 1080p
|
|
L3 Android: 540p
|
|
PlayReady:
|
|
SL3: 2160p
|
|
SL2: 1080p
|
|
|
|
\b
|
|
Tips:
|
|
- Input should be asin only grabbed from URL or other means, e.g.:
|
|
B0B8KZPQBX OR amzn1.dv.gti.7aa9f19e-9c00-40e3-98e7-b365678492dd
|
|
- Use the --lang LANG_RANGE option to request non-english tracks
|
|
- Use --bitrate CBR to request Constant Bitrate:
|
|
devine dl -w s01e01 AMZN -b CBR B0B8KZPQBX
|
|
- Use --quality SD to request SD tracks (default is HD):
|
|
devine dl -w s01e01 AMZN -q SD B0B8KZPQBX
|
|
|
|
\b
|
|
Notes:
|
|
- Written specifically for use with ChromeCDM, with Android L3 as fallback.
|
|
- Region is chosen automatically based on domain extension found in cookies.
|
|
- Loading tracks could take a few seconds if the title has many audio tracks.
|
|
"""
|
|
|
|
# GEOFENCE = ("",)
|
|
ALIASES = ("amazon", "prime")
|
|
TITLE_RE = r"^(?:https?://(?:www\.)?(?P<domain>amazon\.(?P<region>com|co\.uk|de|co\.jp)|primevideo\.com)(?:/.+)?/)?(?P<id>[A-Z0-9]{10,}|amzn1\.dv\.gti\.[a-f0-9-]+)" # noqa: E501
|
|
|
|
AUDIO_CODEC_MAP = {"AAC": "mp4a", "EC3": "ec-3"}
|
|
|
|
@staticmethod
|
|
@click.command(name="AMZN", short_help="https://primevideo.com")
|
|
@click.argument("title", type=str, required=False)
|
|
@click.option(
|
|
"-b",
|
|
"--bitrate",
|
|
default="VBR+CBR",
|
|
type=click.Choice(["VBR", "CBR", "VBR+CBR"], case_sensitive=False),
|
|
help="Video Bitrate Mode to download in. VBR=Variable Bitrate, CBR=Constant Bitrate.",
|
|
)
|
|
# UHD, HD, SD. UHD only returns HEVC, ever, even for <=HD only content
|
|
@click.option(
|
|
"-q",
|
|
"--quality",
|
|
default="HD",
|
|
type=click.Choice(["SD", "HD", "UHD"], case_sensitive=False),
|
|
help="Manifest quality to request.",
|
|
)
|
|
@click.option(
|
|
"-am",
|
|
"--audio-manifest",
|
|
default=None,
|
|
type=click.Choice(["VBR", "CBR", "H265"], case_sensitive=False),
|
|
help="Manifest to use for audio. Defaults to H265 if the video manifest is missing 640k audio.",
|
|
)
|
|
@click.option(
|
|
"-aq",
|
|
"--audio-quality",
|
|
default="SD",
|
|
type=click.Choice(["SD", "HD", "UHD"], case_sensitive=False),
|
|
help="Manifest quality to request for audio. Defaults to the same as --quality.",
|
|
)
|
|
@click.pass_context
|
|
def cli(ctx, **kwargs):
|
|
return AMZN(ctx, **kwargs)
|
|
|
|
def __init__(self, ctx, title, bitrate, quality, audio_manifest, audio_quality):
|
|
m = self.parse_title(ctx, title)
|
|
self.domain = m.get("domain")
|
|
self.domain_region = m.get("region")
|
|
super().__init__(ctx)
|
|
self.bitrate = bitrate
|
|
self.quality = quality
|
|
self.audio_manifest = audio_manifest
|
|
self.audio_quality = audio_quality
|
|
|
|
self.vcodec = "H265" if ctx.parent.params.get("vcodec") == Video.Codec.HEVC else "H264"
|
|
self.acodec = ctx.parent.params.get("acodec")
|
|
|
|
self.cdm = ctx.obj.cdm
|
|
self.region = {}
|
|
self.endpoints = {}
|
|
self.device = {}
|
|
|
|
self.pv = self.domain == "primevideo.com"
|
|
self.device_token = None
|
|
self.device_id = None
|
|
self.customer_id = None
|
|
self.client_id = "f22dbddb-ef2c-48c5-8876-bed0d47594fd"
|
|
|
|
if self.cdm.device_type != DeviceTypes.CHROME:
|
|
self.log.info("Setting manifest quality to SD for Android L3 (use -q HD to override)")
|
|
self.quality = "SD"
|
|
|
|
# Abstracted functions
|
|
|
|
def authenticate(self, cookies: Optional[CookieJar] = None, credential: Optional[Credential] = None) -> None:
|
|
super().authenticate(cookies, credential)
|
|
if not cookies:
|
|
raise EnvironmentError("Service requires Cookies for Authentication.")
|
|
|
|
self.session.cookies.update(cookies)
|
|
self.configure()
|
|
|
|
def get_titles(self) -> Titles_T:
|
|
res = self.session.get(
|
|
url=self.endpoints["detail"],
|
|
params={"titleID": self.title, "isElcano": "1", "sections": "Atf"},
|
|
headers={"Accept": "application/json"},
|
|
).json()["widgets"]
|
|
|
|
entity = res["header"]["detail"].get("entityType")
|
|
if not entity:
|
|
self.log.error(" - Failed to get entity type")
|
|
sys.exit(1)
|
|
|
|
if entity == "Movie":
|
|
metadata = res["header"]["detail"]
|
|
return Movies(
|
|
[
|
|
Movie(
|
|
id_=metadata.get("catalogId"),
|
|
year=metadata.get("releaseYear"),
|
|
name=metadata.get("title"),
|
|
service=self.__class__,
|
|
data=metadata,
|
|
)
|
|
]
|
|
)
|
|
elif entity == "TV Show":
|
|
seasons = [x.get("titleID") for x in res["seasonSelector"]]
|
|
|
|
episodes = []
|
|
for season in seasons:
|
|
res = self.session.get(
|
|
url=self.endpoints["detail"],
|
|
params={"titleID": season, "isElcano": "1", "sections": "Btf"},
|
|
headers={"Accept": "application/json"},
|
|
).json()["widgets"]
|
|
|
|
# cards = [x["detail"] for x in as_list(res["titleContent"][0]["cards"])]
|
|
cards = [
|
|
{**x["detail"], "sequenceNumber": x["self"]["sequenceNumber"]}
|
|
for x in res["episodeList"]["episodes"]
|
|
]
|
|
|
|
product_details = res["productDetails"]["detail"]
|
|
|
|
episodes.extend(
|
|
Episode(
|
|
id_=title.get("titleId") or title["catalogId"],
|
|
title=product_details.get("parentTitle") or product_details["title"],
|
|
year=title.get("releaseYear") or product_details.get("releaseYear"),
|
|
season=product_details.get("seasonNumber"),
|
|
number=title.get("sequenceNumber"),
|
|
name=title.get("title"),
|
|
service=self.__class__,
|
|
data=title,
|
|
)
|
|
for title in cards
|
|
if title["entityType"] == "TV Show"
|
|
)
|
|
|
|
return Series(episodes)
|
|
|
|
def get_tracks(self, title: Title_T) -> Tracks:
|
|
manifest = self.get_manifest(title, video_codec=self.vcodec, bitrate_mode=self.bitrate, quality=self.quality)
|
|
|
|
if "rightsException" in manifest["returnedTitleRendition"]["selectedEntitlement"]:
|
|
self.log.error(" - The profile used does not have the rights to this title.")
|
|
sys.exit(1)
|
|
|
|
self.customer_id = manifest["returnedTitleRendition"]["selectedEntitlement"]["grantedByCustomerId"]
|
|
|
|
chosen_manifest = self.choose_manifest(manifest)
|
|
mpd_url = self.clean_mpd_url(chosen_manifest["avUrlInfoList"][0]["url"])
|
|
|
|
tracks = DASH.from_url(url=mpd_url, session=self.session).to_tracks(
|
|
language=re.sub(r"_dialog.*$", "", manifest["playbackUrls"]["defaultAudioTrackId"])
|
|
)
|
|
tracks.videos[0].data["timecodes"] = manifest.get("transitionTimecodes")
|
|
|
|
audios = defaultdict(list)
|
|
for audio in tracks.audio:
|
|
audios[audio.language].append(audio)
|
|
|
|
need_separate_audio = False
|
|
for lang in audios:
|
|
if not any((x.bitrate or 0) >= 640000 for x in audios[lang]):
|
|
need_separate_audio = True
|
|
break
|
|
|
|
if need_separate_audio:
|
|
manifest_type = self.audio_manifest or "H265"
|
|
self.log.info(f"Getting audio from {manifest_type} manifest for potential higher bitrate or better codec")
|
|
audio_manifest = self.get_manifest(
|
|
title,
|
|
"H265" if manifest_type == "H265" else "H264",
|
|
"VBR" if manifest_type != "CBR" else "CBR",
|
|
self.audio_quality or self.quality,
|
|
)
|
|
audio_mpd_url = self.clean_mpd_url(self.choose_manifest(audio_manifest)["avUrlInfoList"][0]["url"])
|
|
self.log.debug(audio_mpd_url)
|
|
|
|
try:
|
|
audio_mpd = DASH.from_url(url=audio_mpd_url, session=self.session).to_tracks(language="en")
|
|
except KeyError:
|
|
self.log.warning(f" - Title has no {self.audio_manifest} stream, cannot get higher quality audio")
|
|
else:
|
|
tracks.audio = audio_mpd.audio
|
|
|
|
for audio in tracks.audio:
|
|
# Amazon @lang is just the lang code, no dialect, @audioTrackId has it.
|
|
audio_track_id = audio.data["dash"]["adaptation_set"].get("audioTrackId")
|
|
sub_type = audio.data["dash"]["adaptation_set"].get("audioTrackSubtype")
|
|
if audio_track_id is not None:
|
|
audio.language = Language.get(audio_track_id.split("_")[0]) # e.g. es-419_ec3_blabla
|
|
if sub_type is not None and "descriptive" in sub_type.lower():
|
|
audio.descriptive = True
|
|
|
|
for track in tracks:
|
|
rep_base = track.data["dash"]["representation"].find("BaseURL")
|
|
if rep_base is not None:
|
|
base_url = os.path.dirname(track.url)
|
|
track_base = rep_base.text
|
|
track.url = f"{base_url}/{track_base}"
|
|
track.descriptor = Track.Descriptor.URL
|
|
track.data["dash"].clear()
|
|
|
|
# filter out boosted, descriptive, and lowest bitrate audio tracks
|
|
# tracks.audio = [
|
|
# audio
|
|
# for audio in tracks.audio
|
|
# if audio.data["dash"]["adaptation_set"].get("audioTrackSubtype", "").lower() == "dialog"
|
|
# and int(audio.data["dash"]["adaptation_set"].get("maxBandwidth", 0)) >= 192000
|
|
# ]
|
|
|
|
for sub in manifest.get("subtitleUrls", []) + manifest.get("forcedNarratives", []):
|
|
tracks.add(
|
|
Subtitle(
|
|
id_=sub.get(
|
|
"timedTextTrackId", f"{sub['languageCode']}_{sub['type']}_{sub['subtype']}_{sub['index']}"
|
|
),
|
|
url=os.path.splitext(sub["url"])[0] + ".srt", # DFXP -> SRT forcefully seems to work fine
|
|
codec=Subtitle.Codec.from_codecs("srt"), # sub["format"].lower(),
|
|
language=sub["languageCode"],
|
|
forced="forced" in sub["displayName"],
|
|
sdh=sub["type"].lower() == "sdh", # TODO: what other sub types? cc? forced?
|
|
),
|
|
warn_only=True,
|
|
) # expecting possible dupes, ignore
|
|
|
|
return tracks
|
|
|
|
def get_chapters(self, title: Title_T) -> Chapters:
|
|
timecodes = title.tracks.videos[0].data.get("timecodes")
|
|
if not timecodes:
|
|
return Chapters()
|
|
|
|
elements = [x for x in timecodes.get("skipElements", [])]
|
|
|
|
chapters = [
|
|
Chapter(
|
|
name=x.get("elementType", "Chapter"),
|
|
timestamp=x.get("startTimecodeMs"),
|
|
)
|
|
for x in elements
|
|
]
|
|
|
|
if timecodes.get("endCreditsStart"):
|
|
chapters.append(
|
|
Chapter(
|
|
name="CREDITS",
|
|
timestamp=timecodes.get("endCreditsStart"),
|
|
)
|
|
)
|
|
|
|
return chapters
|
|
|
|
def get_widevine_service_certificate(self, **_: Any) -> str:
|
|
return self.config["certificate"]
|
|
|
|
def get_widevine_license(self, *, challenge: bytes, title: Title_T, track) -> None:
|
|
response = self.session.post(
|
|
url=self.endpoints["license"],
|
|
params={
|
|
"asin": title.id,
|
|
"consumptionType": "Streaming",
|
|
"desiredResources": "Widevine2License",
|
|
"deviceTypeID": self.device["device_type"],
|
|
"deviceID": self.device_id,
|
|
"firmware": 1,
|
|
"gascEnabled": str(self.pv).lower(),
|
|
"marketplaceID": self.region["marketplace_id"],
|
|
"resourceUsage": "ImmediateConsumption",
|
|
"videoMaterialType": "Feature",
|
|
"operatingSystemName": "Linux" if self.quality == "SD" else "Windows",
|
|
"operatingSystemVersion": "unknown" if self.quality == "SD" else "10.0",
|
|
"customerID": self.customer_id,
|
|
"deviceDrmOverride": "CENC",
|
|
"deviceStreamingTechnologyOverride": "DASH",
|
|
"deviceVideoQualityOverride": "HD",
|
|
"deviceHdrFormatsOverride": "None",
|
|
},
|
|
headers={
|
|
"Accept": "application/json",
|
|
"Content-Type": "application/x-www-form-urlencoded",
|
|
"Authorization": f"Bearer {self.device_token}",
|
|
},
|
|
data={
|
|
"widevine2Challenge": base64.b64encode(challenge).decode(),
|
|
"includeHdcpTestKeyInLicense": "false",
|
|
},
|
|
).json()
|
|
if "errorsByResource" in response:
|
|
error_code = response["errorsByResource"]["Widevine2License"]
|
|
if "errorCode" in error_code:
|
|
error_code = error_code["errorCode"]
|
|
elif "type" in error_code:
|
|
error_code = error_code["type"]
|
|
|
|
if error_code in ["PRS.NoRights.AnonymizerIP", "PRS.NoRights.NotOwned"]:
|
|
self.log.error("Proxy detected, Unable to License")
|
|
elif error_code == "PRS.Dependency.DRM.Widevine.UnsupportedCdmVersion":
|
|
self.log.error("Cdm version not supported")
|
|
else:
|
|
self.log.error(f" x Error from Amazon's License Server: [{error_code}]")
|
|
sys.exit(1)
|
|
|
|
return response["widevine2License"]["license"]
|
|
|
|
# Service specific functions
|
|
|
|
def configure(self):
|
|
if len(self.title) > 10 and not (self.domain or "").startswith("amazon."):
|
|
self.pv = True
|
|
|
|
self.log.info("Getting account region")
|
|
self.region = self.get_region()
|
|
if not self.region:
|
|
self.log.error(" - Failed to get Amazon account region")
|
|
sys.exit(1)
|
|
# self.GEOFENCE.append(self.region["code"])
|
|
self.log.info(f" + Region: {self.region['code'].upper()}")
|
|
|
|
# endpoints must be prepared AFTER region data is retrieved
|
|
self.endpoints = self.prepare_endpoints(self.config["endpoints"], self.region)
|
|
|
|
self.session.headers.update({"Origin": f"https://{self.region['base']}"})
|
|
|
|
self.device_id = hashlib.sha224(("CustomerID" + self.session.headers["User-Agent"]).encode("utf-8")).hexdigest()
|
|
self.device = {"device_type": self.config["device_types"]["browser"]}
|
|
|
|
def get_region(self):
|
|
domain_region = self.get_domain_region()
|
|
if not domain_region:
|
|
return {}
|
|
|
|
region = self.config["regions"].get(domain_region)
|
|
if not region:
|
|
raise self.log.exit(f" - There's no region configuration data for the region: {domain_region}")
|
|
|
|
region["code"] = domain_region
|
|
|
|
if self.pv:
|
|
res = self.session.get("https://www.primevideo.com").text
|
|
match = re.search(r'ue_furl *= *([\'"])fls-(na|eu|fe)\.amazon\.[a-z.]+\1', res)
|
|
if match:
|
|
pv_region = match.group(2).lower()
|
|
else:
|
|
raise self.log.exit(" - Failed to get PrimeVideo region")
|
|
pv_region = {"na": "atv-ps"}.get(pv_region, f"atv-ps-{pv_region}")
|
|
region["base_manifest"] = f"{pv_region}.primevideo.com"
|
|
region["base"] = "www.primevideo.com"
|
|
|
|
return region
|
|
|
|
def get_domain_region(self):
|
|
"""Get the region of the cookies from the domain."""
|
|
tld = (self.domain_region or "").split(".")[-1]
|
|
if not tld:
|
|
domains = [x.domain for x in self.session.cookies if x.domain_specified]
|
|
tld = next((x.split(".")[-1] for x in domains if x.startswith((".amazon.", ".primevideo."))), None)
|
|
return {"com": "us", "uk": "gb"}.get(tld, tld)
|
|
|
|
def prepare_endpoint(self, name, uri, region):
|
|
if name in ("browse", "playback", "license", "xray"):
|
|
return f"https://{(region['base_manifest'])}{uri}"
|
|
if name in ("detail", "ontv", "devicelink"):
|
|
return f"https://{region['base']}{uri}"
|
|
if name in ("codepair", "register", "token"):
|
|
return f"https://{self.config['regions']['us']['base_api']}{uri}"
|
|
raise ValueError(f"Unknown endpoint: {name}")
|
|
|
|
def prepare_endpoints(self, endpoints, region):
|
|
return {k: self.prepare_endpoint(k, v, region) for k, v in endpoints.items()}
|
|
|
|
def choose_manifest(self, manifest):
|
|
"""Get manifest URL for the title based on CDN weight (or specified CDN)."""
|
|
manifest = sorted(manifest["audioVideoUrls"]["avCdnUrlSets"], key=lambda x: int(x["cdnWeightsRank"]))[0]
|
|
return manifest
|
|
|
|
def get_manifest(self, title, video_codec, bitrate_mode, quality, hdr=None):
|
|
r = self.session.get(
|
|
url=self.endpoints["playback"],
|
|
params={
|
|
"asin": title.id,
|
|
"consumptionType": "Streaming",
|
|
"desiredResources": ",".join(
|
|
[
|
|
"PlaybackUrls",
|
|
"AudioVideoUrls",
|
|
"CatalogMetadata",
|
|
"ForcedNarratives",
|
|
"SubtitlePresets",
|
|
"SubtitleUrls",
|
|
"TransitionTimecodes",
|
|
"TrickplayUrls",
|
|
"CuepointPlaylist",
|
|
"XRayMetadata",
|
|
"PlaybackSettings",
|
|
]
|
|
),
|
|
"deviceID": self.device_id,
|
|
"deviceTypeID": self.device["device_type"],
|
|
"firmware": 1,
|
|
"gascEnabled": str(self.pv).lower(),
|
|
"marketplaceID": self.region["marketplace_id"],
|
|
"resourceUsage": "CacheResources",
|
|
"videoMaterialType": "Feature",
|
|
"playerType": "html5",
|
|
"clientId": self.client_id,
|
|
"operatingSystemName": "Linux" if quality == "SD" else "Windows",
|
|
"operatingSystemVersion": "unknown" if quality == "SD" else "10.0",
|
|
"deviceDrmOverride": "CENC",
|
|
"deviceStreamingTechnologyOverride": "DASH",
|
|
"deviceProtocolOverride": "Https",
|
|
"deviceVideoCodecOverride": video_codec,
|
|
"deviceBitrateAdaptationsOverride": bitrate_mode.replace("VBR", "CVBR").replace("+", ","),
|
|
"deviceVideoQualityOverride": "HD",
|
|
"deviceHdrFormatsOverride": "None",
|
|
"supportedDRMKeyScheme": "DUAL_KEY",
|
|
"liveManifestType": "live,accumulating",
|
|
"titleDecorationScheme": "primary-content",
|
|
"subtitleFormat": "TTMLv2",
|
|
"languageFeature": "MLFv2",
|
|
"uxLocale": "en_US",
|
|
"xrayDeviceClass": "normal",
|
|
"xrayPlaybackMode": "playback",
|
|
"xrayToken": "XRAY_WEB_2020_V1",
|
|
"playbackSettingsFormatVersion": "1.0.0",
|
|
"playerAttributes": json.dumps({"frameRate": "HFR"}),
|
|
"audioTrackId": "all",
|
|
},
|
|
headers={"Authorization": f"Bearer {self.device_token}"},
|
|
)
|
|
try:
|
|
manifest = r.json()
|
|
except json.JSONDecodeError:
|
|
self.log.debug(r.text)
|
|
self.log.error(" - Amazon didn't return JSON data when obtaining the playback manifest.")
|
|
sys.exit(1)
|
|
if "error" in manifest:
|
|
self.log.error(" - Amazon reported an error when obtaining the playback manifest.")
|
|
self.log.error(manifest["error"])
|
|
sys.exit(1)
|
|
return manifest
|
|
|
|
@staticmethod
|
|
def get_original_language(manifest):
|
|
"""Get a title's original language from manifest data."""
|
|
try:
|
|
return next(
|
|
x["language"].replace("_", "-")
|
|
for x in manifest["catalogMetadata"]["playback"]["audioTracks"]
|
|
if x["isOriginalLanguage"]
|
|
)
|
|
except (KeyError, StopIteration):
|
|
pass
|
|
|
|
if "defaultAudioTrackId" in manifest.get("playbackUrls", {}):
|
|
try:
|
|
return manifest["playbackUrls"]["defaultAudioTrackId"].split("_")[0]
|
|
except IndexError:
|
|
pass
|
|
|
|
return None
|
|
|
|
@staticmethod
|
|
def clean_mpd_url(mpd_url):
|
|
"""Clean up an Amazon MPD manifest url."""
|
|
try:
|
|
parsed_url = urlparse(mpd_url)
|
|
new_path = "/".join(
|
|
segment for segment in parsed_url.path.split("/") if not any(sub in segment for sub in ["$", "dm"])
|
|
)
|
|
return urlunparse(parsed_url._replace(path=new_path))
|
|
except Exception as e:
|
|
raise ValueError(f"Unable to parse MPD URL: {e}")
|
|
|
|
def parse_title(self, ctx, title):
|
|
title = title or ctx.parent.params.get("title")
|
|
if not title:
|
|
self.log.exit(" - No title ID specified")
|
|
if not getattr(self, "TITLE_RE"):
|
|
self.title = title
|
|
return {}
|
|
for regex in as_list(self.TITLE_RE):
|
|
m = re.search(regex, title)
|
|
if m:
|
|
self.title = m.group("id")
|
|
return m.groupdict()
|
|
self.log.warning(f" - Unable to parse title ID {title!r}, using as-is")
|
|
self.title = title
|