Unshackle-Services/PCOK/__init__.py
2026-01-03 17:53:34 +02:00

455 lines
17 KiB
Python

import base64
import hashlib
import hmac
import json
import time
from datetime import datetime
from http.cookiejar import CookieJar
from typing import Optional
import click
from langcodes import Language
from pyplayready.cdm import Cdm as PlayReadyCdm
from unshackle.core.constants import AnyTrack
from unshackle.core.credential import Credential
from unshackle.core.manifests import DASH
from unshackle.core.service import Service
from unshackle.core.titles import Episode, Movie, Movies, Series, Title_T, Titles_T
from unshackle.core.tracks import Chapters, Tracks, Video
class PCOK(Service):
"""
Service code for NBC's Peacock streaming service (https://peacocktv.com).
Version: 1.0.0
Authorization: Cookies
Security: UHD@-- FHD@SL*
Tips: - The library of contents can be viewed without logging in at https://www.peacocktv.com/stream/tv
See the footer for links to movies, news, etc. A US IP is required to view.
"""
ALIASES = ("PCOK", "peacock")
GEOFENCE = ("US",)
TITLE_RE = [
r"(?:https?://(?:www\.)?peacocktv\.com/watch/asset/|/?)(?P<id>movies/[a-z0-9/./-]+/[a-f0-9-]+)",
r"(?:https?://(?:www\.)?peacocktv\.com/watch/asset/|/?)(?P<id>tv/[a-z0-9/./-]+/[a-f0-9-]+)",
r"(?:https?://(?:www\.)?peacocktv\.com/watch/asset/|/?)(?P<id>tv/[a-z0-9-/.]+/\d+)",
r"(?:https?://(?:www\.)?peacocktv\.com/watch/asset/|/?)(?P<id>news/[a-z0-9/./-]+/[a-f0-9-]+)",
r"(?:https?://(?:www\.)?peacocktv\.com/watch/asset/|/?)(?P<id>news/[a-z0-9-/.]+/\d+)",
r"(?:https?://(?:www\.)?peacocktv\.com/watch/asset/|/?)(?P<id>-/[a-z0-9-/.]+/\d+)",
r"(?:https?://(?:www\.)?peacocktv\.com/stream-tv/)?(?P<id>[a-z0-9-/.]+)",
]
@staticmethod
@click.command(name="PCOK", short_help="https://peacocktv.com")
@click.argument("title", type=str)
@click.option("-m", "--movie", is_flag=True, default=False, help="Title is a movie.")
@click.pass_context
def cli(ctx, **kwargs):
return PCOK(ctx, **kwargs)
def __init__(self, ctx, title, movie):
super().__init__(ctx)
self.title = title
self.movie = movie
self.cdm = ctx.obj.cdm
if not isinstance(self.cdm, PlayReadyCdm):
self.log.warning("PlayReady CDM not provided, exiting")
raise SystemExit(1)
range_param = ctx.parent.params.get("range_")
self.range = range_param[0].name if range_param else "SDR"
vcodec_param = ctx.parent.params.get("vcodec")
self.vcodec = vcodec_param if vcodec_param else "H264"
if self.config is None:
raise Exception("Config is missing!")
profile_name = ctx.parent.params.get("profile")
if profile_name is None:
profile_name = "default"
self.profile = profile_name
self.hmac_key = None
self.tokens = None
self.license_api = None
self.license_bt = None
def authenticate(self, cookies: Optional[CookieJar] = None, credential: Optional[Credential] = None) -> None:
super().authenticate(cookies, credential)
if not cookies:
raise EnvironmentError("Service requires Cookies for Authentication.")
self.session.headers.update({"Origin": "https://www.peacocktv.com"})
self.log.info("Getting Peacock Client configuration")
if self.config["client"]["platform"] != "PC":
self.service_config = self.session.get(
url=self.config["endpoints"]["config"].format(
territory=self.config["client"]["territory"],
provider=self.config["client"]["provider"],
proposition=self.config["client"]["proposition"],
device=self.config["client"]["platform"],
version=self.config["client"]["config_version"],
)
).json()
self.hmac_key = bytes(self.config["security"]["signature_hmac_key_v4"], "utf-8")
self.log.info("Getting Authorization Tokens")
self.tokens = self.get_tokens()
self.log.info("Verifying Authorization Tokens")
if not self.verify_tokens():
raise EnvironmentError("Failed! Cookies might be outdated.")
def get_titles(self) -> Titles_T:
# Parse title from various URL formats
import re
title_id = self.title
for pattern in self.TITLE_RE:
match = re.search(pattern, self.title)
if match:
title_id = match.group("id")
break
# Handle stream-tv redirects
if "/" not in title_id:
r = self.session.get(self.config["endpoints"]["stream_tv"].format(title_id=title_id))
match = re.search(r"/watch/asset(/[^']+)", r.text)
if match:
title_id = match.group(1)
else:
raise ValueError("Title ID not found or invalid")
if not title_id.startswith("/"):
title_id = f"/{title_id}"
if title_id.startswith("/movies/"):
self.movie = True
res = self.session.get(
url=self.config["endpoints"]["node"],
params={
"slug": title_id,
"represent": "(items(items))"
},
headers={
"Accept": "*",
"Referer": f"https://www.peacocktv.com/watch/asset{title_id}",
"X-SkyOTT-Device": self.config["client"]["device"],
"X-SkyOTT-Platform": self.config["client"]["platform"],
"X-SkyOTT-Proposition": self.config["client"]["proposition"],
"X-SkyOTT-Provider": self.config["client"]["provider"],
"X-SkyOTT-Territory": self.config["client"]["territory"],
"X-SkyOTT-Language": "en"
}
).json()
if self.movie:
return Movies([
Movie(
id_=title_id,
service=self.__class__,
name=res["attributes"]["title"],
year=res["attributes"]["year"],
data=res,
)
])
else:
episodes = []
for season in res["relationships"]["items"]["data"]:
for episode in season["relationships"]["items"]["data"]:
episodes.append(episode)
episode_titles = []
for x in episodes:
episode_titles.append(
Episode(
id_=title_id,
service=self.__class__,
title=res["attributes"]["title"],
season=x["attributes"].get("seasonNumber"),
number=x["attributes"].get("episodeNumber"),
name=x["attributes"].get("title"),
year=x["attributes"].get("year"),
data=x
)
)
return Series(episode_titles)
def get_tracks(self, title: Title_T) -> Tracks:
supported_colour_spaces = ["SDR"]
if self.range == "HDR10":
self.log.info("Switched dynamic range to HDR10")
supported_colour_spaces = ["HDR10"]
elif self.range == "DV":
self.log.info("Switched dynamic range to DV")
supported_colour_spaces = ["DolbyVision"]
content_id = title.data["attributes"]["formats"]["HD"]["contentId"]
variant_id = title.data["attributes"]["providerVariantId"]
sky_headers = {
"X-SkyOTT-Agent": ".".join([
self.config["client"]["proposition"].lower(),
self.config["client"]["device"].lower(),
self.config["client"]["platform"].lower()
]),
"X-SkyOTT-PinOverride": "false",
"X-SkyOTT-Provider": self.config["client"]["provider"],
"X-SkyOTT-Territory": self.config["client"]["territory"],
"X-SkyOTT-UserToken": self.tokens["userToken"]
}
body = json.dumps({
"device": {
"capabilities": [
{
"protection": "PLAYREADY",
"container": "ISOBMFF",
"transport": "DASH",
"acodec": "AAC",
"vcodec": "H265" if self.vcodec == "H265" else "H264",
},
{
"protection": "PLAYREADY",
"container": "ISOBMFF",
"transport": "DASH",
"acodec": "AAC",
"vcodec": "H265" if self.vcodec == "H265" else "H264",
}
],
"maxVideoFormat": "UHD" if self.vcodec == "H265" else "HD",
"supportedColourSpaces": supported_colour_spaces,
"model": self.config["client"]["platform"],
"hdcpEnabled": "true"
},
"client": {
"thirdParties": ["FREEWHEEL", "YOSPACE"]
},
"contentId": content_id,
"providerVariantId": variant_id,
"parentalControlPin": "null"
}, separators=(",", ":"))
manifest = self.session.post(
url=self.config["endpoints"]["vod"],
data=body,
headers=dict(**sky_headers, **{
"Accept": "application/vnd.playvod.v1+json",
"Content-Type": "application/vnd.playvod.v1+json",
"X-Sky-Signature": self.create_signature_header(
method="POST",
path="/video/playouts/vod",
sky_headers=sky_headers,
body=body,
timestamp=int(time.time())
)
})
).json()
if "errorCode" in manifest:
raise ValueError(f"An error occurred: {manifest['description']} [{manifest['errorCode']}]")
self.license_api = manifest["protection"]["licenceAcquisitionUrl"]
self.license_bt = manifest["protection"]["licenceToken"]
tracks = DASH.from_url(
url=manifest["asset"]["endpoints"][0]["url"],
session=self.session
).to_tracks(language=Language.get("en"))
# Set HDR attributes
for video in tracks.videos:
if supported_colour_spaces == ["HDR10"]:
video.range = Video.Range.HDR10
elif supported_colour_spaces == ["DolbyVision"]:
video.range = Video.Range.DV
else:
video.range = Video.Range.SDR
# Fix audio description language
for track in tracks.audio:
if track.language.territory == "AD":
track.language.territory = None
return tracks
def get_chapters(self, title: Title_T) -> Chapters:
"""Get chapters for the title. Peacock doesn't typically provide chapter data."""
return Chapters([])
def get_playready_license(self, *, challenge: bytes, title: Title_T, track: AnyTrack) -> Optional[bytes]:
"""Retrieve a PlayReady license for a given track."""
if not self.license_api:
return None
response = self.session.post(
url=self.license_api,
headers={
"Accept": "*",
"X-Sky-Signature": self.create_signature_header(
method="POST",
path="/" + self.license_api.split("://", 2)[1].split("/", 1)[1],
sky_headers={},
body="",
timestamp=int(time.time())
)
},
data=challenge
)
response.raise_for_status()
return response.content
@staticmethod
def calculate_sky_header_md5(headers):
if len(headers.items()) > 0:
headers_str = "\n".join(f"{x[0].lower()}: {x[1]}" for x in headers.items()) + "\n"
else:
headers_str = "{}"
return str(hashlib.md5(headers_str.encode()).hexdigest())
@staticmethod
def calculate_body_md5(body):
return str(hashlib.md5(body.encode()).hexdigest())
def calculate_signature(self, msg):
digest = hmac.new(self.hmac_key, bytes(msg, "utf-8"), hashlib.sha1).digest()
return str(base64.b64encode(digest), "utf-8")
def create_signature_header(self, method, path, sky_headers, body, timestamp):
data = "\n".join([
method.upper(),
path,
"",
self.config["client"]["client_sdk"],
"1.0",
self.calculate_sky_header_md5(sky_headers),
str(timestamp),
self.calculate_body_md5(body)
]) + "\n"
signature_hmac = self.calculate_signature(data)
return self.config["security"]["signature_format"].format(
client=self.config["client"]["client_sdk"],
signature=signature_hmac,
timestamp=timestamp
)
def get_tokens(self):
# Try to get cached tokens
cache = self.cache.get(f"tokens_{self.profile}_{self.config['client']['id']}")
if cache and cache.data.get("tokenExpiryTime"):
tokens_expiration = cache.data.get("tokenExpiryTime")
if datetime.strptime(tokens_expiration, "%Y-%m-%dT%H:%M:%S.%fZ") > datetime.now():
return cache.data
# Get all SkyOTT headers
sky_headers = {
"X-SkyOTT-Agent": ".".join([
self.config["client"]["proposition"],
self.config["client"]["device"],
self.config["client"]["platform"]
]).lower(),
"X-SkyOTT-Device": self.config["client"]["device"],
"X-SkyOTT-Platform": self.config["client"]["platform"],
"X-SkyOTT-Proposition": self.config["client"]["proposition"],
"X-SkyOTT-Provider": self.config["client"]["provider"],
"X-SkyOTT-Territory": self.config["client"]["territory"]
}
try:
# Call personas endpoint to get the accounts personaId
personas = self.session.get(
url=self.config["endpoints"]["personas"],
headers=dict(**sky_headers, **{
"Accept": "application/vnd.persona.v1+json",
"Content-Type": "application/vnd.persona.v1+json",
"X-SkyOTT-TokenType": self.config["client"]["auth_scheme"]
})
).json()
except Exception as e:
raise EnvironmentError(f"Unable to get persona ID: {e}")
persona = personas["personas"][0]["personaId"]
# Craft the body data
body = json.dumps({
"auth": {
"authScheme": self.config["client"]["auth_scheme"],
"authIssuer": self.config["client"]["auth_issuer"],
"provider": self.config["client"]["provider"],
"providerTerritory": self.config["client"]["territory"],
"proposition": self.config["client"]["proposition"],
"personaId": persona
},
"device": {
"type": self.config["client"]["device"],
"platform": self.config["client"]["platform"],
"id": self.config["client"]["id"],
"drmDeviceId": self.config["client"]["drm_device_id"]
}
}, separators=(",", ":"))
# Get the tokens
tokens = self.session.post(
url=self.config["endpoints"]["tokens"],
headers=dict(**sky_headers, **{
"Accept": "application/vnd.tokens.v1+json",
"Content-Type": "application/vnd.tokens.v1+json",
"X-Sky-Signature": self.create_signature_header(
method="POST",
path="/auth/tokens",
sky_headers=sky_headers,
body=body,
timestamp=int(time.time())
)
}),
data=body
).json()
# Cache the tokens
if not cache:
cache = self.cache.get(f"tokens_{self.profile}_{self.config['client']['id']}")
cache.set(data=tokens)
return tokens
def verify_tokens(self):
"""Verify the tokens by calling the /auth/users/me endpoint"""
sky_headers = {
"X-SkyOTT-Device": self.config["client"]["device"],
"X-SkyOTT-Platform": self.config["client"]["platform"],
"X-SkyOTT-Proposition": self.config["client"]["proposition"],
"X-SkyOTT-Provider": self.config["client"]["provider"],
"X-SkyOTT-Territory": self.config["client"]["territory"],
"X-SkyOTT-UserToken": self.tokens["userToken"]
}
try:
self.session.get(
url=self.config["endpoints"]["me"],
headers=dict(**sky_headers, **{
"Accept": "application/vnd.userinfo.v2+json",
"Content-Type": "application/vnd.userinfo.v2+json",
"X-Sky-Signature": self.create_signature_header(
method="GET",
path="/auth/users/me",
sky_headers=sky_headers,
body="",
timestamp=int(time.time())
)
})
)
return True
except Exception:
return False