from __future__ import annotations import json import re import sys from hashlib import md5 from typing import Any, Optional, Union, List import click from click import Context from langcodes import Language from requests import Request from unshackle.core.constants import AnyTrack from unshackle.core.service import Service from unshackle.core.manifests import HLS, DASH from unshackle.core.titles import Title_T, Titles_T, Episode, Movie, Movies, Series from unshackle.core.tracks import Chapters, Tracks, Audio, Video, Subtitle from unshackle.core.utils.collections import as_list class TELASA(Service): """ Service code for Telasa (https://www.telasa.jp/). \b Authorization: Cookies Security: Widevine: L3: 1080p (Possibly 2160p?) playReady: SL2000: 1080p Author: Crash@NSBC (Ported to Unshackle) """ ALIASES = ("TLSA", "telasa") TITLE_RE = r"^(?:https?://(?:www\.)?telasa\.jp)/(?Pseries|videos)/(?P\d+)" @staticmethod @click.command(name="Telasa", short_help="https://www.telasa.jp/") @click.argument("title", type=str) @click.option("-s", "--single", is_flag=True, default=False, help="Download only the specific ID provided (do not fetch other seasons).") @click.option("-t", "--title", "force_title", type=str, default=None, help="Force a custom series/movie title.") @click.pass_context def cli(ctx: Context, **kwargs: Any) -> Telasa: return TELASA(ctx, **kwargs) def __init__(self, ctx: Context, title: str, single: bool, force_title: str | None = None): self.title_input = title super().__init__(ctx) self.single = single self.force_title = force_title parse = re.search(self.TITLE_RE, self.title_input) if not parse: self.title_id = self.title_input self.is_movie = True else: self.title_id = parse.group("id") self.is_movie = bool(parse.group("type") == "videos") self.cdm = ctx.obj.cdm self.session.headers.update({ "Origin": "https://www.telasa.jp", "Referer": "https://www.telasa.jp/", "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" }) def authenticate(self, cookies: Optional[Any] = None, credential: Optional[Any] = None) -> None: if not cookies: self.log.error("Service requires Cookies for Authentication.", exc_info=False) sys.exit(1) super().authenticate(cookies, credential) self.log.info("Logging into Telasa...") last_auth_user = next( (cookie.value for cookie in self.session.cookies if cookie.name.endswith("LastAuthUser")), None, ) if not last_auth_user: self.log.error("- LastAuthUser not Found in cookies.", exc_info=False) import sys; sys.exit(1) prefix = "CognitoIdentityServiceProvider.71t3jfmis742ffmevo4677evms" access_token_cookie = next( (cookie for cookie in self.session.cookies if cookie.name == f"{prefix}.{last_auth_user}.accessToken"), None, ) if not access_token_cookie: self.log.error("- Access Token not Found in cookies.", exc_info=False) import sys; sys.exit(1) access_token = access_token_cookie.value # Validate/Refresh Token logic req = self.session.post( url=self.config["endpoints"]["auth"], headers={ "Content-Type": "application/x-amz-json-1.1", "X-Amz-Target": "AWSCognitoIdentityProviderService.GetUser", "X-Amz-User-Agent": "aws-amplify/5.0.4 auth framework/2", }, json={"AccessToken": access_token}, ) data = req.json() if data.get("__type") == "NotAuthorizedException": self.log.info(" - Access Token expired, refreshing...") refresh_token = next( (cookie for cookie in self.session.cookies if cookie.name == f"{prefix}.{last_auth_user}.refreshToken"), None, ) if not refresh_token: self.log.error("- Refresh Token not Found.", exc_info=False) import sys; sys.exit(1) req = self.session.post( url=self.config["endpoints"]["auth"], headers={ "Content-Type": "application/x-amz-json-1.1", "x-amz-Target": "AWSCognitoIdentityProviderService.InitiateAuth", "x-amz-User-Agent": "aws-amplify/5.0.4 auth framework/2", }, json={ "ClientId": "71t3jfmis742ffmevo4677evms", "AuthFlow": "REFRESH_TOKEN_AUTH", "AuthParameters": { "REFRESH_TOKEN": refresh_token.value, }, }, ) data = req.json() if data.get("__type") == "NotAuthorizedException": self.log.error(f" - Failed to Refresh Access Token -> {data.get('message')}", exc_info=False) import sys; sys.exit(1) access_token = data["AuthenticationResult"]["AccessToken"] device_id = next( iter(cookie for cookie in self.session.cookies if cookie.name == "did"), None, ) if not device_id: self.log.error(" - Device ID (did) not Found in cookies.", exc_info=False) import sys; sys.exit(1) self.session.headers.update({ "Authorization": f"Bearer {access_token}", "X-Device-Id": device_id.value, }) self.log.info(" + Authentication Successful.") def get_titles(self) -> Titles_T: if self.is_movie: return self._get_movie_titles() else: return self._get_series_titles() def _get_movie_titles(self) -> Movies: req = self.session.get(self.config["endpoints"]["base"].format(id=self.title_id)) data_match = re.search(r'