Unshackle-Services/RKTN/__init__.py
2026-01-03 17:53:34 +02:00

791 lines
33 KiB
Python

import base64
from copy import copy
import datetime
import hashlib
import hmac
import json
import re
import os
import sys
from typing import Optional
from aiohttp import CookieJar
from pymediainfo import MediaInfo
from langcodes import Language
import click
import urllib.parse
from requests import HTTPError
from unshackle.core.config import config
from unshackle.core.constants import AnyTrack
from unshackle.core.credential import Credential
from unshackle.core.manifests.dash import DASH
from unshackle.core.service import Service
from unshackle.core.titles import Title_T
from unshackle.core.titles.episode import Episode, Series
from unshackle.core.titles.movie import Movie, Movies
from unshackle.core.tracks.audio import Audio
from unshackle.core.tracks.chapters import Chapters
from unshackle.core.tracks.subtitle import Subtitle
from unshackle.core.tracks.tracks import Tracks
from unshackle.core.tracks.video import Video
from pyplayready.cdm import Cdm as PlayReadyCdm
class RKTN(Service):
"""
Service code for Rakuten's Rakuten TV streaming service (https://rakuten.tv).
\b
Authorization: Credentials
Security: FHD-UHD@L1, SD-FHD@L3; with trick
\b
Maximum of 3 audio tracks, otherwise will fail because Rakuten blocks more than 3 requests.
Subtitles requests expires fast, so together with video and audio it will fail.
If you want subs, use -S or -na -nv -nc, and download the rest separately.
\b
Command for Titles with no SDR (if not set range to HDR10 it will fail):
uv run unshackle dl -r HDR10 [OPTIONS] RKTN -m https://www.rakuten.tv/...
\b
TODO: - TV Shows are not yet supported as there's 0 TV Shows to purchase, rent, or watch in my region
\b
NOTES: - Only movies are supported as my region's Rakuten has no TV shows available to purchase at all
"""
ALIASES = ["RakutenTV", "rakuten", "rakutentv"]
TITLE_RE = r"^(?:https?://(?:www\.)?rakuten\.tv/([a-z]+/|)movies(?:/[a-z]{2})?/)(?P<id>[a-z0-9-]+)"
LANG_MAP = {
"es": "es-ES",
"pt": "pt-PT",
}
@staticmethod
@click.command(name="RakutenTV", short_help="https://rakuten.tv")
@click.argument("title", type=str, required=False)
@click.option(
"-dev",
"--device",
default=None,
type=click.Choice(
[
"web", # Device: Web Browser - Maximum Quality: 720p - DRM: Widevine
"android", # Device: Android Phone - Maximum Quality: 720p - DRM: Widevine
"atvui40", # Device: AndroidTV - Maximum Quality: 2160p - DRM: Widevine
"lgui40", # Device: LG SMART TV - Maximum Quality: 2160p - DRM: Playready
"smui40", # Device: Samsung SMART TV - Maximum Quality: 2160p - DRM: Playready
],
case_sensitive=True,
),
help="The device you want to make requests with.",
)
@click.option(
"-m", "--movie", is_flag=True, default=False, help="Title is a movie."
)
@click.option(
"-dal", "--desired-audio-language", type=str, default="SPA,ENG", help="Select desired audio language tracks for this title. Default SPA,ENG. Separate multiple languages with a comma."
)
@click.pass_context
def cli(ctx, **kwargs):
return RKTN(ctx, **kwargs)
def __init__(self, ctx, title, device, movie, desired_audio_language):
super().__init__(ctx)
#self.parse_title(ctx, title)
self.title = title
self.cdm = ctx.obj.cdm
self.playready = isinstance(self.cdm, PlayReadyCdm)
self.desired_audio_language = desired_audio_language
self.range = ctx.parent.params.get("range_")[0].name or "SDR"
self.vcodec = ctx.parent.params.get("vcodec") or Video.Codec.AVC # Defaults to H264
self.resolution = "UHD" if (self.vcodec.extension.lower() == "h265" or self.range in ['HYBRID', 'HDR10', 'HDR10P', 'DV']) else "FHD"
self.device = "lgui40" if self.playready else "android"
self.movie = movie or "movies" in title
self.audio_languages = []
# set a custom device if provided
if device is not None:
self.device = device
def authenticate(self, cookies: Optional[CookieJar] = None, credential: Optional[Credential] = None) -> None:
super().authenticate(cookies, credential)
if not credential:
raise EnvironmentError("Service requires Credentials for Authentication.")
self.session.headers.update(
{
"Origin": "https://rakuten.tv/",
"User-Agent": "Mozilla/5.0 (Linux; Android 11; SHIELD Android TV Build/RQ1A.210105.003; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/99.0.4844.88 Mobile Safari/537.36",
}
)
def get_titles(self):
self.pair_device()
if self.movie:
endpoint = self.config["endpoints"]["title"]
else:
endpoint = self.config["endpoints"]["show"]
params = urllib.parse.urlencode(
{
"classification_id": self.classification_id,
"device_identifier": self.config["clients"][self.device][
"device_identifier"
],
"device_serial": self.config["clients"][self.device]["device_serial"],
"locale": self.locale,
"market_code": self.market_code,
"session_uuid": self.session_uuid,
"timestamp": f"{int(datetime.datetime.now().timestamp())}005",
"support_closed_captions": "true",
}
)
title_url = endpoint.format(
title_id=self.title
) + params
title = self.session.get(url=title_url).json()
if "errors" in title:
error = title["errors"][0]
if error["code"] == "error.not_found":
self.log.error(f"Title [{self.title}] was not found on this account.")
else:
self.log.error(
f"Unable to get title info: {error['message']} [{error['code']}]"
)
sys.exit(1)
title = self.get_info(title["data"])
if self.movie:
return Movies(
[
Movie(
id_=self.title,
service=self.__class__,
name=title["title"],
year=title["year"],
language="en",
data=title,
description=title["plot"],
)
]
)
else:
episodes_list = []
#title_ep = self.get_info(title["data"]['episodes'])
for season in title["tv_show"]["seasons"]:
data_season = endpoint.format(
title_id=season["id"]
) + params
data = self.session.get(url=data_season).json()
if "errors" in data:
error = data["errors"][0]
if error["code"] == "error.not_found":
self.log.error(f"Season [{season['id']}] was not found on this account.")
else:
self.log.error(
f"Unable to get title info: {error['message']} [{error['code']}]"
)
continue
for episode in data["data"]["episodes"]:
episodes_list.append(
Episode(
id_=episode["id"],
service=self.__class__,
title=episode["tv_show_title"],
season=episode["season_number"],
number=episode["number"],
name=episode["title"] or episode['display_name'],
description=episode["short_plot"],
year=episode["year"],
language="en",
data=episode,
)
)
return Series(episodes_list)
def get_tracks(self, title: Title_T) -> Tracks:
# Obtener tracks para todos los idiomas de audio disponibles
all_tracks = None
for audio_lang in self.audio_languages:
self.log.info(f"Getting tracks for audio language: {audio_lang}")
# Obtener stream info para este idioma específico
stream_info = self.get_avod(audio_lang, title) if self.kind == "avod" else self.get_me(audio_lang, title)
if "errors" in stream_info:
error = stream_info["errors"][0]
if "error.streaming.no_active_right" in stream_info["errors"][0]["code"]:
self.log.error(
" x You don't have the rights for this content\n You need to rent or buy it first"
)
else:
self.log.error(
f" - Failed to get track info: {error['message']} [{error['code']}]"
)
sys.exit(1)
stream_info = stream_info["data"]["stream_infos"][0]
if all_tracks is None:
# Primera iteración: crear el objeto tracks principal
self.license_url = stream_info["license_url"]
all_tracks = DASH.from_url(url=stream_info["url"], session=self.session).to_tracks(language=title.language)
# Procesar subtítulos (solo una vez)
subtitle_tracks = []
for subtitle in stream_info.get("all_subtitles", []):
subtitle_tracks += [
Subtitle(
id_=hashlib.md5(subtitle["url"].encode()).hexdigest()[0:6],
url=subtitle["url"],
codec=Subtitle.Codec.from_mime(subtitle["format"]),
forced=subtitle["forced"],
language=subtitle["locale"],
)
]
all_tracks.add(subtitle_tracks)
else:
# Iteraciones adicionales: obtener tracks de audio adicionales
temp_tracks = DASH.from_url(url=stream_info["url"], session=self.session).to_tracks(language=title.language)
# Agregar solo los tracks de audio nuevos
for audio_track in temp_tracks.audio:
# Verificar que no sea duplicado basado en el idioma y codec
is_duplicate = False
for existing_audio in all_tracks.audio:
if (existing_audio.language == audio_track.language and
existing_audio.codec == audio_track.codec):
is_duplicate = True
break
if not is_duplicate:
all_tracks.audio.append(audio_track)
# Procesar HDR para videos
for video in all_tracks.videos:
if "HDR10" in video.url:
video.range = Video.Range.HDR10
# Aplicar el método append_tracks mejorado
self.append_tracks(all_tracks)
return all_tracks
def get_chapters(self, title: Title_T) -> Chapters:
return Chapters([])
def get_me(self, audio_language=None, title: Title_T = None):
# Si no se especifica idioma, usar el primero disponible
if audio_language is None:
audio_language = self.audio_languages[0]
stream_info_url = self.config["endpoints"]["manifest"].format(
kind="me"
) + urllib.parse.urlencode(
{
"audio_language": audio_language, # Usar el idioma especificado
"audio_quality": "5.1", # Will get better audio in different request to make sure it wont error
"classification_id": self.classification_id,
"content_id": title.id,
"content_type": "movies" if self.movie else "episodes",
"device_identifier": self.config["clients"][self.device][
"device_identifier"
],
"device_serial": "not_implemented",
"device_stream_audio_quality": "5.1",
"device_stream_hdr_type": self.hdr_type,
"device_stream_video_quality": self.resolution,
"device_uid": "affa434b-8b7c-4ff3-a15e-df1fe500e71e",
"device_year": self.config["clients"][self.device]["device_year"],
"disable_dash_legacy_packages": "false",
"gdpr_consent": self.config["gdpr_consent"],
"gdpr_consent_opt_out": 0,
"hdr_type": self.hdr_type,
"ifa_subscriber_id": self.ifa_subscriber_id,
"locale": self.locale,
"market_code": self.market_code,
"player": self.config["clients"][self.device]["player"],
"player_height": 1080,
"player_width": 1920,
"publisher_provided_id": "046f58b1-d89b-4fa4-979b-a9bcd6d78a76",
"session_uuid": self.session_uuid,
"strict_video_quality": "false",
"subtitle_formats": ["vtt"],
"subtitle_language": "MIS",
"timestamp": f"{int(datetime.datetime.now().timestamp())}122",
"video_type": "stream",
}
)
stream_info_url += "&signature=" + self.generate_signature(stream_info_url)
return self.session.post(
url=stream_info_url,
).json()
def get_avod(self, audio_language=None, title: Title_T = None):
# Si no se especifica idioma, usar el primero disponible
if audio_language is None:
audio_language = self.audio_languages[0]
stream_info_url = self.config["endpoints"]["manifest"].format(
kind="avod"
) + urllib.parse.urlencode(
{
"device_stream_video_quality": self.resolution,
"device_identifier": self.config["clients"][self.device][
"device_identifier"
],
"market_code": self.market_code,
"session_uuid": self.session_uuid,
"timestamp": f"{int(datetime.datetime.now().timestamp())}122",
}
)
stream_info_url += "&signature=" + self.generate_signature(stream_info_url)
return self.session.post(
url=stream_info_url,
data={
"hdr_type": self.hdr_type,
"audio_quality": "5.1", # Will get better audio in different request to make sure it wont error
"app_version": self.config["clients"][self.device]["app_version"],
"content_id": title.id,
"video_quality": self.resolution,
"audio_language": audio_language, # Usar el idioma especificado
"video_type": "stream",
"device_serial": self.config["clients"][self.device]["device_serial"],
"content_type": "movies" if self.movie else "episodes",
"classification_id": self.classification_id,
"subtitle_language": "MIS",
"player": self.config["clients"][self.device]["player"],
},
).json()
def generate_signature(self, url):
up = urllib.parse.urlparse(url)
digester = hmac.new(
self.access_token.encode(),
f"POST{up.path}{up.query}".encode(),
hashlib.sha1,
)
return (
base64.b64encode(digester.digest())
.decode("utf-8")
.replace("+", "-")
.replace("/", "_")
)
def append_tracks(self, tracks):
"""
Busca y agrega tracks adicionales de video y audio que no están en el manifest.
"""
if not tracks.videos:
self.log.warning("No video tracks found, skipping append_tracks")
return
# Buscar tracks de video adicionales
self._append_video_tracks(tracks)
# Buscar tracks de audio adicionales
self._append_audio_tracks(tracks)
def _append_video_tracks(self, tracks):
"""Busca y agrega tracks de video adicionales para H.264."""
if not tracks.videos:
return
codec = tracks.videos[0].codec
# Solo buscar tracks adicionales para H.264
if codec != Video.Codec.AVC:
self.log.debug(f"Skipping video track search (codec: {codec.name}, only works for AVC/H.264)")
return
# Extraer el patrón del codec de la URL
url_pattern = tracks.videos[-1].url
codec_match = re.search(r'(avc1|h264)-(\d+)', url_pattern, re.IGNORECASE)
if not codec_match:
self.log.debug("Could not find codec pattern in URL for video track search")
return
codec_prefix = codec_match.group(1) # "avc1" o "h264"
self.log.info(f"Searching for additional H.264 video tracks (pattern: {codec_prefix})...")
# Usar el directorio temp de Unshackle
temp_file = os.path.join(str(config.directories.temp), "video_test.mp4")
tracks_found = 0
for n in range(100):
# Generar URL del siguiente track
current_number = len(tracks.videos) + 1
ismv = re.sub(
rf"{codec_prefix}-\d+",
rf"{codec_prefix}-{current_number}",
tracks.videos[-1].url,
)
# Verificar si existe
try:
response = self.session.head(ismv, timeout=5)
if response.status_code != 200:
self.log.debug(f"Video track search ended at index {current_number}")
break
except Exception as e:
self.log.debug(f"Video track search failed: {e}")
break
# Crear copia del último video track
video = copy(tracks.videos[-1])
video.url = ismv
video.id_ = hashlib.md5(ismv.encode()).hexdigest()[:16]
# Descargar chunk para obtener info con MediaInfo
try:
with open(temp_file, "wb") as chunkfile:
data = self.session.get(
url=ismv,
headers={"Range": "bytes=0-50000"},
timeout=10
)
chunkfile.write(data.content)
# Parsear con MediaInfo
info = MediaInfo.parse(temp_file)
if not info.video_tracks:
self.log.debug(f"No video info found for track {current_number}")
continue
video_info = info.video_tracks[0]
video.height = video_info.height
video.width = video_info.width
video.bitrate = video_info.maximum_bit_rate or video_info.bit_rate
# Agregar el track
tracks.videos.append(video)
tracks_found += 1
self.log.info(
f" + Added video track #{current_number}: "
f"{video.width}x{video.height} @ {video.bitrate} bps"
)
except Exception as e:
self.log.warning(f"Failed to process video track {current_number}: {e}")
break
finally:
if os.path.exists(temp_file):
os.remove(temp_file)
if tracks_found > 0:
self.log.info(f"Total additional video tracks found: {tracks_found}")
def _append_audio_tracks(self, tracks):
"""Busca y agrega tracks de audio adicionales para todos los idiomas seleccionados."""
if not tracks.audio:
self.log.warning("No audio tracks found to use as base")
return
if not hasattr(self, 'audio_languages') or not self.audio_languages:
self.log.debug("No audio languages configured")
return
self.log.info(f"Searching for additional audio tracks in languages: {self.audio_languages}")
# Codecs a probar (en orden de preferencia)
codecs_to_try = ["ec-3", "ac-3", "dts", "mp4a"]
# Usar el directorio temp de Unshackle
temp_file = os.path.join(str(config.directories.temp), "audio_test.mp4")
base_audio = tracks.audio[0]
base_url = base_audio.url
tracks_found = 0
for language in self.audio_languages:
for codec_name in codecs_to_try:
# Generar URL del track
# Patrón: audio-{LANG}-{CODEC}-{NUMBER}
isma = re.sub(
r"audio-[a-zA-Z]{2,3}-[a-z0-9\-]+-\d+",
f"audio-{language.lower()}-{codec_name}-1",
base_url,
)
# Verificar si existe
try:
response = self.session.head(isma, timeout=5)
if response.status_code != 200:
continue
except Exception:
continue
# Verificar si ya existe (evitar duplicados)
if any(audio.url == isma for audio in tracks.audio):
self.log.debug(f"Audio track already exists: {language}-{codec_name}")
continue
# Crear nuevo track de audio
audio = copy(base_audio)
audio.url = isma
audio.id_ = hashlib.md5(isma.encode()).hexdigest()[:16]
# Mapear idioma
mapped_lang = self.LANG_MAP.get(language, language)
audio.language = Language.get(mapped_lang)
# Determinar si es idioma original
if tracks.videos:
audio.is_original_lang = (
audio.language.language == tracks.videos[0].language.language
)
# Obtener información del track con MediaInfo
try:
with open(temp_file, "wb") as bytetest:
data = self.session.get(
url=isma,
headers={"Range": "bytes=0-50000"},
timeout=10
)
bytetest.write(data.content)
info = MediaInfo.parse(temp_file)
if not info.audio_tracks:
self.log.debug(f"No audio info found for {language}-{codec_name}")
continue
audio_info = info.audio_tracks[0]
audio.bitrate = audio_info.bit_rate
# Detectar canales basado en codec
if codec_name in ["ec-3", "ac-3", "dts"]:
audio.channels = audio_info.channel_s or "5.1"
else: # mp4a (AAC)
audio.channels = audio_info.channel_s or "2.0"
# Actualizar codec
# Para Unshackle, necesitas mantener el formato correcto
audio.codec = Audio.Codec.from_codecs(codec_name)
# Agregar el track
tracks.audio.append(audio)
tracks_found += 1
self.log.info(
f" + Added audio track: {audio.language.display_name()} "
f"[{codec_name.upper()}] - {audio.channels}ch @ {audio.bitrate} bps"
)
except Exception as e:
self.log.debug(f"Failed to process audio {language}-{codec_name}: {e}")
finally:
if os.path.exists(temp_file):
os.remove(temp_file)
if tracks_found > 0:
self.log.info(f"Total additional audio tracks found: {tracks_found}")
def get_widevine_service_certificate(self, **kwargs):
return self.config["certificate"]
def get_widevine_license(self, *, challenge: bytes, title: Title_T, track: AnyTrack) -> Optional[bytes]:
res = self.session.post(
url=self.license_url,
data=challenge,
)
if "errors" in res.text:
res = res.json()
if res["errors"][0]["message"] == "HttpException: Forbidden":
self.log.error(
" x This CDM is not eligible to decrypt this\n"
" content or has been blacklisted by RakutenTV"
)
elif res["errors"][0]["message"] == "HttpException: An error happened":
self.log.error(
" x This CDM seems to be revoked and\n"
" therefore it can't decrypt this content",
)
sys.exit(1)
return res.content
def get_playready_license(self, *, challenge: bytes, title: Title_T, track: AnyTrack) -> Optional[bytes]:
res = self.session.post(
url=self.license_url,
data=challenge,
)
if "errors" in res.text:
res = res.json()
if res["errors"][0]["message"] == "HttpException: Forbidden":
self.log.error(
" x This CDM is not eligible to decrypt this\n"
" content or has been blacklisted by RakutenTV"
)
elif res["errors"][0]["message"] == "HttpException: An error happened":
self.log.error(
" x This CDM seems to be revoked and\n"
" therefore it can't decrypt this content",
)
sys.exit(1)
return res.content
def pair_device(self):
# TODO: Make this return the tokens, move print out of the func
# log.info_("Logging into RakutenTV as an Android device")
if not self.credential:
self.log.error(" - No credentials provided, unable to log in.")
sys.exit(1)
try:
res = self.session.post(
url=self.config["endpoints"]["auth"],
params={
"device_identifier": self.config["clients"][self.device][
"device_identifier"
]
},
data={
"app_version": self.config["clients"][self.device]["app_version"],
"device_metadata[uid]": self.config["clients"][self.device][
"device_serial"
],
"device_metadata[os]": self.config["clients"][self.device][
"device_os"
],
"device_metadata[model]": self.config["clients"][self.device][
"device_model"
],
"device_metadata[year]": self.config["clients"][self.device][
"device_year"
],
"device_serial": self.config["clients"][self.device][
"device_serial"
],
"device_metadata[trusted_uid]": False,
"device_metadata[brand]": self.config["clients"][self.device][
"device_brand"
],
"classification_id": 69,
"user[password]": self.credential.password,
"device_metadata[app_version]": self.config["clients"][self.device][
"app_version"
],
"user[username]": self.credential.username,
"device_metadata[serial_number]": self.config["clients"][
self.device
]["device_serial"],
},
).json()
except HTTPError as e:
if e.response.status_code == 403:
self.log.error(
" - Rakuten returned a 403 (FORBIDDEN) error. "
"This could be caused by your IP being detected as a proxy, or regional issues. Cannot continue."
)
if "errors" in res:
error = res["errors"][0]
if "exception.forbidden_vpn" in error["code"]:
self.log.error(" x RakutenTV is detecting this VPN or Proxy")
else:
self.log.error(f" - Login failed: {error['message']} [{error['code']}]")
self.access_token = res["data"]["user"]["access_token"]
self.ifa_subscriber_id = res["data"]["user"]["avod_profile"][
"ifa_subscriber_id"
]
self.session_uuid = res["data"]["user"]["session_uuid"]
self.classification_id = res["data"]["user"]["profile"]["classification"]["id"]
self.locale = res["data"]["market"]["locale"]
self.market_code = res["data"]["market"]["code"]
def get_info(self, title):
self.kind = title["labels"]["purchase_types"][0]["kind"]
# self.available_resolutions = [x for x in title["labels"]["video_qualities"]]
# if any(x["abbr"] == "UHD" for x in title["labels"]["video_qualities"]):
# self.resolution = "UHD"
# elif any(x["abbr"] == "FHD" for x in title["labels"]["video_qualities"]):
# self.resolution = "FHD"
# elif any(x["abbr"] == "HD" for x in title["labels"]["video_qualities"]):
# self.resolution = "HD"
# else:
# self.resolution = "SD"
self.available_hdr_types = [x for x in title["labels"]["hdr_types"]]
if any(x["abbr"] == "HDR10_PLUS" for x in self.available_hdr_types) and any(
x["abbr"] == "HDR10_PLUS"
for x in title["view_options"]["support"]["hdr_types"]
):
self.hdr_type = "HDR10_PLUS"
elif any(x["abbr"] == "DOLBY_VISION" for x in self.available_hdr_types) and any(
x["abbr"] == "DOLBY_VISION"
for x in title["view_options"]["support"]["hdr_types"]
):
self.hdr_type = "DOLBY_VISION"
elif any(x["abbr"] == "HDR10" for x in self.available_hdr_types) and any(
x["abbr"] == "HDR10" for x in title["view_options"]["support"]["hdr_types"]
):
self.hdr_type = "HDR10"
else:
self.hdr_type = "NONE"
# Obtener view_options desde title o episodes
view_options = title.get("episodes", [{}])[0].get("view_options") or title.get("view_options")
# FIJO: Obtener TODOS los idiomas de audio disponibles
if len(view_options["private"]["offline_streams"]) == 1:
# Caso 1: Un solo stream con múltiples idiomas
self.audio_languages = [
x["abbr"]
for x in view_options["private"]["streams"][0]["audio_languages"]
]
else:
# Caso 2: Múltiples streams, obtener todos los idiomas únicos
all_audio_languages = []
for stream in view_options["private"]["streams"]:
for audio_lang in stream["audio_languages"]:
if audio_lang["abbr"] not in all_audio_languages:
all_audio_languages.append(audio_lang["abbr"])
self.audio_languages = all_audio_languages
# # TODO: Look up only for languages chosen by the user
# print(f"\nAvailable audio languages: {', '.join(self.audio_languages)}")
# selected = input("Type your desired languages, maximum of 3, UPPER CASE (ex: ENG,SPA,FRA): ")
selected_langs = [lang.strip() for lang in self.desired_audio_language.split(",") if lang.strip() in self.audio_languages]
if not selected_langs:
self.log.error("No selected language. Exiting.")
self.audio_languages = selected_langs
# Log para debug
self.log.info(f"Selected audio languages: {self.audio_languages}")
return title