sunnxt service

This commit is contained in:
MrHulk 2025-04-24 10:30:39 +05:30 committed by GitHub
parent d8b223b184
commit 3eab5807a4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 360 additions and 57 deletions

View File

@ -15,6 +15,7 @@ from vinetrimmer.services.hotstar import Hotstar
from vinetrimmer.services.jio import Jio
from vinetrimmer.services.moviesanywhere import MoviesAnywhere
from vinetrimmer.services.sonyliv import Sonyliv
from vinetrimmer.services.sunnxt import Sunnxt
from vinetrimmer.services.disneyplus import DisneyPlus
from vinetrimmer.services.hulu import Hulu
from vinetrimmer.services.paramountplus import ParamountPlus

View File

@ -0,0 +1,302 @@
import base64
import os
from pathlib import Path
import click
import hashlib
import json
import re
import requests
from Crypto.Cipher import AES
from Cryptodome.Util import Padding
import uuid
from langcodes import Language
from vinetrimmer.objects import Title, Tracks
from vinetrimmer.objects.tracks import TextTrack
from vinetrimmer.services.BaseService import BaseService
class Sunnxt(BaseService):
"""
Service Code for Sunnxt Streaming Service (https://www.sunnxt.com)
### Authorization
- Requires Login
### Security
- Supports UHD,FHD @ L3.
### Tips
- The content library can be browsed without an account at: https://www.sunnxt.com
Made by: MrHulk
"""
ALIASES = ["SNXT", "SUNNXT"]
GEOFENCE = [""]
TITLE_RE = r"https:\/\/www\.sunnxt\.com\/(?P<slug>[a-zA-Z0-9\-]+)\/(?P<type>[a-zA-Z]+)\/(?P<id>[0-9]+)"
@staticmethod
@click.command(name="Sunnxt", short_help="https://www.sunnxt.com")
@click.argument("url", type=str)
@click.option("--login", is_flag=True, default=False, help="Login to get Token")
@click.option("-nt", "--notitle", is_flag=True, default=False, help="Don't grab episode title...")
@click.pass_context
def cli(ctx, **kwargs):
return Sunnxt(ctx, **kwargs)
def __init__(self, ctx, url: str, login: bool, notitle: bool):
super().__init__(ctx)
m = self.parse_title(ctx, url)
self.slug = m.get("slug")
self.type = m.get("type")
self.id = m.get("id")
self.login = login
self.notitle = notitle
self.licenseUrl = None
self.token_cache_path = Path(self.get_cache("token.json"))
if self.login:
self._login()
if self.token_cache_path.is_file():
try:
with open(self.token_cache_path, "r", encoding="utf-8") as file:
data = json.load(file)
self.device_id = data.get("device_id")
self.client_key = data.get("client_key")
self.secret_key = (
self.config["secret_key"][-4:] + self.device_id[-8:] + self.config["secret_key"][:4]
)
except (json.JSONDecodeError, KeyError) as e:
self.log.error(f"Error reading token file: {e}")
self.log.exit("Token file is invalid or corrupted. Please log in again using the --login command.")
else:
self.log.exit("No valid token found. Please log in using the --login command.")
self.configure()
def get_titles(self):
self.log.info(f"+ Content Id: {self.id}")
res = self.session.get(self.config["endpoints"]["contentDetail"].format(titleid=self.id))
data = self.is_valid(res, "Content")
if data["results"][0]["generalInfo"]["type"] in ["movie", "musicvideo"]:
return [
Title(
id_=self.id,
type_=Title.Types.MOVIE,
name=data["results"][0]["generalInfo"]["title"],
year=data["results"][0]["content"]["releaseDate"][:4],
original_lang=Language.find(data["results"][0]["content"]["language"][0]).to_alpha3(),
source=self.ALIASES[0],
service_data=data,
)
]
elif data["results"][0]["generalInfo"]["type"] in ["vodchannel", "vod"]:
episodes = []
index = 1
while index < 50:
tv_res = self.session.get(
self.config["endpoints"]["tv_content"].format(_id=self.id, index=index)
)
tv_res = self.is_valid(tv_res, "tv content")
if tv_res["results"] == []:
break
for episode in tv_res["results"]:
ep_number = self.get_episode_number(episode["generalInfo"]["displayTitle"])
if ep_number is None:
continue
episodes.append(
Title(
id_=episode["_id"],
type_=Title.Types.TV,
name=episode["globalServiceName"],
season=1, # TODO
episode=self.get_episode_number(episode["generalInfo"]["displayTitle"]),
episode_name=None if self.notitle else episode["generalInfo"]["displayTitle"],
year=None,
original_lang=Language.find(data["results"][0]["content"]["language"][0]).to_alpha3(),
source=self.ALIASES[0],
)
)
index += 1
return episodes
elif data["results"][0]["generalInfo"]["type"] == "videoalbum":
videos = []
video_res = self.session.get(
self.config["endpoints"]["tv_content"].format(_id=self.id, index=1)
)
video_res = self.is_valid(video_res, "Video res")
for video in video_res["results"]:
if video["_id"] is None:
continue
videos.append(
Title(
id_=video["_id"],
type_=Title.Types.MOVIE,
name=video["title"],
year=video["releaseDate"][:4],
original_lang=Language.find(data["results"][0]["content"]["language"][0]).to_alpha3(),
source=self.ALIASES[0],
service_data=data,
)
)
return videos
def get_tracks(self, title):
tracks = Tracks()
res = self.session.get(
url=self.config["endpoints"]["playback"].format(titleid=title.id),
headers={
"user-agent": self.config["UA"],
"clientKey": self.client_key,
},
)
data = self.decrypt(res.json()["response"], self.secret_key)
if data["status"] != "SUCCESS":
self.log.exit(f" - Got error: {data['message']}")
if data["results"][0]["videos"]["status"] != "SUCCESS":
self.log.exit(f" - Got error: {data['results'][0]['videos']['message']}")
for value in data["results"][0]["videos"]["values"]:
if not value["link"].startswith("https"):
continue
if "dash-cenc" in value["format"]:
self.licenseUrl = value["licenseUrl"]
tracks.add(
Tracks.from_mpd(url=value["link"], session=self.session, source=self.ALIASES[0]),
warn_only=True,
)
if "subtitles" in data["results"][0] and len(data["results"][0]["subtitles"]["values"]) > 0:
tracks.add(
TextTrack(
id_=hashlib.md5(data["results"][0]["subtitles"]["values"][0]["link_sub"].encode()).hexdigest(),
url=data["results"][0]["subtitles"]["values"][0]["link_sub"] + ".vtt",
codec="vtt",
language=Language.find(data["results"][0]["subtitles"]["values"][0]["language"]).to_alpha3(),
source=self.ALIASES[0],
)
)
return tracks
def get_chapters(self, title):
return []
def certificate(self, challenge, **_):
return self.license(challenge)
def license(self, challenge, **_):
return self.session.post(url=self.licenseUrl, data=challenge).content
def configure(self):
self.session.headers.update(
{
"contentlanguage": "tamil,telugu,malayalam,kannada,hindi,bengali,marathi",
"origin": "https://www.sunnxt.com",
"referer": "https://www.sunnxt.com/",
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/135.0.0.0 Safari/537.36",
"x-myplex-maturity-level": "",
"x-myplex-platform": "AndroidTV",
}
)
def decrypt(self, data: str, secret_key: bytes):
cipher = AES.new(secret_key.encode("utf-8"), AES.MODE_CBC, iv=bytes([0] * 16))
return json.loads(Padding.unpad(cipher.decrypt(base64.b64decode(data)), 16).decode())
def encrypt(self, data: dict) -> str:
cipher = AES.new(self.config["secret_key"].encode("utf-8"), AES.MODE_CBC, iv=bytes([0] * 16))
encrypted = cipher.encrypt(
Padding.pad(json.dumps(data, separators=(",", ":")).encode(), 16)
)
return base64.b64encode(encrypted).decode()
def get_episode_number(self, title):
match = re.search(r"\b(?:EP?-?)(\d+)", title, re.IGNORECASE)
if match:
return int(match.group(1))
return None
def is_valid(self, res, stage):
try:
data = res.json()
except ValueError as e:
self.log.exit(f"Failed to get {stage} response. {e}")
if data.get("status") != "SUCCESS":
self.log.exit(f" - Got error: status - {data.get('status')}. msg - {data.get('message')}")
return data
def _login(self):
reg_data = {
"serialNo": str(uuid.uuid4()),
"os": "AndroidSony",
"osVersion": "9",
"make": "NVIDIA",
"model": "SHIELD Android TV",
"resolution": "3840x2160",
"profile": "work",
"deviceType": "Android",
"clientSecret": self.config["client_secret"],
}
self.log.info("Registering device...")
reg_res = requests.post(
self.config["endpoints"]["register_device_url"],
data={"payload": self.encrypt(reg_data), "version": 1},
headers={
"User-Agent": self.config["UA"],
"X-myplex-platform": "AndroidTV",
"ContentLanguage": "telugu",
"Accept-Language": "en",
},
)
reg_data_decrypted = self.decrypt(reg_res.json().get("response", {}), secret_key=self.config["secret_key"])
client_key = reg_data_decrypted.get("clientKey")
device_id = reg_data_decrypted.get("deviceId")
if not client_key or not device_id:
self.log.error("Failed to get client key or device ID from registration response.")
return
os.makedirs(os.path.dirname(self.token_cache_path), exist_ok=True)
with open(self.token_cache_path, "w", encoding="utf-8") as f:
json.dump({"client_key": client_key, "device_id": device_id}, f)
self.log.info(f"Successfully saved client key to: {self.token_cache_path}")
self.session.headers["clientKey"] = client_key
self.log.info("Fetching pairing code...")
pairing_resp = self.session.get(self.config["endpoints"]["code_url"])
pairing_resp.raise_for_status()
pairing_data = pairing_resp.json().get("results", {})
confirmation_url = pairing_data.get("confirmation_url")
auth_code = pairing_data.get("auth_code")
if not confirmation_url or not auth_code:
self.log.exit("Failed to obtain pairing code or confirmation URL.")
self.log.info(f"Go to https://www.{confirmation_url} and enter: {auth_code}")
input("Press Enter after completing the authentication...")
self.log.info("Linking device...")
link_resp = self.session.post(
self.config["endpoints"]["link_url"], data={"device_code": pairing_data.get("device_code")}
)
link_resp.raise_for_status()
self.log.info(f"Device linked successfully: {link_resp.json()}")