sp4rky-devine-services/services/Hotstar/__init__.py
2025-04-09 15:07:35 -06:00

377 lines
16 KiB
Python

import base64
import hashlib
import hmac
import json
import os
import time
import uuid
import re
import requests
from datetime import datetime
from urllib.parse import urlparse, parse_qs
from requests.adapters import HTTPAdapter
from urllib.request import urlopen, Request
from http.cookiejar import CookieJar
from urllib3.util.retry import Retry
from pathlib import Path
import click
from click import Context
from typing import Any, Optional, Union
from devine.core.utils.collections import as_list
from devine.core.config import config
from devine.core.manifests import DASH, HLS
from devine.core.service import Service
from devine.core.titles import Episode, Movie, Movies, Series
from devine.core.tracks import Tracks
class Hotstar(Service):
"""
Service code for Star India's Hotstar (aka Disney+ Hotstar) streaming service (https://hotstar.com).
\b
Authorization: Credentials
Security: UHD@L3, doesn't seem to care about releases.
\b
Tips: - The library of contents can be viewed without logging in at https://hotstar.com
- The homepage hosts domestic programming; Disney+ content is at https://hotstar.com/in/disneyplus
"""
ALIASES = ["HS", "hotstar"]
TITLE_RE = r"^(?:https?://(?:www\.)?hotstar\.com/[a-z0-9/-]+/)(?P<id>\d+)"
@staticmethod
@click.command(name="Hotstar", short_help="https://hotstar.com")
@click.argument("title", type=str, required=False)
@click.option("-m", "--movie", is_flag=True, default=False, help="Title is a movie.")
@click.option("-q", "--quality", default="fhd",
type=click.Choice(["4k", "fhd", "hd", "sd"], case_sensitive=False),
help="Manifest quality to request.")
@click.option("-ac", "--audio-codec", default="dolby51",
type=click.Choice(["dolby51", "stereo", "atmos"], case_sensitive=False),
help="Audio Codec")
@click.option("-rg", "--region", default="in", type=click.Choice(["in", "id", "th"], case_sensitive=False),
help="Account region")
@click.pass_context
def cli(ctx: click.Context, **kwargs: Any) -> 'Hotstar':
return Hotstar(ctx, **kwargs)
def __init__(self, ctx:Context, title, movie, quality, audio_codec, region):
super().__init__(ctx)
self.parse_title(ctx, title)
self.movie = movie
self.quality = quality
self.audio_codec = audio_codec
self.region = region.lower()
assert ctx.parent is not None
self.vcodec = ctx.parent.params["vcodec"]
self.acodec = ctx.parent.params["acodec"] or "EC3"
self.range = ctx.parent.params["range_"]
self.profile = ctx.obj.profile
self.device_id = None
self.hotstar_auth = None
self.token = None
self.license_api = None
self.configure()
def get_titles(self) -> Union[Movies, Series]:
headers = {
"Accept": "*/*",
"Accept-Language": "en-GB,en;q=0.5",
"hotstarauth": self.hotstar_auth,
"X-HS-UserToken": self.token,
"X-HS-Platform": self.config["device"]["platform"]["name"],
"X-HS-AppVersion": self.config["device"]["platform"]["version"],
"X-Country-Code": "in",
"x-platform-code": "PCTV"
}
r = self.session.get(
url=self.config["endpoints"]["movie_title"] if self.movie else self.config["endpoints"]["tv_title"],
headers=headers,
params={"contentId": self.title}
)
try:
res = r.json()["body"]["results"]["item"]
except json.JSONDecodeError:
raise ValueError(f"Failed to load title manifest: {res.text}")
self.content_type = res["assetType"]
self.lang = res["langObjs"][0]["iso3code"]
if self.content_type == "MOVIE":
return Movies([Movie(
id_=res.get("contentId"),
service=self.__class__,
name=res["title"],
year=res["year"],
language=self.lang,
data=res,
)])
else:
show_data = res
episodes = []
r = self.session.get(
url=self.config["endpoints"]["tv_episodes"],
headers=headers,
params={
"eid": res["id"],
"etid": "2",
"tao": "0",
"tas": "1000"
}
)
try:
res = r.json()["body"]["results"]["assets"]["items"]
except json.JSONDecodeError:
raise ValueError(f"Failed to load episodes list: {r.text}")
return [Series([Episode(
id_=ep.get("contentId"),
service=self.__class__,
title=ep.get("showShortTitle") or show_data["title"],
year=ep.get("year"),
season=ep.get("seasonNo"),
number=ep.get("episodeNo"),
name=ep.get("title"),
language=ep.get("langObjs", [{}])[0].get("iso3code", self.lang),
data=ep
)]) for ep in episodes]
def get_tracks(self, title: Union[Movie, Episode]) -> Tracks:
tracks = Tracks()
if self.range == 'HDR10':
range = 'hdr10'
elif self.range == 'SDR':
range = 'sdr'
elif self.range == 'DV':
range = 'dv'
self.vcodec = 'dvh265'
else:
range = 'sdr'
self.vcodec = 'h264'
r = self.session.get(
url=self.config["endpoints"]["manifest"], # .format(id=title.service_data["contentId"]),
params={
"content_id": title.data["contentId"],
"filters": f"content_type={self.content_type}",
"client_capabilities": "{\"package\":[\"dash\",\"hls\"],\"container\":[\"fmp4br\"],\"ads\":[\"non_ssai\",\"ssai\"],\"audio_channel\":[\"" + self.audio_codec + "\"],\"encryption\":[\"plain\",\"widevine\"],\"video_codec\":[\"" + self.vcodec + "\"],\"ladder\":[\"tv\"],\"resolution\":[\"" + self.quality + "\"],\"true_resolution\":[\"" + self.quality + "\"],\"dynamic_range\":[\"" + range + "\"]}",
"drm_parameters": "{\"widevine_security_level\":[\"SW_SECURE_DECODE\",\"SW_SECURE_CRYPTO\"],\"hdcp_version\":[\"HDCP_V2_2\",\"HDCP_V2_1\",\"HDCP_V2\",\"HDCP_V1\"]}"
},
headers={
"user-agent": "Disney+;in.startv.hotstar.dplus.tv/23.08.14.4.2915 (Android/13)",
"hotstarauth": self.hotstar_auth,
"x-hs-usertoken": self.token,
"x-hs-device-id": self.device_id,
"x-hs-client": "platform:androidtv;app_id:in.startv.hotstar.dplus.tv;app_version:23.08.14.4;os:Android;os_version:13;schema_version:0.0.970",
"x-hs-platform": "androidtv",
"content-type": "application/json",
}
).json()
playback = r['success']['page']['spaces']['player']['widget_wrappers'][0]['widget']['data']['player_config'][
'media_asset']['primary']
if playback == {}:
raise ValueError("Wanted playback set is unavailable for this title...")
if 'widevine' in playback['playback_tags']:
self.license_api = playback["license_url"]
mpd_url = playback['content_url'].split('?')[0]
tracks = self.session.get(mpd_url)
tracks.add(DASH.from_url(
url=playback['content_url'],
).to_tracks(title.language))
for track in tracks:
track.needs_proxy = True
return tracks
def get_chapters(self, title):
return []
def get_widevine_service_certificate(self, **_):
return None # will use common privacy cert
def get_widevine_license(self, challenge, **_):
return self.session.post(
url=self.license_api,
data=challenge # expects bytes
).content
# Service specific functions
def configure(self):
self.session.headers.update({
"Origin": "https://www.hotstar.com",
"Referer": f"https://www.hotstar.com/{self.region}"
})
retry_strategy = Retry(
total=5,
backoff_factor=1,
status_forcelist=[500, 502, 503, 504],
allowed_methods=["GET", "POST"]
)
adapter = HTTPAdapter(max_retries=retry_strategy)
self.session.mount("https://", adapter)
self.session.mount("http://", adapter)
self.log.info("Logging into Hotstar")
self.hotstar_auth = self.get_akamai()
self.log.info(f" + Calculated HotstarAuth: {self.hotstar_auth}")
if self.session.cookies:
self.device_id = self.session.cookies.get("deviceId")
self.log.info(f" + Using Device ID: {self.device_id}")
else:
self.device_id = str(uuid.uuid4())
self.log.info(f" + Created Device ID: {self.device_id}")
self.token = self.get_token()
self.log.info(" + Obtained tokens")
@staticmethod
def get_akamai():
enc_key = b"\x05\xfc\x1a\x01\xca\xc9\x4b\xc4\x12\xfc\x53\x12\x07\x75\xf9\xee"
st = int(time.time())
exp = st + 12000
res = f"st={st}~exp={exp}~acl=/*"
res += "~hmac=" + hmac.new(enc_key, res.encode(), hashlib.sha256).hexdigest()
return res
def get_token(self):
token_cache_path = Path(config.directories.cache / self.__class__.__name__ / f"token.json")
if os.path.isfile(token_cache_path):
with open(token_cache_path, encoding="utf-8") as fd:
token = json.load(fd)
if token.get("exp", 0) > int(time.time()):
self.log.info(" + Using cached auth tokens...")
return token["uid"]
else:
self.log.info(" + Refreshing and using cached auth tokens...")
return self.save_token(self.refresh(token["uid"], token["sub"]["deviceId"]), token_cache_path)
if self.session.cookies:
token = self.session.cookies.get("sessionUserUP", None, 'www.hotstar.com', '/' + self.region)
else:
raise self.log.error(f" - Please add cookies")
return self.save_token(token, token_cache_path)
@staticmethod
def save_token(token, to):
data = json.loads(base64.b64decode(token.split(".")[1] + "===").decode("utf-8"))
data["uid"] = token
data["sub"] = json.loads(data["sub"])
os.makedirs(os.path.dirname(to), exist_ok=True)
with open(to, mode="w", encoding="utf-8") as f:
f.write(json.dumps(data, indent=4))
return token
def refresh(self, user_id_token, device_id):
json_data = {
'deeplink_url': f'/{self.region}?client_capabilities=%7B%22ads%22%3A%5B%22non_ssai%22%5D%2C%22audio_channel%22%3A%5B%22stereo%22%5D%2C%22container%22%3A%5B%22fmp4%22%2C%22ts%22%5D%2C%22dvr%22%3A%5B%22short%22%5D%2C%22dynamic_range%22%3A%5B%22sdr%22%5D%2C%22encryption%22%3A%5B%22widevine%22%2C%22plain%22%5D%2C%22ladder%22%3A%5B%22web%22%2C%22tv%22%2C%22phone%22%5D%2C%22package%22%3A%5B%22dash%22%2C%22hls%22%5D%2C%22resolution%22%3A%5B%22sd%22%2C%22hd%22%5D%2C%22video_codec%22%3A%5B%22h264%22%5D%2C%22true_resolution%22%3A%5B%22sd%22%2C%22hd%22%2C%22fhd%22%5D%7D&drm_parameters=%7B%22hdcp_version%22%3A%5B%22HDCP_V2_2%22%5D%2C%22widevine_security_level%22%3A%5B%22SW_SECURE_DECODE%22%5D%2C%22playready_security_level%22%3A%5B%5D%7D',
'app_launch_count': 1,
}
r = self.session.post(
url=self.config["endpoints"]["refresh"],
headers={
'x-hs-usertoken': user_id_token,
'X-HS-Platform': self.config["device"]["platform"]["name"],
'X-Country-Code': self.region,
'X-HS-Accept-language': 'eng',
'X-Request-Id': str(uuid.uuid4()),
'x-hs-device-id': device_id,
'X-HS-Client-Targeting': f'ad_id:{device_id};user_lat:false',
'X-HS-Client': 'platform:web;app_version:23.06.23.3;browser:Firefox;schema_version:0.0.911',
},
json=json_data
)
for cookie in self.session.cookies:
if cookie.name == 'sessionUserUP' and cookie.path == f"/{self.region}" and cookie.domain == 'www.hotstar.com':
cookie.value = r.headers["x-hs-usertoken"]
print(cookie)
for x in self.ALIASES:
cookie_file = os.path.join(config._Directories.cookies, x.lower(), f"{self.profile}.txt")
if not os.path.isfile(cookie_file):
cookie_file = os.path.join(config._Directories.cookies, x, f"{self.profile}.txt")
if os.path.isfile(cookie_file):
self.session.cookies.save(cookie_file, ignore_discard=True, ignore_expires=True)
break
return r.headers["x-hs-usertoken"]
def authenticate(self, cookies: Optional[CookieJar] = None, credential=None):
if cookies:
self.session.cookies.update(cookies)
"""
Log in to HOTSTAR and return a JWT User Identity token.
:returns: JWT User Identity token.
"""
# self.credential = credential
# if self.credential.username == "username" and self.credential.password == "password":
# logincode_url = f"https://api.hotstar.com/{self.region}/aadhar/v2/firetv/{self.region}/users/logincode/"
# logincode_headers = {
# "Content-Length": "0",
# "User-Agent": "Hotstar;in.startv.hotstar/3.3.0 (Android/8.1.0)"
# }
# logincode = self.session.post(
# url=logincode_url,
# headers=logincode_headers
# ).json()["description"]["code"]
# print(f"Go to tv.hotstar.com and put {logincode}")
# logincode_choice = input('Did you put as informed above? (y/n): ')
# if logincode_choice.lower() == 'y':
# res = self.session.get(
# url=logincode_url + logincode,
# headers=logincode_headers
# )
# else:
# self.log.error(" - Exited.")
# raise
# else:
# res = self.session.post(
# url=self.config["endpoints"]["login"],
# json={
# "isProfileRequired": "false",
# "userData": {
# "deviceId": self.device_id,
# "usertype": "email"
# },
# "verification": {}
# },
# headers={
# "hotstarauth": self.hotstar_auth,
# "content-type": "application/json"
# }
# )
# try:
# data = res.json()
# except json.JSONDecodeError:
# self.log.exit(f" - Failed to get auth token, response was not JSON: {res.text}")
# raise
# if "errorCode" in data:
# self.log.errro(f" - Login failed: {data['description']} [{data['errorCode']}]")
# raise
# return data["description"]["userIdentity"]
def parse_title(self, ctx, title):
title = title or ctx.parent.params.get("title")
if not title:
self.log.error(" - No title ID specified")
if not getattr(self, "TITLE_RE"):
self.title = title
return {}
for regex in as_list(self.TITLE_RE):
m = re.search(regex, title)
if m:
self.title = m.group("id")
return m.groupdict()
self.log.warning(f" - Unable to parse title ID {title!r}, using as-is")
self.title = title