This commit is contained in:
chu23465 2025-04-13 07:52:20 +05:30
parent c9e04e3499
commit 4eb7ddedb8
4 changed files with 359 additions and 359 deletions

Binary file not shown.

View File

@ -9,7 +9,6 @@ from langcodes import Language
from vinetrimmer.objects import TextTrack, Title, Tracks
from vinetrimmer.services.BaseService import BaseService
from vinetrimmer.utils.pyhulu import Device, HuluClient
from vinetrimmer.utils.widevine.device import LocalDevice
class Hulu(BaseService):
"""

View File

@ -13,424 +13,425 @@ import jsonpickle
from vinetrimmer.objects import Title, Tracks
from vinetrimmer.objects.tracks import AudioTrack, MenuTrack, TextTrack, VideoTrack
from vinetrimmer.services.BaseService import BaseService
from vinetrimmer.utils.widevine.device import LocalDevice
class ParamountPlus(BaseService):
"""
Service code for Paramount's Paramount+ streaming service (https://paramountplus.com).
\b
Authorization: Credentials
Security: UHD@L3, doesn't care about releases.
"""
ALIASES = ["PMTP", "paramountplus", "paramount+"]
TITLE_RE = [
r"^https?://(?:www\.)?paramountplus\.com/(?P<type>movies)/[a-z0-9_-]+/(?P<id>\w+)",
r"^https?://(?:www\.)?paramountplus\.com/(?P<type>shows)/(?P<id>[a-zA-Z0-9_-]+)(/)?",
r"^https?://(?:www\.)?paramountplus\.com(?:/[a-z]{2})?/(?P<type>movies)/[a-z0-9_-]+/(?P<id>\w+)",
r"^https?://(?:www\.)?paramountplus\.com(?:/[a-z]{2})?/(?P<type>shows)/(?P<id>[a-zA-Z0-9_-]+)(/)?",
r"^(?P<id>\d+)$",
]
VIDEO_CODEC_MAP = {"H264": ["avc", "avc1"], "H265": ["hvc", "dvh", "hvc1", "hev1", "dvh1", "dvhe"]}
AUDIO_CODEC_MAP = {"AAC": "mp4a", "AC3": "ac-3", "EC3": "ec-3"}
@staticmethod
@click.command(name="ParamountPlus", short_help="https://paramountplus.com")
@click.argument("title", type=str, required=False)
@click.option("-m", "--movie", is_flag=True, default=False, help="Title is a Movie.")
@click.option(
"-c", "--clips", is_flag=True, default=False, help="Download clips instead of episodes (for TV shows)"
)
@click.pass_context
def cli(ctx: click.Context, **kwargs):
return ParamountPlus(ctx, **kwargs)
def __init__(self, ctx: click.Context, title: str, movie: bool, clips: bool):
super().__init__(ctx)
m = self.parse_title(ctx, title)
self.movie = movie or m.get("type") == "movies"
self.clips = clips
self.vcodec = ctx.parent.params["vcodec"]
self.acodec = ctx.parent.params["acodec"]
self.range = ctx.parent.params["range_"]
self.wanted = ctx.parent.params["wanted"]
self.shorts = False
self.profile = ctx.obj.profile
self.playready = ctx.obj.cdm.device.type == LocalDevice.Types.PLAYREADY
ctx.parent.params["acodec"] = "EC3"
if self.range != "SDR":
# vcodec must be H265 for High Dynamic Range
self.vcodec = "H265"
self.configure()
def get_titles(self):
if self.movie:
res = self.session.get(
url=self.config[self.region]["movie"].format(title_id=self.title),
params={
"includeTrailerInfo": "true",
"includeContentInfo": "true",
"locale": "en-us",
"at": self.config[self.region]["at_token"],
},
).json()
if not res["success"]:
if res["message"] == "No movie found for contentId.":
raise self.log.exit(" - Unable to find movie. For TV shows, use the numeric ID.")
else:
raise self.log.exit(f" - Failed to get title information: {res['message']}")
title = res["movie"]["movieContent"]
return Title(
id_=title["contentId"],
type_=Title.Types.MOVIE,
name=title["title"],
year=title["_airDateISO"][:4], # todo: find a way to get year, this api doesnt return it
original_lang="en", # TODO: Don't assume
source=self.ALIASES[0],
service_data=title,
)
else:
res = self.session.get(
url=self.config[self.region]["shows"].format(title=self.title)
).json()
links = next((x.get("links") for x in res["showMenu"] if x.get("device_app_id") == "all_platforms"), None)
config = next((x.get("videoConfigUniqueName") for x in links if x.get("title").strip() == "Episodes"), None)
show = next((x for x in res["show"]["results"] if x.get("type") == "show"), None)
seasons = [x["seasonNum"] for x in res["available_video_seasons"]["itemList"] if x.get("seasonNum")]
showId = show.get("show_id")
show_data = self.session.get(
url=self.config[self.region]["section"].format(showId=showId, config=config),
params={"platformType": "apps", "rows": "1", "begin": "0"},
).json()
section = next(
(x["sectionId"] for x in show_data["videoSectionMetadata"] if x["title"] == "Full Episodes"), None
)
episodes = []
for season in seasons:
res = self.session.get(
url=self.config[self.region]["seasons"].format(section=section),
params={"begin": "0", "rows": "999", "params": f"seasonNum={season}", "seasonNum": season},
).json()
episodes.extend(res["sectionItems"].get("itemList"))
titles = []
for episode in episodes:
titles.append(
Title(
id_=episode.get("contentId") or episode.get("content_id"),
type_=Title.Types.TV,
name=episode.get("seriesTitle") or episode.get("series_title"),
season=episode.get("seasonNum") or episode.get("season_number") or 0,
episode=episode["episodeNum"] if episode["fullEpisode"] else episode["positionNum"],
episode_name=episode["label"],
original_lang="en", # TODO: Don't assume
source=self.ALIASES[0],
service_data=episode,
)
)
return titles
def get_tracks(self, title: Title):
assets = (
["DASH_CENC_HDR10"],
[
"HLS_AES",
"DASH_LIVE",
"DASH_CENC_HDR10",
"DASH_TA",
"DASH_CENC",
"DASH_CENC_PRECON",
"DASH_CENC_PS4",
],
)
for asset in assets:
r = requests.Request(
"GET",
url=self.config["LINK_PLATFORM_URL"].format(video_id=title.id),
params={
"format": "redirect",
"formats": "MPEG-DASH",
"assetTypes": "|".join(asset),
"manifest": "M3U",
"Tracking": "true",
"mbr": "true",
},
)
req = self.session.send(self.session.prepare_request(r), allow_redirects=False)
if req.ok:
break
else:
raise ValueError(f"Manifest Error: {req.text}")
mpd_url = req.headers.get('location')
try:
tracks: Tracks = Tracks.from_mpd(
url=mpd_url.replace("cenc_precon_dash", "cenc_dash"),
source=self.ALIASES[0],
session=self.session,
)
except:
tracks: Tracks = Tracks.from_mpd(
url=mpd_url,
source=self.ALIASES[0],
session=self.session,
)
tracks.subtitles.clear()
req = self.session.get(
url=self.config["LINK_PLATFORM_URL"].format(video_id=title.id),
params={
"format": "redirect",
"formats": "M3U",
"assetTypes": "|".join(["HLS_FPS_PRECON"]),
"manifest": "M3U",
"Tracking": "true",
"mbr": "true",
},
)
hls_url = req.url
tracks_m3u8 = Tracks.from_m3u8(
m3u8.load(hls_url),
source=self.ALIASES[0],
)
tracks.subtitles = tracks_m3u8.subtitles
for track in tracks:
# track.id = track.id
if isinstance(track, VideoTrack):
track.hdr10 = (
track.codec[:4] in ("hvc1", "hev1") and track.extra[0].attrib.get("codecs")[5] == "2"
) or (track.codec[:4] in ("hvc1", "hev1") and "HDR10plus" in track.url)
track.dv = track.codec[:4] in ("dvh1", "dvhe")
if isinstance(track, VideoTrack) or isinstance(track, AudioTrack):
if self.shorts:
track.encrypted = False
if isinstance(track, TextTrack):
track.codec = "vtt"
#if track.language.language == "en":
# track.sdh = True # TODO: don't assume SDH
if self.vcodec:
tracks.videos = [x for x in tracks.videos if (x.codec or "")[:4] in self.VIDEO_CODEC_MAP[self.vcodec]]
if self.acodec:
tracks.audios = [x for x in tracks.audios if (x.codec or "")[:4] == self.AUDIO_CODEC_MAP[self.acodec]]
return tracks
def get_chapters(self, title: Title):
chapters = []
events = title.service_data.get("playbackEvents")
events = {k: v for k, v in events.items() if v is not None}
events = dict(sorted(events.items(), key=lambda item: item[1]))
if not events:
return chapters
chapters_titles = {
"endCreditChapterTimeMs": "Credits",
"previewStartTimeMs": "Preview Start",
"previewEndTimeMs": "Preview End",
"openCreditEndTimeMs": "openCreditEnd",
"openCreditStartTime": "openCreditStart",
}
for name, time_ in events.items():
if isinstance(time_, (int, float)):
chapters.append(
MenuTrack(
number=len(chapters) + 1,
title=chapters_titles.get(name),
timecode=MenuTrack.format_duration(time_ / 1000),
)
)
# chapters = sorted(chapters, key=self.converter_timecode)
return chapters
def certificate(self, **_):
return None # will use common privacy cert
def license(self, challenge, title, **_):
contentId = title.service_data.get("contentId") or title.service_data.get("content_id")
if not contentId:
raise self.log.exit("Error")
r = self.session.post(
url=self.config["license_pr"] if self.playready else self.config["license"],
params={
"CrmId": "cbsi",
"AccountId": "cbsi",
"SubContentType": "Default",
"ContentId": title.service_data.get("contentId") or title.service_data.get("content_id"),
},
headers={"Authorization": f"Bearer {self.get_barrear(content_id=contentId)}"},
data=challenge, # expects bytes
)
if r.headers["Content-Type"].startswith("application/json"):
res = r.json()
raise self.log.exit(res["message"])
return base64.b64encode(r.content).decode() if self.playready else r.content
def configure(self):
self.region = self.session.get("https://ipinfo.io/json").json()["country"]
if self.region != "US":
if self.region != "FR":
self.region = "INTL"
#self.device_cache_path = Path(self.get_cache("device_tokens_{profile}.json".format(
#profile=self.profile,
#)))
#if self.device_cache_path.exists():
#with open(self.device_cache_path, encoding="utf-8") as fd:
#date = jsonpickle.decode(fd.read())
#if "expiry" in date and datetime.fromisoformat(date["expiry"]) > datetime.now():
#self.log.warning(" + Using cached device tokens")
#cache = date
#else:
#self.log.warning(" + Refreshing cookies")
#self.device_cache_path.unlink()
#if not self.credentials:
#raise self.log.exit(" - No credentials provided, unable to log in.")
#self.session.headers.update({"user-agent": self.config["Android"]["UserAgent"]})
#self.session.params.update({"at": self.config[self.region]["at_token"]})
#username = self.credentials.username
#password = self.credentials.password
#expiry = (datetime.now() + timedelta(minutes=3)).isoformat()
#cookie = self.login(username=username, password=password)
#cache = {"cookie": cookie, "expiry": expiry}
#self.device_cache_path.parent.mkdir(exist_ok=True, parents=True)
#with open(self.device_cache_path, "w", encoding="utf-8") as fd:
#fd.write(jsonpickle.encode(cache))
#else:
if not self.credentials:
raise self.log.exit(" - No credentials provided, unable to log in.")
self.log.warning(" + Logging in")
self.session.headers.update({"user-agent": self.config["Android"]["UserAgent"]})
self.session.params.update({"at": self.config[self.region]["at_token"]})
username = self.credentials.username
password = self.credentials.password
#expiry = (datetime.now() + timedelta(minutes=3)).isoformat()
cookie = self.login(username=username, password=password)
#cache = {"cookie": cookie, "expiry": expiry}
#self.device_cache_path.parent.mkdir(exist_ok=True, parents=True)
#with open(self.device_cache_path, "w", encoding="utf-8") as fd:
#fd.write(jsonpickle.encode(cache))
#cookie = cache["cookie"]
self.session.headers.update({"cookie": cookie})
else:
self.session.headers.update(
{
"Origin": "https://www.paramountplus.com",
}
)
self.session.params.update({"at": self.config[self.region]["at_token"]})
#if not self.is_logged_in():
#raise ValueError("InvalidCookies")
#if not self.is_subscribed():
#raise ValueError("NotEntitled")
# Service specific functions
def get_prop(self, prop):
res = self.session.get("https://www.paramountplus.com")
prop_re = prop.replace(".", r"\.")
search = re.search(rf"{prop_re} ?= ?[\"']?([^\"';]+)", res.text)
if not search:
raise ValueError("InvalidCookies")
return search.group(1)
def is_logged_in(self):
return self.get_prop("CBS.UserAuthStatus") == "true"
def is_subscribed(self):
return self.get_prop("CBS.Registry.user.sub_status") == "SUBSCRIBER"
def login(self, username, password):
login_params = {
"j_username": username,
"j_password": password
}
response = self.session.post(url=self.config[self.region]["login"], params=login_params)
status_response = self.session.get(url=self.config[self.region]["status"]).json()
self.log.debug(status_response)
if status_response["success"] == False:
raise self.log.exit("InvalidCredentials")
#if not status_response["userStatus"]["description"] == "SUBSCRIBER":
#raise ValueError("NotEntitled")
cookies = ";".join([f"{key}={value}" for key, value in response.cookies.get_dict().items()])
return cookies
def get_barrear(self, content_id):
#license_data = self.session.get(url="https://www.intl.paramountplus.com/apps-api/v3.0/androidphone/irdeto-control/session-token.json?contentId=%s&locale=en-us&at=ABATOpD5wXyjhjIMO0BaNh/gW0iCu0ISRy2U7/tyGiKZTQTlYDFL1NPD58CcuJLOQYY=" % (content_id)).json()
try:
res = self.session.get(
url=self.config[self.region]["barrearUrl"].replace("iphone", "androidtv") if self.playready else self.config[self.region]["barrearUrl"],
params={"contentId": content_id}
)
res.raise_for_status()
except requests.HTTPError as e:
if e.response.status_code == 401:
self.log.warning("Received a 401 error, deleting cached cookies")
self.device_cache_path.unlink()
self.session.headers.clear()
self.session.params = {}
self.configure()
self.retrying = True
res = res.json()
if not res["success"]:
raise self.log.exit("Unable to get license token: %s" % (res["errors"]))
self.license_url = res["url"]
ls_session = res["ls_session"]
return ls_session
def parse_movie_year(self, url):
html_raw = self.session.get(url)
if html_raw.status_code != 200:
return None
self.year = int(
re.findall('"movie__air-year">[0-9]+<', html_raw.text)[0].replace('"movie__air-year">', "").replace("<", "")
)
def parse_show_id(self, url):
html_raw = self.session.get(url)
if html_raw.status_code != 200:
self.log.exit("Could not parse Show Id.")
show = json.loads('{"' + re.search('CBS.Registry.Show = {"(.*)"}', html_raw.text).group(1) + '"}')
return str(show["id"])
"""
Service code for Paramount's Paramount+ streaming service (https://paramountplus.com).
\b
Authorization: Credentials
Security: UHD@L3, doesn't care about releases.
"""
ALIASES = ["PMTP", "paramountplus", "paramount+"]
TITLE_RE = [
r"^https?://(?:www\.)?paramountplus\.com/(?P<type>movies)/[a-z0-9_-]+/(?P<id>\w+)",
r"^https?://(?:www\.)?paramountplus\.com/(?P<type>shows)/(?P<id>[a-zA-Z0-9_-]+)(/)?",
r"^https?://(?:www\.)?paramountplus\.com(?:/[a-z]{2})?/(?P<type>movies)/[a-z0-9_-]+/(?P<id>\w+)",
r"^https?://(?:www\.)?paramountplus\.com(?:/[a-z]{2})?/(?P<type>shows)/(?P<id>[a-zA-Z0-9_-]+)(/)?",
r"^(?P<id>\d+)$",
]
VIDEO_CODEC_MAP = {"H264": ["avc", "avc1"], "H265": ["hvc", "dvh", "hvc1", "hev1", "dvh1", "dvhe"]}
AUDIO_CODEC_MAP = {"AAC": "mp4a", "AC3": "ac-3", "EC3": "ec-3"}
@staticmethod
@click.command(name="ParamountPlus", short_help="https://paramountplus.com")
@click.argument("title", type=str, required=False)
@click.option("-m", "--movie", is_flag=True, default=False, help="Title is a Movie.")
@click.option(
"-c", "--clips", is_flag=True, default=False, help="Download clips instead of episodes (for TV shows)"
)
@click.pass_context
def cli(ctx: click.Context, **kwargs):
return ParamountPlus(ctx, **kwargs)
def __init__(self, ctx: click.Context, title: str, movie: bool, clips: bool):
super().__init__(ctx)
m = self.parse_title(ctx, title)
self.movie = movie or m.get("type") == "movies"
self.clips = clips
self.vcodec = ctx.parent.params["vcodec"]
self.acodec = ctx.parent.params["acodec"]
self.range = ctx.parent.params["range_"]
self.wanted = ctx.parent.params["wanted"]
self.shorts = False
self.profile = ctx.obj.profile
self.playready = True if "certificate_chain" in dir(ctx.obj.cdm) else False # ctx.obj.cdm.device.type == LocalDevice.Types.PLAYREADY
ctx.parent.params["acodec"] = "EC3"
if self.range != "SDR":
# vcodec must be H265 for High Dynamic Range
self.vcodec = "H265"
self.configure()
def get_titles(self):
if self.movie:
res = self.session.get(
url=self.config[self.region]["movie"].format(title_id=self.title),
params={
"includeTrailerInfo": "true",
"includeContentInfo": "true",
"locale": "en-us",
"at": self.config[self.region]["at_token"],
},
).json()
if not res["success"]:
if res["message"] == "No movie found for contentId.":
raise self.log.exit(" - Unable to find movie. For TV shows, use the numeric ID.")
else:
raise self.log.exit(f" - Failed to get title information: {res['message']}")
title = res["movie"]["movieContent"]
return Title(
id_=title["contentId"],
type_=Title.Types.MOVIE,
name=title["title"],
year=title["_airDateISO"][:4], # todo: find a way to get year, this api doesnt return it
original_lang="en", # TODO: Don't assume
source=self.ALIASES[0],
service_data=title,
)
else:
res = self.session.get(
url=self.config[self.region]["shows"].format(title=self.title)
).json()
links = next((x.get("links") for x in res["showMenu"] if x.get("device_app_id") == "all_platforms"), None)
config = next((x.get("videoConfigUniqueName") for x in links if x.get("title").strip() == "Episodes"), None)
show = next((x for x in res["show"]["results"] if x.get("type") == "show"), None)
seasons = [x["seasonNum"] for x in res["available_video_seasons"]["itemList"] if x.get("seasonNum")]
showId = show.get("show_id")
show_data = self.session.get(
url=self.config[self.region]["section"].format(showId=showId, config=config),
params={"platformType": "apps", "rows": "1", "begin": "0"},
).json()
section = next(
(x["sectionId"] for x in show_data["videoSectionMetadata"] if x["title"] == "Full Episodes"), None
)
episodes = []
for season in seasons:
res = self.session.get(
url=self.config[self.region]["seasons"].format(section=section),
params={"begin": "0", "rows": "999", "params": f"seasonNum={season}", "seasonNum": season},
).json()
episodes.extend(res["sectionItems"].get("itemList"))
titles = []
for episode in episodes:
titles.append(
Title(
id_=episode.get("contentId") or episode.get("content_id"),
type_=Title.Types.TV,
name=episode.get("seriesTitle") or episode.get("series_title"),
season=episode.get("seasonNum") or episode.get("season_number") or 0,
episode=episode["episodeNum"] if episode["fullEpisode"] else episode["positionNum"],
episode_name=episode["label"],
original_lang="en", # TODO: Don't assume
source=self.ALIASES[0],
service_data=episode,
)
)
return titles
def get_tracks(self, title: Title):
assets = (
["DASH_CENC_HDR10"],
[
"HLS_AES",
"DASH_LIVE",
"DASH_CENC_HDR10",
"DASH_TA",
"DASH_CENC",
"DASH_CENC_PRECON",
"DASH_CENC_PS4",
],
)
for asset in assets:
r = requests.Request(
"GET",
url=self.config["LINK_PLATFORM_URL"].format(video_id=title.id),
params={
"format": "redirect",
"formats": "MPEG-DASH",
"assetTypes": "|".join(asset),
"manifest": "M3U",
"Tracking": "true",
"mbr": "true",
},
)
req = self.session.send(self.session.prepare_request(r), allow_redirects=False)
if req.ok:
break
else:
raise ValueError(f"Manifest Error: {req.text}")
mpd_url = req.headers.get('location')
try:
tracks: Tracks = Tracks.from_mpd(
url=mpd_url.replace("cenc_precon_dash", "cenc_dash"),
source=self.ALIASES[0],
session=self.session,
)
except:
tracks: Tracks = Tracks.from_mpd(
url=mpd_url,
source=self.ALIASES[0],
session=self.session,
)
tracks.subtitles.clear()
req = self.session.get(
url=self.config["LINK_PLATFORM_URL"].format(video_id=title.id),
params={
"format": "redirect",
"formats": "M3U",
"assetTypes": "|".join(["HLS_FPS_PRECON"]),
"manifest": "M3U",
"Tracking": "true",
"mbr": "true",
},
)
hls_url = req.url
tracks_m3u8 = Tracks.from_m3u8(
m3u8.load(hls_url),
source=self.ALIASES[0],
)
tracks.subtitles = tracks_m3u8.subtitles
for track in tracks:
# track.id = track.id
if isinstance(track, VideoTrack):
track.hdr10 = (
track.codec[:4] in ("hvc1", "hev1") and track.extra[0].attrib.get("codecs")[5] == "2"
) or (track.codec[:4] in ("hvc1", "hev1") and "HDR10plus" in track.url)
track.dv = track.codec[:4] in ("dvh1", "dvhe")
if isinstance(track, VideoTrack) or isinstance(track, AudioTrack):
if self.shorts:
track.encrypted = False
if isinstance(track, TextTrack):
track.codec = "vtt"
#if track.language.language == "en":
# track.sdh = True # TODO: don't assume SDH
if self.vcodec:
tracks.videos = [x for x in tracks.videos if (x.codec or "")[:4] in self.VIDEO_CODEC_MAP[self.vcodec]]
if self.acodec:
tracks.audios = [x for x in tracks.audios if (x.codec or "")[:4] == self.AUDIO_CODEC_MAP[self.acodec]]
return tracks
def get_chapters(self, title: Title):
chapters = []
events = title.service_data.get("playbackEvents")
events = {k: v for k, v in events.items() if v is not None}
events = dict(sorted(events.items(), key=lambda item: item[1]))
if not events:
return chapters
chapters_titles = {
"endCreditChapterTimeMs": "Credits",
"previewStartTimeMs": "Preview Start",
"previewEndTimeMs": "Preview End",
"openCreditEndTimeMs": "openCreditEnd",
"openCreditStartTime": "openCreditStart",
}
for name, time_ in events.items():
if isinstance(time_, (int, float)):
chapters.append(
MenuTrack(
number=len(chapters) + 1,
title=chapters_titles.get(name),
timecode=MenuTrack.format_duration(time_ / 1000),
)
)
# chapters = sorted(chapters, key=self.converter_timecode)
return chapters
def certificate(self, **_):
return None # will use common privacy cert
def license(self, challenge, title, **_):
contentId = title.service_data.get("contentId") or title.service_data.get("content_id")
if not contentId:
raise self.log.exit("Error")
r = self.session.post(
url=self.config["license_pr"] if self.playready else self.config["license"],
params={
"CrmId": "cbsi",
"AccountId": "cbsi",
"SubContentType": "Default",
"ContentId": title.service_data.get("contentId") or title.service_data.get("content_id"),
},
headers={"Authorization": f"Bearer {self.get_barrear(content_id=contentId)}"},
data=challenge, # expects bytes
)
if r.headers["Content-Type"].startswith("application/json"):
res = r.json()
raise self.log.exit(res["message"])
return base64.b64encode(r.content).decode() if self.playready else r.content
def configure(self):
self.region = self.session.get("https://ipinfo.io/json").json()["country"]
if self.region != "US":
if self.region != "FR":
self.region = "INTL"
#self.device_cache_path = Path(self.get_cache("device_tokens_{profile}.json".format(
#profile=self.profile,
#)))
#if self.device_cache_path.exists():
#with open(self.device_cache_path, encoding="utf-8") as fd:
#date = jsonpickle.decode(fd.read())
#if "expiry" in date and datetime.fromisoformat(date["expiry"]) > datetime.now():
#self.log.warning(" + Using cached device tokens")
#cache = date
#else:
#self.log.warning(" + Refreshing cookies")
#self.device_cache_path.unlink()
#if not self.credentials:
#raise self.log.exit(" - No credentials provided, unable to log in.")
#self.session.headers.update({"user-agent": self.config["Android"]["UserAgent"]})
#self.session.params.update({"at": self.config[self.region]["at_token"]})
#username = self.credentials.username
#password = self.credentials.password
#expiry = (datetime.now() + timedelta(minutes=3)).isoformat()
#cookie = self.login(username=username, password=password)
#cache = {"cookie": cookie, "expiry": expiry}
#self.device_cache_path.parent.mkdir(exist_ok=True, parents=True)
#with open(self.device_cache_path, "w", encoding="utf-8") as fd:
#fd.write(jsonpickle.encode(cache))
#else:
if not self.credentials:
raise self.log.exit(" - No credentials provided, unable to log in.")
self.log.warning(" + Logging in")
self.session.headers.update({"user-agent": self.config["Android"]["UserAgent"]})
self.session.params.update({"at": self.config[self.region]["at_token"]})
username = self.credentials.username
password = self.credentials.password
#expiry = (datetime.now() + timedelta(minutes=3)).isoformat()
cookie = self.login(username=username, password=password)
#cache = {"cookie": cookie, "expiry": expiry}
#self.device_cache_path.parent.mkdir(exist_ok=True, parents=True)
#with open(self.device_cache_path, "w", encoding="utf-8") as fd:
#fd.write(jsonpickle.encode(cache))
#cookie = cache["cookie"]
self.session.headers.update({"cookie": cookie})
else:
self.session.headers.update(
{
"Origin": "https://www.paramountplus.com",
}
)
self.session.params.update({"at": self.config[self.region]["at_token"]})
#if not self.is_logged_in():
#raise ValueError("InvalidCookies")
#if not self.is_subscribed():
#raise ValueError("NotEntitled")
# Service specific functions
def get_prop(self, prop):
res = self.session.get("https://www.paramountplus.com")
prop_re = prop.replace(".", r"\.")
search = re.search(rf"{prop_re} ?= ?[\"']?([^\"';]+)", res.text)
if not search:
raise ValueError("InvalidCookies")
return search.group(1)
def is_logged_in(self):
return self.get_prop("CBS.UserAuthStatus") == "true"
def is_subscribed(self):
return self.get_prop("CBS.Registry.user.sub_status") == "SUBSCRIBER"
def login(self, username, password):
login_params = {
"j_username": username,
"j_password": password
}
response = self.session.post(url=self.config[self.region]["login"], params=login_params)
status_response = self.session.get(url=self.config[self.region]["status"]).json()
self.log.debug(status_response)
if status_response["success"] == False:
raise self.log.exit("InvalidCredentials")
#if not status_response["userStatus"]["description"] == "SUBSCRIBER":
#raise ValueError("NotEntitled")
cookies = ";".join([f"{key}={value}" for key, value in response.cookies.get_dict().items()])
return cookies
def get_barrear(self, content_id):
#license_data = self.session.get(url="https://www.intl.paramountplus.com/apps-api/v3.0/androidphone/irdeto-control/session-token.json?contentId=%s&locale=en-us&at=ABATOpD5wXyjhjIMO0BaNh/gW0iCu0ISRy2U7/tyGiKZTQTlYDFL1NPD58CcuJLOQYY=" % (content_id)).json()
try:
res = self.session.get(
url=self.config[self.region]["barrearUrl"].replace("iphone", "androidtv") if self.playready else self.config[self.region]["barrearUrl"],
params={"contentId": content_id}
)
res.raise_for_status()
except requests.HTTPError as e:
if e.response.status_code == 401:
self.log.warning("Received a 401 error, deleting cached cookies")
self.device_cache_path.unlink()
self.session.headers.clear()
self.session.params = {}
self.configure()
self.retrying = True
res = res.json()
if not res["success"]:
raise self.log.exit("Unable to get license token: %s" % (res["errors"]))
self.license_url = res["url"]
ls_session = res["ls_session"]
return ls_session
def parse_movie_year(self, url):
html_raw = self.session.get(url)
if html_raw.status_code != 200:
return None
self.year = int(
re.findall('"movie__air-year">[0-9]+<', html_raw.text)[0].replace('"movie__air-year">', "").replace("<", "")
)
def parse_show_id(self, url):
html_raw = self.session.get(url)
if html_raw.status_code != 200:
self.log.exit("Could not parse Show Id.")
show = json.loads('{"' + re.search('CBS.Registry.Show = {"(.*)"}', html_raw.text).group(1) + '"}')
return str(show["id"])