1116 lines
48 KiB
Python
1116 lines
48 KiB
Python
from __future__ import annotations
|
|
|
|
import base64
|
|
import click
|
|
import re
|
|
import secrets
|
|
import sys
|
|
import uuid
|
|
|
|
from click import Context
|
|
from collections.abc import Generator
|
|
from datetime import datetime
|
|
from http.cookiejar import CookieJar
|
|
from langcodes import Language
|
|
from pyplayready.cdm import Cdm as PlayReadyCdm
|
|
from requests import Request
|
|
from typing import Any, Optional, Union, List
|
|
|
|
from unshackle.core.constants import AnyTrack
|
|
from unshackle.core.credential import Credential
|
|
from unshackle.core.manifests import HLS
|
|
from unshackle.core.search_result import SearchResult
|
|
from unshackle.core.service import Service
|
|
from unshackle.core.titles import Title_T, Titles_T, Episode, Movie, Movies, Series
|
|
from unshackle.core.tracks import Chapter, Chapters, Tracks, Attachment, Video, Audio, Subtitle
|
|
from unshackle.core.utilities import get_ip_info
|
|
from unshackle.core.utils.collections import as_list
|
|
|
|
from . import queries
|
|
|
|
class DSNP(Service):
|
|
"""
|
|
Service code for Disney+ Streaming Service (https://disneyplus.com).
|
|
|
|
Author: Made by CodeName393 with Special Thanks to narakama\n
|
|
Authorization: Credentials\n
|
|
Security: UHD@L1/SL3000 FHD@L1/SL3000 HD@L3/SL2000
|
|
"""
|
|
|
|
ALIASES = ("DSNP", "disneyplus", "disney+")
|
|
TITLE_RE = (
|
|
r"^(?:https?://(?:www\.)?disneyplus\.com(?:/(?!browse)[a-z0-9-]+)?(?:/(?!browse)[a-z0-9-]+)?/(browse)/(?P<id>entity-[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}))(?:\?.*)?$",
|
|
r"^(?:https?://(?:www\.)?disneyplus\.com(?:/[a-z0-9-]+)?(?:/[a-z0-9-]+)?/(movies|series)/[a-z0-9-]+/)?(?P<id>[a-zA-Z0-9-]+)(?:\?.*)?$",
|
|
)
|
|
|
|
@staticmethod
|
|
@click.command(name="DisneyPlus", short_help="https://disneyplus.com", help=__doc__)
|
|
@click.argument("title", type=str)
|
|
@click.option("--imax", is_flag=True, default=False, help="Prefer IMAX Enhanced version if available.")
|
|
@click.option("--remastered-ar", is_flag=True, default=False, help="Prefer Remastered Aspect Ratio if available.")
|
|
@click.option("-ext", "--extras", is_flag=True, default=False, help="Select a extras video if available.")
|
|
@click.pass_context
|
|
def cli(ctx: Context, **kwargs: Any) -> DSNP:
|
|
return DSNP(ctx, **kwargs)
|
|
|
|
def __init__(self, ctx: Context, title: str, imax: bool, remastered_ar: bool, extras: bool):
|
|
self.title = title
|
|
super().__init__(ctx)
|
|
|
|
self.title_id = self.title
|
|
for pattern in self.TITLE_RE:
|
|
match = re.match(pattern, self.title)
|
|
if match:
|
|
self.title_id = match.group("id")
|
|
break
|
|
|
|
self.prefer_imax = imax
|
|
self.prefer_remastered_ar = remastered_ar
|
|
self.extras = extras
|
|
|
|
self.vcodec = ctx.parent.params.get("vcodec") or Video.Codec.AVC
|
|
self.acodec : Audio.Codec = ctx.parent.params.get("acodec")
|
|
self.range = ctx.parent.params.get("range_") or [Video.Range.SDR]
|
|
self.quality: List[int] = ctx.parent.params.get("quality") or [1080]
|
|
self.wanted = ctx.parent.params.get("wanted")
|
|
self.audio_only = ctx.parent.params.get("audio_only")
|
|
self.subs_only = ctx.parent.params.get("subs_only")
|
|
self.chapters_only = ctx.parent.params.get("chapters_only")
|
|
|
|
self.cdm = ctx.obj.cdm
|
|
self.playready = isinstance(self.cdm, PlayReadyCdm) if self.cdm else False
|
|
self.is_l3 = (self.cdm.security_level < 3000) if self.playready else (self.cdm.security_level == 3) if self.cdm else False
|
|
|
|
self.region = None
|
|
self.prod_config = {}
|
|
self.account_tokens = {}
|
|
self.active_session = {}
|
|
self.playback_data = {}
|
|
|
|
self.log.info("Preparing...")
|
|
|
|
if self.is_l3:
|
|
self.vcodec = Video.Codec.AVC
|
|
self.range = [Video.Range.SDR]
|
|
self.quality = [720]
|
|
self.log.warning(" + Switched video to HD. This CDM only support HD.")
|
|
else:
|
|
if self.quality > [1080] and self.range[0] == [Video.Range.SDR]:
|
|
self.range = [Video.Range.HDR10]
|
|
self.log.info(" + Switched range to HDR10. 4K resolution requires HDR.")
|
|
|
|
if (self.range != [Video.Range.SDR] or self.quality > [1080]) and self.vcodec != Video.Codec.HEVC:
|
|
self.vcodec = Video.Codec.HEVC
|
|
self.log.info(f" + Switched video codec to H265 to be able to get {self.range[0]} dynamic range.")
|
|
|
|
if self.acodec == Audio.Codec.DTS and not self.prefer_imax:
|
|
self.prefer_imax = True
|
|
self.log.info(" + Switched IMAX prefer. DTS audio can only be get from IMAX prefer.")
|
|
|
|
self.session.headers.update({
|
|
"User-Agent": self.config["bamsdk"]["user_agent"],
|
|
"Accept-Encoding": "gzip",
|
|
"Accept": "application/json",
|
|
"Content-Type": "application/json"
|
|
})
|
|
|
|
ip_info = get_ip_info(self.session)
|
|
country_key = None
|
|
possible_keys = ["countryCode", "country", "country_code", "country-code"]
|
|
for key in possible_keys:
|
|
if key in ip_info:
|
|
country_key = key
|
|
break
|
|
if country_key:
|
|
self.region = str(ip_info[country_key]).upper()
|
|
self.log.info(f" + IP Region: {self.region}")
|
|
else:
|
|
self.log.warning(f" - The region could not be determined from IP information: {ip_info}")
|
|
self.region = "US"
|
|
self.log.info(f" + IP Region: {self.region} (By Default)")
|
|
|
|
self.prod_config = self.session.get(self.config["endpoints"]["config"]).json()
|
|
|
|
self.session.headers.update({
|
|
"X-Application-Version": self.config["bamsdk"]["application_version"],
|
|
"X-BAMSDK-Client-ID": self.config["bamsdk"]["client"],
|
|
"X-BAMSDK-Platform": self.config["device"]["platform"],
|
|
"X-BAMSDK-Version": self.config["bamsdk"]["sdk_version"],
|
|
"X-DSS-Edge-Accept": "vnd.dss.edge+json; version=2",
|
|
"X-Request-Yp-Id": self.config["bamsdk"]["yp_service_id"]
|
|
})
|
|
|
|
def authenticate(self, cookies: Optional[CookieJar] = None, credential: Optional[Credential] = None) -> None:
|
|
super().authenticate(cookies, credential)
|
|
self.credentials = credential
|
|
if not credential:
|
|
raise EnvironmentError("Service requires Credentials for Authentication.")
|
|
|
|
self.log.info("Logging into Disney+...")
|
|
self._login()
|
|
|
|
if self.config.get("profile") and "index" in self.config["profile"]:
|
|
try:
|
|
target_profile_index = int(self.config["profile"]["index"])
|
|
except (ValueError, TypeError, KeyError):
|
|
self.log.error(" - Profile index in configuration is invalid.", exc_info=False)
|
|
sys.exit(1)
|
|
|
|
profiles = self.active_session['account']['profiles']
|
|
if not 0 <= target_profile_index < len(profiles):
|
|
self.log.error(f" - Invalid profile index: {target_profile_index}. Please choose between 0 and {len(profiles) - 1}.", exc_info=False)
|
|
sys.exit(1)
|
|
|
|
target_profile = profiles[target_profile_index]
|
|
active_profile_id = self.active_session['account']['activeProfile']['id']
|
|
|
|
if target_profile['id'] != active_profile_id:
|
|
self._perform_switch_profile(target_profile, self.session.headers)
|
|
|
|
self.log.info(" + Refreshing session data after profile switch...")
|
|
full_account_info = self._get_account_info()
|
|
self.active_session = full_account_info["activeSession"]
|
|
self.active_session['account'] = full_account_info['account']
|
|
self.log.info("Session data updated successfully.")
|
|
|
|
self.log.debug(self.active_session)
|
|
|
|
if not self.active_session['isSubscriber']:
|
|
self.log.error(" - Cannot continue, account is not subscribed to Disney+", exc_info=False)
|
|
sys.exit(1)
|
|
if not self.active_session['inSupportedLocation']:
|
|
self.log.error(" - Cannot continue, Not available in your Region.", exc_info=False)
|
|
sys.exit(1)
|
|
|
|
self.log.info(f" + Account ID: {self.active_session['account']['id']}")
|
|
self.log.info(f" + Profile ID: {self.active_session['account']['activeProfile']['id']}")
|
|
self.log.info(f" + Subscribed: {self.active_session['isSubscriber']}")
|
|
self.log.debug(f" + Account Region: {self.active_session['homeLocation']['countryCode']}")
|
|
self.log.debug(f" + Detected Location: {self.active_session['location']['countryCode']}")
|
|
self.log.debug(f" + Supported Location: {self.active_session['inSupportedLocation']}")
|
|
|
|
active_profile_id = self.active_session['account']['activeProfile']['id']
|
|
full_profile_object = next(
|
|
p for p in self.active_session['account']['profiles'] if p['id'] == active_profile_id
|
|
)
|
|
|
|
current_imax_setting = full_profile_object["attributes"]["playbackSettings"]["preferImaxEnhancedVersion"]
|
|
self.log.info(f" + IMAX Enhanced: {current_imax_setting}")
|
|
if current_imax_setting is not self.prefer_imax:
|
|
update_tokens = self._set_imax_preference(self.prefer_imax)
|
|
self._apply_new_tokens(update_tokens["token"])
|
|
|
|
current_133_setting = full_profile_object["attributes"]["playbackSettings"]["prefer133"] # Original Aspect Ratio
|
|
self.log.info(f" + Remastered Aspect Ratio: {not current_133_setting}")
|
|
if not current_133_setting is not self.prefer_remastered_ar:
|
|
update_tokens = self._set_remastered_ar_preference(self.prefer_remastered_ar)
|
|
self._apply_new_tokens(update_tokens["token"])
|
|
|
|
def _login(self) -> None:
|
|
cache = self.cache.get(f"tokens_{self.region}_{self.credentials.sha1}")
|
|
|
|
if cache:
|
|
try:
|
|
self.log.info(" + Using cached tokens...")
|
|
self.account_tokens = cache.data
|
|
|
|
bearer = self.account_tokens["accessToken"]
|
|
if not bearer:
|
|
raise ValueError("accessToken not found in cache")
|
|
self.session.headers.update({'Authorization': f'Bearer {bearer}'})
|
|
|
|
except (KeyError, ValueError, TypeError) as e:
|
|
self.log.warning(f" - Cached token data is invalid or corrupted ({e}). Getting new tokens...")
|
|
self._perform_full_login()
|
|
|
|
try:
|
|
self._refresh()
|
|
except Exception as e:
|
|
self.log.warning(f" - Failed to refresh token from cache ({e}). Getting new tokens...")
|
|
self._perform_full_login()
|
|
|
|
# No problem if don't use it
|
|
# self._update_device()
|
|
|
|
else:
|
|
self.log.info(" + Getting new tokens...")
|
|
self._perform_full_login()
|
|
|
|
self.log.info(" + Fetching session data...")
|
|
full_account_info = self._get_account_info()
|
|
self.active_session = full_account_info["activeSession"]
|
|
self.active_session['account'] = full_account_info['account']
|
|
self.log.info("Session data setup successfully.")
|
|
|
|
def _perform_full_login(self) -> None:
|
|
android_id = secrets.token_bytes(8).hex()
|
|
drm_id = f"{base64.urlsafe_b64encode(secrets.token_bytes(32)).decode('utf-8')}\n"
|
|
device_token = self._register_device(android_id, drm_id)
|
|
|
|
email_status = self._check_email(self.credentials.username, device_token)
|
|
|
|
if email_status.lower() != "login":
|
|
if email_status.lower() == "otp":
|
|
self.log.warning(" - Account requires OTP code login.")
|
|
self._request_otp(self.credentials.username, device_token)
|
|
|
|
otp_code = None
|
|
try:
|
|
otp_code = input("Enter a OTP code (Check email): ")
|
|
if not otp_code:
|
|
self.log.error(" - OTP code is required, but no value was entered.", exc_info=False)
|
|
sys.exit(1)
|
|
if not otp_code.isdigit():
|
|
self.log.error(" - Invalid OTP code. Please enter only numbers.", exc_info=False)
|
|
sys.exit(1)
|
|
if len(otp_code) < 6:
|
|
self.log.error(" - OTP code is too short. Please enter at least 6 digits.", exc_info=False)
|
|
sys.exit(1)
|
|
if len(otp_code) > 6:
|
|
self.log.warning(" - OTP code is longer than 6 digits. Using the first 6 digits.")
|
|
otp_code = otp_code[:6]
|
|
except KeyboardInterrupt:
|
|
self.log.error("\n - OTP code input cancelled by user.", exc_info=False)
|
|
sys.exit(1)
|
|
|
|
auth_action = self._auth_action_with_otp(self.credentials.username, otp_code, device_token)
|
|
login_tokens = self._login_with_auth_action(auth_action, device_token)
|
|
|
|
elif email_status.lower() == "register":
|
|
self.log.error(" - Account is not registered. Please register first.", exc_info=False)
|
|
sys.exit(1)
|
|
else:
|
|
self.log.error(f" - Email status is '{email_status}'. Account status verification required.", exc_info=False)
|
|
sys.exit(1)
|
|
|
|
else:
|
|
login_tokens = self._login_with_password(self.credentials.username, self.credentials.password, device_token)
|
|
|
|
temp_auth_header = {"Authorization": f'Bearer {login_tokens["token"]["accessToken"]}'}
|
|
account_info = self._get_account_info(temp_auth_header)
|
|
profiles = account_info["account"]["profiles"]
|
|
|
|
selected_profile = None
|
|
if self.config.get("profile") and "index" in self.config["profile"]:
|
|
try:
|
|
profile_index = int(self.config["profile"]["index"])
|
|
if not 0 <= profile_index < len(profiles):
|
|
raise ValueError(f"Index out of range (0-{len(profiles)-1})")
|
|
|
|
selected_profile = profiles[profile_index]
|
|
except (ValueError, TypeError):
|
|
self.log.error(" - Profile index in configuration is invalid.", exc_info=False)
|
|
sys.exit(1)
|
|
else:
|
|
selected_profile = next(
|
|
(p for p in profiles if not p["attributes"]["kidsModeEnabled"] and not p["attributes"]["parentalControls"]["isPinProtected"]),
|
|
None
|
|
)
|
|
if not selected_profile:
|
|
self.log.error(" - Auto-selection failed: No suitable profile found (non-kids, no PIN). Please configure a specific profile.", exc_info=False)
|
|
sys.exit(1)
|
|
|
|
if selected_profile:
|
|
self._perform_switch_profile(selected_profile, temp_auth_header)
|
|
|
|
def _perform_switch_profile(self, target_profile: dict, auth_headers: dict) -> None:
|
|
self.log.info(f" + Switching to profile: {target_profile['name']}({target_profile['id']})")
|
|
|
|
if target_profile['attributes']['kidsModeEnabled']:
|
|
self.log.error(" - Kids Profile and cannot be used.", exc_info=False)
|
|
sys.exit(1)
|
|
|
|
profile_pin = None
|
|
if target_profile['attributes']['parentalControls']['isPinProtected']:
|
|
self.log.warning(" - This profile is PIN protected.")
|
|
try:
|
|
profile_pin = input("Enter a profile pin: ")
|
|
if not profile_pin:
|
|
self.log.error(" - PIN is required, but no value was entered.", exc_info=False)
|
|
sys.exit(1)
|
|
if not profile_pin.isdigit():
|
|
self.log.error(" - Invalid PIN. Please enter only numbers.", exc_info=False)
|
|
sys.exit(1)
|
|
if len(profile_pin) < 4:
|
|
self.log.error(" - PIN is too short. Please enter at least 4 digits.", exc_info=False)
|
|
sys.exit(1)
|
|
if len(profile_pin) > 4:
|
|
self.log.warning(" - PIN is longer than 4 digits. Using the first 4 digits.")
|
|
profile_pin = profile_pin[:4]
|
|
except KeyboardInterrupt:
|
|
self.log.error("\n - PIN input cancelled by user.", exc_info=False)
|
|
sys.exit(1)
|
|
|
|
switch_profile_data = self._switch_profile(target_profile['id'], auth_headers, profile_pin)
|
|
self._apply_new_tokens(switch_profile_data["token"])
|
|
|
|
def _refresh(self) -> str:
|
|
cache = self.cache.get(f"tokens_{self.region}_{self.credentials.sha1}")
|
|
if not cache.expired:
|
|
self.log.debug(f" + Token is valid until: {datetime.fromtimestamp(cache.expiration.timestamp()).strftime('%Y-%m-%d %H:%M:%S')}")
|
|
return self.session.headers.get('Authorization', 'Bearer ').split(' ')[1]
|
|
|
|
self.log.warning(" + Token expired. Refreshing...")
|
|
try:
|
|
refreshed_data = self._refresh_token(self.account_tokens["refreshToken"])
|
|
self._apply_new_tokens(refreshed_data["token"])
|
|
except Exception as _:
|
|
raise Exception("Refresh Token Expired")
|
|
|
|
def _apply_new_tokens(self, token_data: dict) -> str:
|
|
self.account_tokens = token_data
|
|
|
|
bearer = self.account_tokens["accessToken"]
|
|
if not bearer:
|
|
self.log.error("Invalid token data: accessToken not found.", exc_info=False)
|
|
sys.exit(1)
|
|
self.session.headers.update({'Authorization': f'Bearer {bearer}'})
|
|
|
|
expires_in = self.account_tokens["expiresIn"] or 3600
|
|
cache = self.cache.get(f"tokens_{self.region}_{self.credentials.sha1}")
|
|
cache.set(self.account_tokens, expires_in - 60)
|
|
self.log.debug(f" + New Token is valid until: {datetime.fromtimestamp(cache.expiration.timestamp()).strftime('%Y-%m-%d %H:%M:%S')}")
|
|
|
|
return bearer
|
|
|
|
def search(self) -> Generator[SearchResult, None, None]:
|
|
params = {"query": self.title}
|
|
endpoint = self._href(self.prod_config["services"]["explore"]["client"]["endpoints"]["search"]["href"])
|
|
data = self._request("GET", endpoint, params=params)["data"]["page"]
|
|
if not data.get("containers"):
|
|
return
|
|
|
|
results = data["containers"][0]["items"]
|
|
for result in results:
|
|
entity = "entity-" + result["id"]
|
|
yield SearchResult(
|
|
id_=entity,
|
|
title=result["visuals"]["title"],
|
|
description=result["visuals"]["description"]["brief"],
|
|
label=result["visuals"]["metastringParts"]["releaseYearRange"]["startYear"],
|
|
url=f"https://www.disneyplus.com/browse/{entity}",
|
|
)
|
|
|
|
def get_titles(self) -> Titles_T:
|
|
if not self.extras:
|
|
try:
|
|
content_info = self._get_deeplink(self.title_id)
|
|
content_type = content_info["data"]["deeplink"]["actions"][0]["contentType"]
|
|
except Exception as e:
|
|
try:
|
|
actions_info = self._get_deeplink_last(self.title_id)
|
|
if actions_info["data"]["deeplink"]["actions"][0]["type"] == "browse":
|
|
info_block = base64.b64decode(actions_info["data"]["deeplink"]["actions"][0]["infoBlock"])
|
|
if b"movie" in info_block:
|
|
content_type = "movie"
|
|
elif b"series" in info_block:
|
|
content_type = "series"
|
|
else:
|
|
content_type = "other"
|
|
self.log.warning(" - The content is not standard. however, it tries to look up the data.")
|
|
except Exception as e:
|
|
self.log.error(f" - Failed to determine content type via deeplink ({e}).", exc_info=False)
|
|
sys.exit(1)
|
|
else:
|
|
content_type = "extras"
|
|
self.log.debug(f" + Content Type: {content_type.upper()}")
|
|
|
|
page = self._get_page(self.title_id)
|
|
|
|
year = None
|
|
if year_data := page["visuals"]["metastringParts"].get("releaseYearRange"):
|
|
year = year_data.get("startYear")
|
|
|
|
if content_type != "extras":
|
|
playback_action = next(
|
|
(x for x in page["actions"] if x["type"] == "playback"),
|
|
None
|
|
)
|
|
if not playback_action:
|
|
self.log.error(f" - No content is available. (Playback action not found)", exc_info=False)
|
|
sys.exit(1)
|
|
lang_data = self._get_original_lang(playback_action["availId"])
|
|
player_exp = lang_data["data"]["playerExperience"]
|
|
orig_lang = player_exp.get("originalLanguage") or player_exp.get("targetLanguage") or "en"
|
|
self.log.debug(f' + Original Language: {orig_lang}')
|
|
|
|
if content_type in ("movie", "other"):
|
|
return Movies([
|
|
Movie(
|
|
id_=page["id"],
|
|
service=self.__class__,
|
|
name=page["visuals"]["title"],
|
|
year=year,
|
|
language=Language.get(orig_lang),
|
|
data=page
|
|
)
|
|
])
|
|
|
|
elif content_type == "series":
|
|
return Series(self._get_series(page, year, orig_lang))
|
|
|
|
elif content_type == "extras":
|
|
return Series(self._get_extras(page, year))
|
|
|
|
else:
|
|
self.log.error(f" - Unsupported content type: {content_type}", exc_info=False)
|
|
sys.exit(1)
|
|
|
|
def _get_series(self, page: dict, year: int, orig_lang: str) -> Series:
|
|
container = next(x for x in page["containers"] if x["type"] == "episodes")
|
|
season_ids = [s["id"] for s in container["seasons"]]
|
|
|
|
episodes : List[Episode] = []
|
|
for season_id in season_ids:
|
|
episodes_data = self._get_episodes_data(season_id)
|
|
|
|
for ep in episodes_data:
|
|
if ep["type"] != "view":
|
|
continue
|
|
|
|
episodes.append(
|
|
Episode(
|
|
id_=ep["id"],
|
|
service=self.__class__,
|
|
title=page["visuals"]["title"],
|
|
season=int(ep["visuals"]["seasonNumber"]),
|
|
number=int(ep["visuals"]["episodeNumber"]),
|
|
name=ep["visuals"]["episodeTitle"],
|
|
year=year,
|
|
language=Language.get(orig_lang),
|
|
data=ep
|
|
)
|
|
)
|
|
|
|
return episodes
|
|
|
|
def _get_extras(self, page: dict, year: int) -> Series:
|
|
extras_containers = [
|
|
x for x in page["containers"]
|
|
if x["type"] == "set" and x["style"]["name"] == "standard_compact_list"
|
|
]
|
|
|
|
if not extras_containers:
|
|
self.log.error(" - No extras found.", exc_info=False)
|
|
sys.exit(1)
|
|
|
|
extras_episodes : List[Episode] = []
|
|
ep_count = 1
|
|
|
|
first_item = extras_containers[0]["items"][0]
|
|
first_action = next((x for x in first_item["actions"] if x["type"] in ("playback", "trailer")), None)
|
|
if first_action:
|
|
lang_data = self._get_original_lang(first_action["availId"])
|
|
player_exp = lang_data["data"]["playerExperience"]
|
|
orig_lang = player_exp.get("originalLanguage") or player_exp.get("targetLanguage") or "en"
|
|
self.log.debug(f' + Original Language: {orig_lang}')
|
|
|
|
for container in extras_containers:
|
|
items = container["items"]
|
|
for item in items:
|
|
if item["type"] == "view":
|
|
action = next((x for x in item["actions"] if x["type"] in ("playback", "trailer")), None)
|
|
|
|
if action:
|
|
extras_episodes.append(
|
|
Episode(
|
|
id_=item["id"],
|
|
service=self.__class__,
|
|
title=page["visuals"]["title"],
|
|
season=0, # Special
|
|
number=ep_count,
|
|
name=item["visuals"]["title"],
|
|
year=year,
|
|
language=Language.get(orig_lang),
|
|
data=item
|
|
)
|
|
)
|
|
ep_count += 1
|
|
|
|
if not extras_episodes:
|
|
self.log.error(" - No playable extras found.", exc_info=False)
|
|
sys.exit(1)
|
|
|
|
return extras_episodes
|
|
|
|
def get_tracks(self, title: Title_T) -> Tracks:
|
|
playback = next(x for x in title.data["actions"] if x.get("type") == "playback")
|
|
media_id = playback["resourceId"] or None
|
|
if not media_id:
|
|
self.log.error(" - Failed to get media ID for playback info", exc_info=False)
|
|
sys.exit(1)
|
|
|
|
scenario = "ctr-regular" if self.is_l3 else "ctr-high" # cbcs-high
|
|
|
|
self.log.debug(f"Playback Scenario: {scenario}")
|
|
self.log.debug(f"Media ID: {media_id}")
|
|
|
|
self._refresh() # Safe Access
|
|
|
|
if Video.Range.HYBRID in self.range[0] and not self.is_l3:
|
|
self.log.warning("DV+HDR Multi-range requested.")
|
|
|
|
self.log.info(" + Fetching Dolby Vision tracks...")
|
|
tracks = self._fetch_manifest_tracks(title, media_id, scenario, ["DOLBY_VISION"])
|
|
|
|
self.log.info(" + Fetching HDR10 tracks...")
|
|
hdr_tracks_temp = self._fetch_manifest_tracks(title, media_id, scenario, ["HDR10"]) # HDR10PLUS
|
|
|
|
tracks.add(hdr_tracks_temp, warn_only=True)
|
|
else:
|
|
video_ranges = []
|
|
if not self.is_l3:
|
|
if Video.Range.DV in self.range[0]:
|
|
video_ranges = ["DOLBY_VISION"]
|
|
elif Video.Range.HDR10 in self.range[0] or Video.Range.HDR10P in self.range[0]:
|
|
video_ranges = ["HDR10"] # HDR10PLUS
|
|
|
|
tracks = self._fetch_manifest_tracks(title, media_id, scenario, video_ranges or None)
|
|
|
|
tracks.add(self._get_thumbnail(title))
|
|
return self._post_process_tracks(tracks)
|
|
|
|
def _fetch_manifest_tracks(self, title: Title_T, media_id: str, scenario: str, video_ranges: List[str] = None) -> Tracks:
|
|
attributes = {
|
|
"codecs": {
|
|
"supportsMultiCodecMaster": False,
|
|
"video": ["h.264"]
|
|
},
|
|
"protocol": "HTTPS",
|
|
"frameRates": [60],
|
|
"assetInsertionStrategies": {
|
|
"point": "SGAI", # Server-Guided Ad Insertion
|
|
"range": "SGAI" # Server-Guided Ad Insertion
|
|
},
|
|
"playbackInitiationContext": "ONLINE",
|
|
"slugDuration": "SLUG_500_MS", # SLUG_1000_MS, SLUG_750_MS ?
|
|
"maxSlideDuration": "4_HOUR" # 15_MIN ?
|
|
}
|
|
|
|
if self.is_l3:
|
|
attributes["resolution"] = {"max": ["1280x720"]}
|
|
else:
|
|
attributes["resolution"] = {"max": ["3840x2160"]}
|
|
|
|
if self.vcodec == Video.Codec.HEVC:
|
|
attributes["codecs"]["video"] = ["h.264", "h.265"]
|
|
|
|
attributes["audioTypes"] = ["ATMOS", "DTS_X"]
|
|
|
|
if video_ranges:
|
|
attributes["videoRanges"] = video_ranges
|
|
|
|
payload = {
|
|
"playbackId": media_id,
|
|
"playback": {
|
|
"attributes": attributes
|
|
}
|
|
}
|
|
self.playback_data[title.id] = self._get_playback(scenario, payload)
|
|
manifest_url = self.playback_data[title.id]["sources"][0]['complete']['url']
|
|
self.log.debug(f" + Manifest URL: {manifest_url}")
|
|
return HLS.from_url(url=manifest_url, session=self.session).to_tracks(title.language)
|
|
|
|
def _get_thumbnail(self, title: Title_T) -> Attachment:
|
|
if type(title) == Movie:
|
|
thumbnail_id = title.data["visuals"]["artwork"]["standard"]["background"]["1.78"]["imageId"]
|
|
elif type(title) == Episode:
|
|
thumbnail_id = title.data["visuals"]["artwork"]["standard"]["thumbnail"]["1.78"]["imageId"]
|
|
thumbnail_url = self._href(
|
|
self.prod_config["services"]["ripcut"]["client"]["endpoints"]["mainCompose"]["href"],
|
|
version="v2",
|
|
partnerId="disney",
|
|
imageId=thumbnail_id
|
|
)
|
|
return Attachment.from_url(url=thumbnail_url, name=thumbnail_id, mime_type="image/png")
|
|
|
|
def _post_process_tracks(self, tracks: Tracks) -> Tracks:
|
|
for track in tracks:
|
|
if isinstance(track, (Audio, Subtitle)):
|
|
track.name = "[Original]" if track.is_original_lang else None
|
|
|
|
for audio in tracks.audio:
|
|
bitrate_match = re.search(r"(?<=composite_)\d+|\d+(?=_(?:hdri|complete))|(?<=-)\d+(?=K/)", as_list(audio.url)[0])
|
|
if bitrate_match:
|
|
audio.bitrate = int(bitrate_match.group()) * 1000
|
|
if audio.bitrate == 1_000_000:
|
|
audio.bitrate = 768_000 # DSNP lies about the Atmos bitrate
|
|
if audio.channels == 6.0:
|
|
audio.channels = 5.1
|
|
if audio.channels == 10.0: # DTS-UHD
|
|
audio.channels = "5.1.4" # Unshackle does not recommend
|
|
audio.codec = Audio.Codec.DTS
|
|
audio.drm = None
|
|
|
|
for subtitle in tracks.subtitles:
|
|
subtitle.codec = Subtitle.Codec.WebVTT
|
|
|
|
return tracks
|
|
|
|
def get_chapters(self, title: Title_T) -> Chapters:
|
|
try:
|
|
editorial = self.playback_data[title.id]["editorial"]
|
|
if not editorial:
|
|
return Chapters()
|
|
|
|
LABEL_MAP = {
|
|
"intro_start": "intro_start",
|
|
"intro_end": "intro_end",
|
|
"recap_start": "recap_start",
|
|
"recap_end": "recap_end",
|
|
"FFER": "recap_start", # First Frame Episode Recap
|
|
"LFER": "recap_end", # Last Frame Episode Recap
|
|
"FFEI": "intro_start", # First Frame Episode Intro
|
|
"LFEI": "intro_end", # Last Frame Episode Intro
|
|
"FFEC": "credits_start", # First Frame End Credits
|
|
"LFEC": "lfec_marker", # Last Frame End Credits
|
|
"up_next": None,
|
|
"tag_start": None,
|
|
"tag_end": None,
|
|
}
|
|
|
|
NAME_MAP = {
|
|
"recap_start": "Recap",
|
|
"recap_end": "Scene",
|
|
"intro_start": "Intro",
|
|
"intro_end": "Scene",
|
|
"credits_start": "Credits",
|
|
}
|
|
|
|
grouped = {}
|
|
for marker in editorial:
|
|
group = LABEL_MAP.get(marker["label"])
|
|
offset = marker["offsetMillis"]
|
|
if group and offset is not None:
|
|
grouped.setdefault(group, []).append(offset)
|
|
|
|
raw_chapters = []
|
|
total_runtime = title.data["visuals"]["metastringParts"]["runtime"]["runtimeMs"]
|
|
|
|
for group, times in grouped.items():
|
|
if not times: continue
|
|
|
|
timestamp = min(times) if "start" in group else max(times) if "end" in group else times[0]
|
|
name = NAME_MAP.get(group)
|
|
|
|
if group == "lfec_marker" and (total_runtime - timestamp) > 5000:
|
|
name = "Scene"
|
|
|
|
if name:
|
|
raw_chapters.append((timestamp, name))
|
|
|
|
raw_chapters.sort(key=lambda x: x[0])
|
|
unique_chapters = []
|
|
seen_ms = set()
|
|
|
|
for ms, name in raw_chapters:
|
|
if ms not in seen_ms:
|
|
unique_chapters.append({"ms": ms, "name": name})
|
|
seen_ms.add(ms)
|
|
|
|
if not unique_chapters:
|
|
unique_chapters.append({"ms": 0, "name": "Scene"})
|
|
else:
|
|
first = unique_chapters[0]
|
|
if first["ms"] > 0:
|
|
if first["ms"] < 5000 and first["name"] in ("Intro", "Recap"):
|
|
first["ms"] = 0
|
|
else:
|
|
unique_chapters.insert(0, {"ms": 0, "name": "Scene"})
|
|
|
|
chapters: List[Chapter] = []
|
|
for i, chap_data in enumerate(unique_chapters):
|
|
time_sec = chap_data["ms"] / 1000.000
|
|
chapter_title = chap_data["name"]
|
|
chapters.append(
|
|
Chapter(
|
|
timestamp=float(time_sec),
|
|
name=chapter_title if chapter_title != "Scene" else None
|
|
)
|
|
)
|
|
|
|
return chapters
|
|
|
|
except Exception as e:
|
|
self.log.warning(f"Failed to extract chapters: {e}")
|
|
return Chapters()
|
|
|
|
def get_widevine_service_certificate(self, *, challenge: bytes, title: Title_T, track: AnyTrack) -> Union[bytes, str]:
|
|
# endpoint = self.prod_config["services"]["drm"]["client"]["endpoints"]["widevineCertificate"]["href"]
|
|
# res = self.session.get(endpoint, data=challenge)
|
|
return self.config["certificate"]
|
|
|
|
def get_widevine_license(self, *, challenge: bytes, title: Title_T, track: AnyTrack) -> Optional[Union[bytes, str]]:
|
|
self._refresh() # Safe Access
|
|
endpoint = self.prod_config["services"]["drm"]["client"]["endpoints"]["widevineLicense"]["href"]
|
|
headers = {"Content-Type": "application/octet-stream"}
|
|
|
|
try:
|
|
res = self.session.post(endpoint, headers=headers, data=challenge)
|
|
res.raise_for_status()
|
|
except Exception as e:
|
|
self.log.error(f"License request failed: {e}", exc_info=False)
|
|
sys.exit(1)
|
|
return res.content
|
|
|
|
def get_playready_license(self, *, challenge: bytes, title: Title_T, track: AnyTrack) -> Optional[bytes]:
|
|
self._refresh() # Safe Access
|
|
endpoint = self.prod_config["services"]["drm"]["client"]["endpoints"]["playReadyLicense"]["href"]
|
|
headers = {
|
|
"Accept": "application/xml, application/vnd.media-service+json; version=2",
|
|
"Content-Type": "text/xml; charset=utf-8",
|
|
"SOAPAction": "http://schemas.microsoft.com/DRM/2007/03/protocols/AcquireLicense"
|
|
}
|
|
try:
|
|
res = self.session.post(endpoint, headers=headers, data=challenge)
|
|
res.raise_for_status()
|
|
except Exception as e:
|
|
self.log.error(f"License request failed: {e}", exc_info=False)
|
|
sys.exit(1)
|
|
return res.content
|
|
|
|
def _get_deeplink(self, ref_id: str) -> dict:
|
|
endpoint = self._href(
|
|
self.prod_config["services"]["content"]["client"]["endpoints"]["getDeeplink"]["href"],
|
|
refIdType="deeplinkId",
|
|
refId=ref_id
|
|
)
|
|
data = self._request("GET", endpoint)
|
|
return data
|
|
|
|
def _get_deeplink_last(self, ref_id: str) -> dict:
|
|
endpoint = self._href(self.prod_config["services"]["explore"]["client"]["endpoints"]["getDeeplink"]["href"])
|
|
params = {
|
|
"refIdType" : "deeplinkId",
|
|
"refId" : ref_id
|
|
}
|
|
data = self._request("GET", endpoint, params=params)
|
|
return data
|
|
|
|
def _get_page(self, title_id: str) -> dict:
|
|
endpoint = self._href(
|
|
self.prod_config["services"]["explore"]["client"]["endpoints"]["getPage"]["href"],
|
|
pageId=title_id
|
|
)
|
|
data = self._request("GET", endpoint, params={"disableSmartFocus": "true", "limit": 999})
|
|
return data["data"]["page"]
|
|
|
|
def _get_original_lang(self, availId: str) -> dict:
|
|
endpoint = self._href(
|
|
self.prod_config["services"]["explore"]["client"]["endpoints"]["getPlayerExperience"]["href"],
|
|
availId=availId
|
|
)
|
|
data = self._request("GET", endpoint)
|
|
return data
|
|
|
|
def _get_episodes_data(self, season_id: str) -> List[dict]:
|
|
endpoint = self._href(
|
|
self.prod_config["services"]["explore"]["client"]["endpoints"]["getSeason"]["href"],
|
|
seasonId=season_id
|
|
)
|
|
data = self._request("GET", endpoint, params={'limit': 999})["data"]["season"]["items"]
|
|
return data
|
|
|
|
def _get_playback(self, scenario: str, payload: dict) -> dict:
|
|
endpoint = self._href(
|
|
self.prod_config["services"]["media"]["client"]["endpoints"]["mediaPayload"]["href"],
|
|
scenario=scenario
|
|
)
|
|
headers = {
|
|
"Accept": "application/vnd.media-service+json",
|
|
"X-DSS-Feature-Filtering": "true"
|
|
}
|
|
data = self._request("POST", endpoint, headers=headers, payload=payload)
|
|
return data["stream"]
|
|
|
|
def _register_device(self, android_id: str, drm_id: str) -> str:
|
|
endpoint = self.prod_config["services"]["orchestration"]["client"]["endpoints"]["registerDevice"]["href"]
|
|
headers = {
|
|
"Authorization": self.config["bamsdk"]["api_key"],
|
|
"X-BAMSDK-Platform-Id": self.config["device"]["platform_id"]
|
|
}
|
|
payload = {
|
|
"variables": {
|
|
"registerDevice": {
|
|
"applicationRuntime": self.config["device"]["applicationRuntime"],
|
|
"attributes": {
|
|
"osDeviceIds": [
|
|
{
|
|
"identifier": android_id,
|
|
"type": "android.vendor.id"
|
|
},
|
|
{
|
|
"identifier": drm_id,
|
|
"type": "android.drm.id"
|
|
}
|
|
],
|
|
"operatingSystem": self.config["device"]["operatingSystem"],
|
|
"operatingSystemVersion": self.config["device"]["operatingSystemVersion"]
|
|
},
|
|
"deviceFamily": self.config["device"]["family"],
|
|
"deviceLanguage": self.config["device"]["deviceLanguage"],
|
|
"deviceProfile": self.config["device"]["profile"],
|
|
"devicePlatformId": self.config["device"]["platform_id"],
|
|
}
|
|
},
|
|
"query": queries.REGISTER_DEVICE
|
|
}
|
|
data = self._request("POST", endpoint, payload=payload, headers=headers)
|
|
return data["extensions"]["sdk"]["token"]["accessToken"]
|
|
|
|
def _check_email(self, email: str, token: str) -> str:
|
|
endpoint = self.prod_config["services"]["orchestration"]["client"]["endpoints"]["query"]["href"]
|
|
headers = {
|
|
"Authorization": token,
|
|
"X-BAMSDK-Platform-Id": self.config["device"]["platform_id"]
|
|
}
|
|
payload = {
|
|
"operationName": "check",
|
|
"variables": {
|
|
"email": email
|
|
},
|
|
"query": queries.CHECK_EMAIL
|
|
}
|
|
data = self._request("POST", endpoint, payload=payload, headers=headers)
|
|
return data["data"]["check"]["operations"][0]
|
|
|
|
def _login_with_password(self, email: str, password: str, token: str) -> str:
|
|
endpoint = self.prod_config["services"]["orchestration"]["client"]["endpoints"]["query"]["href"]
|
|
headers = {
|
|
"Authorization": token,
|
|
"X-BAMSDK-Platform-Id": self.config["device"]["platform_id"]
|
|
}
|
|
payload = {
|
|
"operationName": "login",
|
|
"variables": {
|
|
"input": {
|
|
"email": email,
|
|
"password": password
|
|
},
|
|
"includeIdentity": True,
|
|
"includeAccountConsentToken": True
|
|
},
|
|
"query": queries.LOGIN
|
|
}
|
|
data = self._request("POST", endpoint, payload=payload, headers=headers)
|
|
return data["extensions"]["sdk"]
|
|
|
|
def _request_otp(self, email: str, token: str) -> dict:
|
|
endpoint = self.prod_config["services"]["orchestration"]["client"]["endpoints"]["query"]["href"]
|
|
headers = {
|
|
"Authorization": token,
|
|
"X-BAMSDK-Platform-Id": self.config["device"]["platform_id"]
|
|
}
|
|
payload = {
|
|
"operationName": "requestOtp",
|
|
"variables": {
|
|
"input": {
|
|
"email": email,
|
|
"reason": "Login"
|
|
}
|
|
},
|
|
"query": queries.REQUESET_OTP
|
|
}
|
|
data = self._request("POST", endpoint, payload=payload, headers=headers)
|
|
if not data["data"]["requestOtp"]["accepted"]:
|
|
self.log.error(" - OTP code request failed.", exc_info=False)
|
|
sys.exit(1)
|
|
|
|
def _auth_action_with_otp(self, email: str, otp: str, token: str) -> dict:
|
|
endpoint = self.prod_config["services"]["orchestration"]["client"]["endpoints"]["query"]["href"]
|
|
headers = {
|
|
"Authorization": token,
|
|
"X-BAMSDK-Platform-Id": self.config["device"]["platform_id"]
|
|
}
|
|
payload = {
|
|
"operationName": "authenticateWithOtp",
|
|
"variables": {
|
|
"input": {
|
|
"email": email,
|
|
"passcode": otp
|
|
}
|
|
},
|
|
"query": queries.LOGIN_OTP
|
|
}
|
|
data = self._request("POST", endpoint, payload=payload, headers=headers)
|
|
return data["data"]["authenticateWithOtp"]["actionGrant"]
|
|
|
|
def _login_with_auth_action(self, auth_action: str, token: str) -> dict:
|
|
endpoint = self.prod_config["services"]["orchestration"]["client"]["endpoints"]["query"]["href"]
|
|
headers = {
|
|
"Authorization": token,
|
|
"X-BAMSDK-Platform-Id": self.config["device"]["platform_id"]
|
|
}
|
|
payload = {
|
|
"operationName": "loginWithActionGrant",
|
|
"variables": {
|
|
"input": {
|
|
"actionGrant": auth_action
|
|
},
|
|
"includeAccountConsentToken": True
|
|
},
|
|
"query": queries.LOGIN_ACTION_GRANT
|
|
}
|
|
data = self._request("POST", endpoint, payload=payload, headers=headers)
|
|
return data["extensions"]["sdk"]
|
|
|
|
def _get_account_info(self, headers: dict = {}) -> dict:
|
|
endpoint = self.prod_config["services"]["orchestration"]["client"]["endpoints"]["query"]["href"]
|
|
headers.update({"X-BAMSDK-Platform-Id": self.config["device"]["platform_id"]})
|
|
payload = {
|
|
"operationName": "me",
|
|
"variables": {
|
|
"includeAccountConsentToken": True
|
|
},
|
|
"query": queries.ME
|
|
}
|
|
data = self._request("POST", endpoint, payload=payload, headers=headers)
|
|
return data["data"]["me"]
|
|
|
|
def _switch_profile(self, profile_id: str, headers: dict, pin: str = None):
|
|
profile_input = {"profileId": profile_id}
|
|
if pin: profile_input["entryPin"] = pin
|
|
|
|
endpoint = self.prod_config["services"]["orchestration"]["client"]["endpoints"]["query"]["href"]
|
|
headers.update({"X-BAMSDK-Platform-Id": self.config["device"]["platform_id"]})
|
|
payload = {
|
|
"operationName": "switchProfile",
|
|
"variables": {
|
|
"input": profile_input,
|
|
"includeIdentity": True,
|
|
"includeAccountConsentToken": True
|
|
},
|
|
"query": queries.SWITCH_PROFILE
|
|
}
|
|
data = self._request("POST", endpoint, payload=payload, headers=headers)
|
|
return data["extensions"]["sdk"]
|
|
|
|
def _refresh_token(self, refresh_token: str) -> dict:
|
|
endpoint = self.prod_config["services"]["orchestration"]["client"]["endpoints"]["refreshToken"]["href"]
|
|
headers = {
|
|
"Authorization": self.config["bamsdk"]["api_key"],
|
|
"X-BAMSDK-Platform-Id": self.config["device"]["platform_id"]
|
|
}
|
|
payload = {
|
|
"operationName": "refreshToken",
|
|
"variables": {
|
|
"refreshToken": {
|
|
"refreshToken": refresh_token
|
|
}
|
|
},
|
|
"query": queries.REFRESH_TOKEN
|
|
}
|
|
data = self._request("POST", endpoint, payload=payload, headers=headers)
|
|
return data["extensions"]["sdk"]
|
|
|
|
def _update_device(self, android_id: str, drm_id: str) -> str:
|
|
endpoint = self.prod_config["services"]["orchestration"]["client"]["endpoints"]["query"]["href"]
|
|
headers = {"X-BAMSDK-Platform-Id": self.config["device"]["platform_id"]}
|
|
payload = {
|
|
"operationName": "updateDeviceOperatingSystem",
|
|
"variables": {
|
|
"updateDeviceOperatingSystem": {
|
|
"operatingSystem": self.config["device"]["operatingSystem"],
|
|
"operatingSystemVersion": self.config["device"]["operatingSystemVersion"],
|
|
"osDeviceIds": [
|
|
{
|
|
"identifier": android_id,
|
|
"type": "android.vendor.id"
|
|
},
|
|
{
|
|
"identifier": drm_id,
|
|
"type": "android.drm.id"
|
|
}
|
|
]
|
|
}
|
|
},
|
|
"query": queries.UPDATE_DEVICE
|
|
}
|
|
data = self._request("POST", endpoint, payload=payload, headers=headers)
|
|
|
|
if data["data"]["updateDeviceOperatingSystem"]["accepted"]:
|
|
return data["extensions"]["sdk"]
|
|
else:
|
|
self.log.warning(" - Failed to update Device Operating System.")
|
|
|
|
def _set_imax_preference(self, enabled: bool) -> str:
|
|
endpoint = self.prod_config["services"]["orchestration"]["client"]["endpoints"]["query"]["href"]
|
|
headers = {"X-BAMSDK-Platform-Id": self.config["device"]["platform_id"]}
|
|
payload = {
|
|
"operationName": "updateProfileImaxEnhancedVersion",
|
|
"variables": {
|
|
"input": {
|
|
"imaxEnhancedVersion": enabled,
|
|
},
|
|
"includeProfile": True
|
|
},
|
|
"query": queries.SET_IMAX,
|
|
}
|
|
data = self._request("POST", endpoint, payload=payload, headers=headers)
|
|
|
|
if data["data"]["updateProfileImaxEnhancedVersion"]["accepted"]:
|
|
self.log.info(f" + Updated IMAX Enhanced preference: {enabled}")
|
|
return data["extensions"]["sdk"]
|
|
else:
|
|
self.log.warning(" - Failed to set IMAX preference.")
|
|
|
|
def _set_remastered_ar_preference(self, enabled: bool) -> str:
|
|
endpoint = self.prod_config["services"]["orchestration"]["client"]["endpoints"]["query"]["href"]
|
|
headers = {"X-BAMSDK-Platform-Id": self.config["device"]["platform_id"]}
|
|
payload = {
|
|
"operationName": "updateProfileRemasteredAspectRatio",
|
|
"variables": {
|
|
"input": {
|
|
"remasteredAspectRatio": enabled,
|
|
},
|
|
"includeProfile": True
|
|
},
|
|
"query": queries.SET_REMASTERED_AR,
|
|
}
|
|
data = self._request("POST", endpoint, payload=payload, headers=headers)
|
|
|
|
if data["data"]["updateProfileRemasteredAspectRatio"]["accepted"]:
|
|
self.log.info(f" + Updated Remastered Aspect Ratio preference: {enabled}")
|
|
return data["extensions"]["sdk"]
|
|
else:
|
|
self.log.warning(" - Failed to set Remastered Aspect Ratio preference.")
|
|
|
|
def _href(self, href: str, **kwargs: Any) -> str:
|
|
_args = {"version": self.config["bamsdk"]["explore_version"]}
|
|
_args.update(**kwargs)
|
|
return href.format(**_args)
|
|
|
|
def _request(self, method: str, endpoint: str, params: dict = None, headers: dict = None, payload: dict = None) -> Any[dict | str]:
|
|
_headers = self.session.headers.copy()
|
|
if headers: _headers.update(headers)
|
|
_headers.update({
|
|
"X-BAMSDK-Transaction-ID": str(uuid.uuid4()),
|
|
"X-Request-ID": str(uuid.uuid4())
|
|
})
|
|
|
|
req = Request(method, endpoint, headers=_headers, params=params, json=payload)
|
|
prepped = self.session.prepare_request(req)
|
|
try:
|
|
res = self.session.send(prepped)
|
|
res.raise_for_status()
|
|
data = res.json()
|
|
if data.get("errors"):
|
|
error_code = data["errors"][0]["extensions"]["code"]
|
|
if "token.service.invalid.grant" in error_code:
|
|
raise ConnectionError(f"Refresh Token Expired: {error_code}")
|
|
elif "token.service.unauthorized.client" in error_code:
|
|
raise ConnectionError(f"Unauthorized Client/IP: {error_code}")
|
|
elif "idp.error.identity.bad-credentials" in error_code:
|
|
raise ConnectionError(f"Bad Credentials: {error_code}")
|
|
elif "account.profile.pin.invalid" in error_code:
|
|
raise ConnectionError(f"Invalid PIN: {error_code}")
|
|
raise ConnectionError(data["errors"])
|
|
if data.get("data") and data["data"].get("errors"):
|
|
raise ConnectionError(data["data"]["errors"])
|
|
return data
|
|
except Exception as e:
|
|
if "Refresh Token Expired" in str(e) or "/deeplink" in endpoint:
|
|
raise e
|
|
else:
|
|
self.log.error(f"API Request failed: {e}", exc_info=False)
|
|
sys.exit(1) |