import math import uuid from http.cookiejar import CookieJar from typing import Optional, Union, Generator import click from datetime import datetime, timedelta from devine.core.service import Service from devine.core.titles import Titles_T, Title_T, Series, Episode from devine.core.constants import AnyTrack from devine.core.credential import Credential from devine.core.tracks import Chapters, Tracks, Subtitle, Chapter from devine.core.search_result import SearchResult from devine.core.manifests import DASH class CR(Service): """ Service code for Crunchyroll (https://crunchyroll.com) \b Author: TPD94 - edited Authorization: Login Robustness: Widevine: L3: 1080p \b Tips: - Use complete title/episode URL or id as input: https://www.crunchyroll.com/series/GG5H5XQ7D/kaiju-no-8 OR GG5H5XQ7D - Supports series """ @staticmethod @click.command(name="CR", short_help="https://crunchyroll.com/", help=__doc__) @click.argument("title", type=str) @click.option("--display_locale", type=str, help="Set display language") @click.pass_context def cli(ctx, **kwargs): return CR(ctx, **kwargs) def __init__(self, ctx, title, display_locale="en-US"): # Set the title, what the user inputs # Try parsing if it's a URL try: # Split the URL into parts by "/" parts = title.split("/") # Set the identifier for "series" identifier_index = parts.index("series") + 1 # Extract the series ID self.title = parts[identifier_index] # If just a series ID except: self.title = title # Set display language self.display_locale = display_locale # Initialize variable for token self.token = None # Initialize variable for refresh token self.refresh_token = None # Initialize variable for token expiry self.token_expiry = None # Initialize variable for credentials self.credential = None # Initiliaze variable for device_id self.device_id = None # Initiliaze variable for device_name self.device_name = None # Initiliaze variable for device_type self.device_type = None # Overriding the constructor super().__init__(ctx) def authenticate(self, cookies: Optional[CookieJar] = None, credential: Optional[Credential] = None) -> None: # Get or generate a device_id for the session self.device_id = self.config.get("device", {}).get("id") or str(uuid.uuid4()) # Get or generate a device_name for the session self.device_name = self.config.get("device", {}).get("name") or "AOSP on IA Emulator" # Get or generate a device_type for the session self.device_type = self.config.get("device", {}).get("type") or "Google AOSP on IA Emulator" # Load credential for the whole session if self.credential is None: self.credential = credential # Check if there is no token. if self.token is None: # Assign a variable to the token and send a post request to acquire/refresh auth_response = self.session.post( # Token auth URL url=self.config["endpoints"]["auth_url"], # Headers headers={ "Authorization": "Basic eHVuaWh2ZWRidDNtYmlzdWhldnQ6MWtJUzVkeVR2akUwX3JxYUEzWWVBaDBiVVhVbXhXMTE=", "Content-Type": "application/x-www-form-urlencoded", "ETP-Anonymous-ID": f"{uuid.uuid4()}", }, # Body data={ "username": f"{credential.username}", "password": f"{credential.password}", "grant_type": "password", "scope": "offline_access", "device_id": f"{self.device_id}", "device_name": f"{self.device_name}", "device_type": f"{self.device_type}", }, ).json() # Set the token self.token = auth_response["access_token"] # Set the refresh token self.refresh_token = auth_response["refresh_token"] # Set the token expiry time self.token_expiry = (datetime.now() + timedelta(minutes=4)).timestamp() # Update session headers to have Authorization Bearer token self.session.headers.update({"Authorization": f"Bearer {self.token}"}) # Return the token if called return self.token # Check for token expiry if self.token_expiry: if self.token_expiry < datetime.now().timestamp(): # Assign a variable to the token and send a post request to acquire/refresh auth_response = self.session.post( # Token auth URL url=self.config["endpoints"]["auth_url"], # Headers headers={ "Authorization": "Basic eHVuaWh2ZWRidDNtYmlzdWhldnQ6MWtJUzVkeVR2akUwX3JxYUEzWWVBaDBiVVhVbXhXMTE=", "Content-Type": "application/x-www-form-urlencoded", "ETP-Anonymous-ID": f"{uuid.uuid4()}", }, # Body data={ "refresh_token": self.refresh_token, "grant_type": "refresh_token", "scope": "offline_access", "device_id": f"{self.device_id}", "device_name": f"{self.device_name}", "device_type": f"{self.device_type}", }, ).json() # Set the token self.token = auth_response["access_token"] # Set the refresh token self.refresh_token = auth_response["refresh_token"] # Set the token expiry time self.token_expiry = (datetime.now() + timedelta(minutes=4)).timestamp() # Update session headers to have Authorization Bearer token self.session.headers.update({"Authorization": f"Bearer {self.token}"}) # Return the token if called return self.token # If neither, return token if called from function return self.token def get_titles(self) -> Titles_T: # Create a list for episodes episodes = [] # Check/Call for authorization bearer token self.authenticate(credential=self.credential) # Get each season from series metadata for season in self.session.get( url=self.config["endpoints"]["series_metadata"].format(title=self.title), params={"force_locale": "", "locale": f"{self.display_locale}"}, ).json()["data"]: # Get each episode from season metadata for episode in self.session.get( url=self.config["endpoints"]["episode_metadata"].format(season=season["id"]), params={"locale": f"{self.display_locale}"}, ).json()["data"]: # Get the original language guid if episode["versions"]: for version in episode["versions"]: if version["original"] == True: original_id = version["guid"] else: original_id = episode["id"] # Extract correct series and episode number from identifier try: identifier = episode["identifier"] identifier_parts = identifier.split("|") sn = None en = None if len(identifier_parts) == 3: if identifier_parts[1].startswith("S") and identifier_parts[1][1:].isdigit(): sn = int((identifier_parts[1])[1:]) en = float((identifier_parts[2])[1:]) elif identifier_parts[1].startswith("S") and not identifier_parts[1][1:].isdigit(): sn = episode["season_number"] en = float((identifier_parts[2])[1:]) elif identifier_parts[1] == "M" or identifier_parts[1].startswith("O"): sn = episode["season_sequence_number"] en = math.ceil(episode["sequence_number"]) except: pass # Append the available episodes episodes.append( Episode( # id_=episode['id'], id_=original_id, # title=episode['season_title'], title=episode["series_title"], # season=episode['season_sequence_number'], season=sn or episode["season_number"], # number=math.ceil(episode['sequence_number']), number=math.ceil(en or episode["sequence_number"]), name=episode["title"], year=episode["episode_air_date"][:4], language=episode["audio_locale"], service=self.__class__, ) ) # Return the series return Series(episodes) def get_tracks(self, title: Title_T) -> Tracks: # Initialize a tracks class object tracks = Tracks() # Check/Call for authorization bearer token self.authenticate(credential=self.credential) # Get the originally called title try: title_metadata = self.session.get( url=self.config["endpoints"]["video_token"].format(id=title.id), headers={ "Accept-Encoding": "gzip", "Authorization": f"Bearer {self.token}", "Connection": "Keep-Alive", "ETP-Anonymous-ID": f"{uuid.uuid4()}", "Host": "www.crunchyroll.com", }, ).json() except: print(title_metadata) # Add original MPD original_mpd_tracks = DASH.from_url(url=title_metadata["url"], session=self.session).to_tracks( language=title_metadata["audioLocale"] ) # Add the GUID for track in original_mpd_tracks: track.data["guid"] = title.id # Add the tracks tracks.add(original_mpd_tracks) # Keep track of added subtitles to avoid duplicates added_subtitles = set() # Get all the subtitles for subtitle_lang, subtitle_data in title_metadata["subtitles"].items(): if subtitle_lang == "none" or subtitle_data["language"] == "none" or "format" not in subtitle_data: continue if subtitle_data["format"].lower() != "ass": continue subtitle_key = ( subtitle_data["language"], Subtitle.Codec.from_mime(subtitle_data["format"]), track.data["guid"], ) if subtitle_key not in added_subtitles: tracks.add( Subtitle( language=subtitle_data["language"], codec=Subtitle.Codec.from_mime(subtitle_data["format"]), url=subtitle_data["url"], ) ) added_subtitles.add(subtitle_key) # Deactivate the video token self.deactivate_video_token(title=title.id, token=title_metadata["token"]) # Delete the video token self.delete_video_token(title=title.id, token=title_metadata["token"]) # Get other language MPDs for version in title_metadata["versions"]: if version["guid"] != title.id: other_title_metadata = self.session.get( url=self.config["endpoints"]["video_token"].format(id=version["guid"]), headers={ "Accept-Encoding": "gzip", "Authorization": f"Bearer {self.token}", "Connection": "Keep-Alive", "ETP-Anonymous-ID": f"{uuid.uuid4()}", "Host": "www.crunchyroll.com", }, ).json() # Add other language MPD other_mpd_tracks = DASH.from_url(url=other_title_metadata["url"], session=self.session).to_tracks( language=other_title_metadata["audioLocale"] ) # Add the GUID for track in other_mpd_tracks: track.data["guid"] = version["guid"] # Add the tracks tracks.add(other_mpd_tracks) # Get all the subtitles, ensuring no duplicates, don't skip Forced for subtitle_lang, subtitle_data in other_title_metadata["subtitles"].items(): if subtitle_lang == "none" or subtitle_data["language"] == "none" or "format" not in subtitle_data: continue if subtitle_data["format"].lower() != "ass": continue subtitle_key = ( subtitle_data["language"], Subtitle.Codec.from_mime(subtitle_data["format"]), track.data["guid"], ) if ( subtitle_key not in added_subtitles and subtitle_data["language"] == other_title_metadata["audioLocale"] ): tracks.add( Subtitle( language=subtitle_data["language"], codec=Subtitle.Codec.from_mime(subtitle_data["format"]), forced=True, url=subtitle_data["url"], ) ) added_subtitles.add(subtitle_key) # Deactivate the video token self.deactivate_video_token(title=version["guid"], token=other_title_metadata["token"]) # Delete the video token self.delete_video_token(title=version["guid"], token=other_title_metadata["token"]) return tracks def get_chapters(self, title: Title_T) -> Chapters: # Initalize a Chapters class object chapters = Chapters() # Check/Call for authorization bearer token self.authenticate(credential=self.credential) # Get the chapters metadata try: chapters_metadata = self.session.get( url=self.config["endpoints"]["chapters_metadata"].format(id=title.id) ).json() # Parse the chapters sections from response recap = chapters_metadata.get("recap") intro = chapters_metadata.get("intro") credits = chapters_metadata.get("credits") preview = chapters_metadata.get("preview") if recap: if recap["start"] != 0: # Add a dummy Episode chapter using the start time of the file, maybe is wrong parsed Recap section but can't be sure chapters.add(Chapter(timestamp=0, name="Episode")) # Manage the recap section, event it's rarely used chapters.add(Chapter(timestamp=recap["start"] * 1000, name=recap["type"].capitalize())) if intro: # Manage the cases when no Recap chapter is provided but Intro not starts on 0, avoiding the devine fallback on first chapter if not recap and intro["start"] != 0: # Add a dummy Episode chapter using the start time of the file, maybe is the Recap section but can't be sure chapters.add(Chapter(timestamp=0, name="Episode")) chapters.add(Chapter(timestamp=intro["start"] * 1000, name="Opening".capitalize())) # Add a dummy Episode chapter using the end time of Intro chapter chapters.add(Chapter(timestamp=intro["end"] * 1000, name="Episode")) if credits: if not recap and not intro: # Manage the cases when no Intro nor Recap chapter are provided, avoiding the devine fallback on first chapter chapters.add(Chapter(timestamp=0, name="Episode")) chapters.add(Chapter(timestamp=credits["start"] * 1000, name="Ending".capitalize())) if preview: if not recap and not intro and not credits: # Add a dummy Episode chapter using the start time of the file, when no other chapters are provided chapters.add(Chapter(timestamp=0, name="Episode")) # Try to avoid broken Preview chapter using the end time of Credits chapter if credits and "end" in credits: # Manage the cases when the credits have some non-Preview scenes before the actual preview if preview["start"] > credits["end"]: chapters.add(Chapter(timestamp=credits["end"] * 1000, name="Episode")) chapters.add(Chapter(timestamp=preview["start"] * 1000, name="Preview")) else: chapters.add(Chapter(timestamp=credits["end"] * 1000, name="Preview")) else: # Fallback chapters.add(Chapter(timestamp=preview["start"] * 1000, name="Preview")) elif credits and "end" in credits: # Manage the cases when no Preview is provided but it actually exists chapters.add(Chapter(timestamp=credits["end"] * 1000, name="Preview")) except: pass return chapters def get_widevine_license(self, *, challenge: bytes, title: Title_T, track: AnyTrack) -> Optional[Union[bytes, str]]: # Check/Call for authorization bearer token self.authenticate(credential=self.credential) # Get a video token video_token = self.get_video_token(title=track.data["guid"]) # Update the headers self.session.headers.update( { "User-Agent": "okhttp/4.12.0", "content-type": "application/octet-stream", "x-cr-content-id": f"{track.data['guid']}", "x-cr-video-token": f"{video_token}", } ) # Get the license license_response = self.session.post( url=self.config["endpoints"]["license_url"], data=challenge ).content.decode() # Deactivate the video token self.deactivate_video_token(title=track.data["guid"], token=video_token) # Delete the video token self.delete_video_token(title=track.data["guid"], token=video_token) # Get the license return license_response def search(self) -> Generator[SearchResult, None, None]: # Check/Call for authorization bearer token self.authenticate(credential=self.credential) # Get the search results search_results = self.session.get( url=self.config["endpoints"]["search_url"].format(search_keyword=self.title) ).json() # Iterate through series responses, create generator for results. for result_type in search_results["data"]: if result_type["type"] == "series": for series_results in result_type["items"]: yield SearchResult( id_=series_results["id"], title=series_results["title"], description=series_results["description"], ) # Define function to retrieve video token for crunchyroll. def get_video_token(self, title: str) -> str: # Check/Call for authorization bearer token self.authenticate(credential=self.credential) # Get the token video_token = self.session.get( url=self.config["endpoints"]["video_token"].format(id=title), headers={ "Accept-Encoding": "gzip", "Authorization": f"Bearer {self.token}", "Connection": "Keep-Alive", "ETP-Anonymous-ID": f"{uuid.uuid4()}", "Host": "www.crunchyroll.com", }, ).json()["token"] # Return None. return video_token # Define function to deactivate video token for crunchyroll. def deactivate_video_token(self, title: str, token: str) -> None: # Check/Call for authorization bearer token self.authenticate(credential=self.credential) # Delete the token self.session.patch( url=self.config["endpoints"]["video_token_patch"].format(title_id=title, video_token=token), ) # Return None. return # Define function to delete video token for crunchyroll. def delete_video_token(self, title: str, token: str) -> None: # Check/Call for authorization bearer token self.authenticate(credential=self.credential) # Delete the token self.session.delete( url=self.config["endpoints"]["video_token_delete"].format(title_id=title, video_token=token), ) # Return None. return