import base64 import hashlib import hmac import json import os import time import uuid import re import requests from datetime import datetime from urllib.parse import urlparse, parse_qs from requests.adapters import HTTPAdapter from urllib.request import urlopen, Request from http.cookiejar import CookieJar from urllib3.util.retry import Retry from pathlib import Path import click from click import Context from typing import Any, Optional, Union from devine.core.utils.collections import as_list from devine.core.config import config from devine.core.manifests import DASH, HLS from devine.core.service import Service from devine.core.titles import Episode, Movie, Movies, Series from devine.core.tracks import Tracks class Hotstar(Service): """ Service code for Star India's Hotstar (aka Disney+ Hotstar) streaming service (https://hotstar.com). \b Authorization: Credentials Security: UHD@L3, doesn't seem to care about releases. \b Tips: - The library of contents can be viewed without logging in at https://hotstar.com - The homepage hosts domestic programming; Disney+ content is at https://hotstar.com/in/disneyplus """ ALIASES = ["HS", "hotstar"] TITLE_RE = r"^(?:https?://(?:www\.)?hotstar\.com/[a-z0-9/-]+/)(?P\d+)" @staticmethod @click.command(name="Hotstar", short_help="https://hotstar.com") @click.argument("title", type=str, required=False) @click.option("-m", "--movie", is_flag=True, default=False, help="Title is a movie.") @click.option("-q", "--quality", default="fhd", type=click.Choice(["4k", "fhd", "hd", "sd"], case_sensitive=False), help="Manifest quality to request.") @click.option("-ac", "--audio-codec", default="dolby51", type=click.Choice(["dolby51", "stereo", "atmos"], case_sensitive=False), help="Audio Codec") @click.option("-rg", "--region", default="in", type=click.Choice(["in", "id", "th"], case_sensitive=False), help="Account region") @click.pass_context def cli(ctx: click.Context, **kwargs: Any) -> 'Hotstar': return Hotstar(ctx, **kwargs) def __init__(self, ctx:Context, title, movie, quality, audio_codec, region): super().__init__(ctx) self.parse_title(ctx, title) self.movie = movie self.quality = quality self.audio_codec = audio_codec self.region = region.lower() assert ctx.parent is not None self.vcodec = ctx.parent.params["vcodec"] self.acodec = ctx.parent.params["acodec"] or "EC3" self.range = ctx.parent.params["range_"] self.profile = ctx.obj.profile self.device_id = None self.hotstar_auth = None self.token = None self.license_api = None self.configure() def get_titles(self) -> Union[Movies, Series]: headers = { "Accept": "*/*", "Accept-Language": "en-GB,en;q=0.5", "hotstarauth": self.hotstar_auth, "X-HS-UserToken": self.token, "X-HS-Platform": self.config["device"]["platform"]["name"], "X-HS-AppVersion": self.config["device"]["platform"]["version"], "X-Country-Code": "in", "x-platform-code": "PCTV" } r = self.session.get( url=self.config["endpoints"]["movie_title"] if self.movie else self.config["endpoints"]["tv_title"], headers=headers, params={"contentId": self.title} ) try: res = r.json()["body"]["results"]["item"] except json.JSONDecodeError: raise ValueError(f"Failed to load title manifest: {res.text}") self.content_type = res["assetType"] self.lang = res["langObjs"][0]["iso3code"] if self.content_type == "MOVIE": return Movies([Movie( id_=res.get("contentId"), service=self.__class__, name=res["title"], year=res["year"], language=self.lang, data=res, )]) else: show_data = res episodes = [] r = self.session.get( url=self.config["endpoints"]["tv_episodes"], headers=headers, params={ "eid": res["id"], "etid": "2", "tao": "0", "tas": "1000" } ) try: res = r.json()["body"]["results"]["assets"]["items"] except json.JSONDecodeError: raise ValueError(f"Failed to load episodes list: {r.text}") return [Series([Episode( id_=ep.get("contentId"), service=self.__class__, title=ep.get("showShortTitle") or show_data["title"], year=ep.get("year"), season=ep.get("seasonNo"), number=ep.get("episodeNo"), name=ep.get("title"), language=ep.get("langObjs", [{}])[0].get("iso3code", self.lang), data=ep )]) for ep in episodes] def get_tracks(self, title: Union[Movie, Episode]) -> Tracks: tracks = Tracks() if self.range == 'HDR10': range = 'hdr10' elif self.range == 'SDR': range = 'sdr' elif self.range == 'DV': range = 'dv' self.vcodec = 'dvh265' else: range = 'sdr' self.vcodec = 'h264' r = self.session.get( url=self.config["endpoints"]["manifest"], # .format(id=title.service_data["contentId"]), params={ "content_id": title.data["contentId"], "filters": f"content_type={self.content_type}", "client_capabilities": "{\"package\":[\"dash\",\"hls\"],\"container\":[\"fmp4br\"],\"ads\":[\"non_ssai\",\"ssai\"],\"audio_channel\":[\"" + self.audio_codec + "\"],\"encryption\":[\"plain\",\"widevine\"],\"video_codec\":[\"" + self.vcodec + "\"],\"ladder\":[\"tv\"],\"resolution\":[\"" + self.quality + "\"],\"true_resolution\":[\"" + self.quality + "\"],\"dynamic_range\":[\"" + range + "\"]}", "drm_parameters": "{\"widevine_security_level\":[\"SW_SECURE_DECODE\",\"SW_SECURE_CRYPTO\"],\"hdcp_version\":[\"HDCP_V2_2\",\"HDCP_V2_1\",\"HDCP_V2\",\"HDCP_V1\"]}" }, headers={ "user-agent": "Disney+;in.startv.hotstar.dplus.tv/23.08.14.4.2915 (Android/13)", "hotstarauth": self.hotstar_auth, "x-hs-usertoken": self.token, "x-hs-device-id": self.device_id, "x-hs-client": "platform:androidtv;app_id:in.startv.hotstar.dplus.tv;app_version:23.08.14.4;os:Android;os_version:13;schema_version:0.0.970", "x-hs-platform": "androidtv", "content-type": "application/json", } ).json() playback = r['success']['page']['spaces']['player']['widget_wrappers'][0]['widget']['data']['player_config'][ 'media_asset']['primary'] if playback == {}: raise ValueError("Wanted playback set is unavailable for this title...") if 'widevine' in playback['playback_tags']: self.license_api = playback["license_url"] mpd_url = playback['content_url'].split('?')[0] tracks = self.session.get(mpd_url) tracks.add(DASH.from_url( url=playback['content_url'], ).to_tracks(title.language)) for track in tracks: track.needs_proxy = True return tracks def get_chapters(self, title): return [] def get_widevine_service_certificate(self, **_): return None # will use common privacy cert def get_widevine_license(self, challenge, **_): return self.session.post( url=self.license_api, data=challenge # expects bytes ).content # Service specific functions def configure(self): self.session.headers.update({ "Origin": "https://www.hotstar.com", "Referer": f"https://www.hotstar.com/{self.region}" }) retry_strategy = Retry( total=5, backoff_factor=1, status_forcelist=[500, 502, 503, 504], allowed_methods=["GET", "POST"] ) adapter = HTTPAdapter(max_retries=retry_strategy) self.session.mount("https://", adapter) self.session.mount("http://", adapter) self.log.info("Logging into Hotstar") self.hotstar_auth = self.get_akamai() self.log.info(f" + Calculated HotstarAuth: {self.hotstar_auth}") if self.session.cookies: self.device_id = self.session.cookies.get("deviceId") self.log.info(f" + Using Device ID: {self.device_id}") else: self.device_id = str(uuid.uuid4()) self.log.info(f" + Created Device ID: {self.device_id}") self.token = self.get_token() self.log.info(" + Obtained tokens") @staticmethod def get_akamai(): enc_key = b"\x05\xfc\x1a\x01\xca\xc9\x4b\xc4\x12\xfc\x53\x12\x07\x75\xf9\xee" st = int(time.time()) exp = st + 12000 res = f"st={st}~exp={exp}~acl=/*" res += "~hmac=" + hmac.new(enc_key, res.encode(), hashlib.sha256).hexdigest() return res def get_token(self): token_cache_path = Path(config.directories.cache / self.__class__.__name__ / f"token.json") if os.path.isfile(token_cache_path): with open(token_cache_path, encoding="utf-8") as fd: token = json.load(fd) if token.get("exp", 0) > int(time.time()): self.log.info(" + Using cached auth tokens...") return token["uid"] else: self.log.info(" + Refreshing and using cached auth tokens...") return self.save_token(self.refresh(token["uid"], token["sub"]["deviceId"]), token_cache_path) if self.session.cookies: token = self.session.cookies.get("sessionUserUP", None, 'www.hotstar.com', '/' + self.region) else: raise self.log.error(f" - Please add cookies") return self.save_token(token, token_cache_path) @staticmethod def save_token(token, to): data = json.loads(base64.b64decode(token.split(".")[1] + "===").decode("utf-8")) data["uid"] = token data["sub"] = json.loads(data["sub"]) os.makedirs(os.path.dirname(to), exist_ok=True) with open(to, mode="w", encoding="utf-8") as f: f.write(json.dumps(data, indent=4)) return token def refresh(self, user_id_token, device_id): json_data = { 'deeplink_url': f'/{self.region}?client_capabilities=%7B%22ads%22%3A%5B%22non_ssai%22%5D%2C%22audio_channel%22%3A%5B%22stereo%22%5D%2C%22container%22%3A%5B%22fmp4%22%2C%22ts%22%5D%2C%22dvr%22%3A%5B%22short%22%5D%2C%22dynamic_range%22%3A%5B%22sdr%22%5D%2C%22encryption%22%3A%5B%22widevine%22%2C%22plain%22%5D%2C%22ladder%22%3A%5B%22web%22%2C%22tv%22%2C%22phone%22%5D%2C%22package%22%3A%5B%22dash%22%2C%22hls%22%5D%2C%22resolution%22%3A%5B%22sd%22%2C%22hd%22%5D%2C%22video_codec%22%3A%5B%22h264%22%5D%2C%22true_resolution%22%3A%5B%22sd%22%2C%22hd%22%2C%22fhd%22%5D%7D&drm_parameters=%7B%22hdcp_version%22%3A%5B%22HDCP_V2_2%22%5D%2C%22widevine_security_level%22%3A%5B%22SW_SECURE_DECODE%22%5D%2C%22playready_security_level%22%3A%5B%5D%7D', 'app_launch_count': 1, } r = self.session.post( url=self.config["endpoints"]["refresh"], headers={ 'x-hs-usertoken': user_id_token, 'X-HS-Platform': self.config["device"]["platform"]["name"], 'X-Country-Code': self.region, 'X-HS-Accept-language': 'eng', 'X-Request-Id': str(uuid.uuid4()), 'x-hs-device-id': device_id, 'X-HS-Client-Targeting': f'ad_id:{device_id};user_lat:false', 'X-HS-Client': 'platform:web;app_version:23.06.23.3;browser:Firefox;schema_version:0.0.911', }, json=json_data ) for cookie in self.session.cookies: if cookie.name == 'sessionUserUP' and cookie.path == f"/{self.region}" and cookie.domain == 'www.hotstar.com': cookie.value = r.headers["x-hs-usertoken"] print(cookie) for x in self.ALIASES: cookie_file = os.path.join(config._Directories.cookies, x.lower(), f"{self.profile}.txt") if not os.path.isfile(cookie_file): cookie_file = os.path.join(config._Directories.cookies, x, f"{self.profile}.txt") if os.path.isfile(cookie_file): self.session.cookies.save(cookie_file, ignore_discard=True, ignore_expires=True) break return r.headers["x-hs-usertoken"] def authenticate(self, cookies: Optional[CookieJar] = None, credential=None): if cookies: self.session.cookies.update(cookies) """ Log in to HOTSTAR and return a JWT User Identity token. :returns: JWT User Identity token. """ # self.credential = credential # if self.credential.username == "username" and self.credential.password == "password": # logincode_url = f"https://api.hotstar.com/{self.region}/aadhar/v2/firetv/{self.region}/users/logincode/" # logincode_headers = { # "Content-Length": "0", # "User-Agent": "Hotstar;in.startv.hotstar/3.3.0 (Android/8.1.0)" # } # logincode = self.session.post( # url=logincode_url, # headers=logincode_headers # ).json()["description"]["code"] # print(f"Go to tv.hotstar.com and put {logincode}") # logincode_choice = input('Did you put as informed above? (y/n): ') # if logincode_choice.lower() == 'y': # res = self.session.get( # url=logincode_url + logincode, # headers=logincode_headers # ) # else: # self.log.error(" - Exited.") # raise # else: # res = self.session.post( # url=self.config["endpoints"]["login"], # json={ # "isProfileRequired": "false", # "userData": { # "deviceId": self.device_id, # "usertype": "email" # }, # "verification": {} # }, # headers={ # "hotstarauth": self.hotstar_auth, # "content-type": "application/json" # } # ) # try: # data = res.json() # except json.JSONDecodeError: # self.log.exit(f" - Failed to get auth token, response was not JSON: {res.text}") # raise # if "errorCode" in data: # self.log.errro(f" - Login failed: {data['description']} [{data['errorCode']}]") # raise # return data["description"]["userIdentity"] def parse_title(self, ctx, title): title = title or ctx.parent.params.get("title") if not title: self.log.error(" - No title ID specified") if not getattr(self, "TITLE_RE"): self.title = title return {} for regex in as_list(self.TITLE_RE): m = re.search(regex, title) if m: self.title = m.group("id") return m.groupdict() self.log.warning(f" - Unable to parse title ID {title!r}, using as-is") self.title = title