sp4rky-devine-services/services/AMZN/__init__.py
2025-04-09 15:07:35 -06:00

549 lines
22 KiB
Python

import base64
import hashlib
import json
import os
import re
import sys
from collections import defaultdict
from http.cookiejar import CookieJar
from typing import Any, Optional
from urllib.parse import urlparse, urlunparse
import click
from langcodes import Language
from pywidevine.device import DeviceTypes
from devine.core.credential import Credential
from devine.core.manifests import DASH
from devine.core.service import Service
from devine.core.titles import Episode, Movie, Movies, Series, Title_T, Titles_T
from devine.core.tracks import Chapter, Chapters, Subtitle, Tracks, Track, Video
from devine.core.utilities import is_close_match
from devine.core.utils.collections import as_list
class AMZN(Service):
"""
\b
Service code for Amazon Prime Video (https://primevideo.com).
Based on original code for VT, credit to original author.
\b
Authorization: Cookies
Robustness:
Widevine:
L1: 2160p
L3 Chrome: 720p, 1080p
L3 Android: 540p
PlayReady:
SL3: 2160p
SL2: 1080p
\b
Tips:
- Input should be asin only grabbed from URL or other means, e.g.:
B0B8KZPQBX OR amzn1.dv.gti.7aa9f19e-9c00-40e3-98e7-b365678492dd
- Use the --lang LANG_RANGE option to request non-english tracks
- Use --bitrate CBR to request Constant Bitrate:
devine dl -w s01e01 AMZN -b CBR B0B8KZPQBX
- Use --quality SD to request SD tracks (default is HD):
devine dl -w s01e01 AMZN -q SD B0B8KZPQBX
\b
Notes:
- Written specifically for use with ChromeCDM, with Android L3 as fallback.
- Region is chosen automatically based on domain extension found in cookies.
- Loading tracks could take a few seconds if the title has many audio tracks.
"""
# GEOFENCE = ("",)
ALIASES = ("amazon", "prime")
TITLE_RE = r"^(?:https?://(?:www\.)?(?P<domain>amazon\.(?P<region>com|co\.uk|de|co\.jp)|primevideo\.com)(?:/.+)?/)?(?P<id>[A-Z0-9]{10,}|amzn1\.dv\.gti\.[a-f0-9-]+)" # noqa: E501
AUDIO_CODEC_MAP = {"AAC": "mp4a", "EC3": "ec-3"}
@staticmethod
@click.command(name="AMZN", short_help="https://primevideo.com")
@click.argument("title", type=str, required=False)
@click.option(
"-b",
"--bitrate",
default="VBR+CBR",
type=click.Choice(["VBR", "CBR", "VBR+CBR"], case_sensitive=False),
help="Video Bitrate Mode to download in. VBR=Variable Bitrate, CBR=Constant Bitrate.",
)
# UHD, HD, SD. UHD only returns HEVC, ever, even for <=HD only content
@click.option(
"-q",
"--quality",
default="HD",
type=click.Choice(["SD", "HD", "UHD"], case_sensitive=False),
help="Manifest quality to request.",
)
@click.option(
"-am",
"--audio-manifest",
default=None,
type=click.Choice(["VBR", "CBR", "H265"], case_sensitive=False),
help="Manifest to use for audio. Defaults to H265 if the video manifest is missing 640k audio.",
)
@click.option(
"-aq",
"--audio-quality",
default="SD",
type=click.Choice(["SD", "HD", "UHD"], case_sensitive=False),
help="Manifest quality to request for audio. Defaults to the same as --quality.",
)
@click.pass_context
def cli(ctx, **kwargs):
return AMZN(ctx, **kwargs)
def __init__(self, ctx, title, bitrate, quality, audio_manifest, audio_quality):
m = self.parse_title(ctx, title)
self.domain = m.get("domain")
self.domain_region = m.get("region")
super().__init__(ctx)
self.bitrate = bitrate
self.quality = quality
self.audio_manifest = audio_manifest
self.audio_quality = audio_quality
self.vcodec = "H265" if ctx.parent.params.get("vcodec") == Video.Codec.HEVC else "H264"
self.acodec = ctx.parent.params.get("acodec")
self.cdm = ctx.obj.cdm
self.region = {}
self.endpoints = {}
self.device = {}
self.pv = self.domain == "primevideo.com"
self.device_token = None
self.device_id = None
self.customer_id = None
self.client_id = "f22dbddb-ef2c-48c5-8876-bed0d47594fd"
if self.cdm.device_type != DeviceTypes.CHROME:
self.log.info("Setting manifest quality to SD for Android L3 (use -q HD to override)")
self.quality = "SD"
# Abstracted functions
def authenticate(self, cookies: Optional[CookieJar] = None, credential: Optional[Credential] = None) -> None:
super().authenticate(cookies, credential)
if not cookies:
raise EnvironmentError("Service requires Cookies for Authentication.")
self.session.cookies.update(cookies)
self.configure()
def get_titles(self) -> Titles_T:
res = self.session.get(
url=self.endpoints["detail"],
params={"titleID": self.title, "isElcano": "1", "sections": "Atf"},
headers={"Accept": "application/json"},
).json()["widgets"]
entity = res["header"]["detail"].get("entityType")
if not entity:
self.log.error(" - Failed to get entity type")
sys.exit(1)
if entity == "Movie":
metadata = res["header"]["detail"]
return Movies(
[
Movie(
id_=metadata.get("catalogId"),
year=metadata.get("releaseYear"),
name=metadata.get("title"),
service=self.__class__,
data=metadata,
)
]
)
elif entity == "TV Show":
seasons = [x.get("titleID") for x in res["seasonSelector"]]
episodes = []
for season in seasons:
res = self.session.get(
url=self.endpoints["detail"],
params={"titleID": season, "isElcano": "1", "sections": "Btf"},
headers={"Accept": "application/json"},
).json()["widgets"]
# cards = [x["detail"] for x in as_list(res["titleContent"][0]["cards"])]
cards = [
{**x["detail"], "sequenceNumber": x["self"]["sequenceNumber"]}
for x in res["episodeList"]["episodes"]
]
product_details = res["productDetails"]["detail"]
episodes.extend(
Episode(
id_=title.get("titleId") or title["catalogId"],
title=product_details.get("parentTitle") or product_details["title"],
year=title.get("releaseYear") or product_details.get("releaseYear"),
season=product_details.get("seasonNumber"),
number=title.get("sequenceNumber"),
name=title.get("title"),
service=self.__class__,
data=title,
)
for title in cards
if title["entityType"] == "TV Show"
)
return Series(episodes)
def get_tracks(self, title: Title_T) -> Tracks:
manifest = self.get_manifest(title, video_codec=self.vcodec, bitrate_mode=self.bitrate, quality=self.quality)
if "rightsException" in manifest["returnedTitleRendition"]["selectedEntitlement"]:
self.log.error(" - The profile used does not have the rights to this title.")
sys.exit(1)
self.customer_id = manifest["returnedTitleRendition"]["selectedEntitlement"]["grantedByCustomerId"]
chosen_manifest = self.choose_manifest(manifest)
mpd_url = self.clean_mpd_url(chosen_manifest["avUrlInfoList"][0]["url"])
tracks = DASH.from_url(url=mpd_url, session=self.session).to_tracks(
language=re.sub(r"_dialog.*$", "", manifest["playbackUrls"]["defaultAudioTrackId"])
)
tracks.videos[0].data["timecodes"] = manifest.get("transitionTimecodes")
audios = defaultdict(list)
for audio in tracks.audio:
audios[audio.language].append(audio)
need_separate_audio = False
for lang in audios:
if not any((x.bitrate or 0) >= 640000 for x in audios[lang]):
need_separate_audio = True
break
if need_separate_audio:
manifest_type = self.audio_manifest or "H265"
self.log.info(f"Getting audio from {manifest_type} manifest for potential higher bitrate or better codec")
audio_manifest = self.get_manifest(
title,
"H265" if manifest_type == "H265" else "H264",
"VBR" if manifest_type != "CBR" else "CBR",
self.audio_quality or self.quality,
)
audio_mpd_url = self.clean_mpd_url(self.choose_manifest(audio_manifest)["avUrlInfoList"][0]["url"])
self.log.debug(audio_mpd_url)
try:
audio_mpd = DASH.from_url(url=audio_mpd_url, session=self.session).to_tracks(language="en")
except KeyError:
self.log.warning(f" - Title has no {self.audio_manifest} stream, cannot get higher quality audio")
else:
tracks.audio = audio_mpd.audio
for audio in tracks.audio:
# Amazon @lang is just the lang code, no dialect, @audioTrackId has it.
audio_track_id = audio.data["dash"]["adaptation_set"].get("audioTrackId")
sub_type = audio.data["dash"]["adaptation_set"].get("audioTrackSubtype")
if audio_track_id is not None:
audio.language = Language.get(audio_track_id.split("_")[0]) # e.g. es-419_ec3_blabla
if sub_type is not None and "descriptive" in sub_type.lower():
audio.descriptive = True
for track in tracks:
rep_base = track.data["dash"]["representation"].find("BaseURL")
if rep_base is not None:
base_url = os.path.dirname(track.url)
track_base = rep_base.text
track.url = f"{base_url}/{track_base}"
track.descriptor = Track.Descriptor.URL
track.data["dash"].clear()
# filter out boosted, descriptive, and lowest bitrate audio tracks
# tracks.audio = [
# audio
# for audio in tracks.audio
# if audio.data["dash"]["adaptation_set"].get("audioTrackSubtype", "").lower() == "dialog"
# and int(audio.data["dash"]["adaptation_set"].get("maxBandwidth", 0)) >= 192000
# ]
for sub in manifest.get("subtitleUrls", []) + manifest.get("forcedNarratives", []):
tracks.add(
Subtitle(
id_=sub.get(
"timedTextTrackId", f"{sub['languageCode']}_{sub['type']}_{sub['subtype']}_{sub['index']}"
),
url=os.path.splitext(sub["url"])[0] + ".srt", # DFXP -> SRT forcefully seems to work fine
codec=Subtitle.Codec.from_codecs("srt"), # sub["format"].lower(),
language=sub["languageCode"],
forced="forced" in sub["displayName"],
sdh=sub["type"].lower() == "sdh", # TODO: what other sub types? cc? forced?
),
warn_only=True,
) # expecting possible dupes, ignore
return tracks
def get_chapters(self, title: Title_T) -> Chapters:
timecodes = title.tracks.videos[0].data.get("timecodes")
if not timecodes:
return Chapters()
elements = [x for x in timecodes.get("skipElements", [])]
chapters = [
Chapter(
name=x.get("elementType", "Chapter"),
timestamp=x.get("startTimecodeMs"),
)
for x in elements
]
if timecodes.get("endCreditsStart"):
chapters.append(
Chapter(
name="CREDITS",
timestamp=timecodes.get("endCreditsStart"),
)
)
return chapters
def get_widevine_service_certificate(self, **_: Any) -> str:
return self.config["certificate"]
def get_widevine_license(self, *, challenge: bytes, title: Title_T, track) -> None:
response = self.session.post(
url=self.endpoints["license"],
params={
"asin": title.id,
"consumptionType": "Streaming",
"desiredResources": "Widevine2License",
"deviceTypeID": self.device["device_type"],
"deviceID": self.device_id,
"firmware": 1,
"gascEnabled": str(self.pv).lower(),
"marketplaceID": self.region["marketplace_id"],
"resourceUsage": "ImmediateConsumption",
"videoMaterialType": "Feature",
"operatingSystemName": "Linux" if self.quality == "SD" else "Windows",
"operatingSystemVersion": "unknown" if self.quality == "SD" else "10.0",
"customerID": self.customer_id,
"deviceDrmOverride": "CENC",
"deviceStreamingTechnologyOverride": "DASH",
"deviceVideoQualityOverride": "HD",
"deviceHdrFormatsOverride": "None",
},
headers={
"Accept": "application/json",
"Content-Type": "application/x-www-form-urlencoded",
"Authorization": f"Bearer {self.device_token}",
},
data={
"widevine2Challenge": base64.b64encode(challenge).decode(),
"includeHdcpTestKeyInLicense": "false",
},
).json()
if "errorsByResource" in response:
error_code = response["errorsByResource"]["Widevine2License"]
if "errorCode" in error_code:
error_code = error_code["errorCode"]
elif "type" in error_code:
error_code = error_code["type"]
if error_code in ["PRS.NoRights.AnonymizerIP", "PRS.NoRights.NotOwned"]:
self.log.error("Proxy detected, Unable to License")
elif error_code == "PRS.Dependency.DRM.Widevine.UnsupportedCdmVersion":
self.log.error("Cdm version not supported")
else:
self.log.error(f" x Error from Amazon's License Server: [{error_code}]")
sys.exit(1)
return response["widevine2License"]["license"]
# Service specific functions
def configure(self):
if len(self.title) > 10 and not (self.domain or "").startswith("amazon."):
self.pv = True
self.log.info("Getting account region")
self.region = self.get_region()
if not self.region:
self.log.error(" - Failed to get Amazon account region")
sys.exit(1)
# self.GEOFENCE.append(self.region["code"])
self.log.info(f" + Region: {self.region['code'].upper()}")
# endpoints must be prepared AFTER region data is retrieved
self.endpoints = self.prepare_endpoints(self.config["endpoints"], self.region)
self.session.headers.update({"Origin": f"https://{self.region['base']}"})
self.device_id = hashlib.sha224(("CustomerID" + self.session.headers["User-Agent"]).encode("utf-8")).hexdigest()
self.device = {"device_type": self.config["device_types"]["browser"]}
def get_region(self):
domain_region = self.get_domain_region()
if not domain_region:
return {}
region = self.config["regions"].get(domain_region)
if not region:
raise self.log.exit(f" - There's no region configuration data for the region: {domain_region}")
region["code"] = domain_region
if self.pv:
res = self.session.get("https://www.primevideo.com").text
match = re.search(r'ue_furl *= *([\'"])fls-(na|eu|fe)\.amazon\.[a-z.]+\1', res)
if match:
pv_region = match.group(2).lower()
else:
raise self.log.exit(" - Failed to get PrimeVideo region")
pv_region = {"na": "atv-ps"}.get(pv_region, f"atv-ps-{pv_region}")
region["base_manifest"] = f"{pv_region}.primevideo.com"
region["base"] = "www.primevideo.com"
return region
def get_domain_region(self):
"""Get the region of the cookies from the domain."""
tld = (self.domain_region or "").split(".")[-1]
if not tld:
domains = [x.domain for x in self.session.cookies if x.domain_specified]
tld = next((x.split(".")[-1] for x in domains if x.startswith((".amazon.", ".primevideo."))), None)
return {"com": "us", "uk": "gb"}.get(tld, tld)
def prepare_endpoint(self, name, uri, region):
if name in ("browse", "playback", "license", "xray"):
return f"https://{(region['base_manifest'])}{uri}"
if name in ("detail", "ontv", "devicelink"):
return f"https://{region['base']}{uri}"
if name in ("codepair", "register", "token"):
return f"https://{self.config['regions']['us']['base_api']}{uri}"
raise ValueError(f"Unknown endpoint: {name}")
def prepare_endpoints(self, endpoints, region):
return {k: self.prepare_endpoint(k, v, region) for k, v in endpoints.items()}
def choose_manifest(self, manifest):
"""Get manifest URL for the title based on CDN weight (or specified CDN)."""
manifest = sorted(manifest["audioVideoUrls"]["avCdnUrlSets"], key=lambda x: int(x["cdnWeightsRank"]))[0]
return manifest
def get_manifest(self, title, video_codec, bitrate_mode, quality, hdr=None):
r = self.session.get(
url=self.endpoints["playback"],
params={
"asin": title.id,
"consumptionType": "Streaming",
"desiredResources": ",".join(
[
"PlaybackUrls",
"AudioVideoUrls",
"CatalogMetadata",
"ForcedNarratives",
"SubtitlePresets",
"SubtitleUrls",
"TransitionTimecodes",
"TrickplayUrls",
"CuepointPlaylist",
"XRayMetadata",
"PlaybackSettings",
]
),
"deviceID": self.device_id,
"deviceTypeID": self.device["device_type"],
"firmware": 1,
"gascEnabled": str(self.pv).lower(),
"marketplaceID": self.region["marketplace_id"],
"resourceUsage": "CacheResources",
"videoMaterialType": "Feature",
"playerType": "html5",
"clientId": self.client_id,
"operatingSystemName": "Linux" if quality == "SD" else "Windows",
"operatingSystemVersion": "unknown" if quality == "SD" else "10.0",
"deviceDrmOverride": "CENC",
"deviceStreamingTechnologyOverride": "DASH",
"deviceProtocolOverride": "Https",
"deviceVideoCodecOverride": video_codec,
"deviceBitrateAdaptationsOverride": bitrate_mode.replace("VBR", "CVBR").replace("+", ","),
"deviceVideoQualityOverride": "HD",
"deviceHdrFormatsOverride": "None",
"supportedDRMKeyScheme": "DUAL_KEY",
"liveManifestType": "live,accumulating",
"titleDecorationScheme": "primary-content",
"subtitleFormat": "TTMLv2",
"languageFeature": "MLFv2",
"uxLocale": "en_US",
"xrayDeviceClass": "normal",
"xrayPlaybackMode": "playback",
"xrayToken": "XRAY_WEB_2020_V1",
"playbackSettingsFormatVersion": "1.0.0",
"playerAttributes": json.dumps({"frameRate": "HFR"}),
"audioTrackId": "all",
},
headers={"Authorization": f"Bearer {self.device_token}"},
)
try:
manifest = r.json()
except json.JSONDecodeError:
self.log.debug(r.text)
self.log.error(" - Amazon didn't return JSON data when obtaining the playback manifest.")
sys.exit(1)
if "error" in manifest:
self.log.error(" - Amazon reported an error when obtaining the playback manifest.")
self.log.error(manifest["error"])
sys.exit(1)
return manifest
@staticmethod
def get_original_language(manifest):
"""Get a title's original language from manifest data."""
try:
return next(
x["language"].replace("_", "-")
for x in manifest["catalogMetadata"]["playback"]["audioTracks"]
if x["isOriginalLanguage"]
)
except (KeyError, StopIteration):
pass
if "defaultAudioTrackId" in manifest.get("playbackUrls", {}):
try:
return manifest["playbackUrls"]["defaultAudioTrackId"].split("_")[0]
except IndexError:
pass
return None
@staticmethod
def clean_mpd_url(mpd_url):
"""Clean up an Amazon MPD manifest url."""
try:
parsed_url = urlparse(mpd_url)
new_path = "/".join(
segment for segment in parsed_url.path.split("/") if not any(sub in segment for sub in ["$", "dm"])
)
return urlunparse(parsed_url._replace(path=new_path))
except Exception as e:
raise ValueError(f"Unable to parse MPD URL: {e}")
def parse_title(self, ctx, title):
title = title or ctx.parent.params.get("title")
if not title:
self.log.exit(" - No title ID specified")
if not getattr(self, "TITLE_RE"):
self.title = title
return {}
for regex in as_list(self.TITLE_RE):
m = re.search(regex, title)
if m:
self.title = m.group("id")
return m.groupdict()
self.log.warning(f" - Unable to parse title ID {title!r}, using as-is")
self.title = title