sp4rky-devine-services/services/PCOK/__init__.py
2025-04-09 15:07:35 -06:00

471 lines
18 KiB
Python

import base64
import hashlib
import hmac
import json
import os
import re
import time
from datetime import datetime
import click
import requests
from pathlib import Path
from click import Context
from typing import Any, Optional, Union
from devine.core.config import config
from devine.core.tracks.subtitle import Subtitle
from devine.core.credential import Credential
from devine.core.manifests import DASH, HLS
from devine.core.service import Service
from devine.core.titles import Episode, Movie, Movies, Series, Title_T, Titles_T
from devine.core.tracks import Chapter, Tracks, Audio, Video
class PCOK(Service):
"""
Service code for NBC's Peacock streaming service (https://peacocktv.com).
\b
Authorization: Cookies
Security: UHD@-- FHD@L3, doesn't care about releases.
\b
Tips: - The library of contents can be viewed without logging in at https://www.peacocktv.com/stream/tv
See the footer for links to movies, news, etc. A US IP is required to view.
"""
ALIASES = ["PCOK", "peacock"]
GEOFENCE = ["us"]
TITLE_RE = [
r"(?:https?://(?:www\.)?peacocktv\.com/watch/asset/|/?)(?P<id>movies/[a-z0-9/./-]+/[a-f0-9-]+)",
r"(?:https?://(?:www\.)?peacocktv\.com/watch/asset/|/?)(?P<id>tv/[a-z0-9-/.]+/\d+)",
r"(?:https?://(?:www\.)?peacocktv\.com/watch/asset/|/?)(?P<id>-/[a-z0-9-/.]+/\d+)",
r"(?:https?://(?:www\.)?peacocktv\.com/stream-tv/)?(?P<id>[a-z0-9-/.]+)",
]
@staticmethod
@click.command(name="PCOK", short_help="https://peacocktv.com")
@click.argument("title", type=str, required=False)
@click.option("-m", "--movie", is_flag=True, default=False, help="Title is a movie.")
@click.pass_context
def cli(ctx: click.Context, **kwargs: Any) -> "PCOK":
return PCOK(ctx, **kwargs)
def __init__(self, ctx, title, movie):
super().__init__(ctx)
self.title = title
self.movie = movie
self.profile = ctx.obj.profile
self.service_config = None
self.hmac_key = None
self.tokens = None
self.license_api = None
self.license_bt = None
self.vcodec = ctx.parent.params["vcodec"]
self.range = {"SDR": "SDR", "HDR10": "HDR10", "DV": "DOLBYVISION"}.get(ctx.parent.params["range_"][0])
self.configure()
title = self.title.split("/")[-1]
def get_titles(self) -> Union[Movies, Series]:
# Title is a slug, e.g. `/tv/the-office/4902514835143843112` or just `the-office`
if "/" not in self.title:
r = self.session.get(self.config["endpoints"]["stream_tv"].format(title_id=self.title))
self.title = self.find("/watch/asset(/[^']+)", r.text)
if not self.title:
raise self.log.error(" - Title ID not found or invalid")
if not self.title.startswith("/"):
self.title = f"/{self.title}"
if self.title.startswith("/movies/"):
self.movie = True
if self.title.startswith("/stream-tv"):
self.tv = True
res = self.session.get(
url=self.config["endpoints"]["node"],
params={"slug": self.title, "represent": "(items(items))"},
headers={
"Accept": "*",
"Referer": f"https://www.peacocktv.com/watch/asset{self.title}",
"X-SkyOTT-Device": self.config["client"]["device"],
"X-SkyOTT-Platform": self.config["client"]["platform"],
"X-SkyOTT-Proposition": self.config["client"]["proposition"],
"X-SkyOTT-Provider": self.config["client"]["provider"],
"X-SkyOTT-Territory": self.config["client"]["territory"],
"X-SkyOTT-Language": "en",
},
).json()
if self.movie:
return Movies(
[
Movie(
id_=self.title,
service=self.__class__,
name=res["attributes"]["title"],
year=res["attributes"]["year"],
data=res,
)
]
)
else:
titles = []
for season in res["relationships"]["items"]["data"]:
for episode in season["relationships"]["items"]["data"]:
titles.append(episode)
titles = []
for season in res["relationships"]["items"]["data"]:
for episode in season["relationships"]["items"]["data"]:
titles.append(
Episode(
id_=self.title,
service=self.__class__,
title=res["attributes"]["title"],
year=episode["attributes"].get("year"),
season=episode["attributes"].get("seasonNumber"),
number=episode["attributes"].get("episodeNumber"),
name=episode["attributes"].get("title"),
data=episode,
)
)
return Series(titles)
def get_tracks(self, title: Union[Movie, Episode]) -> Tracks:
tracks = Tracks()
supported_colour_spaces = Video.Range.HDR10
if self.range == "HDR10":
self.log.info("Switched dynamic range to HDR10")
supported_colour_spaces = ["HDR10"]
if self.range == "DV":
self.log.info("Switched dynamic range to DV")
supported_colour_spaces = ["DolbyVision"]
content_id = title.data["attributes"]["formats"]["HD"]["contentId"]
variant_id = title.data["attributes"]["providerVariantId"]
sky_headers = {
# order of these matter!
"X-SkyOTT-Agent": ".".join(
[
self.config["client"]["proposition"].lower(),
self.config["client"]["device"].lower(),
self.config["client"]["platform"].lower(),
]
),
"X-SkyOTT-PinOverride": "false",
"X-SkyOTT-Provider": self.config["client"]["provider"],
"X-SkyOTT-Territory": self.config["client"]["territory"],
"X-SkyOTT-UserToken": self.tokens["userToken"],
}
body = json.dumps(
{
"device": {
# maybe get these from the config endpoint?
"capabilities": [
{
"protection": "WIDEVINE",
"container": "ISOBMFF",
"transport": "DASH",
"acodec": "AAC",
"vcodec": "H265",
},
{
"protection": "NONE",
"container": "ISOBMFF",
"transport": "DASH",
"acodec": "AAC",
"vcodec": "H265",
},
],
"maxVideoFormat": "UHD",
"supportedColourSpaces": supported_colour_spaces,
"model": self.config["client"]["platform"],
"hdcpEnabled": "true",
},
"client": {
"thirdParties": ["FREEWHEEL", "YOSPACE"] # CONVIVA
},
"contentId": content_id,
"providerVariantId": variant_id,
"parentalControlPin": "null",
},
separators=(",", ":"),
)
manifest = self.session.post(
url=self.config["endpoints"]["vod"],
data=body,
headers=dict(
**sky_headers,
**{
"Accept": "application/vnd.playvod.v1+json",
"Content-Type": "application/vnd.playvod.v1+json",
"X-Sky-Signature": self.create_signature_header(
method="POST",
path="/video/playouts/vod",
sky_headers=sky_headers,
body=body,
timestamp=int(time.time()),
),
},
),
).json()
if "errorCode" in manifest:
raise self.log.error(f" - An error occurred: {manifest['description']} [{manifest['errorCode']}]")
self.license_api = manifest["protection"]["licenceAcquisitionUrl"]
self.license_bt = manifest["protection"]["licenceToken"]
tracks = DASH.from_url(url=manifest["asset"]["endpoints"][0]["url"], session=self.session).to_tracks(
language="en"
)
if manifest["asset"]["format"]["colourSpace"] == 'HDR10':
for track in tracks:
track.range = Video.Range.HDR10
elif manifest["asset"]["format"]["colourSpace"] == ["DolbyVision"]:
track.range = Video.Range.DV
# for track in tracks.videos:
# if isinstance(track, Video):
# if 'UHDDV' in url:
# track.range_ = Video.Range.HDR10
for track in tracks:
track.needs_proxy = True
for track in tracks.audio:
if track.language.territory == "AD":
# This is supposed to be Audio Description, not Andorra
track.language.territory = None
return tracks
def get_chapters(self, title):
return []
def get_widevine_service_certificate(self, *, challenge, title, track):
return super().get_widevine_service_certificate(challenge=challenge, title=title, track=track)
def get_widevine_license(self, title, challenge, **_) -> bytes:
return self.session.post(
url=self.license_api,
headers={
"Accept": "*",
"X-Sky-Signature": self.create_signature_header(
method="POST",
path="/" + self.license_api.split("://", 1)[1].split("/", 1)[1],
sky_headers={},
body="",
timestamp=int(time.time()),
),
},
data=challenge, # expects bytes
).content
# Service specific functions
def configure(self):
self.session.headers.update({"Origin": "https://www.peacocktv.com"})
self.log.info("Getting Peacock Client configuration")
if self.config["client"]["platform"] != "PC":
self.service_config = self.session.get(
url=self.config["endpoints"]["config"].format(
territory=self.config["client"]["territory"],
provider=self.config["client"]["provider"],
proposition=self.config["client"]["proposition"],
device=self.config["client"]["platform"],
version=self.config["client"]["config_version"],
)
).json()
self.hmac_key = bytes(self.config["security"]["signature_hmac_key_v4"], "utf-8")
self.log.info("Getting Authorization Tokens")
self.tokens = self.get_tokens()
self.log.info("Verifying Authorization Tokens")
if not self.verify_tokens():
raise self.log.error(" - Failed! Cookies might be outdated.")
@staticmethod
def calculate_sky_header_md5(headers):
if len(headers.items()) > 0:
headers_str = "\n".join(f"{x[0].lower()}: {x[1]}" for x in headers.items()) + "\n"
else:
headers_str = "{}"
return str(hashlib.md5(headers_str.encode()).hexdigest())
@staticmethod
def calculate_body_md5(body):
return str(hashlib.md5(body.encode()).hexdigest())
def calculate_signature(self, msg):
digest = hmac.new(self.hmac_key, bytes(msg, "utf-8"), hashlib.sha1).digest()
return str(base64.b64encode(digest), "utf-8")
def create_signature_header(self, method, path, sky_headers, body, timestamp):
data = (
"\n".join(
[
method.upper(),
path,
"", # important!
self.config["client"]["client_sdk"],
"1.0",
self.calculate_sky_header_md5(sky_headers),
str(timestamp),
self.calculate_body_md5(body),
]
)
+ "\n"
)
signature_hmac = self.calculate_signature(data)
return self.config["security"]["signature_format"].format(
client=self.config["client"]["client_sdk"], signature=signature_hmac, timestamp=timestamp
)
def get_tokens(self):
# Try to get cached tokens
cache_path = Path(
config.directories.cache
/ self.__class__.__name__
/ f"token.json".format(profile=self.profile, id=self.config["client"]["id"])
)
if os.path.isfile(cache_path):
with open(cache_path, encoding="utf-8") as fd:
tokens = json.load(fd)
tokens_expiration = tokens.get("tokenExpiryTime", None)
if tokens_expiration and datetime.strptime(tokens_expiration, "%Y-%m-%dT%H:%M:%S.%fZ") > datetime.now():
return tokens
# Get all SkyOTT headers
sky_headers = {
# Order of these matters!
"X-SkyOTT-Agent": ".".join(
[
self.config["client"]["proposition"],
self.config["client"]["device"],
self.config["client"]["platform"],
]
).lower(),
"X-SkyOTT-Device": self.config["client"]["device"],
"X-SkyOTT-Platform": self.config["client"]["platform"],
"X-SkyOTT-Proposition": self.config["client"]["proposition"],
"X-SkyOTT-Provider": self.config["client"]["provider"],
"X-SkyOTT-Territory": self.config["client"]["territory"],
}
try:
# Call personas endpoint to get the accounts personaId
personas = self.session.get(
url=self.config["endpoints"]["personas"],
headers=dict(
**sky_headers,
**{
"Accept": "application/vnd.persona.v1+json",
"Content-Type": "application/vnd.persona.v1+json",
"X-SkyOTT-TokenType": self.config["client"]["auth_scheme"],
},
),
).json()
except requests.HTTPError as e:
error = e.response.json()
if "message" in error and "code" in error:
error = f"{error['message']} [{error['code']}]"
if "bad credentials" in error.lower():
error += ". Cookies may be expired or invalid."
raise self.log.exit(f" - Unable to get persona ID: {error}")
raise self.log.exit(f" - HTTP Error {e.response.status_code}: {e.response.reason}")
persona = personas["personas"][0]["personaId"]
# Craft the body data that will be sent to the tokens endpoint, being minified and order matters!
body = json.dumps(
{
"auth": {
"authScheme": self.config["client"]["auth_scheme"],
"authIssuer": self.config["client"]["auth_issuer"],
"provider": self.config["client"]["provider"],
"providerTerritory": self.config["client"]["territory"],
"proposition": self.config["client"]["proposition"],
"personaId": persona,
},
"device": {
"type": self.config["client"]["device"],
"platform": self.config["client"]["platform"],
"id": self.config["client"]["id"],
"drmDeviceId": self.config["client"]["drm_device_id"],
},
},
separators=(",", ":"),
)
# Get the tokens
tokens = self.session.post(
url=self.config["endpoints"]["tokens"],
headers=dict(
**sky_headers,
**{
"Accept": "application/vnd.tokens.v1+json",
"Content-Type": "application/vnd.tokens.v1+json",
"X-Sky-Signature": self.create_signature_header(
method="POST",
path="/auth/tokens",
sky_headers=sky_headers,
body=body,
timestamp=int(time.time()),
),
},
),
data=body,
).json()
os.makedirs(os.path.dirname(cache_path), exist_ok=True)
with open(cache_path, "w", encoding="utf-8") as fd:
json.dump(tokens, fd)
return tokens
def verify_tokens(self):
"""Verify the tokens by calling the /auth/users/me endpoint and seeing if it works"""
sky_headers = {
# order of these matter!
"X-SkyOTT-Device": self.config["client"]["device"],
"X-SkyOTT-Platform": self.config["client"]["platform"],
"X-SkyOTT-Proposition": self.config["client"]["proposition"],
"X-SkyOTT-Provider": self.config["client"]["provider"],
"X-SkyOTT-Territory": self.config["client"]["territory"],
"X-SkyOTT-UserToken": self.tokens["userToken"],
}
try:
self.session.get(
url=self.config["endpoints"]["me"],
headers=dict(
**sky_headers,
**{
"Accept": "application/vnd.userinfo.v2+json",
"Content-Type": "application/vnd.userinfo.v2+json",
"X-Sky-Signature": self.create_signature_header(
method="GET",
path="/auth/users/me",
sky_headers=sky_headers,
body="",
timestamp=int(time.time()),
),
},
),
)
except requests.HTTPError:
return False
else:
return True