Proper fix for DSNP

DSNP service is currently stable
This commit is contained in:
chu23465 2025-04-16 21:47:14 +05:30
parent b01bbdb53f
commit 1867fb9e5e
6 changed files with 112 additions and 26 deletions

View File

@ -307,7 +307,6 @@ def dl(ctx, profile, cdm, *_, **__):
credentials=credentials, credentials=credentials,
) )
@dl.result_callback() @dl.result_callback()
@click.pass_context @click.pass_context
def result(ctx, service, quality, closest_resolution, range_, wanted, alang, slang, audio_only, subs_only, chapters_only, audio_description, def result(ctx, service, quality, closest_resolution, range_, wanted, alang, slang, audio_only, subs_only, chapters_only, audio_description,
@ -333,10 +332,15 @@ def result(ctx, service, quality, closest_resolution, range_, wanted, alang, sla
else: else:
log.info(" + No captions found") log.info(" + No captions found")
global content_keys
log = service.log log = service.log
service_name = service.__class__.__name__ service_name = service.__class__.__name__
if service_name == "DisneyPlus": # Always retrieve fresh keys for DSNP so that content_keys variable has 2 kid:key pairs
no_cache = True
log.info("Retrieving Titles") log.info("Retrieving Titles")
try: try:
titles = Titles(as_list(service.get_titles())) titles = Titles(as_list(service.get_titles()))
@ -451,7 +455,7 @@ def result(ctx, service, quality, closest_resolution, range_, wanted, alang, sla
continue # only wanted to see what tracks were available and chosen continue # only wanted to see what tracks were available and chosen
skip_title = False skip_title = False
#Download might fail as auth token expires quickly for Hotstar. This is a problem for big downloads like a 4k track. So we reverse tracks and download audio first and large video later. #Download might fail as auth token expires quickly for Hotstar. This is a problem for big downloads like a 4k track. So we reverse tracks and download audio first and large video later.
for track in (list(title.tracks)[::-1] if service_name == "Hotstar" else title.tracks): for track in (list(title.tracks)[::-1] if service_name == "Hotstar" else title.tracks):
if not keys: if not keys:
@ -477,6 +481,7 @@ def result(ctx, service, quality, closest_resolution, range_, wanted, alang, sla
proxy = None proxy = None
track.download(directories.temp, headers=service.session.headers, proxy=proxy) track.download(directories.temp, headers=service.session.headers, proxy=proxy)
log.info(" + Downloaded") log.info(" + Downloaded")
# To-Do: For DSNP KID add -> mp4info (Bento4) with --verbose option show default kid from init.mp4 file
if isinstance(track, VideoTrack) and track.needs_ccextractor_first and not no_subs: if isinstance(track, VideoTrack) and track.needs_ccextractor_first and not no_subs:
ccextractor() ccextractor()
if track.encrypted: if track.encrypted:
@ -617,7 +622,18 @@ def result(ctx, service, quality, closest_resolution, range_, wanted, alang, sla
f"label=0:key_id={track.kid.lower()}:key={track.key.lower()}", f"label=0:key_id={track.kid.lower()}:key={track.key.lower()}",
# Apple TV+ needs this as shaka pulls the incorrect KID, idk why # Apple TV+ needs this as shaka pulls the incorrect KID, idk why
f"label=1:key_id=00000000000000000000000000000000:key={track.key.lower()}", f"label=1:key_id=00000000000000000000000000000000:key={track.key.lower()}",
]) if service_name != "DisneyPlus" else ",".join([f"label={index}:key_id={key}:key={content_keys[key]}" for index, key in enumerate(content_keys)]), # This right here is a hack as DSNP has 2 kids and returns 2 keys. FFS. ]) if service_name != "DisneyPlus" else
",".join(
[# This right here is a hack as DSNP sometimes has 2 kids and returns 2 keys. FFS.
"label={}:key_id={}:key={}".format(
content_keys.index(pair),
pair[0],
pair[1]
)
for pair
in content_keys
]
),
"--temp_dir", directories.temp "--temp_dir", directories.temp
], check=True) ], check=True)
except subprocess.CalledProcessError: except subprocess.CalledProcessError:

Binary file not shown.

View File

