import re import time import sys import os from datetime import datetime from typing import Union, Generator, Optional from urllib.parse import urljoin from http.cookiejar import CookieJar from pathlib import Path import json import click import requests from langcodes import Language from devine.core.constants import AnyTrack from devine.core.service import Service from devine.core.titles import Episode, Movie, Movies, Series from devine.core.tracks import Tracks, Chapters, Subtitle, Chapter from devine.core.tracks.attachment import Attachment from devine.core.credential import Credential from devine.core.search_result import SearchResult from devine.core.downloaders import curl_impersonate, n_m3u8dl_re from devine.core.config import config from devine.core.manifests import DASH import warnings warnings.filterwarnings("ignore", message="chunk_size is ignored") class TFC(Service): """ Service code for iWantTFC Written by @sp4rk.y Authorization: Cookies (Free and Paid Titles) Security: FHD@L3 """ @staticmethod @click.command(name="TFC", short_help="https://www.iwanttfc.com", help=__doc__) @click.argument("title", type=str) @click.option("-m", "--movie", is_flag=True, default=False, help="Title is a Movie.") @click.pass_context def cli(ctx, **kwargs): return TFC(ctx, **kwargs) def __init__(self, ctx, title: str, movie: bool): self.title = title self.is_movie = movie self.credential = None self.token = None self.refresh_token = None self.token_expiry = None super().__init__(ctx) def authenticate( self, cookies: Optional[CookieJar] = None, credential: Optional[Credential] = None ) -> Optional[str]: if self.credential is None and credential: self.credential = credential # Check for cached token cache_file = Path(config.directories.cache / self.__class__.__name__ / "token.json") # Check if session cache exists and is still valid if cache_file.exists(): try: with open(cache_file, "r", encoding="utf-8") as f: session_data = json.load(f) if session_data.get("token_expiry", 0) > time.time(): self.log.info(" + Using cached authentication token...") self.token = session_data["token"] self.refresh_token = session_data.get("refresh_token") self.token_expiry = session_data["token_expiry"] self.session.headers.update({"Authorization": f"Bearer {self.token}"}) # If there are cookies stored, restore them if "cookies" in session_data: for cookie_name, cookie_value in session_data["cookies"].items(): self.session.cookies.set(cookie_name, cookie_value) return self.token else: self.log.info(" + Token expired, re-authenticating...") except (json.JSONDecodeError, KeyError) as e: self.log.warning(f" + Error reading cache: {e}") self.log.info(" + Authenticating with TFC...") base_headers = { "User-Agent": self.config["browser"]["headers"]["user-agent"], "Content-Type": "application/x-www-form-urlencoded", "Accept": "application/json, text/plain, */*", "Origin": "https://www.iwanttfc.com", "Referer": "https://www.iwanttfc.com/", } self.session.headers.update(base_headers) data_auth = ( f"password={self.credential.password}&email={self.credential.username}&deviceID={self.config['UUID']}" ) r1 = self.session.post(self.config["endpoints"]["api_login"], data=data_auth) r1.raise_for_status() j1 = r1.json() user_auth_1 = j1.get("UserAuthentication") if not user_auth_1: raise ValueError("No UserAuthentication from /user/auth") login_url = self.config["endpoints"].get("user_login", "https://www.iwanttfc.com/api/1.0/user/login") r2 = self.session.post(login_url, headers={"UserAuthentication": user_auth_1}) r2.raise_for_status() r3 = self.session.post( self.config["endpoints"]["api_login"], data=data_auth, headers={"UserAuthentication": user_auth_1} ) r3.raise_for_status() j3 = r3.json() user_auth_2 = j3.get("UserAuthentication") if not user_auth_2: raise ValueError("No UserAuthentication from second /user/auth") user_rights_url = self.config["endpoints"].get("user_rights", "https://www.iwanttfc.com/api/1.0/user/rights") r4 = self.session.get(user_rights_url, headers={"UserAuthentication": user_auth_2}) r4.raise_for_status() self.token = user_auth_2 self.refresh_token = j1.get("refreshToken", None) # TFC tokens typically expire in 24 hours (86400 seconds), but we'll set it to just under 24 hours to be safe self.token_expiry = time.time() + 86300 self.session.headers.update({"Authorization": f"Bearer {self.token}"}) # Cache the token and cookies try: cache_file.parent.mkdir(parents=True, exist_ok=True) with open(cache_file, "w", encoding="utf-8") as f: json.dump( { "token": self.token, "refresh_token": self.refresh_token, "token_expiry": self.token_expiry, "cookies": {k: v for k, v in self.session.cookies.items()}, }, f, ) self.log.info(" + Authentication token and cookies cached successfully") except Exception as e: self.log.warning(f" + Failed to cache token: {e}") return self.token def search(self) -> Generator[SearchResult, None, None]: query = self.title headers = {**self.config["search"]["headers"], "Authorization": f"Bearer {self.token}"} data = { "requests": [ {"query": query, "indexName": "www_iwanttfc_com_items", "params": "hitsPerPage=200"}, {"query": query, "indexName": "www_iwanttfc_com_tag_id_cast", "params": "hitsPerPage=200"}, ] } response = self.session.post(self.config["endpoints"]["api_search"], headers=headers, json=data) response.raise_for_status() results = response.json()["results"] for result in results[0]["hits"]: title = result.get("title", {}).get("en", "") if not title: continue try: js_value = self.get_js_value() if not js_value: self.log.warning("Could not get JS value for detail URL") continue # Use the direct item ID instead of objectID for detail lookup item_id = result.get("id") or result["objectID"] detail_url = self.config["endpoints"]["api_playback"].format(js=js_value, id=item_id) detail_response = self.session.get( detail_url, headers={ "Authorization": f"Bearer {self.token}", "Accept": "application/json", "Origin": "https://www.iwanttfc.com", "Referer": "https://www.iwanttfc.com/", }, ) detail_response.raise_for_status() detail_data = detail_response.json() description = detail_data.get("description", {}).get("en", "") if description: description = description[:200] + "..." media_type = "TV" if "children" in detail_data else "Movie" year = detail_data.get("release_year") episode_count = 0 if media_type == "TV": episode_count = len( [episode for episode in detail_data.get("children", []) if "-tlr" not in episode["id"]] ) label = media_type if year: label += f" ({year})" if media_type == "TV": label += f" {episode_count} Episode{'' if episode_count == 1 else 's'}" yield SearchResult(id_=item_id, title=title, description=description, label=label) except requests.RequestException as e: self.log.warning(f"Failed to get details for {result.get('id', result['objectID'])}: {e}") continue def get_js_value(self) -> Optional[str]: for _ in curl_impersonate( urls="https://www.iwanttfc.com/#!/browse", output_dir=config.directories.temp, filename="browse_page.html", ): pass html_path = config.directories.temp / "browse_page.html" with html_path.open("r", encoding="utf8") as f: html_content = f.read() match = re.search(r'src="https://absprod-static.iwanttfc.com/c/6/catalog/(.*?)/script.js', html_content) if match: return match.group(1) return None def get_titles(self) -> Union[Movies, Series]: headers = self.config["browser"]["headers"] try: title_metadata = requests.get( self.config["endpoints"]["api_playback"].format(js=self.get_js_value(), id=self.title), headers=headers ).json() except ValueError: self.log.warning("Show title does not exist.") sys.exit(1) if "children" in title_metadata: episodes = [] for episode in title_metadata.get("children", []): episode_id = episode["id"] match = re.match(r".*-s(\d+)e(\d+)$", episode_id, re.IGNORECASE) if not match: continue season, number = map(int, match.groups()) episode_obj = Episode( id_=episode_id, title=title_metadata.get("title", {}).get("en"), season=season, number=number, language="fil", year=title_metadata.get("release_year"), service=self.__class__, ) episodes.append(episode_obj) return Series(episodes) else: movie_name = title_metadata.get("title", {}).get("en") movie_year = title_metadata.get("release_year") movie_class = Movie( id_=self.title, name=movie_name, year=movie_year, service=self.__class__, ) return Movies([movie_class]) def get_tracks(self, title: Union[Movie, Episode]) -> Tracks: if not title.data: episode_data = requests.get( self.config["endpoints"]["api_playback"].format(js=self.get_js_value(), id=title.id) ).json() title.data = episode_data else: episode_data = title.data mpd_urls = episode_data.get("media", {}).get("mpds", []) subtitle_data = [ ( urljoin(self.config["endpoints"]["api_subtitle"], caption.get("id")) + ".vtt", caption.get("lang"), ) for caption in episode_data.get("media", {}).get("captions", []) ] tracks = Tracks() for mpd_url in mpd_urls: mpd_tracks = DASH.from_url(url=mpd_url, session=self.session).to_tracks(language=title.language or "fil") for track in mpd_tracks: if not tracks.exists(by_id=track.id): track.data["episode_id"] = episode_data.get("id") tracks.add(track) # Force Filipino language on all audio tracks regardless of what's in the MPD for track in tracks.audio: mpd_lang = title.language or "fil" track.language = Language.get(mpd_lang) track.is_original_lang = True # Force Filipino language on all video tracks regardless of what's in the MPD for track in tracks.videos: mpd_lang = title.language or "fil" track.language = Language.get(mpd_lang) track.is_original_lang = True self.log.debug(f"Processing {len(subtitle_data)} subtitle tracks") for subtitle_url, language in subtitle_data: try: subtitle_session = requests.Session() subtitle_session.headers.update( { "User-Agent": self.config["browser"]["headers"]["user-agent"], "Referer": self.config["browser"]["headers"]["Referer"], "Origin": self.config["browser"]["headers"]["Origin"], "sec-ch-ua": self.config["browser"]["headers"]["sec-ch-ua"], "sec-ch-ua-mobile": self.config["browser"]["headers"]["sec-ch-ua-mobile"], "sec-ch-ua-platform": self.config["browser"]["headers"]["sec-ch-ua-platform"], "DNT": self.config["browser"]["headers"]["DNT"], "sec-fetch-dest": "empty", "sec-fetch-mode": "cors", "sec-fetch-site": "same-site", } ) # Check if subtitle URL is valid before creating the track self.log.debug(f"Checking subtitle URL: {subtitle_url}") response = subtitle_session.get(url=subtitle_url, stream=True, timeout=5) response.raise_for_status() subtitle_id = subtitle_url.split("/")[-1].split(".")[0] tracks.add( Subtitle( id_=subtitle_id, url=subtitle_url, codec=Subtitle.Codec.WebVTT, language=language, ) ) except requests.exceptions.HTTPError as e: self.log.warning(f"Subtitle URL not accessible: {subtitle_url} (HTTP {e.response.status_code})") except requests.exceptions.RequestException as e: self.log.warning(f"Request error for subtitle {subtitle_url}: {e}") except Exception as e: self.log.warning(f"Unexpected error processing subtitle {subtitle_url}: {e}") chapters = self.get_chapters(title) tracks.chapters = Chapters(chapters) thumbnail_id = episode_data.get("thumbnail") or episode_data.get("poster") or episode_data.get("thumb") if not thumbnail_id: images = episode_data.get("images", []) if images: thumbnail_data = images[0] thumbnail_id = thumbnail_data.get("id") or thumbnail_data.get("url").split("/")[-1].split(".")[0] if thumbnail_id: thumbnail_base_url = self.config["endpoints"]["api_thumbnail"] thumbnail_url = f"{thumbnail_base_url}{thumbnail_id}.jpg" thumbnail_response = self.session.get(thumbnail_url) if thumbnail_response.status_code == 200: thumbnail_filename = f"{title.id}_thumbnail.jpg" thumbnail_path = config.directories.temp / thumbnail_filename os.makedirs(config.directories.temp, exist_ok=True) with open(thumbnail_path, "wb") as f: f.write(thumbnail_response.content) thumbnail_attachment = Attachment( path=thumbnail_path, name=thumbnail_filename, mime_type="image/jpeg", description="Thumbnail", ) tracks.attachments.append(thumbnail_attachment) else: self.log.warning("Thumbnail not found for title.") if not tracks.subtitles: self.log.error("No Subtitles") sys.exit(1) if not any(sub.language.language == "en" for sub in tracks.subtitles): self.log.error("No English Subtitles") sys.exit(1) for track in tracks: if track not in tracks.attachments: track.downloader = n_m3u8dl_re return tracks def get_chapters(self, title: Union[Movie, Episode]) -> list[Chapter]: if isinstance(title, Episode) and not title.data: episode_data = requests.get( self.config["endpoints"]["api_playback"].format(js=self.get_js_value(), id=title.id) ).json() title.data = episode_data cuepoints = title.data.get("cuepoints", []) seen = set() unique_cuepoints = [] for cp in cuepoints: if cp not in seen: seen.add(cp) unique_cuepoints.append(cp) try: sorted_cuepoints = sorted(unique_cuepoints, key=lambda x: datetime.strptime(x, "%H:%M:%S.%f")) except ValueError as e: self.log.error(f"Error parsing cuepoints: {e}") sorted_cuepoints = [] chapters = [Chapter(name="Chapter 1", timestamp="00:00:00.000")] for i, cuepoint in enumerate(sorted_cuepoints, start=2): try: timestamp = datetime.strptime(cuepoint, "%H:%M:%S.%f").time() chapters.append(Chapter(name=f"Chapter {i}", timestamp=timestamp.strftime("%H:%M:%S.%f")[:-3])) except ValueError: self.log.warning(f"Invalid cuepoint format: {cuepoint}") return chapters def get_widevine_service_certificate(self, challenge: bytes, track: AnyTrack, *_, **__) -> bytes | str: # TODO: Cache the returned service cert return self.get_widevine_license(challenge, track) def get_widevine_license(self, challenge: bytes, track: AnyTrack, *_, **__) -> bytes: episode_id = track.data.get("episode_id") license_url = self.config["endpoints"]["api_license"] license_url += f"?itemID={episode_id}" license_url += f"&UserAuthentication={self.token}" license_url += "&build=52b61137ff3af37f55e0" return self.session.post(url=license_url, data=challenge).content