534 lines
21 KiB
Python
534 lines
21 KiB
Python
import math
|
|
import uuid
|
|
from http.cookiejar import CookieJar
|
|
from typing import Optional, Union, Generator
|
|
import click
|
|
from datetime import datetime, timedelta
|
|
from devine.core.service import Service
|
|
from devine.core.titles import Titles_T, Title_T, Series, Episode
|
|
from devine.core.constants import AnyTrack
|
|
from devine.core.credential import Credential
|
|
from devine.core.tracks import Chapters, Tracks, Subtitle, Chapter
|
|
from devine.core.search_result import SearchResult
|
|
from devine.core.manifests import DASH
|
|
|
|
|
|
class CR(Service):
|
|
"""
|
|
Service code for Crunchyroll (https://crunchyroll.com)
|
|
|
|
\b
|
|
Author: TPD94 - edited
|
|
Authorization: Login
|
|
Robustness:
|
|
Widevine:
|
|
L3: 1080p
|
|
\b
|
|
Tips:
|
|
- Use complete title/episode URL or id as input:
|
|
https://www.crunchyroll.com/series/GG5H5XQ7D/kaiju-no-8
|
|
OR
|
|
GG5H5XQ7D
|
|
- Supports series
|
|
"""
|
|
|
|
@staticmethod
|
|
@click.command(name="CR", short_help="https://crunchyroll.com/", help=__doc__)
|
|
@click.argument("title", type=str)
|
|
@click.option("--display_locale", type=str, help="Set display language")
|
|
@click.pass_context
|
|
def cli(ctx, **kwargs):
|
|
return CR(ctx, **kwargs)
|
|
|
|
def __init__(self, ctx, title, display_locale="en-US"):
|
|
# Set the title, what the user inputs
|
|
|
|
# Try parsing if it's a URL
|
|
try:
|
|
# Split the URL into parts by "/"
|
|
parts = title.split("/")
|
|
|
|
# Set the identifier for "series"
|
|
identifier_index = parts.index("series") + 1
|
|
|
|
# Extract the series ID
|
|
self.title = parts[identifier_index]
|
|
|
|
# If just a series ID
|
|
except:
|
|
self.title = title
|
|
|
|
# Set display language
|
|
self.display_locale = display_locale
|
|
|
|
# Initialize variable for token
|
|
self.token = None
|
|
|
|
# Initialize variable for refresh token
|
|
self.refresh_token = None
|
|
|
|
# Initialize variable for token expiry
|
|
self.token_expiry = None
|
|
|
|
# Initialize variable for credentials
|
|
self.credential = None
|
|
|
|
# Initiliaze variable for device_id
|
|
self.device_id = None
|
|
|
|
# Initiliaze variable for device_name
|
|
self.device_name = None
|
|
|
|
# Initiliaze variable for device_type
|
|
self.device_type = None
|
|
|
|
# Overriding the constructor
|
|
super().__init__(ctx)
|
|
|
|
def authenticate(self, cookies: Optional[CookieJar] = None, credential: Optional[Credential] = None) -> None:
|
|
# Get or generate a device_id for the session
|
|
self.device_id = self.config.get("device", {}).get("id") or str(uuid.uuid4())
|
|
|
|
# Get or generate a device_name for the session
|
|
self.device_name = self.config.get("device", {}).get("name") or "AOSP on IA Emulator"
|
|
|
|
# Get or generate a device_type for the session
|
|
self.device_type = self.config.get("device", {}).get("type") or "Google AOSP on IA Emulator"
|
|
|
|
# Load credential for the whole session
|
|
if self.credential is None:
|
|
self.credential = credential
|
|
|
|
# Check if there is no token.
|
|
if self.token is None:
|
|
# Assign a variable to the token and send a post request to acquire/refresh
|
|
auth_response = self.session.post(
|
|
# Token auth URL
|
|
url=self.config["endpoints"]["auth_url"],
|
|
# Headers
|
|
headers={
|
|
"Authorization": "Basic eHVuaWh2ZWRidDNtYmlzdWhldnQ6MWtJUzVkeVR2akUwX3JxYUEzWWVBaDBiVVhVbXhXMTE=",
|
|
"Content-Type": "application/x-www-form-urlencoded",
|
|
"ETP-Anonymous-ID": f"{uuid.uuid4()}",
|
|
},
|
|
# Body
|
|
data={
|
|
"username": f"{credential.username}",
|
|
"password": f"{credential.password}",
|
|
"grant_type": "password",
|
|
"scope": "offline_access",
|
|
"device_id": f"{self.device_id}",
|
|
"device_name": f"{self.device_name}",
|
|
"device_type": f"{self.device_type}",
|
|
},
|
|
).json()
|
|
|
|
# Set the token
|
|
self.token = auth_response["access_token"]
|
|
|
|
# Set the refresh token
|
|
self.refresh_token = auth_response["refresh_token"]
|
|
|
|
# Set the token expiry time
|
|
self.token_expiry = (datetime.now() + timedelta(minutes=4)).timestamp()
|
|
|
|
# Update session headers to have Authorization Bearer token
|
|
self.session.headers.update({"Authorization": f"Bearer {self.token}"})
|
|
|
|
# Return the token if called
|
|
return self.token
|
|
|
|
# Check for token expiry
|
|
if self.token_expiry:
|
|
if self.token_expiry < datetime.now().timestamp():
|
|
# Assign a variable to the token and send a post request to acquire/refresh
|
|
auth_response = self.session.post(
|
|
# Token auth URL
|
|
url=self.config["endpoints"]["auth_url"],
|
|
# Headers
|
|
headers={
|
|
"Authorization": "Basic eHVuaWh2ZWRidDNtYmlzdWhldnQ6MWtJUzVkeVR2akUwX3JxYUEzWWVBaDBiVVhVbXhXMTE=",
|
|
"Content-Type": "application/x-www-form-urlencoded",
|
|
"ETP-Anonymous-ID": f"{uuid.uuid4()}",
|
|
},
|
|
# Body
|
|
data={
|
|
"refresh_token": self.refresh_token,
|
|
"grant_type": "refresh_token",
|
|
"scope": "offline_access",
|
|
"device_id": f"{self.device_id}",
|
|
"device_name": f"{self.device_name}",
|
|
"device_type": f"{self.device_type}",
|
|
},
|
|
).json()
|
|
|
|
# Set the token
|
|
self.token = auth_response["access_token"]
|
|
|
|
# Set the refresh token
|
|
self.refresh_token = auth_response["refresh_token"]
|
|
|
|
# Set the token expiry time
|
|
self.token_expiry = (datetime.now() + timedelta(minutes=4)).timestamp()
|
|
|
|
# Update session headers to have Authorization Bearer token
|
|
self.session.headers.update({"Authorization": f"Bearer {self.token}"})
|
|
|
|
# Return the token if called
|
|
return self.token
|
|
|
|
# If neither, return token if called from function
|
|
return self.token
|
|
|
|
def get_titles(self) -> Titles_T:
|
|
# Create a list for episodes
|
|
episodes = []
|
|
|
|
# Check/Call for authorization bearer token
|
|
self.authenticate(credential=self.credential)
|
|
|
|
# Get each season from series metadata
|
|
for season in self.session.get(
|
|
url=self.config["endpoints"]["series_metadata"].format(title=self.title),
|
|
params={"force_locale": "", "locale": f"{self.display_locale}"},
|
|
).json()["data"]:
|
|
# Get each episode from season metadata
|
|
for episode in self.session.get(
|
|
url=self.config["endpoints"]["episode_metadata"].format(season=season["id"]),
|
|
params={"locale": f"{self.display_locale}"},
|
|
).json()["data"]:
|
|
# Get the original language guid
|
|
if episode["versions"]:
|
|
for version in episode["versions"]:
|
|
if version["original"] == True:
|
|
original_id = version["guid"]
|
|
else:
|
|
original_id = episode["id"]
|
|
|
|
# Extract correct series and episode number from identifier
|
|
try:
|
|
identifier = episode["identifier"]
|
|
identifier_parts = identifier.split("|")
|
|
sn = None
|
|
en = None
|
|
if len(identifier_parts) == 3:
|
|
if identifier_parts[1].startswith("S") and identifier_parts[1][1:].isdigit():
|
|
sn = int((identifier_parts[1])[1:])
|
|
en = float((identifier_parts[2])[1:])
|
|
|
|
elif identifier_parts[1].startswith("S") and not identifier_parts[1][1:].isdigit():
|
|
sn = episode["season_number"]
|
|
en = float((identifier_parts[2])[1:])
|
|
|
|
elif identifier_parts[1] == "M" or identifier_parts[1].startswith("O"):
|
|
sn = episode["season_sequence_number"]
|
|
en = math.ceil(episode["sequence_number"])
|
|
except:
|
|
pass
|
|
|
|
# Append the available episodes
|
|
episodes.append(
|
|
Episode(
|
|
# id_=episode['id'],
|
|
id_=original_id,
|
|
# title=episode['season_title'],
|
|
title=episode["series_title"],
|
|
# season=episode['season_sequence_number'],
|
|
season=sn or episode["season_number"],
|
|
# number=math.ceil(episode['sequence_number']),
|
|
number=math.ceil(en or episode["sequence_number"]),
|
|
name=episode["title"],
|
|
year=episode["episode_air_date"][:4],
|
|
language=episode["audio_locale"],
|
|
service=self.__class__,
|
|
)
|
|
)
|
|
|
|
# Return the series
|
|
return Series(episodes)
|
|
|
|
def get_tracks(self, title: Title_T) -> Tracks:
|
|
# Initialize a tracks class object
|
|
tracks = Tracks()
|
|
|
|
# Check/Call for authorization bearer token
|
|
self.authenticate(credential=self.credential)
|
|
|
|
# Get the originally called title
|
|
try:
|
|
title_metadata = self.session.get(
|
|
url=self.config["endpoints"]["video_token"].format(id=title.id),
|
|
headers={
|
|
"Accept-Encoding": "gzip",
|
|
"Authorization": f"Bearer {self.token}",
|
|
"Connection": "Keep-Alive",
|
|
"ETP-Anonymous-ID": f"{uuid.uuid4()}",
|
|
"Host": "www.crunchyroll.com",
|
|
},
|
|
).json()
|
|
except:
|
|
print(title_metadata)
|
|
|
|
# Add original MPD
|
|
original_mpd_tracks = DASH.from_url(url=title_metadata["url"], session=self.session).to_tracks(
|
|
language=title_metadata["audioLocale"]
|
|
)
|
|
|
|
# Add the GUID
|
|
for track in original_mpd_tracks:
|
|
track.data["guid"] = title.id
|
|
|
|
# Add the tracks
|
|
tracks.add(original_mpd_tracks)
|
|
|
|
# Keep track of added subtitles to avoid duplicates
|
|
added_subtitles = set()
|
|
|
|
# Get all the subtitles
|
|
for subtitle_lang, subtitle_data in title_metadata["subtitles"].items():
|
|
if subtitle_lang == "none" or subtitle_data["language"] == "none" or "format" not in subtitle_data:
|
|
continue
|
|
if subtitle_data["format"].lower() != "ass":
|
|
continue
|
|
subtitle_key = (
|
|
subtitle_data["language"],
|
|
Subtitle.Codec.from_mime(subtitle_data["format"]),
|
|
track.data["guid"],
|
|
)
|
|
if subtitle_key not in added_subtitles:
|
|
tracks.add(
|
|
Subtitle(
|
|
language=subtitle_data["language"],
|
|
codec=Subtitle.Codec.from_mime(subtitle_data["format"]),
|
|
url=subtitle_data["url"],
|
|
)
|
|
)
|
|
added_subtitles.add(subtitle_key)
|
|
|
|
# Deactivate the video token
|
|
self.deactivate_video_token(title=title.id, token=title_metadata["token"])
|
|
|
|
# Delete the video token
|
|
self.delete_video_token(title=title.id, token=title_metadata["token"])
|
|
|
|
# Get other language MPDs
|
|
for version in title_metadata["versions"]:
|
|
if version["guid"] != title.id:
|
|
other_title_metadata = self.session.get(
|
|
url=self.config["endpoints"]["video_token"].format(id=version["guid"]),
|
|
headers={
|
|
"Accept-Encoding": "gzip",
|
|
"Authorization": f"Bearer {self.token}",
|
|
"Connection": "Keep-Alive",
|
|
"ETP-Anonymous-ID": f"{uuid.uuid4()}",
|
|
"Host": "www.crunchyroll.com",
|
|
},
|
|
).json()
|
|
|
|
# Add other language MPD
|
|
other_mpd_tracks = DASH.from_url(url=other_title_metadata["url"], session=self.session).to_tracks(
|
|
language=other_title_metadata["audioLocale"]
|
|
)
|
|
|
|
# Add the GUID
|
|
for track in other_mpd_tracks:
|
|
track.data["guid"] = version["guid"]
|
|
|
|
# Add the tracks
|
|
tracks.add(other_mpd_tracks)
|
|
|
|
# Get all the subtitles, ensuring no duplicates, don't skip Forced
|
|
for subtitle_lang, subtitle_data in other_title_metadata["subtitles"].items():
|
|
if subtitle_lang == "none" or subtitle_data["language"] == "none" or "format" not in subtitle_data:
|
|
continue
|
|
if subtitle_data["format"].lower() != "ass":
|
|
continue
|
|
subtitle_key = (
|
|
subtitle_data["language"],
|
|
Subtitle.Codec.from_mime(subtitle_data["format"]),
|
|
track.data["guid"],
|
|
)
|
|
if (
|
|
subtitle_key not in added_subtitles
|
|
and subtitle_data["language"] == other_title_metadata["audioLocale"]
|
|
):
|
|
tracks.add(
|
|
Subtitle(
|
|
language=subtitle_data["language"],
|
|
codec=Subtitle.Codec.from_mime(subtitle_data["format"]),
|
|
forced=True,
|
|
url=subtitle_data["url"],
|
|
)
|
|
)
|
|
added_subtitles.add(subtitle_key)
|
|
|
|
# Deactivate the video token
|
|
self.deactivate_video_token(title=version["guid"], token=other_title_metadata["token"])
|
|
|
|
# Delete the video token
|
|
self.delete_video_token(title=version["guid"], token=other_title_metadata["token"])
|
|
|
|
return tracks
|
|
|
|
def get_chapters(self, title: Title_T) -> Chapters:
|
|
# Initalize a Chapters class object
|
|
chapters = Chapters()
|
|
|
|
# Check/Call for authorization bearer token
|
|
self.authenticate(credential=self.credential)
|
|
|
|
# Get the chapters metadata
|
|
try:
|
|
chapters_metadata = self.session.get(
|
|
url=self.config["endpoints"]["chapters_metadata"].format(id=title.id)
|
|
).json()
|
|
|
|
# Parse the chapters sections from response
|
|
recap = chapters_metadata.get("recap")
|
|
intro = chapters_metadata.get("intro")
|
|
credits = chapters_metadata.get("credits")
|
|
preview = chapters_metadata.get("preview")
|
|
|
|
if recap:
|
|
if recap["start"] != 0:
|
|
# Add a dummy Episode chapter using the start time of the file, maybe is wrong parsed Recap section but can't be sure
|
|
chapters.add(Chapter(timestamp=0, name="Episode"))
|
|
# Manage the recap section, event it's rarely used
|
|
chapters.add(Chapter(timestamp=recap["start"] * 1000, name=recap["type"].capitalize()))
|
|
|
|
if intro:
|
|
# Manage the cases when no Recap chapter is provided but Intro not starts on 0, avoiding the devine fallback on first chapter
|
|
if not recap and intro["start"] != 0:
|
|
# Add a dummy Episode chapter using the start time of the file, maybe is the Recap section but can't be sure
|
|
chapters.add(Chapter(timestamp=0, name="Episode"))
|
|
chapters.add(Chapter(timestamp=intro["start"] * 1000, name="Opening".capitalize()))
|
|
# Add a dummy Episode chapter using the end time of Intro chapter
|
|
chapters.add(Chapter(timestamp=intro["end"] * 1000, name="Episode"))
|
|
|
|
if credits:
|
|
if not recap and not intro:
|
|
# Manage the cases when no Intro nor Recap chapter are provided, avoiding the devine fallback on first chapter
|
|
chapters.add(Chapter(timestamp=0, name="Episode"))
|
|
chapters.add(Chapter(timestamp=credits["start"] * 1000, name="Ending".capitalize()))
|
|
|
|
if preview:
|
|
if not recap and not intro and not credits:
|
|
# Add a dummy Episode chapter using the start time of the file, when no other chapters are provided
|
|
chapters.add(Chapter(timestamp=0, name="Episode"))
|
|
# Try to avoid broken Preview chapter using the end time of Credits chapter
|
|
if credits and "end" in credits:
|
|
# Manage the cases when the credits have some non-Preview scenes before the actual preview
|
|
if preview["start"] > credits["end"]:
|
|
chapters.add(Chapter(timestamp=credits["end"] * 1000, name="Episode"))
|
|
chapters.add(Chapter(timestamp=preview["start"] * 1000, name="Preview"))
|
|
else:
|
|
chapters.add(Chapter(timestamp=credits["end"] * 1000, name="Preview"))
|
|
else:
|
|
# Fallback
|
|
chapters.add(Chapter(timestamp=preview["start"] * 1000, name="Preview"))
|
|
|
|
elif credits and "end" in credits:
|
|
# Manage the cases when no Preview is provided but it actually exists
|
|
chapters.add(Chapter(timestamp=credits["end"] * 1000, name="Preview"))
|
|
|
|
except:
|
|
pass
|
|
|
|
return chapters
|
|
|
|
def get_widevine_license(self, *, challenge: bytes, title: Title_T, track: AnyTrack) -> Optional[Union[bytes, str]]:
|
|
# Check/Call for authorization bearer token
|
|
self.authenticate(credential=self.credential)
|
|
|
|
# Get a video token
|
|
video_token = self.get_video_token(title=track.data["guid"])
|
|
|
|
# Update the headers
|
|
self.session.headers.update(
|
|
{
|
|
"User-Agent": "okhttp/4.12.0",
|
|
"content-type": "application/octet-stream",
|
|
"x-cr-content-id": f"{track.data['guid']}",
|
|
"x-cr-video-token": f"{video_token}",
|
|
}
|
|
)
|
|
|
|
# Get the license
|
|
license_response = self.session.post(
|
|
url=self.config["endpoints"]["license_url"], data=challenge
|
|
).content.decode()
|
|
|
|
# Deactivate the video token
|
|
self.deactivate_video_token(title=track.data["guid"], token=video_token)
|
|
|
|
# Delete the video token
|
|
self.delete_video_token(title=track.data["guid"], token=video_token)
|
|
|
|
# Get the license
|
|
return license_response
|
|
|
|
def search(self) -> Generator[SearchResult, None, None]:
|
|
# Check/Call for authorization bearer token
|
|
self.authenticate(credential=self.credential)
|
|
|
|
# Get the search results
|
|
search_results = self.session.get(
|
|
url=self.config["endpoints"]["search_url"].format(search_keyword=self.title)
|
|
).json()
|
|
|
|
# Iterate through series responses, create generator for results.
|
|
for result_type in search_results["data"]:
|
|
if result_type["type"] == "series":
|
|
for series_results in result_type["items"]:
|
|
yield SearchResult(
|
|
id_=series_results["id"],
|
|
title=series_results["title"],
|
|
description=series_results["description"],
|
|
)
|
|
|
|
# Define function to retrieve video token for crunchyroll.
|
|
def get_video_token(self, title: str) -> str:
|
|
# Check/Call for authorization bearer token
|
|
self.authenticate(credential=self.credential)
|
|
|
|
# Get the token
|
|
video_token = self.session.get(
|
|
url=self.config["endpoints"]["video_token"].format(id=title),
|
|
headers={
|
|
"Accept-Encoding": "gzip",
|
|
"Authorization": f"Bearer {self.token}",
|
|
"Connection": "Keep-Alive",
|
|
"ETP-Anonymous-ID": f"{uuid.uuid4()}",
|
|
"Host": "www.crunchyroll.com",
|
|
},
|
|
).json()["token"]
|
|
|
|
# Return None.
|
|
return video_token
|
|
|
|
# Define function to deactivate video token for crunchyroll.
|
|
def deactivate_video_token(self, title: str, token: str) -> None:
|
|
# Check/Call for authorization bearer token
|
|
self.authenticate(credential=self.credential)
|
|
|
|
# Delete the token
|
|
self.session.patch(
|
|
url=self.config["endpoints"]["video_token_patch"].format(title_id=title, video_token=token),
|
|
)
|
|
|
|
# Return None.
|
|
return
|
|
|
|
# Define function to delete video token for crunchyroll.
|
|
def delete_video_token(self, title: str, token: str) -> None:
|
|
# Check/Call for authorization bearer token
|
|
self.authenticate(credential=self.credential)
|
|
|
|
# Delete the token
|
|
self.session.delete(
|
|
url=self.config["endpoints"]["video_token_delete"].format(title_id=title, video_token=token),
|
|
)
|
|
|
|
# Return None.
|
|
return
|