import base64 import os import datetime import time import json import re import click import sys from pathlib import Path from langcodes import Language from http.cookiejar import CookieJar import hmac import hashlib from typing import Optional, Union, Generator from devine.core.config import config from devine.core.constants import AnyTrack from devine.core.manifests import DASH from devine.core.service import Service from devine.core.titles import Episode, Movie, Movies, Series from devine.core.tracks import Chapters, Tracks, Subtitle from devine.core.tracks.attachment import Attachment from devine.core.search_result import SearchResult from devine.core.downloaders import n_m3u8dl_re class VIKI(Service): """ Service code for Viki Written by ToonsHub, improved by @sp4rk.y Authorization: None (Free SD) | Cookies (Free and Paid Titles) Security: FHD@L3 """ TITLE_RE = r"^(?:https?://(?:www\.)?viki\.com/(?:tv|movies)/)(?P[a-z0-9]+)(?:-.+)?$" # GEOFENCE = ("ca",) @staticmethod @click.command(name="VIKI", short_help="https://www.viki.com", help=__doc__) @click.argument("title", type=str) @click.option("-m", "--movie", is_flag=True, default=False, help="Title is a Movie.") @click.pass_context def cli(ctx, **kwargs): return VIKI(ctx, **kwargs) def __init__(self, ctx, title: str, movie: bool): self.title = title # Decide if it's a movie or series if "/movies/" in self.title: self.is_movie = True else: self.is_movie = movie self.device_id = "157428845d" self.app_id = "100531a" self.app_secret_key = "a4e52e9b08620b7131d1830c71cde6cf03c4a7b00d664d9dec8ee27a19d13ba0" super().__init__(ctx) # Set default "browser" style headers for normal usage self.session.headers.update( { "user-agent": self.config["browser"]["headers"]["user-agent"], "x-client-user-agent": self.config["browser"]["headers"]["user-agent"], "x-viki-app-ver": self.config["browser"]["headers"]["x-viki-app-ver"], "x-viki-as-id": self.config["browser"]["headers"]["x-viki-as-id"], } ) def authenticate(self, cookies: Optional[CookieJar] = None, credential=None): if cookies: self.session.cookies.update(cookies) return cache_file = Path(config.directories.cache / self.__class__.__name__ / "token.json") # Check if session cache exists and is still valid if os.path.exists(cache_file): with open(cache_file, "r") as f: session_data = json.load(f) if session_data.get("expiry_time") > time.time(): self.token = session_data["token"] self.session.cookies.update(session_data["cookies"]) self.log.info("Reusing cached session token.") return self.token else: self.log.info("Session expired, re-authenticating.") # Step 1: Get device registration code login_code_data = {"type": "androidtv", "device_id": self.device_id} login_code_data_str = json.dumps(login_code_data, separators=(",", ":")) timestamp_login_code = str(round(time.time())) code_gen_api = f"/v5/devices.json?app={self.app_id}&t={timestamp_login_code}{login_code_data_str}" signature = hmac.new( self.app_secret_key.encode("utf-8"), code_gen_api.encode("utf-8"), hashlib.sha1 ).hexdigest() login_code_headers = { **self.config["tv_login_headers"], "signature": signature, "timestamp": timestamp_login_code, } res = self.session.post( self.config["endpoints"]["login_code"], headers=login_code_headers, data=login_code_data_str ) res.raise_for_status() result_json = res.json() code = result_json["device_registration_code"] self.log.info(f"Login Code: {code}") self.log.info("Go to https://www.viki.com/androidtv and enter the Login code above.") input("Press Enter after you have logged in.") # Step 2: Verify code timestamp_verify_data = str(round(time.time())) verify_api_str = ( f"/v5/devices/{code}.json?device_code={code}&type=androidtv&app={self.app_id}&t={timestamp_verify_data}" ) verify_sig = hmac.new( self.app_secret_key.encode("utf-8"), verify_api_str.encode("utf-8"), hashlib.sha1 ).hexdigest() verify_headers = { **self.config["tv_login_headers"], "signature": verify_sig, "timestamp": timestamp_verify_data, } r_verify = self.session.get(self.config["endpoints"]["verify"].format(code=code), headers=verify_headers) r_verify.raise_for_status() device_token = r_verify.json()["device_token"] # Step 3: Exchange device token for session token timestamp_token_api = str(round(time.time())) token_api_str = f"/v5/sessions.json?app={self.app_id}&t={timestamp_token_api}" token_data = {"device_token": device_token, "type": "androidtv"} token_data_str = json.dumps(token_data, separators=(",", ":")) token_api_signature = hmac.new( self.app_secret_key.encode("utf-8"), token_api_str.encode("utf-8"), hashlib.sha1 ).hexdigest() token_headers = { **self.config["tv_login_headers"], "signature": token_api_signature, "timestamp": timestamp_token_api, } r_token = self.session.post( self.config["endpoints"]["session_token"], headers=token_headers, data=token_data_str ) r_token.raise_for_status() token_json = r_token.json() self.token = token_json.get("token") expiry_time = time.time() + token_json.get("expiry_time", 3600) cache_file.parent.mkdir(parents=True, exist_ok=True) with open(cache_file, "w") as f: json.dump({"token": self.token, "expiry_time": expiry_time, "cookies": self.session.cookies.get_dict()}, f) self.log.debug(f"Session Token: {self.token} (expires in {token_json.get('expiry_time')} seconds)") return self.token def search(self) -> Generator[SearchResult, None, None]: query = self.title response = self.session.get( self.config["endpoints"]["search_endpoint_url"], params={ "term": query, "app": "100000a", "per_page": 10, "blocked": "true", }, ) response.raise_for_status() search_data = response.json() for result in search_data["response"]: media_type = "TV" if result["type"] == "series" else "Movie" year = None distributors = result.get("distributors") if distributors: from_date = distributors[0].get("from") if from_date: year_match = re.match(r"^\d{4}", from_date) if year_match: year = year_match.group() label = media_type if year: label += f" ({year})" if "viki_air_time" in result: release_time = datetime.datetime.fromtimestamp(result["viki_air_time"], datetime.timezone.utc) if release_time > datetime.datetime.now(datetime.timezone.utc): time_diff = release_time - datetime.datetime.now(datetime.timezone.utc) days, seconds = time_diff.days, time_diff.seconds hours = days * 24 + seconds // 3600 minutes = (seconds % 3600) // 60 if hours > 0: label = f"In {hours} hours" elif minutes > 0: label = f"In {minutes} minutes" else: label = "In less than a minute" yield SearchResult( id_=result["id"], title=result["titles"]["en"], description=result.get("descriptions", {}).get("en", "")[:200] + "...", label=label, url=f"https://www.viki.com/tv/{result['id']}", ) def get_titles(self) -> Union[Movies, Series]: match = re.match(self.TITLE_RE, self.title) if match: title_id = match.group("id") else: title_id = self.title if not self.is_movie: self.is_movie = False episodes = [] pagenumber = 1 special_episode_number = 1 while True: series_metadata_url = self.config["endpoints"]["episode_metadata"].format( id=title_id, pagenumber=pagenumber ) series_metadata = self.session.get(series_metadata_url).json() self.series_metadata = series_metadata if not series_metadata["response"] and not series_metadata["more"]: break show_year = self.get_show_year_from_search() for episode in series_metadata["response"]: episode_id = episode["id"] if len(episode_id) < 4: episode_id += "5349" show_title = episode["container"]["titles"]["en"] episode_season = 1 episode_number = episode["number"] current_time = int(time.time()) geo_blocking = episode.get("blocking", {}).get("geo", False) viki_air_time = episode.get("viki_air_time", 0) if geo_blocking: if current_time < viki_air_time: continue else: self.log.info(f"Episode {episode.get('number')} is blocked due to Geo-Location.\n") sys.exit(1) title_match = re.match(r"^(.*?)(?: (\d{4})$| (\d{1,2})$)?", show_title) if title_match: base_title = title_match.group(1) year = title_match.group(2) season = title_match.group(3) if year: show_title = base_title elif season: episode_season = int(season) episode_title_with_year = f"{show_title} {show_year}" if "Special" in episode.get("titles", {}).get("en", "") or "Extra" in episode.get("titles", {}).get( "en", "" ): episode_season = 0 episode_number = special_episode_number special_episode_number += 1 episode_name = None episode_class = Episode( id_=episode_id, title=episode_title_with_year, season=episode_season, number=episode_number, name=episode_name, year=show_year, service=self.__class__, ) if "images" in episode and "poster" in episode["images"] and "url" in episode["images"]["poster"]: episode_class.thumbnail_url = episode["images"]["poster"]["url"] episodes.append(episode_class) pagenumber += 1 return Series(episodes) else: movie_metadata = self.session.get(f"https://www.viki.com/movies/{title_id}").text video_id = re.search(r"https://api.viki.io/v4/videos/(.*?).json", movie_metadata).group(1) movie_metadata = self.session.get(self.config["endpoints"]["video_metadata"].format(id=video_id)).json() self.movie_metadata = movie_metadata movie_id = movie_metadata["id"] movie_name = movie_metadata["titles"]["en"] title_match = re.match(r"^(.*?)(?: (\d{4}))?$", movie_name) if title_match: base_title = title_match.group(1) year = title_match.group(2) if year: movie_name = base_title movie_year = self.get_show_year_from_search() movie_class = Movie(id_=movie_id, name=movie_name, year=movie_year, service=self.__class__) movie_class.thumbnail_url = movie_metadata["images"]["poster"]["url"] return Movies([movie_class]) def get_show_year_from_search(self) -> Optional[str]: if hasattr(self, "movie_metadata") and self.movie_metadata: query = self.movie_metadata["container"]["titles"]["en"] else: query = self.series_metadata["response"][0]["container"]["titles"]["en"] response = self.session.get( self.config["endpoints"]["search_endpoint_url"], params={ "term": query, "app": "100000a", "per_page": 50, "blocked": "true", }, ) response.raise_for_status() search_data = response.json() for result in search_data.get("response", []): if result.get("id") == self.title: distributors = result.get("distributors") if distributors: from_date = distributors[0].get("from") if from_date: return from_date[:4] match = re.match(self.TITLE_RE, self.title) if match: extracted_id = match.group("id") if extracted_id == result.get("id"): distributors = result.get("distributors") if distributors: from_date = distributors[0].get("from") if from_date: return from_date[:4] return "2024" def get_tracks(self, title: Union[Movie, Episode]) -> Tracks: chinese_language_map = { "zh": "zh-Hans", "zt": "zh-TW", "zh-tw": "zh-Hant", "zh-hk": "zh-Hant", "zh-hans": "zh-Hans", "zh-hant": "zh-Hant", } original_id = title.id[:-4] if title.id.endswith("5349") else title.id mpd_info = self.session.get(self.config["endpoints"]["mpd_api"].format(id=original_id)) mpd_data = mpd_info.json() try: mpd_url = mpd_data["queue"][1]["url"] except (KeyError, IndexError): self.log.info("Episode not yet available\n") sys.exit(1) mpd_lang = mpd_data["video"]["origin"]["language"].lower() if mpd_lang == "pt": mpd_lang_mapped = "pt-br" elif mpd_lang in chinese_language_map: mpd_lang_mapped = chinese_language_map[mpd_lang] else: mpd_lang_mapped = mpd_lang license_url = json.loads(base64.b64decode(mpd_data["drm"]).decode("utf-8", "ignore"))["dt3"] tracks = DASH.from_url(url=mpd_url).to_tracks(language=mpd_lang_mapped) for track in tracks: track.data["license_url"] = license_url for track in tracks.audio: track.data["original_language"] = track.language track.language = Language.make(language=mpd_lang_mapped) tracks.subtitles.clear() def clean_language_label(label: str) -> str: label = re.sub(r"<[^>]+>", "", label) label = re.sub(r"\s*\(\d+%\)", "", label) return label.strip() # Download thumbnail if available if hasattr(title, "thumbnail_url") and title.thumbnail_url: thumbnail_url = title.thumbnail_url thumbnail_name = f"{mpd_data['video']['container']['titles']['en']} Episode {mpd_data['video']['number']} thumbnail" try: # Use Attachment.from_url to create and download the thumbnail thumbnail_attachment = Attachment.from_url( url=thumbnail_url, name=thumbnail_name, mime_type="image/jpeg", description="Thumbnail", session=self.session, ) if not hasattr(tracks, "attachments"): tracks.attachments = [] tracks.attachments.append(thumbnail_attachment) except Exception as e: self.log.warning(f"Failed to download thumbnail: {e}") else: self.log.warning("Thumbnail URL not available for this title.") # Handle subtitles stream_subtitles = mpd_data.get("streamSubtitles", {}).get("dash", []) if not stream_subtitles: self.log.warning("No subtitles available in 'streamSubtitles.dash'.") else: for sub in stream_subtitles: if sub.get("percentage", 0) > 95 and sub.get("kind") == "subtitles": language_code_raw = sub.get("srclang", "").lower() language_label = sub.get("label", language_code_raw) language_name = clean_language_label(language_label) if language_code_raw.startswith("z"): language_code_mapped = chinese_language_map.get(language_code_raw, language_code_raw) script = "Simplified" if language_code_mapped == "zh-Hans" else "Traditional" language_name = f"Chinese ({script})" elif language_code_raw == "pt": language_code_mapped = "pt-br" else: language_code_mapped = language_code_raw is_original = language_code_mapped == mpd_lang_mapped subtitle_id = f"{title.id}_{sub.get('id', '')}_{language_code_mapped}" subtitle_track = Subtitle( id_=subtitle_id, url=sub["src"], codec=Subtitle.Codec.WebVTT, language=language_code_mapped, is_original_lang=is_original, forced=False, sdh=False, name=language_name, ) if sub.get("default"): subtitle_track.default = True tracks.add(subtitle_track, warn_only=True) if not tracks.subtitles: self.log.error("No Subtitles") sys.exit(1) if not any(sub.language.language == "en" for sub in tracks.subtitles): self.log.error("No English Subtitles") sys.exit(1) for track in tracks: if track not in tracks.attachments: track.downloader = n_m3u8dl_re return tracks def get_chapters(self, *_, **__) -> Chapters: return Chapters() def get_widevine_service_certificate(self, challenge: bytes, track: AnyTrack, *_, **__) -> bytes | str: return self.get_widevine_license(challenge, track) def get_widevine_license(self, challenge: bytes, track: AnyTrack, *_, **__) -> bytes: return self.session.post(url=track.data["license_url"], data=challenge).content