327 lines
13 KiB
Python
327 lines
13 KiB
Python
import base64
|
|
import hashlib
|
|
import hmac
|
|
import json
|
|
import random
|
|
import re
|
|
import time
|
|
import urllib.parse
|
|
import requests
|
|
from Cryptodome.Cipher import AES
|
|
from Cryptodome.Util.Padding import unpad
|
|
from mock_date_converter import MockDateConverter
|
|
|
|
class MPCrypto:
|
|
SALT_PREFIX = b"Salted__"
|
|
BLOCK_SIZE = AES.block_size
|
|
KEY_LENGTH = 32
|
|
IV_LENGTH = 16
|
|
SALT_LENGTH = 8
|
|
ENCODING = "utf-8"
|
|
CIPHER_MODE = AES.MODE_CBC
|
|
HASH_FUNCTION = hashlib.md5
|
|
|
|
DEVICE_WEBPLAYER = "webplayer"
|
|
DEVICE_ANDROID_TV = "android.tv"
|
|
DEVICE_PHONE_ANDROID = "phone.android"
|
|
DEVICE_TABLET_ANDROID = "tablet.android"
|
|
|
|
_PHONE_REQUIRED_KEYS = {
|
|
"value_phone_hours",
|
|
"value_phone_minutes",
|
|
"value_phone_seconds",
|
|
"key_phone_hours",
|
|
"key_phone_minutes",
|
|
"key_phone_seconds",
|
|
}
|
|
|
|
_TABLET_REQUIRED_KEYS = {
|
|
"value_tablet_hours",
|
|
"value_tablet_minutes",
|
|
"value_tablet_seconds",
|
|
"key_tablet_hours",
|
|
"key_tablet_minutes",
|
|
"key_tablet_seconds",
|
|
}
|
|
|
|
def __init__(self, device_class: str, *, config_url: str | None = None, config_pattern: str | None = None, mock_date_converter: MockDateConverter | None = None):
|
|
self.device_class = device_class
|
|
self.mock_date_converter = mock_date_converter
|
|
|
|
valid_devices = {
|
|
self.DEVICE_WEBPLAYER,
|
|
self.DEVICE_ANDROID_TV,
|
|
self.DEVICE_PHONE_ANDROID,
|
|
self.DEVICE_TABLET_ANDROID,
|
|
}
|
|
if device_class not in valid_devices:
|
|
raise ValueError(f"Unsupported device_class: {device_class}")
|
|
|
|
if device_class in {self.DEVICE_WEBPLAYER, self.DEVICE_ANDROID_TV}:
|
|
if not config_url or not config_pattern:
|
|
raise ValueError("config_url and config_pattern are required for webplayer/android.tv")
|
|
|
|
self.CONFIG_URL = config_url
|
|
self.CONFIG_PATTERN = config_pattern
|
|
|
|
if device_class == self.DEVICE_WEBPLAYER:
|
|
self.config = self.fetch_config_webplayer()
|
|
else:
|
|
self.config = self.fetch_config_androidtv()
|
|
|
|
self.password = self.generate_password()
|
|
self.consumer_secret = self.decrypt_ua("param")
|
|
self.consumer_key = self.decrypt_ua("query")
|
|
self.jswbw = self.decrypt_ua("jswbw")
|
|
|
|
return
|
|
|
|
if mock_date_converter is None:
|
|
raise ValueError("mock_date_converter is required for phone.android / tablet.android")
|
|
|
|
self._validate_mock_date_converter(mock_date_converter, device_class)
|
|
self.mock_date_converter = mock_date_converter
|
|
|
|
self.consumer_secret = self.reconstruct_app_consumer_secret()
|
|
self.consumer_key = self.reconstruct_app_consumer_key()
|
|
|
|
def _validate_mock_date_converter(self, mock_date_converter: MockDateConverter, device_class: str) -> None:
|
|
package_name = getattr(mock_date_converter, "package_name", None)
|
|
if not package_name:
|
|
raise ValueError("mock_date_converter.package_name must be set")
|
|
|
|
xor_templates = getattr(mock_date_converter, "xor_templates", None)
|
|
if not xor_templates:
|
|
raise ValueError("mock_date_converter.xor_templates must be populated before use")
|
|
|
|
required = self._PHONE_REQUIRED_KEYS if device_class == self.DEVICE_PHONE_ANDROID else self._TABLET_REQUIRED_KEYS
|
|
missing = required - xor_templates.keys()
|
|
if missing:
|
|
raise ValueError(f"Missing XOR templates for {device_class}: {sorted(missing)}")
|
|
|
|
|
|
def get_basic_token(self, device_id: str) -> str:
|
|
token_str = f"{device_id}:{self.device_class}"
|
|
return base64.b64encode(token_str.encode(self.ENCODING)).decode(self.ENCODING)
|
|
|
|
def fetch_config_webplayer(self) -> dict:
|
|
response = requests.get(self.CONFIG_URL, timeout=10).text
|
|
match = re.search(self.CONFIG_PATTERN, response, re.DOTALL)
|
|
if not match:
|
|
raise ValueError("Configuration pattern not found in the response.")
|
|
|
|
config_data = json.loads(match.group(1))['config']
|
|
return {
|
|
"version": config_data['version'],
|
|
"timestamp": config_data['timestamp'],
|
|
"ua": config_data['ua'],
|
|
"name": config_data['name'],
|
|
"language": config_data['language']
|
|
}
|
|
|
|
def generate_password(self) -> str:
|
|
version = self.config['version']
|
|
timestamp = self.config['timestamp']
|
|
name = self.config['name']
|
|
language = self.config['language']
|
|
password_str = f"{version}{name}{timestamp}{language}"
|
|
encoded_password = base64.b64encode(password_str.encode(self.ENCODING)).decode(self.ENCODING)
|
|
return encoded_password
|
|
|
|
def fetch_config_androidtv(self) -> dict:
|
|
response = requests.get(self.CONFIG_URL, timeout=10).text
|
|
match = re.search(self.CONFIG_PATTERN, response, re.DOTALL)
|
|
if not match:
|
|
raise ValueError("Configuration pattern not found in the response.")
|
|
|
|
ua_body = match.group(1)
|
|
|
|
pairs = re.finditer(r'(\w+)\s*:\s*(?:\'([^\']*)\'|"([^"]*)")', ua_body)
|
|
config_data = {
|
|
p.group(1): (p.group(2) if p.group(2) is not None else p.group(3))
|
|
for p in pairs
|
|
}
|
|
|
|
return {
|
|
"version": config_data['version'],
|
|
"timestamp": config_data['timestamp'],
|
|
"ua": {
|
|
"query": config_data['query'],
|
|
"param": config_data['param'],
|
|
"jswbw": config_data['jswbw']
|
|
},
|
|
"name": config_data['name'],
|
|
"language": config_data['language']
|
|
}
|
|
|
|
def reconstruct_app_consumer_secret(self) -> str:
|
|
mock_date_converter = self.mock_date_converter
|
|
if(self.device_class == self.DEVICE_PHONE_ANDROID):
|
|
hours = mock_date_converter.get_value_phone_hours()
|
|
minutes = mock_date_converter.get_value_phone_minutes()
|
|
seconds = mock_date_converter.get_value_phone_seconds()
|
|
elif(self.device_class == self.DEVICE_TABLET_ANDROID):
|
|
hours = mock_date_converter.get_value_tablet_hours()
|
|
minutes = mock_date_converter.get_value_tablet_minutes()
|
|
seconds = mock_date_converter.get_value_tablet_seconds()
|
|
else:
|
|
raise ValueError("Incorrect device_class for this function.")
|
|
return hours + minutes + seconds
|
|
|
|
def reconstruct_app_consumer_key(self) -> str:
|
|
mock_date_converter = self.mock_date_converter
|
|
if(self.device_class == self.DEVICE_PHONE_ANDROID):
|
|
hours = mock_date_converter.get_key_phone_hours()
|
|
minutes = mock_date_converter.get_key_phone_minutes()
|
|
seconds = mock_date_converter.get_key_phone_seconds()
|
|
elif(self.device_class == self.DEVICE_TABLET_ANDROID):
|
|
hours = mock_date_converter.get_key_tablet_hours()
|
|
minutes = mock_date_converter.get_key_tablet_minutes()
|
|
seconds = mock_date_converter.get_key_tablet_seconds()
|
|
else:
|
|
raise ValueError("Incorrect device_class for this function.")
|
|
return hours + minutes + seconds
|
|
|
|
def decrypt_ua(self, key: str) -> str:
|
|
try:
|
|
encrypted_data = json.loads(self.config['ua'][key])
|
|
except KeyError as e:
|
|
raise KeyError(f"UA key '{key}' not found in config") from e
|
|
return self.decrypt(encrypted_data, self.password)
|
|
|
|
@classmethod
|
|
def openssl_evp_bytes_to_key(cls, password: str, salt: bytes) -> tuple:
|
|
password_bytes = password.encode(cls.ENCODING)
|
|
key_iv = b''
|
|
prev = b''
|
|
while len(key_iv) < cls.KEY_LENGTH + cls.IV_LENGTH:
|
|
prev = cls.HASH_FUNCTION(prev + password_bytes + salt).digest()
|
|
key_iv += prev
|
|
key = key_iv[:cls.KEY_LENGTH]
|
|
iv = key_iv[cls.KEY_LENGTH:cls.KEY_LENGTH + cls.IV_LENGTH]
|
|
return key, iv
|
|
|
|
@classmethod
|
|
def _decode_segment(cls, segment: list[str]) -> bytes:
|
|
return bytes.fromhex(''.join(segment[::-1]))
|
|
|
|
@classmethod
|
|
def _extract_salt_and_ciphertext(cls, data: bytes) -> tuple[bytes, bytes]:
|
|
if data.startswith(cls.SALT_PREFIX):
|
|
if len(data) < len(cls.SALT_PREFIX) + cls.SALT_LENGTH:
|
|
raise ValueError("Invalid salted ciphertext: truncated salt")
|
|
|
|
salt = data[len(cls.SALT_PREFIX):len(cls.SALT_PREFIX) + cls.SALT_LENGTH]
|
|
ciphertext = data[len(cls.SALT_PREFIX) + cls.SALT_LENGTH:]
|
|
else:
|
|
salt = b''
|
|
ciphertext = data
|
|
return salt, ciphertext
|
|
|
|
@classmethod
|
|
def _decrypt_ciphertext(cls, ciphertext: bytes, password: str, salt: bytes) -> bytes:
|
|
key, iv = cls.openssl_evp_bytes_to_key(password, salt)
|
|
cipher = AES.new(key, cls.CIPHER_MODE, iv)
|
|
return unpad(cipher.decrypt(ciphertext), cls.BLOCK_SIZE)
|
|
|
|
@classmethod
|
|
def decrypt(cls, salt_ciphertext_segments: list[list[str]], password: str) -> str:
|
|
decrypted_parts = []
|
|
|
|
for segment in salt_ciphertext_segments:
|
|
raw_bytes = cls._decode_segment(segment)
|
|
decoded = base64.b64decode(raw_bytes.decode(cls.ENCODING))
|
|
|
|
salt, ciphertext = cls._extract_salt_and_ciphertext(decoded)
|
|
plaintext_bytes = cls._decrypt_ciphertext(ciphertext, password, salt)
|
|
|
|
decrypted_parts.append(plaintext_bytes.decode(cls.ENCODING))
|
|
|
|
return ''.join(decrypted_parts)
|
|
|
|
@staticmethod
|
|
def get_nonce_web() -> str:
|
|
random_part = ''.join(random.choices('abcdefghijklmnopqrstuvwxyz0123456789', k=11))
|
|
timestamp_part = str(int(time.time() * 1000))
|
|
full_nonce = random_part + timestamp_part
|
|
return base64.b64encode(full_nonce.encode('utf-8')).decode('utf-8')
|
|
|
|
@staticmethod
|
|
def get_nonce_apk() -> str:
|
|
random_part = ''.join(random.choices('ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789', k=9))
|
|
return random_part
|
|
|
|
@staticmethod
|
|
def get_hmac_sha1(key: bytes, message: str) -> str:
|
|
signature = hmac.new(key, message.encode(), hashlib.sha1)
|
|
return base64.b64encode(signature.digest()).decode()
|
|
|
|
def get_signature(self, method: str, url: str, token: str, scheme: str, params: dict | None = None) -> str:
|
|
method = method.upper()
|
|
allowed_methods = {"GET", "POST", "PUT", "DELETE"}
|
|
allowed_schemes = {"Basic", "Bearer"}
|
|
|
|
if method not in allowed_methods:
|
|
raise ValueError(f"Invalid method: {method}. Allowed methods are: {allowed_methods}")
|
|
if scheme not in allowed_schemes:
|
|
raise ValueError(f"Invalid scheme: {scheme}. Allowed schemes are: {allowed_schemes}")
|
|
|
|
consumer_secret = self.consumer_secret.encode('utf-8')
|
|
consumer_key = self.consumer_key
|
|
|
|
if self.device_class in [self.DEVICE_WEBPLAYER, self.DEVICE_ANDROID_TV]:
|
|
nonce = self.get_nonce_web()
|
|
else:
|
|
nonce = self.get_nonce_apk()
|
|
|
|
signature_method = "HMAC-SHA1"
|
|
timestamp = str(int(time.time()))
|
|
version = "1.0"
|
|
|
|
parsed_url = urllib.parse.urlparse(url)
|
|
base_url = parsed_url.netloc + parsed_url.path
|
|
|
|
url_params = dict(urllib.parse.parse_qsl(parsed_url.query))
|
|
|
|
merged_params = {}
|
|
if params:
|
|
merged_params.update(params)
|
|
merged_params.update(url_params)
|
|
|
|
merged_params[scheme] = token
|
|
merged_params['consumer_key'] = consumer_key
|
|
merged_params['nonce'] = nonce
|
|
merged_params['signature_method'] = signature_method
|
|
merged_params['timestamp'] = timestamp
|
|
merged_params['version'] = version
|
|
|
|
for key in list(merged_params.keys()):
|
|
raw_value = str(merged_params[key])
|
|
decoded_value = urllib.parse.unquote(raw_value)
|
|
|
|
if '@' in decoded_value:
|
|
merged_params[key] = urllib.parse.quote_plus(decoded_value)
|
|
else:
|
|
merged_params[key] = raw_value
|
|
|
|
sorted_encoded_params = []
|
|
for key in sorted(merged_params.keys()):
|
|
encoded_key = urllib.parse.quote_plus(key)
|
|
encoded_value = urllib.parse.quote_plus(merged_params[key])
|
|
sorted_encoded_params.append(f"{encoded_key}%3D{encoded_value}")
|
|
|
|
encoded_sorted_params = "%26".join(sorted_encoded_params)
|
|
data_to_sign = f"{method}&{urllib.parse.quote_plus(base_url)}&{encoded_sorted_params}"
|
|
|
|
hmac_sha1 = self.get_hmac_sha1(consumer_secret, data_to_sign)
|
|
|
|
result_string = (
|
|
f'consumer_key="{consumer_key}",'
|
|
f'nonce="{nonce}",'
|
|
f'signature_method="{signature_method}",'
|
|
f'signature="{hmac_sha1}",'
|
|
f'timestamp="{timestamp}",'
|
|
f'version="{version}"'
|
|
)
|
|
|
|
return base64.b64encode(result_string.encode()).decode() |