@ -226,20 +226,7 @@ class Track:
True if KID is now available, False otherwise. KID will be stored in Track.kid True if KID is now available, False otherwise. KID will be stored in Track.kid
automatically. automatically.
""" """
if self.encrypted and self.source == "DSNP": if self.encrypted and self.psshPR:
try:
log = logging.getLogger("Tracks")
log.info("+ Replacing KID with correct track KID (DSNP workaround)")
xml_str = base64.b64decode(self.psshPR).decode("utf-16-le", "ignore")
xml_str = xml_str[xml_str.index("<"):]
kids = [uuid.UUID(base64.b64decode(kid_xml['@VALUE']).hex()).bytes_le.hex() for kid_xml in xmltodict.parse(xml_str)['WRMHEADER']['DATA']['CUSTOMATTRIBUTES']['KIDS']['KID']]
if self.kid:
kids.remove(self.kid)
self.kid = kids[-1]
except:
raise log.exit("Failed To Replace Correct KID for DSNP")
elif self.source != "DSNP" and self.psshPR:
xml_str = base64.b64decode(self.psshPR).decode("utf-16-le", "ignore") xml_str = base64.b64decode(self.psshPR).decode("utf-16-le", "ignore")
xml_str = xml_str[xml_str.index("<"):] xml_str = xml_str[xml_str.index("<"):]
xml = load_xml(xml_str).find("DATA") # root: WRMHEADER xml = load_xml(xml_str).find("DATA") # root: WRMHEADER
@ -249,8 +236,11 @@ class Track:
self.kid = next(iter(xml.xpath("PROTECTINFO/KID/@VALUE")), None) self.kid = next(iter(xml.xpath("PROTECTINFO/KID/@VALUE")), None)
if not self.kid: # v4.3.0.0 if not self.kid: # v4.3.0.0
self.kid = next(iter(xml.xpath("PROTECTINFO/KIDS/KID/@VALUE")), None) # can be multiple? self.kid = next(iter(xml.xpath("PROTECTINFO/KIDS/KID/@VALUE")), None) # can be multiple?
if not self.kid: if not self.kid and self.source == "DSNP":
raise xml_str = base64.b64decode(self.psshPR).decode("utf-16-le", "ignore")
xml_str = xml_str[xml_str.index("<"):]
kids = [uuid.UUID(base64.b64decode(kid_xml['@VALUE']).hex()).bytes_le.hex() for kid_xml in xmltodict.parse(xml_str)['WRMHEADER']['DATA']['CUSTOMATTRIBUTES']['KIDS']['KID']]
self.kid = kids[-1]
self.kid = uuid.UUID(base64.b64decode(self.kid).hex()).bytes_le.hex() self.kid = uuid.UUID(base64.b64decode(self.kid).hex()).bytes_le.hex()
@ -347,6 +337,12 @@ class Track:
) )
self.url = segments self.url = segments
if Path(save_path).is_file() and not (os.stat(save_path).st_size <= 3):
log = logging.getLogger("Tracks")
log.info("File already exists, assuming it's from previous unfinished download")
self._location = save_path
return save_path
if self.source == "CORE": if self.source == "CORE":
asyncio.run(saldl( asyncio.run(saldl(
self.url, self.url,
@ -1025,10 +1021,19 @@ class Tracks:
language = next((x.language for x in self.audios if x.is_original_lang), "") language = next((x.language for x in self.audios if x.is_original_lang), "")
if not language: if not language:
continue continue
self.audios = sorted( try:
self.audios, self.audios = sorted(
key=str(x.language) if str(x.language) != "" else "und" #lambda x: "" if is_close_match(language, [x.language]) else str(x.language) self.audios,
) key=lambda x: "" if is_close_match(language, [x.language]) else str(x.language)
)
except:
self.audios = sorted(
self.audios,
key=lambda x: "und" if str(x.language) == "" else str(x.language)
)
def sort_subtitles(self, by_language=None): def sort_subtitles(self, by_language=None):
"""Sort subtitle tracks by sdh, cc, forced, and optionally language.""" """Sort subtitle tracks by sdh, cc, forced, and optionally language."""

View File

@ -44,8 +44,68 @@ class DisneyPlus(BaseService):
@click.command(name="DisneyPlus", short_help="https://disneyplus.com") @click.command(name="DisneyPlus", short_help="https://disneyplus.com")
@click.argument("title", type=str, required=False) @click.argument("title", type=str, required=False)
@click.option("-m", "--movie", is_flag=True, default=False, help="Title is a movie.") @click.option("-m", "--movie", is_flag=True, default=False, help="Title is a movie.")
@click.option("-s", "--scenario", default="tv-drm-ctr", type=str, @click.option("-s", "--scenario", default="tv-drm-ctr", help="Capability profile that specifies compatible codecs, streams, bit-rates, resolutions and such.",
help="Capability profile that specifies compatible codecs, streams, bit-rates, resolutions and such.") type=click.Choice([
"android",
"android-mobile",
"android-mobile-drm",
"android-mobile-drm-ctr",
"android-tablet",
"android-tablet-drm",
"android-tablet-drm-ctr",
"android-tv",
"android-tv-drm",
"android-tv-drm-ctr",
"android~unlimited",
"apple",
"apple-mobile",
"apple-mobile-drm",
"apple-mobile-drm-cbcs",
"apple-tablet",
"apple-tablet-drm",
"apple-tablet-drm-cbcs",
"apple-tv",
"apple-tv-drm",
"apple-tv-drm-cbcs",
"apple~unlimited",
"browser",
"browser-cbcs",
"browser-drm-cbcs",
"browser-drm-ctr",
"browser~unlimited",
"cbcs-high",
"cbcs-regular",
"chromecast",
"chromecast-drm",
"chromecast-drm-cbcs",
"ctr-high",
"ctr-regular",
"handset-drm-cbcs",
"handset-drm-cbcs-multi",
"handset-drm-ctr",
"handset-drm-ctr-lfr",
"playstation",
"playstation-drm",
"playstation-drm-cbcs",
"restricted-drm-ctr-sw",
"roku",
"roku-drm",
"roku-drm-cbcs",
"roku-drm-ctr",
"tizen",
"tizen-drm",
"tizen-drm-ctr",
"tv-drm-cbcs",
"tv-drm-ctr",
"tvs-drm-cbcs",
"tvs-drm-ctr",
"webos",
"webos-drm",
"webos-drm-ctr",
# 1080p bypass for downgraded l3 devices, keep private!
"drm-cbcs-multi"
])
)
@click.pass_context @click.pass_context
def cli(ctx, **kwargs): def cli(ctx, **kwargs):
return DisneyPlus(ctx, **kwargs) return DisneyPlus(ctx, **kwargs)
@ -61,6 +121,7 @@ class DisneyPlus(BaseService):
self.acodec = ctx.parent.params["acodec"] self.acodec = ctx.parent.params["acodec"]
self.range = ctx.parent.params["range_"] self.range = ctx.parent.params["range_"]
self.wanted = ctx.parent.params["wanted"] self.wanted = ctx.parent.params["wanted"]
self.quality = ctx.parent.params["quality"] or 1080
self.playready = True if "certificate_chain" in dir(ctx.obj.cdm) else False # ctx.obj.cdm.device.type == LocalDevice.Types.PLAYREADY self.playready = True if "certificate_chain" in dir(ctx.obj.cdm) else False # ctx.obj.cdm.device.type == LocalDevice.Types.PLAYREADY
@ -178,6 +239,8 @@ class DisneyPlus(BaseService):
tracks.audios.extend(atmos_scenario.audios) tracks.audios.extend(atmos_scenario.audios)
tracks.subtitles.extend(atmos_scenario.subtitles) tracks.subtitles.extend(atmos_scenario.subtitles)
for track in tracks:
track.needs_proxy = False
return tracks return tracks
def get_chapters(self, title): def get_chapters(self, title):
@ -216,7 +279,8 @@ class DisneyPlus(BaseService):
}) })
self.log.info("Preparing") self.log.info("Preparing")
if self.range != "SDR" and self.vcodec != "H265":
if (self.range != "SDR" or self.quality > 1080) and self.vcodec != "H265":
# vcodec must be H265 for High Dynamic Range # vcodec must be H265 for High Dynamic Range
self.vcodec = "H265" self.vcodec = "H265"
self.log.info(f" + Switched video codec to H265 to be able to get {self.range} dynamic range") self.log.info(f" + Switched video codec to H265 to be able to get {self.range} dynamic range")

View File

@ -23,6 +23,7 @@ cdm_api:
credentials: credentials:
iTunes: '' iTunes: ''
Hotstar: 'username:password' Hotstar: 'username:password'
DisneyPlus: 'email:password'
directories: directories:
temp: '' temp: ''