Merge pull request #39 from MrHulk02/dev

SUNNXT service by MrHulk02
This commit is contained in:
chu23465 2025-04-24 10:45:07 +05:30 committed by GitHub
commit f42208cce0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 373 additions and 57 deletions

View File

@ -0,0 +1,13 @@
secret_key: A3s68aORSgHs$71P
client_secret: apalyaSonyTV
UA: okhttp/2.5.0
endpoints:
contentDetail: https://pwaapi.sunnxt.com/content/v3/contentDetail/{titleid}/?level=devicemax&fields=contents,user/currentdata,generalInfo,subtitles,globalServiceName,globalServiceId
playback: https://api.sunnxt.com/content/v3/media/{titleid}/
tv_content: https://pwaapi.sunnxt.com/content/v3/vods/GID_{_id}/?fields=contents,generalInfo,globalServiceName&startIndex={index}&count=100
#login
register_device_url: https://api.sunnxt.com/user/v7/registerDevice
code_url: https://api.sunnxt.com/user/v4/device/code
link_url: https://api.sunnxt.com/user/v4/device/validate

View File

@ -1,57 +1,58 @@
import os import os
import re import re
from copy import copy from copy import copy
from vinetrimmer.services.BaseService import BaseService from vinetrimmer.services.BaseService import BaseService
SERVICE_MAP = {} SERVICE_MAP = {}
from vinetrimmer.services.amazon import Amazon from vinetrimmer.services.amazon import Amazon
from vinetrimmer.services.appletvplus import AppleTVPlus from vinetrimmer.services.appletvplus import AppleTVPlus
from vinetrimmer.services.max import Max from vinetrimmer.services.max import Max
from vinetrimmer.services.netflix import Netflix from vinetrimmer.services.netflix import Netflix
from vinetrimmer.services.peacock import Peacock from vinetrimmer.services.peacock import Peacock
from vinetrimmer.services.hotstar import Hotstar from vinetrimmer.services.hotstar import Hotstar
from vinetrimmer.services.jio import Jio from vinetrimmer.services.jio import Jio
from vinetrimmer.services.moviesanywhere import MoviesAnywhere from vinetrimmer.services.moviesanywhere import MoviesAnywhere
from vinetrimmer.services.sonyliv import Sonyliv from vinetrimmer.services.sonyliv import Sonyliv
from vinetrimmer.services.disneyplus import DisneyPlus from vinetrimmer.services.sunnxt import Sunnxt
from vinetrimmer.services.hulu import Hulu from vinetrimmer.services.disneyplus import DisneyPlus
from vinetrimmer.services.paramountplus import ParamountPlus from vinetrimmer.services.hulu import Hulu
from vinetrimmer.services.paramountplus import ParamountPlus
# Above is necessary since dynamic imports like below fuck up nuitak
# Above is necessary since dynamic imports like below fuck up nuitak
# Below dynamic imports fuck with compiling when using Nuitka - exec() call is the problem
#for service in os.listdir(os.path.dirname(__file__)): # Below dynamic imports fuck with compiling when using Nuitka - exec() call is the problem
# if service.startswith("_") or not service.endswith(".py"): #for service in os.listdir(os.path.dirname(__file__)):
# continue # if service.startswith("_") or not service.endswith(".py"):
# continue
# service = os.path.splitext(service)[0]
# service = os.path.splitext(service)[0]
# if service in ("__init__", "BaseService"):
# continue # if service in ("__init__", "BaseService"):
# continue
# with open(os.path.join(os.path.dirname(__file__), f"{service}.py"), encoding="utf-8") as fd:
# code = "" # with open(os.path.join(os.path.dirname(__file__), f"{service}.py"), encoding="utf-8") as fd:
# for line in fd.readlines(): # code = ""
# if re.match(r"\s*(?:import(?! click)|from)\s", line): # for line in fd.readlines():
# continue # if re.match(r"\s*(?:import(?! click)|from)\s", line):
# code += line # continue
# if re.match(r"\s*super\(\)\.__init__\(", line): # code += line
# break # if re.match(r"\s*super\(\)\.__init__\(", line):
# exec(code) # break
# exec(code)
for x in copy(globals()).values():
if isinstance(x, type) and issubclass(x, BaseService) and x != BaseService: for x in copy(globals()).values():
SERVICE_MAP[x.__name__] = x.ALIASES if isinstance(x, type) and issubclass(x, BaseService) and x != BaseService:
SERVICE_MAP[x.__name__] = x.ALIASES
def get_service_key(value):
""" def get_service_key(value):
Get the Service Key name (e.g. DisneyPlus, not dsnp, disney+, etc.) from the SERVICE_MAP. """
Input value can be of any case-sensitivity and can be either the key itself or an alias. Get the Service Key name (e.g. DisneyPlus, not dsnp, disney+, etc.) from the SERVICE_MAP.
""" Input value can be of any case-sensitivity and can be either the key itself or an alias.
value = value.lower() """
for key, aliases in SERVICE_MAP.items(): value = value.lower()
if value in map(str.lower, aliases) or value == key.lower(): for key, aliases in SERVICE_MAP.items():
return key if value in map(str.lower, aliases) or value == key.lower():
return key

