import json import re from urllib.parse import urljoin from datetime import datetime, timedelta from pathlib import Path import click import m3u8 import requests import base64 import jsonpickle from vinetrimmer.objects import Title, Tracks from vinetrimmer.objects.tracks import AudioTrack, MenuTrack, TextTrack, VideoTrack from vinetrimmer.services.BaseService import BaseService class ParamountPlus(BaseService): """ Service code for Paramount's Paramount+ streaming service (https://paramountplus.com). \b Authorization: Credentials Security: UHD@L3, doesn't care about releases. """ ALIASES = ["PMTP", "paramountplus", "paramount+"] TITLE_RE = [ r"^https?://(?:www\.)?paramountplus\.com/(?Pmovies)/[a-z0-9_-]+/(?P\w+)", r"^https?://(?:www\.)?paramountplus\.com/(?Pshows)/(?P[a-zA-Z0-9_-]+)(/)?", r"^https?://(?:www\.)?paramountplus\.com(?:/[a-z]{2})?/(?Pmovies)/[a-z0-9_-]+/(?P\w+)", r"^https?://(?:www\.)?paramountplus\.com(?:/[a-z]{2})?/(?Pshows)/(?P[a-zA-Z0-9_-]+)(/)?", r"^(?P\d+)$", ] VIDEO_CODEC_MAP = {"H264": ["avc", "avc1"], "H265": ["hvc", "dvh", "hvc1", "hev1", "dvh1", "dvhe"]} AUDIO_CODEC_MAP = {"AAC": "mp4a", "AC3": "ac-3", "EC3": "ec-3"} @staticmethod @click.command(name="ParamountPlus", short_help="https://paramountplus.com") @click.argument("title", type=str, required=False) @click.option("-m", "--movie", is_flag=True, default=False, help="Title is a Movie.") @click.option( "-c", "--clips", is_flag=True, default=False, help="Download clips instead of episodes (for TV shows)" ) @click.pass_context def cli(ctx: click.Context, **kwargs): return ParamountPlus(ctx, **kwargs) def __init__(self, ctx: click.Context, title: str, movie: bool, clips: bool): super().__init__(ctx) m = self.parse_title(ctx, title) self.movie = movie or m.get("type") == "movies" self.clips = clips self.vcodec = ctx.parent.params["vcodec"] self.acodec = ctx.parent.params["acodec"] self.range = ctx.parent.params["range_"] self.wanted = ctx.parent.params["wanted"] self.shorts = False self.profile = ctx.obj.profile self.playready = True if "certificate_chain" in dir(ctx.obj.cdm) else False # ctx.obj.cdm.device.type == LocalDevice.Types.PLAYREADY ctx.parent.params["acodec"] = "EC3" if self.range != "SDR": # vcodec must be H265 for High Dynamic Range self.vcodec = "H265" self.configure() def get_titles(self): if self.movie: res = self.session.get( url=self.config[self.region]["movie"].format(title_id=self.title), params={ "includeTrailerInfo": "true", "includeContentInfo": "true", "locale": "en-us", "at": self.config[self.region]["at_token"], }, ).json() if not res["success"]: if res["message"] == "No movie found for contentId.": raise self.log.exit(" - Unable to find movie. For TV shows, use the numeric ID.") else: raise self.log.exit(f" - Failed to get title information: {res['message']}") title = res["movie"]["movieContent"] return Title( id_=title["contentId"], type_=Title.Types.MOVIE, name=title["title"], year=title["_airDateISO"][:4], # todo: find a way to get year, this api doesnt return it original_lang="en", # TODO: Don't assume source=self.ALIASES[0], service_data=title, ) else: res = self.session.get( url=self.config[self.region]["shows"].format(title=self.title) ).json() links = next((x.get("links") for x in res["showMenu"] if x.get("device_app_id") == "all_platforms"), None) config = next((x.get("videoConfigUniqueName") for x in links if x.get("title").strip() == "Episodes"), None) show = next((x for x in res["show"]["results"] if x.get("type") == "show"), None) seasons = [x["seasonNum"] for x in res["available_video_seasons"]["itemList"] if x.get("seasonNum")] showId = show.get("show_id") show_data = self.session.get( url=self.config[self.region]["section"].format(showId=showId, config=config), params={"platformType": "apps", "rows": "1", "begin": "0"}, ).json() section = next( (x["sectionId"] for x in show_data["videoSectionMetadata"] if x["title"] == "Full Episodes"), None ) episodes = [] for season in seasons: res = self.session.get( url=self.config[self.region]["seasons"].format(section=section), params={"begin": "0", "rows": "999", "params": f"seasonNum={season}", "seasonNum": season}, ).json() episodes.extend(res["sectionItems"].get("itemList")) titles = [] for episode in episodes: titles.append( Title( id_=episode.get("contentId") or episode.get("content_id"), type_=Title.Types.TV, name=episode.get("seriesTitle") or episode.get("series_title"), season=episode.get("seasonNum") or episode.get("season_number") or 0, episode=episode["episodeNum"] if episode["fullEpisode"] else episode["positionNum"], episode_name=episode["label"], original_lang="en", # TODO: Don't assume source=self.ALIASES[0], service_data=episode, ) ) return titles def get_tracks(self, title: Title): assets = ( ["DASH_CENC_HDR10"], [ "HLS_AES", "DASH_LIVE", "DASH_CENC_HDR10", "DASH_TA", "DASH_CENC", "DASH_CENC_PRECON", "DASH_CENC_PS4", ], ) for asset in assets: r = requests.Request( "GET", url=self.config["LINK_PLATFORM_URL"].format(video_id=title.id), params={ "format": "redirect", "formats": "MPEG-DASH", "assetTypes": "|".join(asset), "manifest": "M3U", "Tracking": "true", "mbr": "true", }, ) req = self.session.send(self.session.prepare_request(r), allow_redirects=False) if req.ok: break else: raise ValueError(f"Manifest Error: {req.text}") mpd_url = req.headers.get('location') try: tracks: Tracks = Tracks.from_mpd( url=mpd_url.replace("cenc_precon_dash", "cenc_dash"), source=self.ALIASES[0], session=self.session, ) except: tracks: Tracks = Tracks.from_mpd( url=mpd_url, source=self.ALIASES[0], session=self.session, ) tracks.subtitles.clear() req = self.session.get( url=self.config["LINK_PLATFORM_URL"].format(video_id=title.id), params={ "format": "redirect", "formats": "M3U", "assetTypes": "|".join(["HLS_FPS_PRECON"]), "manifest": "M3U", "Tracking": "true", "mbr": "true", }, ) hls_url = req.url tracks_m3u8 = Tracks.from_m3u8( m3u8.load(hls_url), source=self.ALIASES[0], ) tracks.subtitles = tracks_m3u8.subtitles for track in tracks: # track.id = track.id if isinstance(track, VideoTrack): track.hdr10 = ( track.codec[:4] in ("hvc1", "hev1") and track.extra[0].attrib.get("codecs")[5] == "2" ) or (track.codec[:4] in ("hvc1", "hev1") and "HDR10plus" in track.url) track.dv = track.codec[:4] in ("dvh1", "dvhe") if isinstance(track, VideoTrack) or isinstance(track, AudioTrack): if self.shorts: track.encrypted = False if isinstance(track, TextTrack): track.codec = "vtt" #if track.language.language == "en": # track.sdh = True # TODO: don't assume SDH if self.vcodec: tracks.videos = [x for x in tracks.videos if (x.codec or "")[:4] in self.VIDEO_CODEC_MAP[self.vcodec]] if self.acodec: tracks.audios = [x for x in tracks.audios if (x.codec or "")[:4] == self.AUDIO_CODEC_MAP[self.acodec]] return tracks def get_chapters(self, title: Title): chapters = [] events = title.service_data.get("playbackEvents") events = {k: v for k, v in events.items() if v is not None} events = dict(sorted(events.items(), key=lambda item: item[1])) if not events: return chapters chapters_titles = { "endCreditChapterTimeMs": "Credits", "previewStartTimeMs": "Preview Start", "previewEndTimeMs": "Preview End", "openCreditEndTimeMs": "openCreditEnd", "openCreditStartTime": "openCreditStart", } for name, time_ in events.items(): if isinstance(time_, (int, float)): chapters.append( MenuTrack( number=len(chapters) + 1, title=chapters_titles.get(name), timecode=MenuTrack.format_duration(time_ / 1000), ) ) # chapters = sorted(chapters, key=self.converter_timecode) return chapters def certificate(self, **_): return None # will use common privacy cert def license(self, challenge, title, **_): contentId = title.service_data.get("contentId") or title.service_data.get("content_id") if not contentId: raise self.log.exit("Error") r = self.session.post( url=self.config["license_pr"] if self.playready else self.config["license"], params={ "CrmId": "cbsi", "AccountId": "cbsi", "SubContentType": "Default", "ContentId": title.service_data.get("contentId") or title.service_data.get("content_id"), }, headers={"Authorization": f"Bearer {self.get_barrear(content_id=contentId)}"}, data=challenge, # expects bytes ) if r.headers["Content-Type"].startswith("application/json"): res = r.json() raise self.log.exit(res["message"]) return base64.b64encode(r.content).decode() if self.playready else r.content def configure(self): self.region = self.session.get("https://ipinfo.io/json").json()["country"] if self.region != "US": if self.region != "FR": self.region = "INTL" #self.device_cache_path = Path(self.get_cache("device_tokens_{profile}.json".format( #profile=self.profile, #))) #if self.device_cache_path.exists(): #with open(self.device_cache_path, encoding="utf-8") as fd: #date = jsonpickle.decode(fd.read()) #if "expiry" in date and datetime.fromisoformat(date["expiry"]) > datetime.now(): #self.log.warning(" + Using cached device tokens") #cache = date #else: #self.log.warning(" + Refreshing cookies") #self.device_cache_path.unlink() #if not self.credentials: #raise self.log.exit(" - No credentials provided, unable to log in.") #self.session.headers.update({"user-agent": self.config["Android"]["UserAgent"]}) #self.session.params.update({"at": self.config[self.region]["at_token"]}) #username = self.credentials.username #password = self.credentials.password #expiry = (datetime.now() + timedelta(minutes=3)).isoformat() #cookie = self.login(username=username, password=password) #cache = {"cookie": cookie, "expiry": expiry} #self.device_cache_path.parent.mkdir(exist_ok=True, parents=True) #with open(self.device_cache_path, "w", encoding="utf-8") as fd: #fd.write(jsonpickle.encode(cache)) #else: if not self.credentials: raise self.log.exit(" - No credentials provided, unable to log in.") self.log.warning(" + Logging in") self.session.headers.update({"user-agent": self.config["Android"]["UserAgent"]}) self.session.params.update({"at": self.config[self.region]["at_token"]}) username = self.credentials.username password = self.credentials.password #expiry = (datetime.now() + timedelta(minutes=3)).isoformat() cookie = self.login(username=username, password=password) #cache = {"cookie": cookie, "expiry": expiry} #self.device_cache_path.parent.mkdir(exist_ok=True, parents=True) #with open(self.device_cache_path, "w", encoding="utf-8") as fd: #fd.write(jsonpickle.encode(cache)) #cookie = cache["cookie"] self.session.headers.update({"cookie": cookie}) else: self.session.headers.update( { "Origin": "https://www.paramountplus.com", } ) self.session.params.update({"at": self.config[self.region]["at_token"]}) #if not self.is_logged_in(): #raise ValueError("InvalidCookies") #if not self.is_subscribed(): #raise ValueError("NotEntitled") # Service specific functions def get_prop(self, prop): res = self.session.get("https://www.paramountplus.com") prop_re = prop.replace(".", r"\.") search = re.search(rf"{prop_re} ?= ?[\"']?([^\"';]+)", res.text) if not search: raise ValueError("InvalidCookies") return search.group(1) def is_logged_in(self): return self.get_prop("CBS.UserAuthStatus") == "true" def is_subscribed(self): return self.get_prop("CBS.Registry.user.sub_status") == "SUBSCRIBER" def login(self, username, password): login_params = { "j_username": username, "j_password": password } response = self.session.post(url=self.config[self.region]["login"], params=login_params) status_response = self.session.get(url=self.config[self.region]["status"]).json() self.log.debug(status_response) if status_response["success"] == False: raise self.log.exit("InvalidCredentials") #if not status_response["userStatus"]["description"] == "SUBSCRIBER": #raise ValueError("NotEntitled") cookies = ";".join([f"{key}={value}" for key, value in response.cookies.get_dict().items()]) return cookies def get_barrear(self, content_id): #license_data = self.session.get(url="https://www.intl.paramountplus.com/apps-api/v3.0/androidphone/irdeto-control/session-token.json?contentId=%s&locale=en-us&at=ABATOpD5wXyjhjIMO0BaNh/gW0iCu0ISRy2U7/tyGiKZTQTlYDFL1NPD58CcuJLOQYY=" % (content_id)).json() try: res = self.session.get( url=self.config[self.region]["barrearUrl"].replace("iphone", "androidtv") if self.playready else self.config[self.region]["barrearUrl"], params={"contentId": content_id} ) res.raise_for_status() except requests.HTTPError as e: if e.response.status_code == 401: self.log.warning("Received a 401 error, deleting cached cookies") self.device_cache_path.unlink() self.session.headers.clear() self.session.params = {} self.configure() self.retrying = True res = res.json() if not res["success"]: raise self.log.exit("Unable to get license token: %s" % (res["errors"])) self.license_url = res["url"] ls_session = res["ls_session"] return ls_session def parse_movie_year(self, url): html_raw = self.session.get(url) if html_raw.status_code != 200: return None self.year = int( re.findall('"movie__air-year">[0-9]+<', html_raw.text)[0].replace('"movie__air-year">', "").replace("<", "") ) def parse_show_id(self, url): html_raw = self.session.get(url) if html_raw.status_code != 200: self.log.exit("Could not parse Show Id.") show = json.loads('{"' + re.search('CBS.Registry.Show = {"(.*)"}', html_raw.text).group(1) + '"}') return str(show["id"])