import base64 import hashlib import hmac import json import os import re import time from datetime import datetime import click import requests from pathlib import Path from click import Context from typing import Any, Optional, Union from devine.core.config import config from devine.core.tracks.subtitle import Subtitle from devine.core.credential import Credential from devine.core.manifests import DASH, HLS from devine.core.service import Service from devine.core.titles import Episode, Movie, Movies, Series, Title_T, Titles_T from devine.core.tracks import Chapter, Tracks, Audio, Video class PCOK(Service): """ Service code for NBC's Peacock streaming service (https://peacocktv.com). \b Authorization: Cookies Security: UHD@-- FHD@L3, doesn't care about releases. \b Tips: - The library of contents can be viewed without logging in at https://www.peacocktv.com/stream/tv See the footer for links to movies, news, etc. A US IP is required to view. """ ALIASES = ["PCOK", "peacock"] GEOFENCE = ["us"] TITLE_RE = [ r"(?:https?://(?:www\.)?peacocktv\.com/watch/asset/|/?)(?Pmovies/[a-z0-9/./-]+/[a-f0-9-]+)", r"(?:https?://(?:www\.)?peacocktv\.com/watch/asset/|/?)(?Ptv/[a-z0-9-/.]+/\d+)", r"(?:https?://(?:www\.)?peacocktv\.com/watch/asset/|/?)(?P-/[a-z0-9-/.]+/\d+)", r"(?:https?://(?:www\.)?peacocktv\.com/stream-tv/)?(?P[a-z0-9-/.]+)", ] @staticmethod @click.command(name="PCOK", short_help="https://peacocktv.com") @click.argument("title", type=str, required=False) @click.option("-m", "--movie", is_flag=True, default=False, help="Title is a movie.") @click.pass_context def cli(ctx: click.Context, **kwargs: Any) -> "PCOK": return PCOK(ctx, **kwargs) def __init__(self, ctx, title, movie): super().__init__(ctx) self.title = title self.movie = movie self.profile = ctx.obj.profile self.service_config = None self.hmac_key = None self.tokens = None self.license_api = None self.license_bt = None self.vcodec = ctx.parent.params["vcodec"] self.range = {"SDR": "SDR", "HDR10": "HDR10", "DV": "DOLBYVISION"}.get(ctx.parent.params["range_"][0]) self.configure() title = self.title.split("/")[-1] def get_titles(self) -> Union[Movies, Series]: # Title is a slug, e.g. `/tv/the-office/4902514835143843112` or just `the-office` if "/" not in self.title: r = self.session.get(self.config["endpoints"]["stream_tv"].format(title_id=self.title)) self.title = self.find("/watch/asset(/[^']+)", r.text) if not self.title: raise self.log.error(" - Title ID not found or invalid") if not self.title.startswith("/"): self.title = f"/{self.title}" if self.title.startswith("/movies/"): self.movie = True if self.title.startswith("/stream-tv"): self.tv = True res = self.session.get( url=self.config["endpoints"]["node"], params={"slug": self.title, "represent": "(items(items))"}, headers={ "Accept": "*", "Referer": f"https://www.peacocktv.com/watch/asset{self.title}", "X-SkyOTT-Device": self.config["client"]["device"], "X-SkyOTT-Platform": self.config["client"]["platform"], "X-SkyOTT-Proposition": self.config["client"]["proposition"], "X-SkyOTT-Provider": self.config["client"]["provider"], "X-SkyOTT-Territory": self.config["client"]["territory"], "X-SkyOTT-Language": "en", }, ).json() if self.movie: return Movies( [ Movie( id_=self.title, service=self.__class__, name=res["attributes"]["title"], year=res["attributes"]["year"], data=res, ) ] ) else: titles = [] for season in res["relationships"]["items"]["data"]: for episode in season["relationships"]["items"]["data"]: titles.append(episode) titles = [] for season in res["relationships"]["items"]["data"]: for episode in season["relationships"]["items"]["data"]: titles.append( Episode( id_=self.title, service=self.__class__, title=res["attributes"]["title"], year=episode["attributes"].get("year"), season=episode["attributes"].get("seasonNumber"), number=episode["attributes"].get("episodeNumber"), name=episode["attributes"].get("title"), data=episode, ) ) return Series(titles) def get_tracks(self, title: Union[Movie, Episode]) -> Tracks: tracks = Tracks() supported_colour_spaces = Video.Range.HDR10 if self.range == "HDR10": self.log.info("Switched dynamic range to HDR10") supported_colour_spaces = ["HDR10"] if self.range == "DV": self.log.info("Switched dynamic range to DV") supported_colour_spaces = ["DolbyVision"] content_id = title.data["attributes"]["formats"]["HD"]["contentId"] variant_id = title.data["attributes"]["providerVariantId"] sky_headers = { # order of these matter! "X-SkyOTT-Agent": ".".join( [ self.config["client"]["proposition"].lower(), self.config["client"]["device"].lower(), self.config["client"]["platform"].lower(), ] ), "X-SkyOTT-PinOverride": "false", "X-SkyOTT-Provider": self.config["client"]["provider"], "X-SkyOTT-Territory": self.config["client"]["territory"], "X-SkyOTT-UserToken": self.tokens["userToken"], } body = json.dumps( { "device": { # maybe get these from the config endpoint? "capabilities": [ { "protection": "WIDEVINE", "container": "ISOBMFF", "transport": "DASH", "acodec": "AAC", "vcodec": "H265", }, { "protection": "NONE", "container": "ISOBMFF", "transport": "DASH", "acodec": "AAC", "vcodec": "H265", }, ], "maxVideoFormat": "UHD", "supportedColourSpaces": supported_colour_spaces, "model": self.config["client"]["platform"], "hdcpEnabled": "true", }, "client": { "thirdParties": ["FREEWHEEL", "YOSPACE"] # CONVIVA }, "contentId": content_id, "providerVariantId": variant_id, "parentalControlPin": "null", }, separators=(",", ":"), ) manifest = self.session.post( url=self.config["endpoints"]["vod"], data=body, headers=dict( **sky_headers, **{ "Accept": "application/vnd.playvod.v1+json", "Content-Type": "application/vnd.playvod.v1+json", "X-Sky-Signature": self.create_signature_header( method="POST", path="/video/playouts/vod", sky_headers=sky_headers, body=body, timestamp=int(time.time()), ), }, ), ).json() if "errorCode" in manifest: raise self.log.error(f" - An error occurred: {manifest['description']} [{manifest['errorCode']}]") self.license_api = manifest["protection"]["licenceAcquisitionUrl"] self.license_bt = manifest["protection"]["licenceToken"] tracks = DASH.from_url(url=manifest["asset"]["endpoints"][0]["url"], session=self.session).to_tracks( language="en" ) if manifest["asset"]["format"]["colourSpace"] == 'HDR10': for track in tracks: track.range = Video.Range.HDR10 elif manifest["asset"]["format"]["colourSpace"] == ["DolbyVision"]: track.range = Video.Range.DV # for track in tracks.videos: # if isinstance(track, Video): # if 'UHDDV' in url: # track.range_ = Video.Range.HDR10 for track in tracks: track.needs_proxy = True for track in tracks.audio: if track.language.territory == "AD": # This is supposed to be Audio Description, not Andorra track.language.territory = None return tracks def get_chapters(self, title): return [] def get_widevine_service_certificate(self, *, challenge, title, track): return super().get_widevine_service_certificate(challenge=challenge, title=title, track=track) def get_widevine_license(self, title, challenge, **_) -> bytes: return self.session.post( url=self.license_api, headers={ "Accept": "*", "X-Sky-Signature": self.create_signature_header( method="POST", path="/" + self.license_api.split("://", 1)[1].split("/", 1)[1], sky_headers={}, body="", timestamp=int(time.time()), ), }, data=challenge, # expects bytes ).content # Service specific functions def configure(self): self.session.headers.update({"Origin": "https://www.peacocktv.com"}) self.log.info("Getting Peacock Client configuration") if self.config["client"]["platform"] != "PC": self.service_config = self.session.get( url=self.config["endpoints"]["config"].format( territory=self.config["client"]["territory"], provider=self.config["client"]["provider"], proposition=self.config["client"]["proposition"], device=self.config["client"]["platform"], version=self.config["client"]["config_version"], ) ).json() self.hmac_key = bytes(self.config["security"]["signature_hmac_key_v4"], "utf-8") self.log.info("Getting Authorization Tokens") self.tokens = self.get_tokens() self.log.info("Verifying Authorization Tokens") if not self.verify_tokens(): raise self.log.error(" - Failed! Cookies might be outdated.") @staticmethod def calculate_sky_header_md5(headers): if len(headers.items()) > 0: headers_str = "\n".join(f"{x[0].lower()}: {x[1]}" for x in headers.items()) + "\n" else: headers_str = "{}" return str(hashlib.md5(headers_str.encode()).hexdigest()) @staticmethod def calculate_body_md5(body): return str(hashlib.md5(body.encode()).hexdigest()) def calculate_signature(self, msg): digest = hmac.new(self.hmac_key, bytes(msg, "utf-8"), hashlib.sha1).digest() return str(base64.b64encode(digest), "utf-8") def create_signature_header(self, method, path, sky_headers, body, timestamp): data = ( "\n".join( [ method.upper(), path, "", # important! self.config["client"]["client_sdk"], "1.0", self.calculate_sky_header_md5(sky_headers), str(timestamp), self.calculate_body_md5(body), ] ) + "\n" ) signature_hmac = self.calculate_signature(data) return self.config["security"]["signature_format"].format( client=self.config["client"]["client_sdk"], signature=signature_hmac, timestamp=timestamp ) def get_tokens(self): # Try to get cached tokens cache_path = Path( config.directories.cache / self.__class__.__name__ / f"token.json".format(profile=self.profile, id=self.config["client"]["id"]) ) if os.path.isfile(cache_path): with open(cache_path, encoding="utf-8") as fd: tokens = json.load(fd) tokens_expiration = tokens.get("tokenExpiryTime", None) if tokens_expiration and datetime.strptime(tokens_expiration, "%Y-%m-%dT%H:%M:%S.%fZ") > datetime.now(): return tokens # Get all SkyOTT headers sky_headers = { # Order of these matters! "X-SkyOTT-Agent": ".".join( [ self.config["client"]["proposition"], self.config["client"]["device"], self.config["client"]["platform"], ] ).lower(), "X-SkyOTT-Device": self.config["client"]["device"], "X-SkyOTT-Platform": self.config["client"]["platform"], "X-SkyOTT-Proposition": self.config["client"]["proposition"], "X-SkyOTT-Provider": self.config["client"]["provider"], "X-SkyOTT-Territory": self.config["client"]["territory"], } try: # Call personas endpoint to get the accounts personaId personas = self.session.get( url=self.config["endpoints"]["personas"], headers=dict( **sky_headers, **{ "Accept": "application/vnd.persona.v1+json", "Content-Type": "application/vnd.persona.v1+json", "X-SkyOTT-TokenType": self.config["client"]["auth_scheme"], }, ), ).json() except requests.HTTPError as e: error = e.response.json() if "message" in error and "code" in error: error = f"{error['message']} [{error['code']}]" if "bad credentials" in error.lower(): error += ". Cookies may be expired or invalid." raise self.log.exit(f" - Unable to get persona ID: {error}") raise self.log.exit(f" - HTTP Error {e.response.status_code}: {e.response.reason}") persona = personas["personas"][0]["personaId"] # Craft the body data that will be sent to the tokens endpoint, being minified and order matters! body = json.dumps( { "auth": { "authScheme": self.config["client"]["auth_scheme"], "authIssuer": self.config["client"]["auth_issuer"], "provider": self.config["client"]["provider"], "providerTerritory": self.config["client"]["territory"], "proposition": self.config["client"]["proposition"], "personaId": persona, }, "device": { "type": self.config["client"]["device"], "platform": self.config["client"]["platform"], "id": self.config["client"]["id"], "drmDeviceId": self.config["client"]["drm_device_id"], }, }, separators=(",", ":"), ) # Get the tokens tokens = self.session.post( url=self.config["endpoints"]["tokens"], headers=dict( **sky_headers, **{ "Accept": "application/vnd.tokens.v1+json", "Content-Type": "application/vnd.tokens.v1+json", "X-Sky-Signature": self.create_signature_header( method="POST", path="/auth/tokens", sky_headers=sky_headers, body=body, timestamp=int(time.time()), ), }, ), data=body, ).json() os.makedirs(os.path.dirname(cache_path), exist_ok=True) with open(cache_path, "w", encoding="utf-8") as fd: json.dump(tokens, fd) return tokens def verify_tokens(self): """Verify the tokens by calling the /auth/users/me endpoint and seeing if it works""" sky_headers = { # order of these matter! "X-SkyOTT-Device": self.config["client"]["device"], "X-SkyOTT-Platform": self.config["client"]["platform"], "X-SkyOTT-Proposition": self.config["client"]["proposition"], "X-SkyOTT-Provider": self.config["client"]["provider"], "X-SkyOTT-Territory": self.config["client"]["territory"], "X-SkyOTT-UserToken": self.tokens["userToken"], } try: self.session.get( url=self.config["endpoints"]["me"], headers=dict( **sky_headers, **{ "Accept": "application/vnd.userinfo.v2+json", "Content-Type": "application/vnd.userinfo.v2+json", "X-Sky-Signature": self.create_signature_header( method="GET", path="/auth/users/me", sky_headers=sky_headers, body="", timestamp=int(time.time()), ), }, ), ) except requests.HTTPError: return False else: return True