View File

@ -0,0 +1,302 @@
import base64
import os
from pathlib import Path
import click
import hashlib
import json
import re
import requests
from Crypto.Cipher import AES
from Cryptodome.Util import Padding
import uuid
from langcodes import Language
from vinetrimmer.objects import Title, Tracks
from vinetrimmer.objects.tracks import TextTrack
from vinetrimmer.services.BaseService import BaseService
class Sunnxt(BaseService):
"""
Service Code for Sunnxt Streaming Service (https://www.sunnxt.com)
### Authorization
- Requires Login
### Security
- Supports UHD,FHD @ L3.
### Tips
- The content library can be browsed without an account at: https://www.sunnxt.com
Made by: MrHulk
"""
ALIASES = ["SNXT", "SUNNXT"]
GEOFENCE = [""]
TITLE_RE = r"https:\/\/www\.sunnxt\.com\/(?P<slug>[a-zA-Z0-9\-]+)\/(?P<type>[a-zA-Z]+)\/(?P<id>[0-9]+)"
@staticmethod
@click.command(name="Sunnxt", short_help="https://www.sunnxt.com")
@click.argument("url", type=str)
@click.option("--login", is_flag=True, default=False, help="Login to get Token")
@click.option("-nt", "--notitle", is_flag=True, default=False, help="Don't grab episode title...")
@click.pass_context
def cli(ctx, **kwargs):
return Sunnxt(ctx, **kwargs)
def __init__(self, ctx, url: str, login: bool, notitle: bool):
super().__init__(ctx)
m = self.parse_title(ctx, url)
self.slug = m.get("slug")
self.type = m.get("type")
self.id = m.get("id")
self.login = login
self.notitle = notitle
self.licenseUrl = None
self.token_cache_path = Path(self.get_cache("token.json"))
if self.login:
self._login()
if self.token_cache_path.is_file():
try:
with open(self.token_cache_path, "r", encoding="utf-8") as file:
data = json.load(file)
self.device_id = data.get("device_id")
self.client_key = data.get("client_key")
self.secret_key = (
self.config["secret_key"][-4:] + self.device_id[-8:] + self.config["secret_key"][:4]
)
except (json.JSONDecodeError, KeyError) as e:
self.log.error(f"Error reading token file: {e}")
self.log.exit("Token file is invalid or corrupted. Please log in again using the --login command.")
else:
self.log.exit("No valid token found. Please log in using the --login command.")
self.configure()
def get_titles(self):
self.log.info(f"+ Content Id: {self.id}")
res = self.session.get(self.config["endpoints"]["contentDetail"].format(titleid=self.id))
data = self.is_valid(res, "Content")
if data["results"][0]["generalInfo"]["type"] in ["movie", "musicvideo"]:
return [
Title(
id_=self.id,
type_=Title.Types.MOVIE,
name=data["results"][0]["generalInfo"]["title"],
year=data["results"][0]["content"]["releaseDate"][:4],
original_lang=Language.find(data["results"][0]["content"]["language"][0]).to_alpha3(),
source=self.ALIASES[0],
service_data=data,
)
]
elif data["results"][0]["generalInfo"]["type"] in ["vodchannel", "vod"]:
episodes = []
index = 1
while index < 50:
tv_res = self.session.get(
self.config["endpoints"]["tv_content"].format(_id=self.id, index=index)
)
tv_res = self.is_valid(tv_res, "tv content")
if tv_res["results"] == []:
break
for episode in tv_res["results"]:
ep_number = self.get_episode_number(episode["generalInfo"]["displayTitle"])
if ep_number is None:
continue
episodes.append(
Title(
id_=episode["_id"],
type_=Title.Types.TV,
name=episode["globalServiceName"],
season=1, # TODO
episode=self.get_episode_number(episode["generalInfo"]["displayTitle"]),
episode_name=None if self.notitle else episode["generalInfo"]["displayTitle"],
year=None,
original_lang=Language.find(data["results"][0]["content"]["language"][0]).to_alpha3(),
source=self.ALIASES[0],
)
)
index += 1
return episodes
elif data["results"][0]["generalInfo"]["type"] == "videoalbum":
videos = []
video_res = self.session.get(
self.config["endpoints"]["tv_content"].format(_id=self.id, index=1)
)
video_res = self.is_valid(video_res, "Video res")
for video in video_res["results"]:
if video["_id"] is None:
continue
videos.append(
Title(
id_=video["_id"],
type_=Title.Types.MOVIE,
name=video["title"],
year=video["releaseDate"][:4],
original_lang=Language.find(data["results"][0]["content"]["language"][0]).to_alpha3(),
source=self.ALIASES[0],
service_data=data,
)
)
return videos
def get_tracks(self, title):
tracks = Tracks()
res = self.session.get(
url=self.config["endpoints"]["playback"].format(titleid=title.id),
headers={
"user-agent": self.config["UA"],
"clientKey": self.client_key,
},
)
data = self.decrypt(res.json()["response"], self.secret_key)
if data["status"] != "SUCCESS":
self.log.exit(f" - Got error: {data['message']}")
if data["results"][0]["videos"]["status"] != "SUCCESS":
self.log.exit(f" - Got error: {data['results'][0]['videos']['message']}")
for value in data["results"][0]["videos"]["values"]:
if not value["link"].startswith("https"):
continue
if "dash-cenc" in value["format"]:
self.licenseUrl = value["licenseUrl"]
tracks.add(
Tracks.from_mpd(url=value["link"], session=self.session, source=self.ALIASES[0]),
warn_only=True,
)
if "subtitles" in data["results"][0] and len(data["results"][0]["subtitles"]["values"]) > 0:
tracks.add(
TextTrack(
id_=hashlib.md5(data["results"][0]["subtitles"]["values"][0]["link_sub"].encode()).hexdigest(),
url=data["results"][0]["subtitles"]["values"][0]["link_sub"] + ".vtt",
codec="vtt",
language=Language.find(data["results"][0]["subtitles"]["values"][0]["language"]).to_alpha3(),
source=self.ALIASES[0],
)
)
return tracks
def get_chapters(self, title):
return []
def certificate(self, challenge, **_):
return self.license(challenge)
def license(self, challenge, **_):
return self.session.post(url=self.licenseUrl, data=challenge).content
def configure(self):
self.session.headers.update(
{
"contentlanguage": "tamil,telugu,malayalam,kannada,hindi,bengali,marathi",
"origin": "https://www.sunnxt.com",
"referer": "https://www.sunnxt.com/",
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/135.0.0.0 Safari/537.36",
"x-myplex-maturity-level": "",
"x-myplex-platform": "AndroidTV",
}
)
def decrypt(self, data: str, secret_key: bytes):
cipher = AES.new(secret_key.encode("utf-8"), AES.MODE_CBC, iv=bytes([0] * 16))
return json.loads(Padding.unpad(cipher.decrypt(base64.b64decode(data)), 16).decode())
def encrypt(self, data: dict) -> str:
cipher = AES.new(self.config["secret_key"].encode("utf-8"), AES.MODE_CBC, iv=bytes([0] * 16))
encrypted = cipher.encrypt(
Padding.pad(json.dumps(data, separators=(",", ":")).encode(), 16)
)
return base64.b64encode(encrypted).decode()
def get_episode_number(self, title):
match = re.search(r"\b(?:EP?-?)(\d+)", title, re.IGNORECASE)
if match:
return int(match.group(1))
return None
def is_valid(self, res, stage):
try:
data = res.json()
except ValueError as e:
self.log.exit(f"Failed to get {stage} response. {e}")
if data.get("status") != "SUCCESS":
self.log.exit(f" - Got error: status - {data.get('status')}. msg - {data.get('message')}")
return data
def _login(self):
reg_data = {
"serialNo": str(uuid.uuid4()),
"os": "AndroidSony",
"osVersion": "9",
"make": "NVIDIA",
"model": "SHIELD Android TV",
"resolution": "3840x2160",
"profile": "work",
"deviceType": "Android",
"clientSecret": self.config["client_secret"],
}
self.log.info("Registering device...")
reg_res = requests.post(
self.config["endpoints"]["register_device_url"],
data={"payload": self.encrypt(reg_data), "version": 1},
headers={
"User-Agent": self.config["UA"],
"X-myplex-platform": "AndroidTV",
"ContentLanguage": "telugu",
"Accept-Language": "en",
},
)
reg_data_decrypted = self.decrypt(reg_res.json().get("response", {}), secret_key=self.config["secret_key"])
client_key = reg_data_decrypted.get("clientKey")
device_id = reg_data_decrypted.get("deviceId")
if not client_key or not device_id:
self.log.error("Failed to get client key or device ID from registration response.")
return
os.makedirs(os.path.dirname(self.token_cache_path), exist_ok=True)
with open(self.token_cache_path, "w", encoding="utf-8") as f:
json.dump({"client_key": client_key, "device_id": device_id}, f)
self.log.info(f"Successfully saved client key to: {self.token_cache_path}")
self.session.headers["clientKey"] = client_key
self.log.info("Fetching pairing code...")
pairing_resp = self.session.get(self.config["endpoints"]["code_url"])
pairing_resp.raise_for_status()
pairing_data = pairing_resp.json().get("results", {})
confirmation_url = pairing_data.get("confirmation_url")
auth_code = pairing_data.get("auth_code")
if not confirmation_url or not auth_code:
self.log.exit("Failed to obtain pairing code or confirmation URL.")
self.log.info(f"Go to https://www.{confirmation_url} and enter: {auth_code}")
input("Press Enter after completing the authentication...")
self.log.info("Linking device...")
link_resp = self.session.post(
self.config["endpoints"]["link_url"], data={"device_code": pairing_data.get("device_code")}
)
link_resp.raise_for_status()
self.log.info(f"Device linked successfully: {link_resp.json()}")