diff --git a/vinetrimmer/devices/mtc_mtc_atv_atv_sl3000.prd b/vinetrimmer/devices/mtc_mtc_atv_atv_sl3000.prd index 557239d..8657091 100644 Binary files a/vinetrimmer/devices/mtc_mtc_atv_atv_sl3000.prd and b/vinetrimmer/devices/mtc_mtc_atv_atv_sl3000.prd differ diff --git a/vinetrimmer/key_store.db b/vinetrimmer/key_store.db index b46f4fc..da3bef9 100644 Binary files a/vinetrimmer/key_store.db and b/vinetrimmer/key_store.db differ diff --git a/vinetrimmer/services/hulu.py b/vinetrimmer/services/hulu.py index b532a19..e526c7b 100644 --- a/vinetrimmer/services/hulu.py +++ b/vinetrimmer/services/hulu.py @@ -9,7 +9,6 @@ from langcodes import Language from vinetrimmer.objects import TextTrack, Title, Tracks from vinetrimmer.services.BaseService import BaseService from vinetrimmer.utils.pyhulu import Device, HuluClient -from vinetrimmer.utils.widevine.device import LocalDevice class Hulu(BaseService): """ diff --git a/vinetrimmer/services/paramountplus.py b/vinetrimmer/services/paramountplus.py index ece80ce..688dd74 100644 --- a/vinetrimmer/services/paramountplus.py +++ b/vinetrimmer/services/paramountplus.py @@ -13,424 +13,425 @@ import jsonpickle from vinetrimmer.objects import Title, Tracks from vinetrimmer.objects.tracks import AudioTrack, MenuTrack, TextTrack, VideoTrack from vinetrimmer.services.BaseService import BaseService -from vinetrimmer.utils.widevine.device import LocalDevice + class ParamountPlus(BaseService): - """ - Service code for Paramount's Paramount+ streaming service (https://paramountplus.com). + """ + Service code for Paramount's Paramount+ streaming service (https://paramountplus.com). - \b - Authorization: Credentials - Security: UHD@L3, doesn't care about releases. - """ + \b + Authorization: Credentials + Security: UHD@L3, doesn't care about releases. + """ - ALIASES = ["PMTP", "paramountplus", "paramount+"] - TITLE_RE = [ - r"^https?://(?:www\.)?paramountplus\.com/(?Pmovies)/[a-z0-9_-]+/(?P\w+)", - r"^https?://(?:www\.)?paramountplus\.com/(?Pshows)/(?P[a-zA-Z0-9_-]+)(/)?", - r"^https?://(?:www\.)?paramountplus\.com(?:/[a-z]{2})?/(?Pmovies)/[a-z0-9_-]+/(?P\w+)", - r"^https?://(?:www\.)?paramountplus\.com(?:/[a-z]{2})?/(?Pshows)/(?P[a-zA-Z0-9_-]+)(/)?", - r"^(?P\d+)$", - ] - VIDEO_CODEC_MAP = {"H264": ["avc", "avc1"], "H265": ["hvc", "dvh", "hvc1", "hev1", "dvh1", "dvhe"]} - AUDIO_CODEC_MAP = {"AAC": "mp4a", "AC3": "ac-3", "EC3": "ec-3"} + ALIASES = ["PMTP", "paramountplus", "paramount+"] + TITLE_RE = [ + r"^https?://(?:www\.)?paramountplus\.com/(?Pmovies)/[a-z0-9_-]+/(?P\w+)", + r"^https?://(?:www\.)?paramountplus\.com/(?Pshows)/(?P[a-zA-Z0-9_-]+)(/)?", + r"^https?://(?:www\.)?paramountplus\.com(?:/[a-z]{2})?/(?Pmovies)/[a-z0-9_-]+/(?P\w+)", + r"^https?://(?:www\.)?paramountplus\.com(?:/[a-z]{2})?/(?Pshows)/(?P[a-zA-Z0-9_-]+)(/)?", + r"^(?P\d+)$", + ] + VIDEO_CODEC_MAP = {"H264": ["avc", "avc1"], "H265": ["hvc", "dvh", "hvc1", "hev1", "dvh1", "dvhe"]} + AUDIO_CODEC_MAP = {"AAC": "mp4a", "AC3": "ac-3", "EC3": "ec-3"} - @staticmethod - @click.command(name="ParamountPlus", short_help="https://paramountplus.com") - @click.argument("title", type=str, required=False) - @click.option("-m", "--movie", is_flag=True, default=False, help="Title is a Movie.") - @click.option( - "-c", "--clips", is_flag=True, default=False, help="Download clips instead of episodes (for TV shows)" - ) - @click.pass_context - def cli(ctx: click.Context, **kwargs): - return ParamountPlus(ctx, **kwargs) + @staticmethod + @click.command(name="ParamountPlus", short_help="https://paramountplus.com") + @click.argument("title", type=str, required=False) + @click.option("-m", "--movie", is_flag=True, default=False, help="Title is a Movie.") + @click.option( + "-c", "--clips", is_flag=True, default=False, help="Download clips instead of episodes (for TV shows)" + ) + @click.pass_context + def cli(ctx: click.Context, **kwargs): + return ParamountPlus(ctx, **kwargs) - def __init__(self, ctx: click.Context, title: str, movie: bool, clips: bool): - super().__init__(ctx) - m = self.parse_title(ctx, title) - self.movie = movie or m.get("type") == "movies" - self.clips = clips + def __init__(self, ctx: click.Context, title: str, movie: bool, clips: bool): + super().__init__(ctx) + m = self.parse_title(ctx, title) + self.movie = movie or m.get("type") == "movies" + self.clips = clips - self.vcodec = ctx.parent.params["vcodec"] - self.acodec = ctx.parent.params["acodec"] - self.range = ctx.parent.params["range_"] - self.wanted = ctx.parent.params["wanted"] - self.shorts = False - - self.profile = ctx.obj.profile - self.playready = ctx.obj.cdm.device.type == LocalDevice.Types.PLAYREADY + self.vcodec = ctx.parent.params["vcodec"] + self.acodec = ctx.parent.params["acodec"] + self.range = ctx.parent.params["range_"] + self.wanted = ctx.parent.params["wanted"] + self.shorts = False + + self.profile = ctx.obj.profile + self.playready = True if "certificate_chain" in dir(ctx.obj.cdm) else False # ctx.obj.cdm.device.type == LocalDevice.Types.PLAYREADY - ctx.parent.params["acodec"] = "EC3" - if self.range != "SDR": - # vcodec must be H265 for High Dynamic Range - self.vcodec = "H265" + ctx.parent.params["acodec"] = "EC3" - self.configure() + if self.range != "SDR": + # vcodec must be H265 for High Dynamic Range + self.vcodec = "H265" - def get_titles(self): - if self.movie: - res = self.session.get( - url=self.config[self.region]["movie"].format(title_id=self.title), - params={ - "includeTrailerInfo": "true", - "includeContentInfo": "true", - "locale": "en-us", - "at": self.config[self.region]["at_token"], - }, - ).json() - if not res["success"]: - if res["message"] == "No movie found for contentId.": - raise self.log.exit(" - Unable to find movie. For TV shows, use the numeric ID.") - else: - raise self.log.exit(f" - Failed to get title information: {res['message']}") + self.configure() - title = res["movie"]["movieContent"] + def get_titles(self): + if self.movie: + res = self.session.get( + url=self.config[self.region]["movie"].format(title_id=self.title), + params={ + "includeTrailerInfo": "true", + "includeContentInfo": "true", + "locale": "en-us", + "at": self.config[self.region]["at_token"], + }, + ).json() + if not res["success"]: + if res["message"] == "No movie found for contentId.": + raise self.log.exit(" - Unable to find movie. For TV shows, use the numeric ID.") + else: + raise self.log.exit(f" - Failed to get title information: {res['message']}") - return Title( - id_=title["contentId"], - type_=Title.Types.MOVIE, - name=title["title"], - year=title["_airDateISO"][:4], # todo: find a way to get year, this api doesnt return it - original_lang="en", # TODO: Don't assume - source=self.ALIASES[0], - service_data=title, - ) - else: - res = self.session.get( - url=self.config[self.region]["shows"].format(title=self.title) - ).json() - links = next((x.get("links") for x in res["showMenu"] if x.get("device_app_id") == "all_platforms"), None) - config = next((x.get("videoConfigUniqueName") for x in links if x.get("title").strip() == "Episodes"), None) - show = next((x for x in res["show"]["results"] if x.get("type") == "show"), None) - seasons = [x["seasonNum"] for x in res["available_video_seasons"]["itemList"] if x.get("seasonNum")] - showId = show.get("show_id") + title = res["movie"]["movieContent"] - show_data = self.session.get( - url=self.config[self.region]["section"].format(showId=showId, config=config), - params={"platformType": "apps", "rows": "1", "begin": "0"}, - ).json() + return Title( + id_=title["contentId"], + type_=Title.Types.MOVIE, + name=title["title"], + year=title["_airDateISO"][:4], # todo: find a way to get year, this api doesnt return it + original_lang="en", # TODO: Don't assume + source=self.ALIASES[0], + service_data=title, + ) + else: + res = self.session.get( + url=self.config[self.region]["shows"].format(title=self.title) + ).json() + links = next((x.get("links") for x in res["showMenu"] if x.get("device_app_id") == "all_platforms"), None) + config = next((x.get("videoConfigUniqueName") for x in links if x.get("title").strip() == "Episodes"), None) + show = next((x for x in res["show"]["results"] if x.get("type") == "show"), None) + seasons = [x["seasonNum"] for x in res["available_video_seasons"]["itemList"] if x.get("seasonNum")] + showId = show.get("show_id") - section = next( - (x["sectionId"] for x in show_data["videoSectionMetadata"] if x["title"] == "Full Episodes"), None - ) + show_data = self.session.get( + url=self.config[self.region]["section"].format(showId=showId, config=config), + params={"platformType": "apps", "rows": "1", "begin": "0"}, + ).json() - episodes = [] - for season in seasons: - res = self.session.get( - url=self.config[self.region]["seasons"].format(section=section), - params={"begin": "0", "rows": "999", "params": f"seasonNum={season}", "seasonNum": season}, - ).json() - episodes.extend(res["sectionItems"].get("itemList")) + section = next( + (x["sectionId"] for x in show_data["videoSectionMetadata"] if x["title"] == "Full Episodes"), None + ) - titles = [] - for episode in episodes: - titles.append( - Title( - id_=episode.get("contentId") or episode.get("content_id"), - type_=Title.Types.TV, - name=episode.get("seriesTitle") or episode.get("series_title"), - season=episode.get("seasonNum") or episode.get("season_number") or 0, - episode=episode["episodeNum"] if episode["fullEpisode"] else episode["positionNum"], - episode_name=episode["label"], - original_lang="en", # TODO: Don't assume - source=self.ALIASES[0], - service_data=episode, - ) - ) + episodes = [] + for season in seasons: + res = self.session.get( + url=self.config[self.region]["seasons"].format(section=section), + params={"begin": "0", "rows": "999", "params": f"seasonNum={season}", "seasonNum": season}, + ).json() + episodes.extend(res["sectionItems"].get("itemList")) - return titles + titles = [] + for episode in episodes: + titles.append( + Title( + id_=episode.get("contentId") or episode.get("content_id"), + type_=Title.Types.TV, + name=episode.get("seriesTitle") or episode.get("series_title"), + season=episode.get("seasonNum") or episode.get("season_number") or 0, + episode=episode["episodeNum"] if episode["fullEpisode"] else episode["positionNum"], + episode_name=episode["label"], + original_lang="en", # TODO: Don't assume + source=self.ALIASES[0], + service_data=episode, + ) + ) - def get_tracks(self, title: Title): - assets = ( - ["DASH_CENC_HDR10"], - [ - "HLS_AES", - "DASH_LIVE", - "DASH_CENC_HDR10", - "DASH_TA", - "DASH_CENC", - "DASH_CENC_PRECON", - "DASH_CENC_PS4", - ], - ) - for asset in assets: - r = requests.Request( - "GET", - url=self.config["LINK_PLATFORM_URL"].format(video_id=title.id), - params={ - "format": "redirect", - "formats": "MPEG-DASH", - "assetTypes": "|".join(asset), - "manifest": "M3U", - "Tracking": "true", - "mbr": "true", - }, - ) - req = self.session.send(self.session.prepare_request(r), allow_redirects=False) - if req.ok: - break - else: - raise ValueError(f"Manifest Error: {req.text}") + return titles - mpd_url = req.headers.get('location') + def get_tracks(self, title: Title): + assets = ( + ["DASH_CENC_HDR10"], + [ + "HLS_AES", + "DASH_LIVE", + "DASH_CENC_HDR10", + "DASH_TA", + "DASH_CENC", + "DASH_CENC_PRECON", + "DASH_CENC_PS4", + ], + ) + for asset in assets: + r = requests.Request( + "GET", + url=self.config["LINK_PLATFORM_URL"].format(video_id=title.id), + params={ + "format": "redirect", + "formats": "MPEG-DASH", + "assetTypes": "|".join(asset), + "manifest": "M3U", + "Tracking": "true", + "mbr": "true", + }, + ) + req = self.session.send(self.session.prepare_request(r), allow_redirects=False) + if req.ok: + break + else: + raise ValueError(f"Manifest Error: {req.text}") - try: - tracks: Tracks = Tracks.from_mpd( - url=mpd_url.replace("cenc_precon_dash", "cenc_dash"), - source=self.ALIASES[0], - session=self.session, - ) - except: - tracks: Tracks = Tracks.from_mpd( - url=mpd_url, - source=self.ALIASES[0], - session=self.session, - ) - tracks.subtitles.clear() + mpd_url = req.headers.get('location') - req = self.session.get( - url=self.config["LINK_PLATFORM_URL"].format(video_id=title.id), - params={ - "format": "redirect", - "formats": "M3U", - "assetTypes": "|".join(["HLS_FPS_PRECON"]), - "manifest": "M3U", - "Tracking": "true", - "mbr": "true", - }, - ) - hls_url = req.url + try: + tracks: Tracks = Tracks.from_mpd( + url=mpd_url.replace("cenc_precon_dash", "cenc_dash"), + source=self.ALIASES[0], + session=self.session, + ) + except: + tracks: Tracks = Tracks.from_mpd( + url=mpd_url, + source=self.ALIASES[0], + session=self.session, + ) + tracks.subtitles.clear() - tracks_m3u8 = Tracks.from_m3u8( - m3u8.load(hls_url), - source=self.ALIASES[0], - ) - tracks.subtitles = tracks_m3u8.subtitles + req = self.session.get( + url=self.config["LINK_PLATFORM_URL"].format(video_id=title.id), + params={ + "format": "redirect", + "formats": "M3U", + "assetTypes": "|".join(["HLS_FPS_PRECON"]), + "manifest": "M3U", + "Tracking": "true", + "mbr": "true", + }, + ) + hls_url = req.url - for track in tracks: - # track.id = track.id - if isinstance(track, VideoTrack): - track.hdr10 = ( - track.codec[:4] in ("hvc1", "hev1") and track.extra[0].attrib.get("codecs")[5] == "2" - ) or (track.codec[:4] in ("hvc1", "hev1") and "HDR10plus" in track.url) + tracks_m3u8 = Tracks.from_m3u8( + m3u8.load(hls_url), + source=self.ALIASES[0], + ) + tracks.subtitles = tracks_m3u8.subtitles - track.dv = track.codec[:4] in ("dvh1", "dvhe") + for track in tracks: + # track.id = track.id + if isinstance(track, VideoTrack): + track.hdr10 = ( + track.codec[:4] in ("hvc1", "hev1") and track.extra[0].attrib.get("codecs")[5] == "2" + ) or (track.codec[:4] in ("hvc1", "hev1") and "HDR10plus" in track.url) - if isinstance(track, VideoTrack) or isinstance(track, AudioTrack): - if self.shorts: - track.encrypted = False + track.dv = track.codec[:4] in ("dvh1", "dvhe") - if isinstance(track, TextTrack): - track.codec = "vtt" - #if track.language.language == "en": - # track.sdh = True # TODO: don't assume SDH + if isinstance(track, VideoTrack) or isinstance(track, AudioTrack): + if self.shorts: + track.encrypted = False - if self.vcodec: - tracks.videos = [x for x in tracks.videos if (x.codec or "")[:4] in self.VIDEO_CODEC_MAP[self.vcodec]] + if isinstance(track, TextTrack): + track.codec = "vtt" + #if track.language.language == "en": + # track.sdh = True # TODO: don't assume SDH - if self.acodec: - tracks.audios = [x for x in tracks.audios if (x.codec or "")[:4] == self.AUDIO_CODEC_MAP[self.acodec]] + if self.vcodec: + tracks.videos = [x for x in tracks.videos if (x.codec or "")[:4] in self.VIDEO_CODEC_MAP[self.vcodec]] - return tracks + if self.acodec: + tracks.audios = [x for x in tracks.audios if (x.codec or "")[:4] == self.AUDIO_CODEC_MAP[self.acodec]] - def get_chapters(self, title: Title): - chapters = [] - events = title.service_data.get("playbackEvents") - events = {k: v for k, v in events.items() if v is not None} - events = dict(sorted(events.items(), key=lambda item: item[1])) - if not events: - return chapters + return tracks - chapters_titles = { - "endCreditChapterTimeMs": "Credits", - "previewStartTimeMs": "Preview Start", - "previewEndTimeMs": "Preview End", - "openCreditEndTimeMs": "openCreditEnd", - "openCreditStartTime": "openCreditStart", - } + def get_chapters(self, title: Title): + chapters = [] + events = title.service_data.get("playbackEvents") + events = {k: v for k, v in events.items() if v is not None} + events = dict(sorted(events.items(), key=lambda item: item[1])) + if not events: + return chapters - for name, time_ in events.items(): - if isinstance(time_, (int, float)): - chapters.append( - MenuTrack( - number=len(chapters) + 1, - title=chapters_titles.get(name), - timecode=MenuTrack.format_duration(time_ / 1000), - ) - ) + chapters_titles = { + "endCreditChapterTimeMs": "Credits", + "previewStartTimeMs": "Preview Start", + "previewEndTimeMs": "Preview End", + "openCreditEndTimeMs": "openCreditEnd", + "openCreditStartTime": "openCreditStart", + } - # chapters = sorted(chapters, key=self.converter_timecode) + for name, time_ in events.items(): + if isinstance(time_, (int, float)): + chapters.append( + MenuTrack( + number=len(chapters) + 1, + title=chapters_titles.get(name), + timecode=MenuTrack.format_duration(time_ / 1000), + ) + ) - return chapters + # chapters = sorted(chapters, key=self.converter_timecode) - def certificate(self, **_): - return None # will use common privacy cert + return chapters - def license(self, challenge, title, **_): - contentId = title.service_data.get("contentId") or title.service_data.get("content_id") - if not contentId: - raise self.log.exit("Error") + def certificate(self, **_): + return None # will use common privacy cert - r = self.session.post( - url=self.config["license_pr"] if self.playready else self.config["license"], - params={ - "CrmId": "cbsi", - "AccountId": "cbsi", - "SubContentType": "Default", - "ContentId": title.service_data.get("contentId") or title.service_data.get("content_id"), - }, - headers={"Authorization": f"Bearer {self.get_barrear(content_id=contentId)}"}, - data=challenge, # expects bytes - ) + def license(self, challenge, title, **_): + contentId = title.service_data.get("contentId") or title.service_data.get("content_id") + if not contentId: + raise self.log.exit("Error") - if r.headers["Content-Type"].startswith("application/json"): - res = r.json() - raise self.log.exit(res["message"]) + r = self.session.post( + url=self.config["license_pr"] if self.playready else self.config["license"], + params={ + "CrmId": "cbsi", + "AccountId": "cbsi", + "SubContentType": "Default", + "ContentId": title.service_data.get("contentId") or title.service_data.get("content_id"), + }, + headers={"Authorization": f"Bearer {self.get_barrear(content_id=contentId)}"}, + data=challenge, # expects bytes + ) - return base64.b64encode(r.content).decode() if self.playready else r.content + if r.headers["Content-Type"].startswith("application/json"): + res = r.json() + raise self.log.exit(res["message"]) - def configure(self): - self.region = self.session.get("https://ipinfo.io/json").json()["country"] - if self.region != "US": - if self.region != "FR": - self.region = "INTL" + return base64.b64encode(r.content).decode() if self.playready else r.content - #self.device_cache_path = Path(self.get_cache("device_tokens_{profile}.json".format( - #profile=self.profile, - #))) + def configure(self): + self.region = self.session.get("https://ipinfo.io/json").json()["country"] + if self.region != "US": + if self.region != "FR": + self.region = "INTL" - #if self.device_cache_path.exists(): - #with open(self.device_cache_path, encoding="utf-8") as fd: - #date = jsonpickle.decode(fd.read()) - #if "expiry" in date and datetime.fromisoformat(date["expiry"]) > datetime.now(): - #self.log.warning(" + Using cached device tokens") - #cache = date - #else: - #self.log.warning(" + Refreshing cookies") - #self.device_cache_path.unlink() - #if not self.credentials: - #raise self.log.exit(" - No credentials provided, unable to log in.") - #self.session.headers.update({"user-agent": self.config["Android"]["UserAgent"]}) - #self.session.params.update({"at": self.config[self.region]["at_token"]}) - #username = self.credentials.username - #password = self.credentials.password - #expiry = (datetime.now() + timedelta(minutes=3)).isoformat() - #cookie = self.login(username=username, password=password) - #cache = {"cookie": cookie, "expiry": expiry} - #self.device_cache_path.parent.mkdir(exist_ok=True, parents=True) - #with open(self.device_cache_path, "w", encoding="utf-8") as fd: - #fd.write(jsonpickle.encode(cache)) - #else: - if not self.credentials: - raise self.log.exit(" - No credentials provided, unable to log in.") - self.log.warning(" + Logging in") - self.session.headers.update({"user-agent": self.config["Android"]["UserAgent"]}) - self.session.params.update({"at": self.config[self.region]["at_token"]}) - username = self.credentials.username - password = self.credentials.password - #expiry = (datetime.now() + timedelta(minutes=3)).isoformat() - cookie = self.login(username=username, password=password) - #cache = {"cookie": cookie, "expiry": expiry} - #self.device_cache_path.parent.mkdir(exist_ok=True, parents=True) - #with open(self.device_cache_path, "w", encoding="utf-8") as fd: - #fd.write(jsonpickle.encode(cache)) - #cookie = cache["cookie"] - self.session.headers.update({"cookie": cookie}) - else: - self.session.headers.update( - { - "Origin": "https://www.paramountplus.com", - } - ) - self.session.params.update({"at": self.config[self.region]["at_token"]}) + #self.device_cache_path = Path(self.get_cache("device_tokens_{profile}.json".format( + #profile=self.profile, + #))) - #if not self.is_logged_in(): - #raise ValueError("InvalidCookies") + #if self.device_cache_path.exists(): + #with open(self.device_cache_path, encoding="utf-8") as fd: + #date = jsonpickle.decode(fd.read()) + #if "expiry" in date and datetime.fromisoformat(date["expiry"]) > datetime.now(): + #self.log.warning(" + Using cached device tokens") + #cache = date + #else: + #self.log.warning(" + Refreshing cookies") + #self.device_cache_path.unlink() + #if not self.credentials: + #raise self.log.exit(" - No credentials provided, unable to log in.") + #self.session.headers.update({"user-agent": self.config["Android"]["UserAgent"]}) + #self.session.params.update({"at": self.config[self.region]["at_token"]}) + #username = self.credentials.username + #password = self.credentials.password + #expiry = (datetime.now() + timedelta(minutes=3)).isoformat() + #cookie = self.login(username=username, password=password) + #cache = {"cookie": cookie, "expiry": expiry} + #self.device_cache_path.parent.mkdir(exist_ok=True, parents=True) + #with open(self.device_cache_path, "w", encoding="utf-8") as fd: + #fd.write(jsonpickle.encode(cache)) + #else: + if not self.credentials: + raise self.log.exit(" - No credentials provided, unable to log in.") + self.log.warning(" + Logging in") + self.session.headers.update({"user-agent": self.config["Android"]["UserAgent"]}) + self.session.params.update({"at": self.config[self.region]["at_token"]}) + username = self.credentials.username + password = self.credentials.password + #expiry = (datetime.now() + timedelta(minutes=3)).isoformat() + cookie = self.login(username=username, password=password) + #cache = {"cookie": cookie, "expiry": expiry} + #self.device_cache_path.parent.mkdir(exist_ok=True, parents=True) + #with open(self.device_cache_path, "w", encoding="utf-8") as fd: + #fd.write(jsonpickle.encode(cache)) + #cookie = cache["cookie"] + self.session.headers.update({"cookie": cookie}) + else: + self.session.headers.update( + { + "Origin": "https://www.paramountplus.com", + } + ) + self.session.params.update({"at": self.config[self.region]["at_token"]}) - #if not self.is_subscribed(): - #raise ValueError("NotEntitled") + #if not self.is_logged_in(): + #raise ValueError("InvalidCookies") - # Service specific functions + #if not self.is_subscribed(): + #raise ValueError("NotEntitled") - def get_prop(self, prop): - res = self.session.get("https://www.paramountplus.com") - prop_re = prop.replace(".", r"\.") - search = re.search(rf"{prop_re} ?= ?[\"']?([^\"';]+)", res.text) - if not search: - raise ValueError("InvalidCookies") + # Service specific functions - return search.group(1) + def get_prop(self, prop): + res = self.session.get("https://www.paramountplus.com") + prop_re = prop.replace(".", r"\.") + search = re.search(rf"{prop_re} ?= ?[\"']?([^\"';]+)", res.text) + if not search: + raise ValueError("InvalidCookies") - def is_logged_in(self): - return self.get_prop("CBS.UserAuthStatus") == "true" + return search.group(1) - def is_subscribed(self): - return self.get_prop("CBS.Registry.user.sub_status") == "SUBSCRIBER" - - def login(self, username, password): - login_params = { - "j_username": username, - "j_password": password - } + def is_logged_in(self): + return self.get_prop("CBS.UserAuthStatus") == "true" - response = self.session.post(url=self.config[self.region]["login"], params=login_params) + def is_subscribed(self): + return self.get_prop("CBS.Registry.user.sub_status") == "SUBSCRIBER" + + def login(self, username, password): + login_params = { + "j_username": username, + "j_password": password + } - status_response = self.session.get(url=self.config[self.region]["status"]).json() - self.log.debug(status_response) - if status_response["success"] == False: - raise self.log.exit("InvalidCredentials") - #if not status_response["userStatus"]["description"] == "SUBSCRIBER": - #raise ValueError("NotEntitled") - - cookies = ";".join([f"{key}={value}" for key, value in response.cookies.get_dict().items()]) + response = self.session.post(url=self.config[self.region]["login"], params=login_params) - return cookies - - def get_barrear(self, content_id): - #license_data = self.session.get(url="https://www.intl.paramountplus.com/apps-api/v3.0/androidphone/irdeto-control/session-token.json?contentId=%s&locale=en-us&at=ABATOpD5wXyjhjIMO0BaNh/gW0iCu0ISRy2U7/tyGiKZTQTlYDFL1NPD58CcuJLOQYY=" % (content_id)).json() - try: - res = self.session.get( - url=self.config[self.region]["barrearUrl"].replace("iphone", "androidtv") if self.playready else self.config[self.region]["barrearUrl"], - params={"contentId": content_id} - ) - res.raise_for_status() - except requests.HTTPError as e: - if e.response.status_code == 401: - self.log.warning("Received a 401 error, deleting cached cookies") - self.device_cache_path.unlink() - self.session.headers.clear() - self.session.params = {} - self.configure() - self.retrying = True - - res = res.json() + status_response = self.session.get(url=self.config[self.region]["status"]).json() + self.log.debug(status_response) + if status_response["success"] == False: + raise self.log.exit("InvalidCredentials") + #if not status_response["userStatus"]["description"] == "SUBSCRIBER": + #raise ValueError("NotEntitled") + + cookies = ";".join([f"{key}={value}" for key, value in response.cookies.get_dict().items()]) - if not res["success"]: - raise self.log.exit("Unable to get license token: %s" % (res["errors"])) + return cookies + + def get_barrear(self, content_id): + #license_data = self.session.get(url="https://www.intl.paramountplus.com/apps-api/v3.0/androidphone/irdeto-control/session-token.json?contentId=%s&locale=en-us&at=ABATOpD5wXyjhjIMO0BaNh/gW0iCu0ISRy2U7/tyGiKZTQTlYDFL1NPD58CcuJLOQYY=" % (content_id)).json() + try: + res = self.session.get( + url=self.config[self.region]["barrearUrl"].replace("iphone", "androidtv") if self.playready else self.config[self.region]["barrearUrl"], + params={"contentId": content_id} + ) + res.raise_for_status() + except requests.HTTPError as e: + if e.response.status_code == 401: + self.log.warning("Received a 401 error, deleting cached cookies") + self.device_cache_path.unlink() + self.session.headers.clear() + self.session.params = {} + self.configure() + self.retrying = True + + res = res.json() - self.license_url = res["url"] - ls_session = res["ls_session"] + if not res["success"]: + raise self.log.exit("Unable to get license token: %s" % (res["errors"])) - return ls_session + self.license_url = res["url"] + ls_session = res["ls_session"] - def parse_movie_year(self, url): - html_raw = self.session.get(url) + return ls_session - if html_raw.status_code != 200: - return None + def parse_movie_year(self, url): + html_raw = self.session.get(url) - self.year = int( - re.findall('"movie__air-year">[0-9]+<', html_raw.text)[0].replace('"movie__air-year">', "").replace("<", "") - ) + if html_raw.status_code != 200: + return None - def parse_show_id(self, url): - html_raw = self.session.get(url) + self.year = int( + re.findall('"movie__air-year">[0-9]+<', html_raw.text)[0].replace('"movie__air-year">', "").replace("<", "") + ) - if html_raw.status_code != 200: - self.log.exit("Could not parse Show Id.") + def parse_show_id(self, url): + html_raw = self.session.get(url) - show = json.loads('{"' + re.search('CBS.Registry.Show = {"(.*)"}', html_raw.text).group(1) + '"}') + if html_raw.status_code != 200: + self.log.exit("Could not parse Show Id.") - return str(show["id"]) + show = json.loads('{"' + re.search('CBS.Registry.Show = {"(.*)"}', html_raw.text).group(1) + '"}') + + return str(show["id"])