sp4rky-devine-services/services/CR/__init__.py
2025-04-09 15:07:35 -06:00

534 lines
21 KiB
Python

import math
import uuid
from http.cookiejar import CookieJar
from typing import Optional, Union, Generator
import click
from datetime import datetime, timedelta
from devine.core.service import Service
from devine.core.titles import Titles_T, Title_T, Series, Episode
from devine.core.constants import AnyTrack
from devine.core.credential import Credential
from devine.core.tracks import Chapters, Tracks, Subtitle, Chapter
from devine.core.search_result import SearchResult
from devine.core.manifests import DASH
class CR(Service):
"""
Service code for Crunchyroll (https://crunchyroll.com)
\b
Author: TPD94 - edited
Authorization: Login
Robustness:
Widevine:
L3: 1080p
\b
Tips:
- Use complete title/episode URL or id as input:
https://www.crunchyroll.com/series/GG5H5XQ7D/kaiju-no-8
OR
GG5H5XQ7D
- Supports series
"""
@staticmethod
@click.command(name="CR", short_help="https://crunchyroll.com/", help=__doc__)
@click.argument("title", type=str)
@click.option("--display_locale", type=str, help="Set display language")
@click.pass_context
def cli(ctx, **kwargs):
return CR(ctx, **kwargs)
def __init__(self, ctx, title, display_locale="en-US"):
# Set the title, what the user inputs
# Try parsing if it's a URL
try:
# Split the URL into parts by "/"
parts = title.split("/")
# Set the identifier for "series"
identifier_index = parts.index("series") + 1
# Extract the series ID
self.title = parts[identifier_index]
# If just a series ID
except:
self.title = title
# Set display language
self.display_locale = display_locale
# Initialize variable for token
self.token = None
# Initialize variable for refresh token
self.refresh_token = None
# Initialize variable for token expiry
self.token_expiry = None
# Initialize variable for credentials
self.credential = None
# Initiliaze variable for device_id
self.device_id = None
# Initiliaze variable for device_name
self.device_name = None
# Initiliaze variable for device_type
self.device_type = None
# Overriding the constructor
super().__init__(ctx)
def authenticate(self, cookies: Optional[CookieJar] = None, credential: Optional[Credential] = None) -> None:
# Get or generate a device_id for the session
self.device_id = self.config.get("device", {}).get("id") or str(uuid.uuid4())
# Get or generate a device_name for the session
self.device_name = self.config.get("device", {}).get("name") or "AOSP on IA Emulator"
# Get or generate a device_type for the session
self.device_type = self.config.get("device", {}).get("type") or "Google AOSP on IA Emulator"
# Load credential for the whole session
if self.credential is None:
self.credential = credential
# Check if there is no token.
if self.token is None:
# Assign a variable to the token and send a post request to acquire/refresh
auth_response = self.session.post(
# Token auth URL
url=self.config["endpoints"]["auth_url"],
# Headers
headers={
"Authorization": "Basic eHVuaWh2ZWRidDNtYmlzdWhldnQ6MWtJUzVkeVR2akUwX3JxYUEzWWVBaDBiVVhVbXhXMTE=",
"Content-Type": "application/x-www-form-urlencoded",
"ETP-Anonymous-ID": f"{uuid.uuid4()}",
},
# Body
data={
"username": f"{credential.username}",
"password": f"{credential.password}",
"grant_type": "password",
"scope": "offline_access",
"device_id": f"{self.device_id}",
"device_name": f"{self.device_name}",
"device_type": f"{self.device_type}",
},
).json()
# Set the token
self.token = auth_response["access_token"]
# Set the refresh token
self.refresh_token = auth_response["refresh_token"]
# Set the token expiry time
self.token_expiry = (datetime.now() + timedelta(minutes=4)).timestamp()
# Update session headers to have Authorization Bearer token
self.session.headers.update({"Authorization": f"Bearer {self.token}"})
# Return the token if called
return self.token
# Check for token expiry
if self.token_expiry:
if self.token_expiry < datetime.now().timestamp():
# Assign a variable to the token and send a post request to acquire/refresh
auth_response = self.session.post(
# Token auth URL
url=self.config["endpoints"]["auth_url"],
# Headers
headers={
"Authorization": "Basic eHVuaWh2ZWRidDNtYmlzdWhldnQ6MWtJUzVkeVR2akUwX3JxYUEzWWVBaDBiVVhVbXhXMTE=",
"Content-Type": "application/x-www-form-urlencoded",
"ETP-Anonymous-ID": f"{uuid.uuid4()}",
},
# Body
data={
"refresh_token": self.refresh_token,
"grant_type": "refresh_token",
"scope": "offline_access",
"device_id": f"{self.device_id}",
"device_name": f"{self.device_name}",
"device_type": f"{self.device_type}",
},
).json()
# Set the token
self.token = auth_response["access_token"]
# Set the refresh token
self.refresh_token = auth_response["refresh_token"]
# Set the token expiry time
self.token_expiry = (datetime.now() + timedelta(minutes=4)).timestamp()
# Update session headers to have Authorization Bearer token
self.session.headers.update({"Authorization": f"Bearer {self.token}"})
# Return the token if called
return self.token
# If neither, return token if called from function
return self.token
def get_titles(self) -> Titles_T:
# Create a list for episodes
episodes = []
# Check/Call for authorization bearer token
self.authenticate(credential=self.credential)
# Get each season from series metadata
for season in self.session.get(
url=self.config["endpoints"]["series_metadata"].format(title=self.title),
params={"force_locale": "", "locale": f"{self.display_locale}"},
).json()["data"]:
# Get each episode from season metadata
for episode in self.session.get(
url=self.config["endpoints"]["episode_metadata"].format(season=season["id"]),
params={"locale": f"{self.display_locale}"},
).json()["data"]:
# Get the original language guid
if episode["versions"]:
for version in episode["versions"]:
if version["original"] == True:
original_id = version["guid"]
else:
original_id = episode["id"]
# Extract correct series and episode number from identifier
try:
identifier = episode["identifier"]
identifier_parts = identifier.split("|")
sn = None
en = None
if len(identifier_parts) == 3:
if identifier_parts[1].startswith("S") and identifier_parts[1][1:].isdigit():
sn = int((identifier_parts[1])[1:])
en = float((identifier_parts[2])[1:])
elif identifier_parts[1].startswith("S") and not identifier_parts[1][1:].isdigit():
sn = episode["season_number"]
en = float((identifier_parts[2])[1:])
elif identifier_parts[1] == "M" or identifier_parts[1].startswith("O"):
sn = episode["season_sequence_number"]
en = math.ceil(episode["sequence_number"])
except:
pass
# Append the available episodes
episodes.append(
Episode(
# id_=episode['id'],
id_=original_id,
# title=episode['season_title'],
title=episode["series_title"],
# season=episode['season_sequence_number'],
season=sn or episode["season_number"],
# number=math.ceil(episode['sequence_number']),
number=math.ceil(en or episode["sequence_number"]),
name=episode["title"],
year=episode["episode_air_date"][:4],
language=episode["audio_locale"],
service=self.__class__,
)
)
# Return the series
return Series(episodes)
def get_tracks(self, title: Title_T) -> Tracks:
# Initialize a tracks class object
tracks = Tracks()
# Check/Call for authorization bearer token
self.authenticate(credential=self.credential)
# Get the originally called title
try:
title_metadata = self.session.get(
url=self.config["endpoints"]["video_token"].format(id=title.id),
headers={
"Accept-Encoding": "gzip",
"Authorization": f"Bearer {self.token}",
"Connection": "Keep-Alive",
"ETP-Anonymous-ID": f"{uuid.uuid4()}",
"Host": "www.crunchyroll.com",
},
).json()
except:
print(title_metadata)
# Add original MPD
original_mpd_tracks = DASH.from_url(url=title_metadata["url"], session=self.session).to_tracks(
language=title_metadata["audioLocale"]
)
# Add the GUID
for track in original_mpd_tracks:
track.data["guid"] = title.id
# Add the tracks
tracks.add(original_mpd_tracks)
# Keep track of added subtitles to avoid duplicates
added_subtitles = set()
# Get all the subtitles
for subtitle_lang, subtitle_data in title_metadata["subtitles"].items():
if subtitle_lang == "none" or subtitle_data["language"] == "none" or "format" not in subtitle_data:
continue
if subtitle_data["format"].lower() != "ass":
continue
subtitle_key = (
subtitle_data["language"],
Subtitle.Codec.from_mime(subtitle_data["format"]),
track.data["guid"],
)
if subtitle_key not in added_subtitles:
tracks.add(
Subtitle(
language=subtitle_data["language"],
codec=Subtitle.Codec.from_mime(subtitle_data["format"]),
url=subtitle_data["url"],
)
)
added_subtitles.add(subtitle_key)
# Deactivate the video token
self.deactivate_video_token(title=title.id, token=title_metadata["token"])
# Delete the video token
self.delete_video_token(title=title.id, token=title_metadata["token"])
# Get other language MPDs
for version in title_metadata["versions"]:
if version["guid"] != title.id:
other_title_metadata = self.session.get(
url=self.config["endpoints"]["video_token"].format(id=version["guid"]),
headers={
"Accept-Encoding": "gzip",
"Authorization": f"Bearer {self.token}",
"Connection": "Keep-Alive",
"ETP-Anonymous-ID": f"{uuid.uuid4()}",
"Host": "www.crunchyroll.com",
},
).json()
# Add other language MPD
other_mpd_tracks = DASH.from_url(url=other_title_metadata["url"], session=self.session).to_tracks(
language=other_title_metadata["audioLocale"]
)
# Add the GUID
for track in other_mpd_tracks:
track.data["guid"] = version["guid"]
# Add the tracks
tracks.add(other_mpd_tracks)
# Get all the subtitles, ensuring no duplicates, don't skip Forced
for subtitle_lang, subtitle_data in other_title_metadata["subtitles"].items():
if subtitle_lang == "none" or subtitle_data["language"] == "none" or "format" not in subtitle_data:
continue
if subtitle_data["format"].lower() != "ass":
continue
subtitle_key = (
subtitle_data["language"],
Subtitle.Codec.from_mime(subtitle_data["format"]),
track.data["guid"],
)
if (
subtitle_key not in added_subtitles
and subtitle_data["language"] == other_title_metadata["audioLocale"]
):
tracks.add(
Subtitle(
language=subtitle_data["language"],
codec=Subtitle.Codec.from_mime(subtitle_data["format"]),
forced=True,
url=subtitle_data["url"],
)
)
added_subtitles.add(subtitle_key)
# Deactivate the video token
self.deactivate_video_token(title=version["guid"], token=other_title_metadata["token"])
# Delete the video token
self.delete_video_token(title=version["guid"], token=other_title_metadata["token"])
return tracks
def get_chapters(self, title: Title_T) -> Chapters:
# Initalize a Chapters class object
chapters = Chapters()
# Check/Call for authorization bearer token
self.authenticate(credential=self.credential)
# Get the chapters metadata
try:
chapters_metadata = self.session.get(
url=self.config["endpoints"]["chapters_metadata"].format(id=title.id)
).json()
# Parse the chapters sections from response
recap = chapters_metadata.get("recap")
intro = chapters_metadata.get("intro")
credits = chapters_metadata.get("credits")
preview = chapters_metadata.get("preview")
if recap:
if recap["start"] != 0:
# Add a dummy Episode chapter using the start time of the file, maybe is wrong parsed Recap section but can't be sure
chapters.add(Chapter(timestamp=0, name="Episode"))
# Manage the recap section, event it's rarely used
chapters.add(Chapter(timestamp=recap["start"] * 1000, name=recap["type"].capitalize()))
if intro:
# Manage the cases when no Recap chapter is provided but Intro not starts on 0, avoiding the devine fallback on first chapter
if not recap and intro["start"] != 0:
# Add a dummy Episode chapter using the start time of the file, maybe is the Recap section but can't be sure
chapters.add(Chapter(timestamp=0, name="Episode"))
chapters.add(Chapter(timestamp=intro["start"] * 1000, name="Opening".capitalize()))
# Add a dummy Episode chapter using the end time of Intro chapter
chapters.add(Chapter(timestamp=intro["end"] * 1000, name="Episode"))
if credits:
if not recap and not intro:
# Manage the cases when no Intro nor Recap chapter are provided, avoiding the devine fallback on first chapter
chapters.add(Chapter(timestamp=0, name="Episode"))
chapters.add(Chapter(timestamp=credits["start"] * 1000, name="Ending".capitalize()))
if preview:
if not recap and not intro and not credits:
# Add a dummy Episode chapter using the start time of the file, when no other chapters are provided
chapters.add(Chapter(timestamp=0, name="Episode"))
# Try to avoid broken Preview chapter using the end time of Credits chapter
if credits and "end" in credits:
# Manage the cases when the credits have some non-Preview scenes before the actual preview
if preview["start"] > credits["end"]:
chapters.add(Chapter(timestamp=credits["end"] * 1000, name="Episode"))
chapters.add(Chapter(timestamp=preview["start"] * 1000, name="Preview"))
else:
chapters.add(Chapter(timestamp=credits["end"] * 1000, name="Preview"))
else:
# Fallback
chapters.add(Chapter(timestamp=preview["start"] * 1000, name="Preview"))
elif credits and "end" in credits:
# Manage the cases when no Preview is provided but it actually exists
chapters.add(Chapter(timestamp=credits["end"] * 1000, name="Preview"))
except:
pass
return chapters
def get_widevine_license(self, *, challenge: bytes, title: Title_T, track: AnyTrack) -> Optional[Union[bytes, str]]:
# Check/Call for authorization bearer token
self.authenticate(credential=self.credential)
# Get a video token
video_token = self.get_video_token(title=track.data["guid"])
# Update the headers
self.session.headers.update(
{
"User-Agent": "okhttp/4.12.0",
"content-type": "application/octet-stream",
"x-cr-content-id": f"{track.data['guid']}",
"x-cr-video-token": f"{video_token}",
}
)
# Get the license
license_response = self.session.post(
url=self.config["endpoints"]["license_url"], data=challenge
).content.decode()
# Deactivate the video token
self.deactivate_video_token(title=track.data["guid"], token=video_token)
# Delete the video token
self.delete_video_token(title=track.data["guid"], token=video_token)
# Get the license
return license_response
def search(self) -> Generator[SearchResult, None, None]:
# Check/Call for authorization bearer token
self.authenticate(credential=self.credential)
# Get the search results
search_results = self.session.get(
url=self.config["endpoints"]["search_url"].format(search_keyword=self.title)
).json()
# Iterate through series responses, create generator for results.
for result_type in search_results["data"]:
if result_type["type"] == "series":
for series_results in result_type["items"]:
yield SearchResult(
id_=series_results["id"],
title=series_results["title"],
description=series_results["description"],
)
# Define function to retrieve video token for crunchyroll.
def get_video_token(self, title: str) -> str:
# Check/Call for authorization bearer token
self.authenticate(credential=self.credential)
# Get the token
video_token = self.session.get(
url=self.config["endpoints"]["video_token"].format(id=title),
headers={
"Accept-Encoding": "gzip",
"Authorization": f"Bearer {self.token}",
"Connection": "Keep-Alive",
"ETP-Anonymous-ID": f"{uuid.uuid4()}",
"Host": "www.crunchyroll.com",
},
).json()["token"]
# Return None.
return video_token
# Define function to deactivate video token for crunchyroll.
def deactivate_video_token(self, title: str, token: str) -> None:
# Check/Call for authorization bearer token
self.authenticate(credential=self.credential)
# Delete the token
self.session.patch(
url=self.config["endpoints"]["video_token_patch"].format(title_id=title, video_token=token),
)
# Return None.
return
# Define function to delete video token for crunchyroll.
def delete_video_token(self, title: str, token: str) -> None:
# Check/Call for authorization bearer token
self.authenticate(credential=self.credential)
# Delete the token
self.session.delete(
url=self.config["endpoints"]["video_token_delete"].format(title_id=title, video_token=token),
)
# Return None.
return