815 lines
32 KiB
Python
815 lines
32 KiB
Python
import datetime
|
|
import json
|
|
import random
|
|
import uuid
|
|
import re
|
|
import urllib.parse
|
|
import click
|
|
import sys
|
|
import langcodes
|
|
|
|
from http.cookiejar import CookieJar
|
|
from typing import Any, Optional
|
|
|
|
from devine.core.service import Service
|
|
from devine.core.titles import Episode, Movie, Movies, Series, Title_T, Titles_T
|
|
from devine.core.credential import Credential
|
|
from devine.core.tracks import Tracks, Subtitle, Attachment
|
|
from devine.core.utilities import is_close_match
|
|
from devine.core.manifests import HLS
|
|
from bs4 import BeautifulSoup
|
|
|
|
|
|
class VIU(Service):
|
|
"""
|
|
Service code for VIU streaming service (https://viu.com).
|
|
|
|
Authorization: Username-Password, None
|
|
Security: HD@L3, NonDRM doesn't seem to care about releases.
|
|
|
|
VIU has some regions supported:
|
|
- 1: ID, MY
|
|
- 2: SG, HK, TH, PH
|
|
1 & 2 has different api
|
|
|
|
Author: unnamed improved by @sp4rk.y
|
|
last update: 2025-01-07
|
|
notes: Works fine for any region, but doesn't work on movies yet
|
|
"""
|
|
|
|
# GEOFENCE = ("sg","id","my","hk")
|
|
|
|
# Static method, this method belongs to the class
|
|
@staticmethod
|
|
# The command name, must much the service tag (and by extension the service folder)
|
|
@click.command(name="VIU", short_help="https://viu.com", help=__doc__)
|
|
@click.argument("title", type=str)
|
|
@click.option("-l", "--lang", default="kor", help="Specify language for metadata")
|
|
@click.option("-nt", "--notitle", is_flag=True, default=False, help="Dont grab episode title.")
|
|
@click.option("-q", "--quality", type=int, default=1080, help="Desired quality (e.g., 1080, 720, 480, 240)")
|
|
@click.pass_context
|
|
def cli(ctx, **kwargs):
|
|
return VIU(ctx, **kwargs)
|
|
|
|
def __init__(self, ctx, title, lang: str, notitle: bool, quality: int):
|
|
self.url = title
|
|
self.title = self.parse_input(title)
|
|
self.region = title.split("/")[4]
|
|
self.notitle = notitle
|
|
self.lang = lang
|
|
self.token = ""
|
|
self._auth_codes = {}
|
|
self._user_token = None
|
|
self.quality = quality
|
|
self.ctx = ctx
|
|
|
|
if self.region == "id":
|
|
self.lang_flag_id = 3
|
|
self.lang_flag_id_2 = 8
|
|
self.area_id = 1000
|
|
self.os_flag_id = 1
|
|
self.os_flag_id_2 = 2
|
|
self.ut = 0
|
|
elif self.region == "my":
|
|
self.lang_flag_id = 3
|
|
self.lang_flag_id_2 = 7
|
|
self.area_id = 1001
|
|
self.os_flag_id = 1
|
|
self.os_flag_id_2 = 2
|
|
self.ut = 0
|
|
elif self.region == "sg":
|
|
self.lang_flag_id = 3
|
|
self.lang_flag_id_2 = 3
|
|
self.area_id = 2
|
|
self.os_flag_id = 1
|
|
self.os_flag_id_2 = 2
|
|
self.ut = 0
|
|
elif self.region == "ph":
|
|
self.lang_flag_id = 3
|
|
self.lang_flag_id_2 = 3
|
|
self.area_id = 5
|
|
self.os_flag_id = 1
|
|
self.os_flag_id_2 = 1
|
|
self.ut = 2
|
|
elif self.region == "th":
|
|
self.lang_flag_id = 4
|
|
self.lang_flag_id_2 = 4
|
|
self.area_id = 4
|
|
self.os_flag_id = 1
|
|
self.os_flag_id_2 = 1
|
|
self.ut = 0
|
|
elif self.region == "za":
|
|
self.lang_flag_id = 3
|
|
self.lang_flag_id_2 = 3
|
|
self.area_id = 1006
|
|
self.os_flag_id = 1
|
|
self.os_flag_id_2 = 1
|
|
self.ut = 0
|
|
elif self.region == "hk":
|
|
self.lang_flag_id = 3
|
|
self.lang_flag_id_2 = 3
|
|
self.area_id = 1
|
|
self.os_flag_id = 1
|
|
self.os_flag_id_2 = 1
|
|
self.ut = 0
|
|
|
|
self.lang = "kr"
|
|
super().__init__(ctx)
|
|
|
|
def authenticate(
|
|
self,
|
|
cookies: Optional[CookieJar] = None,
|
|
credential: Optional[Credential] = None,
|
|
) -> None:
|
|
self.credentials = credential
|
|
self.session.headers.update(
|
|
{"Referer": "https://viu.com/"} # headers Origin make 403 error
|
|
)
|
|
self.log.info(" + Downloading without an account")
|
|
|
|
def _fetch_metadata(self):
|
|
page = self.session.get(self.url, allow_redirects=True)
|
|
soup = BeautifulSoup(page.text, "html.parser")
|
|
|
|
for tag in soup.find_all("script", {"id": "__NEXT_DATA__"}):
|
|
try:
|
|
data = json.loads(tag.text)
|
|
fallback_data = data["props"]["pageProps"].get("fallback", {})
|
|
|
|
# Search through fallback keys to find the one that starts with @"PRODUCT_ALT_LANG_LIST"
|
|
for key, val in fallback_data.items():
|
|
if key.startswith('@"PRODUCT_ALT_LANG_LIST'):
|
|
products = val["data"]["product"]
|
|
for p in products:
|
|
if p["area_id"] == str(self.area_id) and p["language_flag_id"] == str(self.lang_flag_id):
|
|
self.series_id = p["series_id"]
|
|
self.product_id = p["product_id"]
|
|
self.main_title = p["series_name"]
|
|
break
|
|
break
|
|
|
|
except (KeyError, json.JSONDecodeError):
|
|
pass
|
|
|
|
def get_titles(self) -> Titles_T:
|
|
res = self.session.get(url=self.url, allow_redirects=True)
|
|
try:
|
|
vod_id = self.url.split("/vod/")[1].split("/")[0]
|
|
self.title = vod_id
|
|
self.log.info(f" + Region: {self.region}")
|
|
self.log.debug(f" + Area_id: {self.area_id}")
|
|
self.log.debug(f" + Language_flag_id: {self.lang_flag_id}")
|
|
except Exception:
|
|
self.log.exit(f" - Error, response: {res.text}")
|
|
|
|
self._fetch_metadata()
|
|
|
|
if self.region.lower() == "id":
|
|
self.session.headers.update({"X-Forwarded-For": "139.195.232.194"})
|
|
meta_res = self.session.get(
|
|
url=self.config["endpoints"]["playlist"],
|
|
headers={
|
|
"authority": "api-gateway-global.viu.com",
|
|
"accept": "application/json, text/plain, */*",
|
|
"accept-language": "en-US,en;q=0.9",
|
|
"authorization": f"Bearer {self.token}",
|
|
"origin": "https://www.viu.com",
|
|
"referer": "https://www.viu.com/",
|
|
},
|
|
params={
|
|
"platform_flag_label": "web",
|
|
"area_id": self.area_id,
|
|
"language_flag_id": self.lang_flag_id,
|
|
"platformFlagLabel": "web",
|
|
"areaId": self.area_id,
|
|
"languageFlagId": self.lang_flag_id,
|
|
"ut": "0",
|
|
"countryCode": self.region,
|
|
"r": "/vod/product-list",
|
|
"os_flag_id": self.os_flag_id,
|
|
"series_id": self.series_id,
|
|
"size": "-1",
|
|
"sort": "desc",
|
|
},
|
|
)
|
|
try:
|
|
data = meta_res.json()["data"]
|
|
movie_data_list = data.get("product_list", [])
|
|
except Exception as e:
|
|
self.log.error(f" - Error in region-specific request: {e}")
|
|
self.log.info(f" - Response: {meta_res.text}")
|
|
sys.exit()
|
|
else:
|
|
self.session.headers.update({"X-Forwarded-For": "103.62.48.237"})
|
|
r = self.session.get(
|
|
url=self.config["endpoints"]["playlist"],
|
|
headers={
|
|
"authority": "api-gateway-global.viu.com",
|
|
"accept": "application/json, text/plain, */*",
|
|
"accept-language": "en-US,en;q=0.9",
|
|
"authorization": f"Bearer {self.token}",
|
|
"origin": "https://www.viu.com",
|
|
"referer": "https://www.viu.com/",
|
|
},
|
|
params={
|
|
"platform_flag_label": "web",
|
|
"area_id": self.area_id,
|
|
"language_flag_id": self.lang_flag_id,
|
|
"platformFlagLabel": "web",
|
|
"areaId": self.area_id,
|
|
"languageFlagId": self.lang_flag_id,
|
|
"ut": "0",
|
|
"countryCode": self.region,
|
|
"r": "/vod/product-list",
|
|
"os_flag_id": self.os_flag_id,
|
|
"series_id": self.series_id,
|
|
"size": "-1",
|
|
"sort": "desc",
|
|
},
|
|
)
|
|
try:
|
|
data = r.json()["data"]
|
|
movie_data_list = data["product_list"]
|
|
except Exception:
|
|
self.log.info(f" - Error, response: {r.text}")
|
|
sys.exit()
|
|
|
|
data = None
|
|
content_type = 0
|
|
|
|
# Get the last item in the movie_data_list for backward compatibility
|
|
if movie_data_list:
|
|
data = movie_data_list[-1]
|
|
content_type = int(data.get("is_movie", 0))
|
|
if self.region in ["id", "my"]:
|
|
stream_info = self.session.get(
|
|
url=self.config["endpoints"]["playlist"],
|
|
params={
|
|
"platform_flag_label": "web",
|
|
"platformFlagLabel": "web",
|
|
"area_id": self.area_id,
|
|
"areaId": self.area_id,
|
|
"language_flag_id": self.lang_flag_id,
|
|
"languageFlagId": self.lang_flag_id,
|
|
"r": "/vod/detail",
|
|
"countryCode": self.region,
|
|
"ut": "0",
|
|
"product_id": data["product_id"],
|
|
"os_flag_id": self.os_flag_id,
|
|
},
|
|
).json()["data"]
|
|
else:
|
|
stream_info = self.session.get(
|
|
url=self.config["endpoints"]["ott"].format(region=self.region),
|
|
params={
|
|
"area_id": self.area_id,
|
|
"language_flag_id": self.lang_flag_id,
|
|
"r": "vod/ajax-detail",
|
|
"platform_flag_label": "web",
|
|
"product_id": data["product_id"],
|
|
},
|
|
).json()["data"]
|
|
|
|
self.lang = stream_info["series"].get("series_language", "ko")
|
|
|
|
self.log.info(f" + Detected language: {self.lang}")
|
|
|
|
product_type = "movie" if content_type == 1 else "series"
|
|
self.log.info(f" + Product type: {product_type}")
|
|
|
|
if product_type == "movie":
|
|
for x in movie_data_list:
|
|
x["product_id"]
|
|
|
|
r = self.session.get(
|
|
url=self.config["endpoints"]["playlist"],
|
|
headers={
|
|
"authority": "api-gateway-global.viu.com",
|
|
"accept": "application/json, text/plain, */*",
|
|
"accept-language": "en-US,en;q=0.9",
|
|
"authorization": f"Bearer {self.token}",
|
|
"origin": "https://www.viu.com",
|
|
"referer": "https://www.viu.com/",
|
|
},
|
|
params={
|
|
"platform_flag_label": "web",
|
|
"area_id": self.area_id,
|
|
"language_flag_id": self.lang_flag_id,
|
|
"platformFlagLabel": "web",
|
|
"areaId": self.area_id,
|
|
"languageFlagId": self.lang_flag_id,
|
|
"countryCode": self.region,
|
|
"ut": "0",
|
|
"r": "/vod/detail",
|
|
"product_id": self.product_id,
|
|
"os_flag_id": self.os_flag_id,
|
|
},
|
|
)
|
|
try:
|
|
movie_data = r.json()["data"]
|
|
except Exception:
|
|
self.log.info(f" - Error, response: {r.text}")
|
|
sys.exit()
|
|
try:
|
|
# First check if release_of_year exists in the data
|
|
year = movie_data["series"].get("release_of_year")
|
|
if not year:
|
|
# If not available, try to extract from the series name
|
|
year_match = re.search(r"(\d{4})", movie_data["series"]["name"])
|
|
if year_match:
|
|
year = year_match.group(1)
|
|
except Exception:
|
|
year = None
|
|
return Movies(
|
|
[
|
|
Movie(
|
|
id_=movie_data["current_product"]["product_id"],
|
|
service=self.__class__,
|
|
year=year,
|
|
name=movie_data["series"]["name"],
|
|
language=self.lang,
|
|
data=movie_data,
|
|
)
|
|
]
|
|
)
|
|
else:
|
|
titles_ = []
|
|
raw_title = stream_info["series"]["name"]
|
|
|
|
# First check for explicit "Season X" text
|
|
season_match = re.search(r"(Season\s+(\d+))", raw_title, re.IGNORECASE)
|
|
if season_match:
|
|
season_number = int(season_match.group(2))
|
|
series_name = raw_title.split(season_match.group(1))[0].strip()
|
|
else:
|
|
# Then check for number at end of title
|
|
number_match = re.search(r"(\d+)$", raw_title.strip())
|
|
if number_match:
|
|
season_number = int(number_match.group(1))
|
|
series_name = raw_title[: -(len(number_match.group(1)))].strip()
|
|
else:
|
|
season_number = 1
|
|
series_name = raw_title
|
|
|
|
# Get year from release_of_year or fallback to release_time
|
|
year_val = stream_info["series"].get("release_of_year")
|
|
if not year_val:
|
|
release_time = int(stream_info["series"].get("release_time", 0))
|
|
year_val = datetime.datetime.fromtimestamp(release_time, tz=datetime.timezone.utc).year
|
|
|
|
for x in sorted(movie_data_list, key=lambda x: int(x.get("number", 0))):
|
|
series_name_with_year = f"{series_name} {year_val}" if year_val else series_name
|
|
titles_.append(
|
|
Episode(
|
|
id_=x["ccs_product_id"],
|
|
title=series_name_with_year,
|
|
year=year_val,
|
|
season=season_number,
|
|
number=int(x.get("number", 0)),
|
|
service=self.__class__,
|
|
language=self.lang,
|
|
name=x.get("synopsis").split("-")[-1].strip(),
|
|
data=x,
|
|
)
|
|
)
|
|
|
|
return Series(titles_)
|
|
|
|
def get_tracks(self, title: Title_T) -> Tracks:
|
|
tracks = Tracks()
|
|
data = title.data
|
|
|
|
if self.region in ["id", "my"]:
|
|
stream_info = {
|
|
"current_product": data,
|
|
"time_duration": data.get("time_duration", ""),
|
|
}
|
|
else:
|
|
product_id = data.get("current_product", {}).get("product_id") or data.get("product_id")
|
|
if product_id is None:
|
|
raise ValueError("product_id not found in the provided data")
|
|
|
|
stream_info = self.session.get(
|
|
url=self.config["endpoints"]["ott"].format(region=self.region),
|
|
params={
|
|
"area_id": self.area_id,
|
|
"language_flag_id": self.lang_flag_id,
|
|
"r": "vod/ajax-detail",
|
|
"platform_flag_label": "web",
|
|
"product_id": product_id,
|
|
},
|
|
).json()["data"]
|
|
|
|
ccs_id = stream_info.get("current_product", {}).get("ccs_product_id") or stream_info.get(
|
|
"current_product", {}
|
|
).get("current_product", {}).get("ccs_product_id")
|
|
|
|
query = {
|
|
"ccs_product_id": ccs_id,
|
|
"platform_flag_label": "phone",
|
|
"language_flag_id": self.lang_flag_id_2,
|
|
"ut": self.ut,
|
|
"area_id": self.area_id,
|
|
"os_flag_id": self.os_flag_id_2,
|
|
"countryCode": self.region,
|
|
}
|
|
duration_limit = False
|
|
|
|
def download_playback():
|
|
stream_data = self.session.get(
|
|
url=self.config["endpoints"]["playback"],
|
|
params=query,
|
|
headers={
|
|
"Authorization": f"Bearer {self._auth_codes[self.region]}",
|
|
"connection": "Keep-Alive",
|
|
"content-type": "application/json",
|
|
"host": "api-gateway-global.viu.com",
|
|
"platform": "android",
|
|
"user-agent": "okhttp/3.12.1",
|
|
},
|
|
).json()
|
|
return self.check_error(stream_data).get("stream")
|
|
|
|
if not self._auth_codes.get(self.region):
|
|
self._auth_codes[self.region] = self._get_token(self.region)
|
|
|
|
self.log.debug(f" + Token play: {self._auth_codes[self.region]}")
|
|
|
|
stream_data = None
|
|
try:
|
|
stream_data = download_playback()
|
|
except (Exception, KeyError):
|
|
token = self._login(self.region)
|
|
self.log.debug(f" + Token login: {token}")
|
|
if token is not None:
|
|
query["identity"] = token
|
|
else:
|
|
# The content is Preview or for VIP only.
|
|
# We can try to bypass the duration which is limited to 3mins only
|
|
duration_limit, query["duration"] = True, "180"
|
|
try:
|
|
stream_data = download_playback()
|
|
except (Exception, KeyError):
|
|
if token is None:
|
|
raise
|
|
self.log.exit(
|
|
" - Login required, needs password, detected:"
|
|
f"\nuser: {self.credentials.username}\npwd: {self.credentials.password}"
|
|
)
|
|
if not stream_data:
|
|
self.log.exit(" - Cannot get stream info")
|
|
|
|
formats = []
|
|
stream_key = "airplayurl" if self.region in ["id", "my"] else "airplayurl2"
|
|
stream_urls = stream_data.get(stream_key, {})
|
|
|
|
for vid_format, stream_url in stream_urls.items():
|
|
height_match = re.search(r"s(\d+)p", vid_format)
|
|
height = int(height_match.group(1)) if height_match else None
|
|
|
|
# Bypass preview duration limit
|
|
if duration_limit:
|
|
old_stream_url = urllib.parse.urlparse(stream_url)
|
|
query = dict(urllib.parse.parse_qsl(old_stream_url.query, keep_blank_values=True))
|
|
query.update(
|
|
{
|
|
"duration": stream_info.get("time_duration") or "9999999",
|
|
"duration_start": "0",
|
|
}
|
|
)
|
|
stream_url = old_stream_url._replace(query=urllib.parse.urlencode(query)).geturl()
|
|
|
|
formats.append({"format_id": vid_format, "url": stream_url, "height": height})
|
|
|
|
if self.ctx.params.get("quality"):
|
|
requested_qualities = self.ctx.params["quality"]
|
|
|
|
if not isinstance(requested_qualities, (list, set)):
|
|
requested_qualities = [requested_qualities]
|
|
|
|
formats = [
|
|
fmt
|
|
for fmt in formats
|
|
if fmt["height"] in requested_qualities or int(fmt["height"] * (9 / 16)) in requested_qualities
|
|
]
|
|
|
|
if not formats:
|
|
formats = self.formats
|
|
|
|
highest_resolution_format = max(formats, key=lambda x: x["height"])
|
|
url = highest_resolution_format["url"]
|
|
if "_var_" in url:
|
|
url = url.replace("_var_", "_")
|
|
|
|
tracks.add(
|
|
HLS.from_url(url=url, session=self.session).to_tracks(language=self.lang),
|
|
warn_only=True,
|
|
)
|
|
|
|
if not tracks.subtitles:
|
|
try:
|
|
# Get product_id from either current_product or directly from data
|
|
product_id = data.get("current_product", {}).get("product_id") or data.get("product_id")
|
|
if not product_id:
|
|
self.log.error("Cannot find product_id in data")
|
|
raise ValueError("product_id not found")
|
|
|
|
subs_resp = self.session.get(
|
|
url=self.config["endpoints"]["playlist"],
|
|
params={
|
|
"r": "/vod/detail",
|
|
"product_id": product_id,
|
|
"platform_flag_label": "phone",
|
|
"language_flag_id": self.lang_flag_id,
|
|
"ut": self.ut,
|
|
"area_id": self.area_id,
|
|
"os_flag_id": self.os_flag_id,
|
|
"countryCode": self.region,
|
|
},
|
|
headers={
|
|
"authorization": f"Bearer {self.token}",
|
|
"connection": "Keep-Alive",
|
|
"content-type": "application/json",
|
|
"host": "api-gateway-global.viu.com",
|
|
"platform": "android",
|
|
"user-agent": "okhttp/3.12.1",
|
|
},
|
|
).json()
|
|
subs = subs_resp["data"]["current_product"]["subtitle"]
|
|
except: # noqa: E722
|
|
subs_resp = self.session.get(
|
|
url=self.config["endpoints"]["playlist"],
|
|
params={
|
|
"platform_flag_label": "web",
|
|
"area_id": self.area_id,
|
|
"language_flag_id": self.lang_flag_id_2,
|
|
"platformFlagLabel": "web",
|
|
"areaId": self.area_id,
|
|
"languageFlagId": self.lang_flag_id_2,
|
|
"countryCode": self.region.upper(),
|
|
"ut": self.ut,
|
|
"r": "/vod/product-list",
|
|
"os_flag_id": self.os_flag_id_2,
|
|
"series_id": data["series"]["series_id"],
|
|
"size": "-1",
|
|
"sort": "desc",
|
|
},
|
|
headers={
|
|
"authorization": f"Bearer {self.token}",
|
|
"connection": "Keep-Alive",
|
|
"content-type": "application/json",
|
|
"host": "api-gateway-global.viu.com",
|
|
"platform": "android",
|
|
"user-agent": "okhttp/3.12.1",
|
|
},
|
|
).json()
|
|
subs = subs_resp["data"]["current_product"]["subtitle"]
|
|
for x in subs:
|
|
# Main subtitle
|
|
main_subtitle = Subtitle(
|
|
id_=f"{x['product_subtitle_id']}_{x['code']}",
|
|
url=x["url"],
|
|
codec=Subtitle.Codec.SubRip,
|
|
language=x["code"],
|
|
is_original_lang=is_close_match(x["code"], [title.language]),
|
|
forced=False,
|
|
sdh=False,
|
|
)
|
|
tracks.add(main_subtitle, warn_only=True)
|
|
|
|
# Annotation/secondary subtitle
|
|
if x.get("second_subtitle_url"):
|
|
annotation_subtitle = Subtitle(
|
|
id_=f"{x['product_subtitle_id']}_{x['code']}_annotation",
|
|
url=x["second_subtitle_url"],
|
|
codec=Subtitle.Codec.SubRip,
|
|
language=x["code"],
|
|
is_original_lang=is_close_match(x["code"], [title.language]),
|
|
forced=False,
|
|
sdh=True,
|
|
)
|
|
tracks.add(annotation_subtitle, warn_only=True)
|
|
|
|
# Update subtitle track names based on their language
|
|
for track in tracks.subtitles:
|
|
try:
|
|
if not track.name or len(track.name) == 0:
|
|
if track.language:
|
|
# Get the full language name for the subtitle track
|
|
lang_name = track.language.display_name()
|
|
|
|
# Add territory name if available
|
|
if track.language.territory:
|
|
territory_name = track.language.territory_name()
|
|
track.name = f"{lang_name}, {territory_name}"
|
|
else:
|
|
track.name = lang_name
|
|
|
|
# Add annotation/sdh indicator
|
|
if track.sdh:
|
|
track.name += " [Annotations]"
|
|
except Exception as e:
|
|
self.log.warning(f"Failed to update subtitle name for {track.language}: {e}")
|
|
|
|
if not tracks.subtitles:
|
|
self.log.error("No Subtitles")
|
|
sys.exit(1)
|
|
if not any(sub.language.language == "en" for sub in tracks.subtitles):
|
|
self.log.error("No English Subtitles")
|
|
sys.exit(1)
|
|
|
|
thumbnail_url = data.get("cover_image_url") or data.get("poster") or data.get("thumb")
|
|
|
|
if not thumbnail_url and isinstance(data.get("current_product"), dict):
|
|
thumbnail_url = data["current_product"].get("cover_image_url")
|
|
|
|
if not thumbnail_url:
|
|
images = data.get("images", [])
|
|
if images:
|
|
thumbnail_data = images[0]
|
|
thumbnail_url = thumbnail_data.get("url")
|
|
|
|
if thumbnail_url:
|
|
# Get synopsis from data or current_product, fallback to title
|
|
synopsis = None
|
|
if isinstance(data.get("current_product"), dict) and data["current_product"].get("synopsis"):
|
|
synopsis = data["current_product"]["synopsis"]
|
|
elif data.get("synopsis"):
|
|
synopsis = data["synopsis"]
|
|
else:
|
|
synopsis = getattr(title, "name", "unknown")
|
|
|
|
# Sanitize the filename by replacing problematic characters
|
|
sanitized_synopsis = re.sub(r"[/\\]", "_", synopsis)
|
|
thumbnail_name = f"{sanitized_synopsis} thumbnail"
|
|
|
|
try:
|
|
thumbnail_attachment = Attachment.from_url(
|
|
url=thumbnail_url,
|
|
name=thumbnail_name,
|
|
mime_type="image/avif",
|
|
description="Thumbnail",
|
|
session=self.session,
|
|
)
|
|
tracks.attachments.append(thumbnail_attachment)
|
|
except Exception as e:
|
|
self.log.warning(f"Failed to download thumbnail: {e}")
|
|
else:
|
|
self.log.warning("Thumbnail not found for title.")
|
|
|
|
for video in tracks.videos:
|
|
if not video.language.is_valid():
|
|
video.language = langcodes.Language.get(self.lang)
|
|
|
|
return tracks
|
|
|
|
def get_chapters(self, title):
|
|
return []
|
|
|
|
def get_widevine_license(self, challenge: bytes, title: Title_T, **_: Any) -> bytes:
|
|
return self.session.post(
|
|
url=self.config["endpoints"]["license"].format(id=title.id),
|
|
headers={
|
|
"authorization": self.token_lic or self.config["auth"],
|
|
"actiontype": "s",
|
|
"drm_level": "l3",
|
|
"hdcp_level": "null",
|
|
"lang_id": "en",
|
|
"languageid": "en",
|
|
"os_ver": "10",
|
|
"x-client": "browser",
|
|
"x-request-id": str(uuid.uuid4()),
|
|
"x-session-id": self.sessionid,
|
|
},
|
|
data=challenge,
|
|
).content
|
|
|
|
def parse_input(self, input_):
|
|
re_product = r"vod\/(\d+)\/"
|
|
re_playlist = r".+playlist-(\d+)"
|
|
# re_playlist2 = r".+video.+-(\d+)"
|
|
re_playlist2 = r"containerId=(\d+)"
|
|
|
|
product_id = re.search(re_product, input_)
|
|
playlist_id = re.search(re_playlist, input_)
|
|
playlist2_id = re.search(re_playlist2, input_)
|
|
|
|
if product_id:
|
|
self.jenis = "product_id"
|
|
input_id = product_id.group(1)
|
|
elif playlist_id or playlist2_id:
|
|
self.jenis = "playlist_id"
|
|
input_ = playlist_id or playlist2_id
|
|
input_id = input_.group(1)
|
|
else:
|
|
self.jenis = "playlist_id_eps"
|
|
input_id = input_.split("-")[-1]
|
|
|
|
return input_id
|
|
|
|
def check_error(self, response):
|
|
code = response.get("status", {}).get("code", 0)
|
|
if code > 0:
|
|
message = response.get("status", {}).get("message", "Unknown error")
|
|
self.log.error(f" - API Error: Code {code} - {message} - Attempting to bypass")
|
|
raise Exception(f"API Error: {message}")
|
|
return response.get("data") or {}
|
|
|
|
def get_token(self):
|
|
self.sessionid = str(uuid.uuid4())
|
|
self.deviceid = str(uuid.uuid4())
|
|
res = self.session.post(
|
|
url=self.config["endpoints"]["token"],
|
|
params={
|
|
"ver": "1.0",
|
|
"fmt": "json",
|
|
"aver": "5.0",
|
|
"appver": "2.0",
|
|
"appid": "viu_desktop",
|
|
"platform": "desktop",
|
|
"iid": str(uuid.uuid4()),
|
|
},
|
|
headers={
|
|
"accept": "application/json; charset=utf-8",
|
|
"content-type": "application/json; charset=UTF-8",
|
|
"x-session-id": self.sessionid,
|
|
"Sec-Fetch-Mode": "cors",
|
|
"x-client": "browser",
|
|
},
|
|
json={"deviceId": self.deviceid},
|
|
)
|
|
if res.ok:
|
|
return res.json()["token"]
|
|
else:
|
|
self.log.exit(f" - Cannot get token, response: {res.text}")
|
|
|
|
def _get_token(self, country_code):
|
|
try:
|
|
rand = "".join(random.choices("0123456789", k=10))
|
|
uuid_str = str(uuid.uuid4())
|
|
|
|
response = self.session.post(
|
|
url=self.config["endpoints"]["token2"],
|
|
params={"v": f"{rand}000&"},
|
|
headers={"Content-Type": "application/json"},
|
|
data=json.dumps(
|
|
{
|
|
"countryCode": country_code.upper(),
|
|
"platform": "browser",
|
|
"platformFlagLabel": "web",
|
|
"language": "en",
|
|
"uuid": uuid_str,
|
|
"carrierId": "0",
|
|
}
|
|
).encode("utf-8"),
|
|
)
|
|
|
|
response.raise_for_status()
|
|
return response.json()["token"]
|
|
except Exception as e:
|
|
self.log.error(f" - Token retrieval failed: {e}")
|
|
raise
|
|
|
|
def _login(self, country_code):
|
|
if not self._user_token:
|
|
try:
|
|
user = Credential.username
|
|
pwd = Credential.password
|
|
except Exception:
|
|
user = None
|
|
pwd = None
|
|
if user == "empty" or not user:
|
|
return
|
|
if pwd == "empty" or not user:
|
|
return
|
|
self.log.debug(f" + auth: {self._auth_codes[country_code]}")
|
|
headers = {
|
|
"Authorization": f"Bearer {self._auth_codes[country_code]}",
|
|
"Content-Type": "application/json",
|
|
}
|
|
data = self.session.post(
|
|
url=self.config["endpoints"]["validate"],
|
|
headers=headers,
|
|
data=json.dumps({"principal": user, "provider": "email"}).encode(),
|
|
).json()
|
|
if not data.get("exists"):
|
|
self.log.exit(" - Invalid email address")
|
|
|
|
data = self.session.post(
|
|
url=self.config["endpoints"]["login"],
|
|
headers=headers,
|
|
data=json.dumps(
|
|
{
|
|
"email": user,
|
|
"password": pwd,
|
|
"provider": "email",
|
|
}
|
|
).encode(),
|
|
).json()
|
|
self.check_error(data)
|
|
self._user_token = data.get("identity")
|
|
# need to update with valid user's token else will throw an error again
|
|
self._get_token[country_code] = data.get("token")
|
|
|
|
return self._user_token
|