diff --git a/TELASA/__init__.py b/TELASA/__init__.py new file mode 100644 index 0000000..e31108c --- /dev/null +++ b/TELASA/__init__.py @@ -0,0 +1,404 @@ +from __future__ import annotations + +import json +import re +import sys +from hashlib import md5 +from typing import Any, Optional, Union, List + +import click +from click import Context +from langcodes import Language +from requests import Request + +from unshackle.core.constants import AnyTrack +from unshackle.core.service import Service +from unshackle.core.manifests import HLS, DASH +from unshackle.core.titles import Title_T, Titles_T, Episode, Movie, Movies, Series +from unshackle.core.tracks import Chapters, Tracks, Audio, Video, Subtitle +from unshackle.core.utils.collections import as_list + +class TELASA(Service): + """ + Service code for Telasa (https://www.telasa.jp/). + \b + Authorization: Cookies + Security: + Widevine: + L3: 1080p (Possibly 2160p?) + playReady: + SL2000: 1080p + Author: Crash@NSBC (Ported to Unshackle) + """ + + ALIASES = ("TLSA", "telasa") + TITLE_RE = r"^(?:https?://(?:www\.)?telasa\.jp)/(?Pseries|videos)/(?P\d+)" + + @staticmethod + @click.command(name="Telasa", short_help="https://www.telasa.jp/") + @click.argument("title", type=str) + @click.option("-s", "--single", is_flag=True, default=False, help="Download only the specific ID provided (do not fetch other seasons).") + @click.option("-t", "--title", "force_title", type=str, default=None, help="Force a custom series/movie title.") + @click.pass_context + def cli(ctx: Context, **kwargs: Any) -> Telasa: + return TELASA(ctx, **kwargs) + + def __init__(self, ctx: Context, title: str, single: bool, force_title: str | None = None): + self.title_input = title + super().__init__(ctx) + + self.single = single + self.force_title = force_title + + parse = re.search(self.TITLE_RE, self.title_input) + if not parse: + self.title_id = self.title_input + self.is_movie = True + else: + self.title_id = parse.group("id") + self.is_movie = bool(parse.group("type") == "videos") + + self.cdm = ctx.obj.cdm + self.session.headers.update({ + "Origin": "https://www.telasa.jp", + "Referer": "https://www.telasa.jp/", + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" + }) + + def authenticate(self, cookies: Optional[Any] = None, credential: Optional[Any] = None) -> None: + if not cookies: + self.log.error("Service requires Cookies for Authentication.", exc_info=False) + sys.exit(1) + + super().authenticate(cookies, credential) + + self.log.info("Logging into Telasa...") + + last_auth_user = next( + (cookie.value for cookie in self.session.cookies if cookie.name.endswith("LastAuthUser")), + None, + ) + if not last_auth_user: + self.log.error("- LastAuthUser not Found in cookies.", exc_info=False) + import sys; sys.exit(1) + + prefix = "CognitoIdentityServiceProvider.71t3jfmis742ffmevo4677evms" + access_token_cookie = next( + (cookie for cookie in self.session.cookies if cookie.name == f"{prefix}.{last_auth_user}.accessToken"), + None, + ) + + if not access_token_cookie: + self.log.error("- Access Token not Found in cookies.", exc_info=False) + import sys; sys.exit(1) + + access_token = access_token_cookie.value + + # Validate/Refresh Token logic + req = self.session.post( + url=self.config["endpoints"]["auth"], + headers={ + "Content-Type": "application/x-amz-json-1.1", + "X-Amz-Target": "AWSCognitoIdentityProviderService.GetUser", + "X-Amz-User-Agent": "aws-amplify/5.0.4 auth framework/2", + }, + json={"AccessToken": access_token}, + ) + + data = req.json() + + if data.get("__type") == "NotAuthorizedException": + self.log.info(" - Access Token expired, refreshing...") + refresh_token = next( + (cookie for cookie in self.session.cookies if cookie.name == f"{prefix}.{last_auth_user}.refreshToken"), + None, + ) + + if not refresh_token: + self.log.error("- Refresh Token not Found.", exc_info=False) + import sys; sys.exit(1) + + req = self.session.post( + url=self.config["endpoints"]["auth"], + headers={ + "Content-Type": "application/x-amz-json-1.1", + "x-amz-Target": "AWSCognitoIdentityProviderService.InitiateAuth", + "x-amz-User-Agent": "aws-amplify/5.0.4 auth framework/2", + }, + json={ + "ClientId": "71t3jfmis742ffmevo4677evms", + "AuthFlow": "REFRESH_TOKEN_AUTH", + "AuthParameters": { + "REFRESH_TOKEN": refresh_token.value, + }, + }, + ) + + data = req.json() + if data.get("__type") == "NotAuthorizedException": + self.log.error(f" - Failed to Refresh Access Token -> {data.get('message')}", exc_info=False) + import sys; sys.exit(1) + + access_token = data["AuthenticationResult"]["AccessToken"] + + device_id = next( + iter(cookie for cookie in self.session.cookies if cookie.name == "did"), + None, + ) + + if not device_id: + self.log.error(" - Device ID (did) not Found in cookies.", exc_info=False) + import sys; sys.exit(1) + + self.session.headers.update({ + "Authorization": f"Bearer {access_token}", + "X-Device-Id": device_id.value, + }) + self.log.info(" + Authentication Successful.") + + def get_titles(self) -> Titles_T: + if self.is_movie: + return self._get_movie_titles() + else: + return self._get_series_titles() + + def _get_movie_titles(self) -> Movies: + req = self.session.get(self.config["endpoints"]["base"].format(id=self.title_id)) + data_match = re.search(r'