diff --git a/vinetrimmer/services/disneyplus.py b/vinetrimmer/services/disneyplus.py index aca1cc0..90d36b1 100644 --- a/vinetrimmer/services/disneyplus.py +++ b/vinetrimmer/services/disneyplus.py @@ -17,550 +17,550 @@ from vinetrimmer.utils.io import get_ip_info class DisneyPlus(BaseService): - """ - Service code for Disney's Disney+ streaming service (https://disneyplus.com). + """ + Service code for Disney's Disney+ streaming service (https://disneyplus.com). - \b - Authorization: Credentials - Security: UHD@L1 FHD@L1 HD@L3, HEAVILY monitors high-profit and newly released titles!! + \b + Authorization: Credentials + Security: UHD@L1 FHD@L1 HD@L3, HEAVILY monitors high-profit and newly released titles!! - \b - Tips: - Some titles offer a setting in its Details tab to prefer "Remastered" or Original format - - You can specify which profile is used for its preferences and such in the config file - """ + \b + Tips: - Some titles offer a setting in its Details tab to prefer "Remastered" or Original format + - You can specify which profile is used for its preferences and such in the config file + """ - ALIASES = ["DSNP", "disneyplus", "disney+"] - TITLE_RE = [ - r"^https?://(?:www\.)?disneyplus\.com(?:/[a-z0-9-]+)?(?:/[a-z0-9-]+)?/(?Pbrowse)/(?Pentity-[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})" + ALIASES = ["DSNP", "disneyplus", "disney+"] + TITLE_RE = [ + r"^https?://(?:www\.)?disneyplus\.com(?:/[a-z0-9-]+)?(?:/[a-z0-9-]+)?/(?Pbrowse)/(?Pentity-[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})" - ] + ] - AUDIO_CODEC_MAP = { - "AAC": ["aac"], - "EC3": ["eac", "atmos"] - } + AUDIO_CODEC_MAP = { + "AAC": ["aac"], + "EC3": ["eac", "atmos"] + } - @staticmethod - @click.command(name="DisneyPlus", short_help="https://disneyplus.com") - @click.argument("title", type=str, required=False) - @click.option("-m", "--movie", is_flag=True, default=False, help="Title is a movie.") - @click.option("-s", "--scenario", default="tv-drm-ctr", type=str, - help="Capability profile that specifies compatible codecs, streams, bit-rates, resolutions and such.") - @click.pass_context - def cli(ctx, **kwargs): - return DisneyPlus(ctx, **kwargs) + @staticmethod + @click.command(name="DisneyPlus", short_help="https://disneyplus.com") + @click.argument("title", type=str, required=False) + @click.option("-m", "--movie", is_flag=True, default=False, help="Title is a movie.") + @click.option("-s", "--scenario", default="tv-drm-ctr", type=str, + help="Capability profile that specifies compatible codecs, streams, bit-rates, resolutions and such.") + @click.pass_context + def cli(ctx, **kwargs): + return DisneyPlus(ctx, **kwargs) - def __init__(self, ctx, title, movie, scenario): - super().__init__(ctx) - m = self.parse_title(ctx, title) - self.movie = movie #or m.get("type") == "movies" - #self.type = m.get("type") - self.scenario = scenario + def __init__(self, ctx, title, movie, scenario): + super().__init__(ctx) + m = self.parse_title(ctx, title) + self.movie = movie #or m.get("type") == "movies" + #self.type = m.get("type") + self.scenario = scenario - self.vcodec = ctx.parent.params["vcodec"] - self.acodec = ctx.parent.params["acodec"] - self.range = ctx.parent.params["range_"] - self.wanted = ctx.parent.params["wanted"] + self.vcodec = ctx.parent.params["vcodec"] + self.acodec = ctx.parent.params["acodec"] + self.range = ctx.parent.params["range_"] + self.wanted = ctx.parent.params["wanted"] - self.playready = True if "group_certificate" in dir(ctx.obj.cdm.device) else False # ctx.obj.cdm.device.type == LocalDevice.Types.PLAYREADY + self.playready = True if "group_certificate" in dir(ctx.obj.cdm.device) else False # ctx.obj.cdm.device.type == LocalDevice.Types.PLAYREADY - self.region = None - self.bamsdk = None - self.device_token = None - self.account_tokens = {} + self.region = None + self.bamsdk = None + self.device_token = None + self.account_tokens = {} - self.configure() + self.configure() - def get_titles(self): + def get_titles(self): - if self.movie: - #original_lang = self.get_hulu_series(self.title)['originalLanguage'] - data = self.get_hulu_series(self.title)["data"]["page"] - movie = Title( - id_=self.title, - type_=Title.Types.MOVIE, - name=data['visuals']['title'], - year=data['visuals']['metastringParts']['releaseYearRange']['startYear'], - source=self.ALIASES[0], - original_lang="en", - service_data=data["containers"][-1]["visuals"] - ) + if self.movie: + #original_lang = self.get_hulu_series(self.title)['originalLanguage'] + data = self.get_hulu_series(self.title)["data"]["page"] + movie = Title( + id_=self.title, + type_=Title.Types.MOVIE, + name=data['visuals']['title'], + year=data['visuals']['metastringParts']['releaseYearRange']['startYear'], + source=self.ALIASES[0], + original_lang="en", + service_data=data["containers"][-1]["visuals"] + ) - movie.service_data = data["actions"][0]["resourceId"] - - return movie + movie.service_data = data["actions"][0]["resourceId"] + + return movie - else: - data = self.get_hulu_series(self.title)["data"].get("page") - if not data: - raise self.log.exit(" - No data returned") + else: + data = self.get_hulu_series(self.title)["data"].get("page") + if not data: + raise self.log.exit(" - No data returned") - season_len = len(data["containers"][0]["seasons"]) - if data["containers"][0].get("type") == "episodes": - if season_len == 0: - raise self.log.exit(" - No seasons available") + season_len = len(data["containers"][0]["seasons"]) + if data["containers"][0].get("type") == "episodes": + if season_len == 0: + raise self.log.exit(" - No seasons available") - seasons = list() - for x, season in enumerate( - reversed(data["containers"][0]["seasons"]), start=1 - ): - episodes = self.get_hulu_season(season["id"])["data"]["season"]["items"] - self.log.debug(episodes) - seasons += [ - Title( - id_=t2["id"], - type_=Title.Types.TV, - name=t2["visuals"]["title"], - season=t2["visuals"].get("seasonNumber"), - episode=t2["visuals"].get("episodeNumber"), - episode_name=t2["visuals"] - .get("episodeTitle") - .replace("(Sub) ", ""), - original_lang="en", - source=self.ALIASES[0], - service_data=t2, - ) - for t2 in episodes - ] + seasons = list() + for x, season in enumerate( + reversed(data["containers"][0]["seasons"]), start=1 + ): + episodes = self.get_hulu_season(season["id"])["data"]["season"]["items"] + self.log.debug(episodes) + seasons += [ + Title( + id_=t2["id"], + type_=Title.Types.TV, + name=t2["visuals"]["title"], + season=t2["visuals"].get("seasonNumber"), + episode=t2["visuals"].get("episodeNumber"), + episode_name=t2["visuals"] + .get("episodeTitle") + .replace("(Sub) ", ""), + original_lang="en", + source=self.ALIASES[0], + service_data=t2, + ) + for t2 in episodes + ] - # Get mediaId from decoded resourceId - for x in seasons: - x.service_data["mediaMetadata"] = {} - x.service_data["mediaMetadata"]["mediaId"] = x.service_data["actions"][0]["resourceId"] + # Get mediaId from decoded resourceId + for x in seasons: + x.service_data["mediaMetadata"] = {} + x.service_data["mediaMetadata"]["mediaId"] = x.service_data["actions"][0]["resourceId"] - return seasons + return seasons - # get data for every episode in every season via looping due to the fact - # that the api doesn't provide ALL episodes in the initial bundle api call. - # TODO: The season info returned might also be paged/limited - + # get data for every episode in every season via looping due to the fact + # that the api doesn't provide ALL episodes in the initial bundle api call. + # TODO: The season info returned might also be paged/limited + - def get_tracks(self, title): - # Refresh token in case it expired - self.account_tokens = self.get_account_token( - credential=self.credentials, - device_family=self.config["bamsdk"]["family"], - device_token=self.device_token, - ) - if self.movie: - tracks = self.get_manifest_tracks( - self.get_manifest_url( - media_id=title.service_data, - scenario=self.scenario - ) - ) - - else: - tracks = self.get_manifest_tracks( - self.get_manifest_url( - media_id=title.service_data["mediaMetadata"]["mediaId"], - scenario=self.scenario - ) - ) + def get_tracks(self, title): + # Refresh token in case it expired + self.account_tokens = self.get_account_token( + credential=self.credentials, + device_family=self.config["bamsdk"]["family"], + device_token=self.device_token, + ) + if self.movie: + tracks = self.get_manifest_tracks( + self.get_manifest_url( + media_id=title.service_data, + scenario=self.scenario + ) + ) + + else: + tracks = self.get_manifest_tracks( + self.get_manifest_url( + media_id=title.service_data["mediaMetadata"]["mediaId"], + scenario=self.scenario + ) + ) - if (not any((x.codec or "").startswith("atmos") for x in tracks.audios) - and not self.scenario.endswith(("-atmos", "~unlimited"))): - self.log.info(" + Attempting to get Atmos audio from H265 manifest") - try: - atmos_scenario = self.get_manifest_tracks( - self.get_manifest_url( - media_id=title.service_data["mediaMetadata"]["mediaId"], - scenario="tv-drm-ctr-h265-atmos" - ) - ) - except: - atmos_scenario = self.get_manifest_tracks( - self.get_manifest_url( - media_id=title.service_data, - scenario="tv-drm-ctr-h265-atmos" - ) - ) - tracks.audios.extend(atmos_scenario.audios) - tracks.subtitles.extend(atmos_scenario.subtitles) + if (not any((x.codec or "").startswith("atmos") for x in tracks.audios) + and not self.scenario.endswith(("-atmos", "~unlimited"))): + self.log.info(" + Attempting to get Atmos audio from H265 manifest") + try: + atmos_scenario = self.get_manifest_tracks( + self.get_manifest_url( + media_id=title.service_data["mediaMetadata"]["mediaId"], + scenario="tv-drm-ctr-h265-atmos" + ) + ) + except: + atmos_scenario = self.get_manifest_tracks( + self.get_manifest_url( + media_id=title.service_data, + scenario="tv-drm-ctr-h265-atmos" + ) + ) + tracks.audios.extend(atmos_scenario.audios) + tracks.subtitles.extend(atmos_scenario.subtitles) - return tracks + return tracks - def get_chapters(self, title): - return [] + def get_chapters(self, title): + return [] - def certificate(self, **_): - return None if self.playready else self.config["certificate"] + def certificate(self, **_): + return None if self.playready else self.config["certificate"] - def license(self, challenge, **_): - # Refresh token in case it expired - self.account_tokens = self.get_account_token( - credential=self.credentials, - device_family=self.config["bamsdk"]["family"], - device_token=self.device_token, - ) + def license(self, challenge, **_): + # Refresh token in case it expired + self.account_tokens = self.get_account_token( + credential=self.credentials, + device_family=self.config["bamsdk"]["family"], + device_token=self.device_token, + ) - if self.playready: - res = self.bamsdk.drm.playreadyLicense( - licence=challenge.decode(), # expects XML - access_token=self.account_tokens["access_token"] - ) - res = base64.b64encode(res).decode() - else: - res = self.bamsdk.drm.widevineLicense( - licence=challenge, # expects bytes - access_token=self.account_tokens["access_token"] - ) + if self.playready: + res = self.bamsdk.drm.playreadyLicense( + licence=challenge.decode(), # expects XML + access_token=self.account_tokens["access_token"] + ) + res = base64.b64encode(res).decode() + else: + res = self.bamsdk.drm.widevineLicense( + licence=challenge, # expects bytes + access_token=self.account_tokens["access_token"] + ) - return res + return res - # Service specific functions + # Service specific functions - def configure(self): - self.session.headers.update({ - "Accept-Language": "en-US,en;q=0.5", - "User-Agent": self.config["bamsdk"]["user_agent"], - "Origin": "https://www.disneyplus.com" - }) + def configure(self): + self.session.headers.update({ + "Accept-Language": "en-US,en;q=0.5", + "User-Agent": self.config["bamsdk"]["user_agent"], + "Origin": "https://www.disneyplus.com" + }) - self.log.info("Preparing") - if self.range != "SDR" and self.vcodec != "H265": - # vcodec must be H265 for High Dynamic Range - self.vcodec = "H265" - self.log.info(f" + Switched video codec to H265 to be able to get {self.range} dynamic range") - self.scenario = self.prepare_scenario(self.scenario, self.vcodec, self.range) - self.log.info(f" + Scenario: {self.scenario}") + self.log.info("Preparing") + if self.range != "SDR" and self.vcodec != "H265": + # vcodec must be H265 for High Dynamic Range + self.vcodec = "H265" + self.log.info(f" + Switched video codec to H265 to be able to get {self.range} dynamic range") + self.scenario = self.prepare_scenario(self.scenario, self.vcodec, self.range) + self.log.info(f" + Scenario: {self.scenario}") - self.log.info("Getting BAMSDK Configuration") + self.log.info("Getting BAMSDK Configuration") - ip_info = get_ip_info(self.session, fresh=True) - self.region = ip_info["country_code"].upper() - self.config["location_x"] = ip_info["latitude"] - self.config["location_y"] = ip_info["longitude"] - self.log.info(f" + IP Location: {self.config['location_x']},{self.config['location_y']}") + ip_info = get_ip_info(self.session, fresh=True) + self.region = ip_info["country_code"].upper() + self.config["location_x"] = ip_info["latitude"] + self.config["location_y"] = ip_info["longitude"] + self.log.info(f" + IP Location: {self.config['location_x']},{self.config['location_y']}") - self.bamsdk = BamSdk(self.config["bamsdk"]["config"], self.session) - self.session.headers.update(dict(**{ - k.lower(): v.replace( - "{SDKPlatform}", self.config["bamsdk"]["platform"] - ).replace( - "{SDKVersion}", self.config["bamsdk"]["version"] - ) for k, v in self.bamsdk.commonHeaders.items() - }, **{ - "user-agent": self.config["bamsdk"]["user_agent"] - })) + self.bamsdk = BamSdk(self.config["bamsdk"]["config"], self.session) + self.session.headers.update(dict(**{ + k.lower(): v.replace( + "{SDKPlatform}", self.config["bamsdk"]["platform"] + ).replace( + "{SDKVersion}", self.config["bamsdk"]["version"] + ) for k, v in self.bamsdk.commonHeaders.items() + }, **{ + "user-agent": self.config["bamsdk"]["user_agent"] + })) - self.log.debug(" + Capabilities:") - for k, v in self.bamsdk.media.extras.items(): - self.log.debug(f" {k}: {v}") + self.log.debug(" + Capabilities:") + for k, v in self.bamsdk.media.extras.items(): + self.log.debug(f" {k}: {v}") - self.log.info("Logging into Disney+") - self.device_token, self.account_tokens = self.login(self.credentials) + self.log.info("Logging into Disney+") + self.device_token, self.account_tokens = self.login(self.credentials) - session_info = self.bamsdk.session.getInfo(self.account_tokens["access_token"]) - self.log.info(f" + Account ID: {session_info['account']['id']}") - self.log.info(f" + Profile ID: {session_info['profile']['id']}") - self.log.info(f" + Subscribed: {session_info['isSubscriber']}") - self.log.info(f" + Account Region: {session_info['home_location']['country_code']}") - self.log.info(f" + Detected Location: {session_info['location']['country_code']}") - self.log.info(f" + Supported Location: {session_info['inSupportedLocation']}") - self.log.info(f" + Device: {session_info['device']['platform']}") + session_info = self.bamsdk.session.getInfo(self.account_tokens["access_token"]) + self.log.info(f" + Account ID: {session_info['account']['id']}") + self.log.info(f" + Profile ID: {session_info['profile']['id']}") + self.log.info(f" + Subscribed: {session_info['isSubscriber']}") + self.log.info(f" + Account Region: {session_info['home_location']['country_code']}") + self.log.info(f" + Detected Location: {session_info['location']['country_code']}") + self.log.info(f" + Supported Location: {session_info['inSupportedLocation']}") + self.log.info(f" + Device: {session_info['device']['platform']}") - if not session_info["isSubscriber"]: - raise self.log.exit(" - Cannot continue, account is not subscribed to Disney+.") + if not session_info["isSubscriber"]: + raise self.log.exit(" - Cannot continue, account is not subscribed to Disney+.") - @staticmethod - def prepare_scenario(scenario, vcodec, range_): - """Prepare Disney+'s scenario based on other arguments and settings.""" - if scenario.endswith("~unlimited"): - # if unlimited scenario, nothing needs to be appended or changed. - # the scenario will return basically all streams it can. - return scenario - if vcodec == "H265": - scenario += "-h265" - if range_ == "HDR10": - scenario += "-hdr10" - elif range_ == "DV": - scenario += "-dovi" - return scenario + @staticmethod + def prepare_scenario(scenario, vcodec, range_): + """Prepare Disney+'s scenario based on other arguments and settings.""" + if scenario.endswith("~unlimited"): + # if unlimited scenario, nothing needs to be appended or changed. + # the scenario will return basically all streams it can. + return scenario + if vcodec == "H265": + scenario += "-h265" + if range_ == "HDR10": + scenario += "-hdr10" + elif range_ == "DV": + scenario += "-dovi" + return scenario - def login(self, credential): - """Log into Disney+ and retrieve various authorisation keys.""" - device_token = self.create_device_token( - family=self.config["bamsdk"]["family"], - profile=self.config["bamsdk"]["profile"], - application=self.config["bamsdk"]["applicationRuntime"], - api_key=self.config["device_api_key"] - ) - self.log.info(" + Obtained Device Token") - account_tokens = self.get_account_token( - credential=credential, - device_family=self.config["bamsdk"]["family"], - device_token=device_token, - ) - self.log.info(" + Obtained Account Token") - return device_token, account_tokens + def login(self, credential): + """Log into Disney+ and retrieve various authorisation keys.""" + device_token = self.create_device_token( + family=self.config["bamsdk"]["family"], + profile=self.config["bamsdk"]["profile"], + application=self.config["bamsdk"]["applicationRuntime"], + api_key=self.config["device_api_key"] + ) + self.log.info(" + Obtained Device Token") + account_tokens = self.get_account_token( + credential=credential, + device_family=self.config["bamsdk"]["family"], + device_token=device_token, + ) + self.log.info(" + Obtained Account Token") + return device_token, account_tokens - def create_device_token(self, family, profile, application, api_key): - """ - Create a Device Token for a specified device type. - This tells the API's what is possible for your device. - :param family: Device Family. - :param profile: Device Profile. - :param application: Device Runtime, the use case of the device. - :param api_key: Device API Key. - :returns: Device Exchange Token. - """ - # create an initial assertion grant used to identify the kind of device profile-level. - # TODO: cache this, it doesn't need to be obtained unless the values change - device_grant = self.bamsdk.device.createDeviceGrant( - json={ - "deviceFamily": family, - "applicationRuntime": application, - "deviceProfile": profile, - "attributes": {} - }, - api_key=api_key - ) - if "errors" in device_grant: - raise self.log.exit( - " - Failed to obtain the device assertion grant: " - f"{device_grant['errors']}" - ) - # exchange the assertion grant for a usable device token. - device_token = self.bamsdk.token.exchange( - data={ - "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange", - "platform": family, - "subject_token": device_grant["assertion"], - "subject_token_type": self.bamsdk.token.subject_tokens["device"] - }, - api_key=api_key - ) - if "error" in device_token: - raise self.log.exit( - " - Failed to exchange the assertion grant for a device token: " - f"{device_token['error_description']} [{device_token['error']}]" - ) - return device_token["access_token"] + def create_device_token(self, family, profile, application, api_key): + """ + Create a Device Token for a specified device type. + This tells the API's what is possible for your device. + :param family: Device Family. + :param profile: Device Profile. + :param application: Device Runtime, the use case of the device. + :param api_key: Device API Key. + :returns: Device Exchange Token. + """ + # create an initial assertion grant used to identify the kind of device profile-level. + # TODO: cache this, it doesn't need to be obtained unless the values change + device_grant = self.bamsdk.device.createDeviceGrant( + json={ + "deviceFamily": family, + "applicationRuntime": application, + "deviceProfile": profile, + "attributes": {} + }, + api_key=api_key + ) + if "errors" in device_grant: + raise self.log.exit( + " - Failed to obtain the device assertion grant: " + f"{device_grant['errors']}" + ) + # exchange the assertion grant for a usable device token. + device_token = self.bamsdk.token.exchange( + data={ + "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange", + "platform": family, + "subject_token": device_grant["assertion"], + "subject_token_type": self.bamsdk.token.subject_tokens["device"] + }, + api_key=api_key + ) + if "error" in device_token: + raise self.log.exit( + " - Failed to exchange the assertion grant for a device token: " + f"{device_token['error_description']} [{device_token['error']}]" + ) + return device_token["access_token"] - def get_account_token(self, credential, device_family, device_token): - """ - Get an Account Token using Account Credentials and a Device Token, using a Cache store. - It also refreshes the token if needed. - """ - if not credential: - raise self.log.exit(" - No credentials provided, unable to log in.") - tokens_cache_path = self.get_cache(f"tokens_{self.region}_{credential.sha1}.json") - if os.path.isfile(tokens_cache_path): - self.log.info(" + Using cached tokens...") - with open(tokens_cache_path, encoding="utf-8") as fd: - tokens = json.load(fd) - if os.stat(tokens_cache_path).st_ctime > (time.time() - tokens["expires_in"]): - return tokens - # expired - self.log.info(" + Refreshing...") - tokens = self.refresh_token( - device_family=device_family, - refresh_token=tokens["refresh_token"], - api_key=self.config["device_api_key"] - ) - else: - # first time - self.log.info(" + Getting new tokens...") - tokens = self.create_account_token( - device_family=self.config["bamsdk"]["family"], - email=credential.username, - password=credential.password, - device_token=device_token, - api_key=self.config["device_api_key"] - ) + def get_account_token(self, credential, device_family, device_token): + """ + Get an Account Token using Account Credentials and a Device Token, using a Cache store. + It also refreshes the token if needed. + """ + if not credential: + raise self.log.exit(" - No credentials provided, unable to log in.") + tokens_cache_path = self.get_cache(f"tokens_{self.region}_{credential.sha1}.json") + if os.path.isfile(tokens_cache_path): + self.log.info(" + Using cached tokens...") + with open(tokens_cache_path, encoding="utf-8") as fd: + tokens = json.load(fd) + if os.stat(tokens_cache_path).st_ctime > (time.time() - tokens["expires_in"]): + return tokens + # expired + self.log.info(" + Refreshing...") + tokens = self.refresh_token( + device_family=device_family, + refresh_token=tokens["refresh_token"], + api_key=self.config["device_api_key"] + ) + else: + # first time + self.log.info(" + Getting new tokens...") + tokens = self.create_account_token( + device_family=self.config["bamsdk"]["family"], + email=credential.username, + password=credential.password, + device_token=device_token, + api_key=self.config["device_api_key"] + ) - os.makedirs(os.path.dirname(tokens_cache_path), exist_ok=True) - with open(tokens_cache_path, "w", encoding="utf-8") as fd: - json.dump(tokens, fd) + os.makedirs(os.path.dirname(tokens_cache_path), exist_ok=True) + with open(tokens_cache_path, "w", encoding="utf-8") as fd: + json.dump(tokens, fd) - return tokens + return tokens - def create_account_token(self, device_family, email, password, device_token, api_key): - """ - Create an Account Token using Account Credentials and a Device Token. - :param device_family: Device Family. - :param email: Account Email. - :param password: Account Password. - :param device_token: Device Token. - :param api_key: Device API Key. - :returns: Account Exchange Tokens. - """ - # log in to the account via bamsdk using the device token - identity_token = self.bamsdk.bamIdentity.identityLogin( - email=email, - password=password, - access_token=device_token - ) - if "errors" in identity_token: - raise self.log.exit( - " - Failed to obtain the identity token: " - f"{identity_token['errors']}" - ) - # create an initial assertion grant used to identify the account - # this seems to tie the account to the device token - account_grant = self.bamsdk.account.createAccountGrant( - json={"id_token": identity_token["id_token"]}, - access_token=device_token - ) - # exchange the assertion grant for a usable account token. - account_tokens = self.bamsdk.token.exchange( - data={ - "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange", - "platform": device_family, - "subject_token": account_grant["assertion"], - "subject_token_type": self.bamsdk.token.subject_tokens["account"] - }, - api_key=api_key - ) - # change profile and re-exchange if provided - if self.config.get("profile"): - profile_grant = self.change_profile(self.config["profile"], account_tokens["access_token"]) - account_tokens = self.bamsdk.token.exchange( - data={ - "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange", - "platform": device_family, - "subject_token": profile_grant["assertion"], - "subject_token_type": self.bamsdk.token.subject_tokens["account"] - }, - api_key=api_key - ) - return account_tokens + def create_account_token(self, device_family, email, password, device_token, api_key): + """ + Create an Account Token using Account Credentials and a Device Token. + :param device_family: Device Family. + :param email: Account Email. + :param password: Account Password. + :param device_token: Device Token. + :param api_key: Device API Key. + :returns: Account Exchange Tokens. + """ + # log in to the account via bamsdk using the device token + identity_token = self.bamsdk.bamIdentity.identityLogin( + email=email, + password=password, + access_token=device_token + ) + if "errors" in identity_token: + raise self.log.exit( + " - Failed to obtain the identity token: " + f"{identity_token['errors']}" + ) + # create an initial assertion grant used to identify the account + # this seems to tie the account to the device token + account_grant = self.bamsdk.account.createAccountGrant( + json={"id_token": identity_token["id_token"]}, + access_token=device_token + ) + # exchange the assertion grant for a usable account token. + account_tokens = self.bamsdk.token.exchange( + data={ + "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange", + "platform": device_family, + "subject_token": account_grant["assertion"], + "subject_token_type": self.bamsdk.token.subject_tokens["account"] + }, + api_key=api_key + ) + # change profile and re-exchange if provided + if self.config.get("profile"): + profile_grant = self.change_profile(self.config["profile"], account_tokens["access_token"]) + account_tokens = self.bamsdk.token.exchange( + data={ + "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange", + "platform": device_family, + "subject_token": profile_grant["assertion"], + "subject_token_type": self.bamsdk.token.subject_tokens["account"] + }, + api_key=api_key + ) + return account_tokens - def refresh_token(self, device_family, refresh_token, api_key): - """ - Refresh a Token using its adjacent refresh token. - :param device_family: Device Family. - :param refresh_token: Refresh Token. - :param api_key: Device API Key. - :returns: Account Exchange Token. - """ - return self.bamsdk.token.exchange( - data={ - "grant_type": "refresh_token", - "platform": device_family, - "refresh_token": refresh_token - }, - api_key=api_key - ) + def refresh_token(self, device_family, refresh_token, api_key): + """ + Refresh a Token using its adjacent refresh token. + :param device_family: Device Family. + :param refresh_token: Refresh Token. + :param api_key: Device API Key. + :returns: Account Exchange Token. + """ + return self.bamsdk.token.exchange( + data={ + "grant_type": "refresh_token", + "platform": device_family, + "refresh_token": refresh_token + }, + api_key=api_key + ) - def change_profile(self, profile, access_token): - """ - Change to a different account user profile. - :param profile: profile by name, number, or directly by profile ID. - :param access_token: account access token. - :returns: profile grant tokens. - """ - if not profile: - raise self.log.exit(" - Profile cannot be empty") - try: - profile_id = uuid.UUID(str(profile)) - self.log.info(f" + Switching profile to {profile_id}") - # is UUID - except ValueError: - profiles = self.bamsdk.account.getUserProfiles(access_token) - if isinstance(profile, int): - if len(profiles) < profile: - raise self.log.exit( - " - There isn't a {}{} profile for this account".format( - profile, "tsnrhtdd"[(profile // 10 % 10 != 1) * (profile % 10 < 4) * profile % 10::4] - ) - ) - profile_data = profiles[profile - 1] - else: - profile_data = [x for x in profiles if x["profileName"] == profile] - if not profile_data: - raise self.log.exit(f" - Profile {profile!r} does not exist in this account") - profile_data = profile_data[0] - profile_id = profile_data["profileId"] - self.log.info(f" + Switching profile to {profile_data['profileName']!r} ({profile_id})") - res = self.bamsdk.account.setActiveUserProfile(str(profile_id), access_token) - if "errors" in res: - raise self.log.exit(f" - Failed! {res['errors'][0]['description']}") - return res + def change_profile(self, profile, access_token): + """ + Change to a different account user profile. + :param profile: profile by name, number, or directly by profile ID. + :param access_token: account access token. + :returns: profile grant tokens. + """ + if not profile: + raise self.log.exit(" - Profile cannot be empty") + try: + profile_id = uuid.UUID(str(profile)) + self.log.info(f" + Switching profile to {profile_id}") + # is UUID + except ValueError: + profiles = self.bamsdk.account.getUserProfiles(access_token) + if isinstance(profile, int): + if len(profiles) < profile: + raise self.log.exit( + " - There isn't a {}{} profile for this account".format( + profile, "tsnrhtdd"[(profile // 10 % 10 != 1) * (profile % 10 < 4) * profile % 10::4] + ) + ) + profile_data = profiles[profile - 1] + else: + profile_data = [x for x in profiles if x["profileName"] == profile] + if not profile_data: + raise self.log.exit(f" - Profile {profile!r} does not exist in this account") + profile_data = profile_data[0] + profile_id = profile_data["profileId"] + self.log.info(f" + Switching profile to {profile_data['profileName']!r} ({profile_id})") + res = self.bamsdk.account.setActiveUserProfile(str(profile_id), access_token) + if "errors" in res: + raise self.log.exit(f" - Failed! {res['errors'][0]['description']}") + return res - def get_manifest_url(self, media_id, scenario): - self.log.info(f"Retrieving manifest for {scenario}") - self.session.headers['x-dss-feature-filtering'] = 'true' - self.session.headers['x-application-version'] = '1.1.2' - self.session.headers['x-bamsdk-client-id'] = 'disney-svod' - self.session.headers['x-bamsdk-platform'] = 'javascript/windows/chrome' - self.session.headers['x-bamsdk-version'] = '28.0' - resolution = "1280x720" if str(self.scenario).lower() == "browser" else "" + def get_manifest_url(self, media_id, scenario): + self.log.info(f"Retrieving manifest for {scenario}") + self.session.headers['x-dss-feature-filtering'] = 'true' + self.session.headers['x-application-version'] = '1.1.2' + self.session.headers['x-bamsdk-client-id'] = 'disney-svod' + self.session.headers['x-bamsdk-platform'] = 'javascript/windows/chrome' + self.session.headers['x-bamsdk-version'] = '28.0' + resolution = "1280x720" if str(self.scenario).lower() == "browser" else "" - json_data = { - 'playback': { - 'attributes': { - 'resolution': { - 'max': [ - f'{resolution}', - ], - }, - 'protocol': 'HTTPS', - 'assetInsertionStrategy': 'SGAI', - 'playbackInitiationContext': 'ONLINE', - 'frameRates': [ - 60, - ], - }, - }, - 'playbackId': media_id, - } + json_data = { + 'playback': { + 'attributes': { + 'resolution': { + 'max': [ + f'{resolution}', + ], + }, + 'protocol': 'HTTPS', + 'assetInsertionStrategy': 'SGAI', + 'playbackInitiationContext': 'ONLINE', + 'frameRates': [ + 60, + ], + }, + }, + 'playbackId': media_id, + } - manifest = self.session.post( - f'https://disney.playback.edge.bamgrid.com/v7/playback/{scenario}', - headers={"authorization": f"Bearer {self.account_tokens['access_token']}"}, - json=json_data - ).json() + manifest = self.session.post( + f'https://disney.playback.edge.bamgrid.com/v7/playback/{scenario}', + headers={"authorization": f"Bearer {self.account_tokens['access_token']}"}, + json=json_data + ).json() - self.chaps = {} - self.chaps["editorial"] = manifest["stream"].get("editorial", {}) + self.chaps = {} + self.chaps["editorial"] = manifest["stream"].get("editorial", {}) - return manifest["stream"]["sources"][0]['complete']['url'] + return manifest["stream"]["sources"][0]['complete']['url'] - def get_manifest_tracks(self, url): - self.session.get(url) - tracks = Tracks.from_m3u8(m3u8.load(url), source=self.ALIASES[0]) - if self.acodec: - tracks.audios = [ - x for x in tracks.audios if (x.codec or "").split("-")[0] in self.AUDIO_CODEC_MAP[self.acodec] - ] - for video in tracks.videos: - # This is needed to remove weird glitchy NOP data at the end of stream - video.needs_repack = True - for audio in tracks.audios: - bitrate = re.search(r"(?<=r/composite_)\d+|\d+(?=_complete.m3u8)", as_list(audio.url)[0]) - if not bitrate: - raise self.log.exit(" - Unable to get bitrate for an audio track") - audio.bitrate = int(bitrate.group()) * 1000 - if audio.bitrate == 1000_000: - # DSNP lies about the Atmos bitrate - audio.bitrate = 768_000 - for subtitle in tracks.subtitles: - subtitle.codec = "vtt" - subtitle.forced = subtitle.forced or subtitle.extra.name.endswith("--forced--") - # sdh might not actually occur, either way DSNP CC == SDH :) - subtitle.sdh = "[cc]" in subtitle.extra.name.lower() or "[sdh]" in subtitle.extra.name.lower() - return tracks + def get_manifest_tracks(self, url): + self.session.get(url) + tracks = Tracks.from_m3u8(m3u8.load(url), source=self.ALIASES[0]) + if self.acodec: + tracks.audios = [ + x for x in tracks.audios if (x.codec or "").split("-")[0] in self.AUDIO_CODEC_MAP[self.acodec] + ] + for video in tracks.videos: + # This is needed to remove weird glitchy NOP data at the end of stream + video.needs_repack = True + for audio in tracks.audios: + bitrate = re.search(r"(?<=r/composite_)\d+|\d+(?=_complete.m3u8)", as_list(audio.url)[0]) + if not bitrate: + raise self.log.exit(" - Unable to get bitrate for an audio track") + audio.bitrate = int(bitrate.group()) * 1000 + if audio.bitrate == 1000_000: + # DSNP lies about the Atmos bitrate + audio.bitrate = 768_000 + for subtitle in tracks.subtitles: + subtitle.codec = "vtt" + subtitle.forced = subtitle.forced or subtitle.extra.name.endswith("--forced--") + # sdh might not actually occur, either way DSNP CC == SDH :) + subtitle.sdh = "[cc]" in subtitle.extra.name.lower() or "[sdh]" in subtitle.extra.name.lower() + return tracks - def get_hulu_series(self, content_id: str) -> dict: - r = self.session.get( - url=self.config["bamsdk"]["page"].format(id=content_id), - params={ - "disableSmartFocus": True, - "enhancedContainersLimit": 12, - "limit": 999, - }, - headers={"authorization": f"Bearer {self.account_tokens['access_token']}"}, - ).json() + def get_hulu_series(self, content_id: str) -> dict: + r = self.session.get( + url=self.config["bamsdk"]["page"].format(id=content_id), + params={ + "disableSmartFocus": True, + "enhancedContainersLimit": 12, + "limit": 999, + }, + headers={"authorization": f"Bearer {self.account_tokens['access_token']}"}, + ).json() - return r + return r - def get_hulu_season(self, season_id: str) -> dict: - r = self.session.get( - url=self.config["bamsdk"]["season"].format(id=season_id), - params={"limit": 999}, - headers={"authorization": f"Bearer {self.account_tokens['access_token']}"}, - ).json() + def get_hulu_season(self, season_id: str) -> dict: + r = self.session.get( + url=self.config["bamsdk"]["season"].format(id=season_id), + params={"limit": 999}, + headers={"authorization": f"Bearer {self.account_tokens['access_token']}"}, + ).json() - return r + return r diff --git a/vinetrimmer/services/hulu.py b/vinetrimmer/services/hulu.py index 8e6515f..96b0c50 100644 --- a/vinetrimmer/services/hulu.py +++ b/vinetrimmer/services/hulu.py @@ -197,8 +197,8 @@ class Hulu(BaseService): for track in tracks.audios: if not track.psshPR: track.psshPR = next(x.psshPR for x in tracks.videos if x.psshPR) - if not track.psshWV: - track.psshWV = next(x.psshWV for x in tracks.videos if x.psshWV) + if not track.psshWV: + track.psshWV = next(x.psshWV for x in tracks.videos if x.psshWV) if self.acodec: tracks.audios = [x for x in tracks.audios if (x.codec or "")[:4] == self.AUDIO_CODEC_MAP[self.acodec]]