diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..713ec1a --- /dev/null +++ b/LICENSE @@ -0,0 +1,9 @@ +MIT License + +Copyright (c) 2026 moszkowski + +Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/README.md b/README.md new file mode 100644 index 0000000..626e8e3 --- /dev/null +++ b/README.md @@ -0,0 +1,177 @@ +# MPCrypto + +MPCrypto is a Python helper for generating authentication tokens and request signatures. + +This project intentionally implements **legacy and non-standard behaviors** (custom hashes, double-encoding, legacy crypto) required for compatibility reasons. + +--- + +## Features + +- Device-aware behavior for `webplayer`, `android.tv`, `phone.android`, and `tablet.android` +- Basic token generation tied to device class +- Request signature generation (HMAC-SHA1) +- Support for **Basic** and **Bearer** authentication schemes +- OpenSSL-compatible salted AES-CBC decryption (`EVP_BytesToKey`) used to derive Consumer Key and Consumer Secret values +- Android app Consumer Key and Consumer Secret reconstruction via `MockDateConverter`, which implements a SHA-256–like hash with XOR masking + +--- + +## XOR template generation (Ghidra) + +The XOR templates required by `MockDateConverter` are extracted from the original Android native library using Ghidra. + +This repository includes a Ghidra Python script that automates the extraction process by: + +- Locating JNI-exported functions related to `MockDateConverter` +- Decompiling each function +- Recovering the XOR byte sequences used internally +- Normalizing function names into the Python-friendly `XOR_TEMPLATES` format + +The script supports both DAT-backed byte arrays accessed through pointer arithmetic and hardcoded local-variable XOR sequences produced by the compiler. + +The output is a ready-to-use dictionary compatible with the `MockDateConverter` constructor. + + +--- + +## Supported device classes + +``` +webplayer +android.tv +phone.android +tablet.android +``` + +--- + +## Initialization + +`MPCrypto` can be initialized in two distinct ways depending on the device class. + +### Web-based devices (`webplayer`, `android.tv`) + +For web-based clients, configuration data is fetched remotely and decrypted. You must provide: + +- `config_url`: URL pointing to the remote configuration source +- `config_pattern`: Regular expression used to extract the configuration payload from the response + +These values are device and version specific and must be supplied by the user. + +**Initialization example:** + +```python +from mp_crypto import MPCrypto + +mp_crypto = MPCrypto( + device_class="webplayer", + config_url="https://example.com/config.php", + config_pattern=r"some_regex_pattern_here", +) +``` + +### Android app devices (`phone.android`, `tablet.android`) + +For Android-based clients, no remote configuration is fetched. Instead, key material is reconstructed locally using `MockDateConverter`. + +In this case, you must provide: + +- A fully initialized `MockDateConverter` instance +- A valid `package_name` +- A populated `xor_templates` dictionary (typically generated via the provided Ghidra script) + +The library validates that all required XOR templates for the selected device class are present before proceeding. + +**Initialization example:** + +```python +from mp_crypto import MPCrypto +from mock_date_converter import MockDateConverter + +XOR_TEMPLATES = { + 'key': bytes.fromhex("0123456789abcdef"), + #... +} + +mock_date_converter = MockDateConverter( + package_name="com.example.app", + xor_templates=XOR_TEMPLATES, +) + +mp_crypto = MPCrypto( + device_class="phone.android", + mock_date_converter=mock_date_converter, +) +``` + +--- + +## Basic authentication example + +This example shows how to sign requests using a **Basic** token for web-based devices. + +```python +new_device_id = str(uuid.uuid4()) +basic_token = mp_crypto.get_basic_token(new_device_id) + +signature = mp_crypto.get_signature( + method="POST", + url=url, + token=basic_token, + scheme="Basic", + params=data, +) + +headers = { + **default_auth_headers, + "Authorization": f"OPPlus Basic={basic_token},Signature={signature}", +} + +resp = requests.post( + url, + data=data, + headers=headers, +) +``` + +--- + +## Bearer authentication example + +This example shows how to sign requests using a previously obtained **Bearer** token. + +```python +signature = mp_crypto.get_signature( + method="GET", + url=url, + token=login_token, + scheme="Bearer", +) + +headers = { + **default_account_info_headers, + "Authorization": f"OPPlus Bearer={login_token},Signature={signature}", +} + +resp = requests.get( + url, + headers=headers, +) +``` + +--- + +## Disclaimer + +This project is provided for **educational and interoperability purposes only**. + +1. This project does not condone piracy or violations of DRM systems. +2. All techniques are the result of reverse engineering, publicly available research, and empirical analysis. +3. Do not use this software to access, decrypt, or distribute content without proper authorization. +4. Unauthorized access or redistribution of protected content may violate applicable laws. +5. This software must not be used for illegal activities, including DRM circumvention. +6. The authors and contributors are not responsible for misuse of this project. +7. By using this software, you agree to comply with all applicable laws and regulations. + +The authors assume no responsibility for misuse. diff --git a/mock_date_converter.py b/mock_date_converter.py new file mode 100644 index 0000000..a6611fa --- /dev/null +++ b/mock_date_converter.py @@ -0,0 +1,128 @@ +import struct + +class MockDateConverter: + + IV = [ + 0x6a09e667, 0xbb67ae85, 0x3c6ef372, 0xa54ff53a, + 0x510e527f, 0x9b05688c, 0x1f83d9ab, 0x5be0cd19 + ] + + def __init__(self, package_name: str, xor_templates: dict[str, bytes]): + if not package_name: + raise ValueError("package_name is required") + + if not xor_templates: + raise ValueError("xor_templates must not be empty") + + self.package_name = package_name + self.xor_templates = xor_templates + + self.K = [ + 0x428a2f98, 0x71374491, 0xb5c0fbcf, 0xe9b5dba5, + 0x3956c25b, 0x59f111f1, 0x923f82a4, 0xab1c5ed5, + 0xd807aa98, 0x12835b01, 0x243185be, 0x550c7dc3, + 0x72be5d74, 0x80deb1fe, 0x9bdc06a7, 0xc19bf174, + 0xe49b69c1, 0xefbe4786, 0x0fc19dc6, 0x240ca1cc, + 0x2de92c6f, 0x4a7484aa, 0x5cb0a9dc, 0x76f988da, + 0x983e5152, 0xa831c66d, 0xb00327c8, 0xbf597fc7, + 0xc6e00bf3, 0xd5a79147, 0x06ca6351, 0x14292967, + 0x27b70a85, 0x2e1b2138, 0x4d2c6dfc, 0x53380d13, + 0x650a7354, 0x766a0abb, 0x81c2c92e, 0x92722c85, + 0xa2bfe8a1, 0xa81a664b, 0xc24b8b70, 0xc76c51a3, + 0xd192e819, 0xd6990624, 0xf40e3585, 0x106aa070, + 0x19a4c116, 0x1e376c08, 0x2748774c, 0x34b0bcb5, + 0x391c0cb3, 0x4ed8aa4a, 0x5b9cca4f, 0x682e6ff3, + 0x748f82ee, 0x78a5636f, 0x84c87814, 0x8cc70208, + 0x90befffa, 0xa4506ceb, 0xbef9a3f7, 0xc67178f2 + ] + + def right_rotate(self, value, shift): + return ((value >> shift) | (value << (32 - shift))) & 0xFFFFFFFF + + def hash(self, message): + h = list(self.IV) + original_length = len(message) + bit_length = original_length * 8 + message += b'\x80' + b'\x00' * ((56 - (original_length + 1) % 64) % 64) + message += struct.pack('>Q', bit_length) + for i in range(0, len(message), 64): + chunk = message[i:i+64] + w = [0] * 64 + for j in range(16): + w[j] = struct.unpack('>I', chunk[j*4:j*4+4])[0] + for j in range(16, 64): + s0 = self.right_rotate(w[j-15], 7) ^ self.right_rotate(w[j-15], 18) ^ (w[j-15] >> 3) + s1 = self.right_rotate(w[j-2], 17) ^ self.right_rotate(w[j-2], 19) ^ (w[j-2] >> 10) + w[j] = (w[j-16] + s0 + w[j-7] + s1) & 0xFFFFFFFF + a, b, c, d, e, f, g, h_temp = h + for j in range(64): + S1 = self.right_rotate(e, 6) ^ self.right_rotate(e, 11) ^ self.right_rotate(e, 25) + ch = (e & f) ^ ((~e) & g) + temp1 = (h_temp + S1 + ch + self.K[j] + w[j]) & 0xFFFFFFFF + S0 = self.right_rotate(a, 2) ^ self.right_rotate(a, 13) ^ self.right_rotate(a, 22) + maj = (a & b) ^ (a & c) ^ (b & c) + temp2 = (S0 + maj) & 0xFFFFFFFF + h_temp, g, f = g, f, e + e = (d + temp1) & 0xFFFFFFFF + d, c, b = c, b, a + a = (temp1 + temp2) & 0xFFFFFFFF + h = [(x + y) & 0xFFFFFFFF for x, y in zip(h, [a, b, c, d, e, f, g, h_temp])] + return struct.pack('>8I', *h) + + def _compute(self, func_type): + xor_template = self.xor_templates[func_type] + hash_bytes = self.hash(self.package_name.encode()) + hex_str = hash_bytes.hex() + data_bytes = hex_str.encode('latin-1') + + data = data_bytes[:len(xor_template)] + result_bytes = bytes([a ^ b for a, b in zip(data, xor_template)]) + return result_bytes.decode('latin-1') + + def get_value_phone_hours(self): + return self._compute('value_phone_hours') + + def get_value_phone_minutes(self): + return self._compute('value_phone_minutes') + + def get_value_phone_seconds(self): + return self._compute('value_phone_seconds') + + def get_value_tablet_hours(self): + return self._compute('value_tablet_hours') + + def get_value_tablet_minutes(self): + return self._compute('value_tablet_minutes') + + def get_value_tablet_seconds(self): + return self._compute('value_tablet_seconds') + + def get_key_phone_hours(self): + return self._compute('key_phone_hours') + + def get_key_phone_minutes(self): + return self._compute('key_phone_minutes') + + def get_key_phone_seconds(self): + return self._compute('key_phone_seconds') + + def get_key_tablet_hours(self): + return self._compute('key_tablet_hours') + + def get_key_tablet_minutes(self): + return self._compute('key_tablet_minutes') + + def get_key_tablet_seconds(self): + return self._compute('key_tablet_seconds') + + def get_key_demo_indigital(self): + return self._compute('key_demo_indigital') + + def get_key_prod_indigital(self): + return self._compute('key_prod_indigital') + + def get_key_sender_demo_indigital(self): + return self._compute('key_sender_demo_indigital') + + def get_key_sender_prod_indigital(self): + return self._compute('key_sender_prod_indigital') diff --git a/mp_crypto.py b/mp_crypto.py new file mode 100644 index 0000000..fcc76f6 --- /dev/null +++ b/mp_crypto.py @@ -0,0 +1,327 @@ +import base64 +import hashlib +import hmac +import json +import random +import re +import time +import urllib.parse +import requests +from Cryptodome.Cipher import AES +from Cryptodome.Util.Padding import unpad +from mock_date_converter import MockDateConverter + +class MPCrypto: + SALT_PREFIX = b"Salted__" + BLOCK_SIZE = AES.block_size + KEY_LENGTH = 32 + IV_LENGTH = 16 + SALT_LENGTH = 8 + ENCODING = "utf-8" + CIPHER_MODE = AES.MODE_CBC + HASH_FUNCTION = hashlib.md5 + + DEVICE_WEBPLAYER = "webplayer" + DEVICE_ANDROID_TV = "android.tv" + DEVICE_PHONE_ANDROID = "phone.android" + DEVICE_TABLET_ANDROID = "tablet.android" + + _PHONE_REQUIRED_KEYS = { + "value_phone_hours", + "value_phone_minutes", + "value_phone_seconds", + "key_phone_hours", + "key_phone_minutes", + "key_phone_seconds", + } + + _TABLET_REQUIRED_KEYS = { + "value_tablet_hours", + "value_tablet_minutes", + "value_tablet_seconds", + "key_tablet_hours", + "key_tablet_minutes", + "key_tablet_seconds", + } + + def __init__(self, device_class: str, *, config_url: str | None = None, config_pattern: str | None = None, mock_date_converter: MockDateConverter | None = None): + self.device_class = device_class + self.mock_date_converter = mock_date_converter + + valid_devices = { + self.DEVICE_WEBPLAYER, + self.DEVICE_ANDROID_TV, + self.DEVICE_PHONE_ANDROID, + self.DEVICE_TABLET_ANDROID, + } + if device_class not in valid_devices: + raise ValueError(f"Unsupported device_class: {device_class}") + + if device_class in {self.DEVICE_WEBPLAYER, self.DEVICE_ANDROID_TV}: + if not config_url or not config_pattern: + raise ValueError("config_url and config_pattern are required for webplayer/android.tv") + + self.CONFIG_URL = config_url + self.CONFIG_PATTERN = config_pattern + + if device_class == self.DEVICE_WEBPLAYER: + self.config = self.fetch_config_webplayer() + else: + self.config = self.fetch_config_androidtv() + + self.password = self.generate_password() + self.consumer_secret = self.decrypt_ua("param") + self.consumer_key = self.decrypt_ua("query") + self.jswbw = self.decrypt_ua("jswbw") + + return + + if mock_date_converter is None: + raise ValueError("mock_date_converter is required for phone.android / tablet.android") + + self._validate_mock_date_converter(mock_date_converter, device_class) + self.mock_date_converter = mock_date_converter + + self.consumer_secret = self.reconstruct_app_consumer_secret() + self.consumer_key = self.reconstruct_app_consumer_key() + + def _validate_mock_date_converter(self, mock_date_converter: MockDateConverter, device_class: str) -> None: + package_name = getattr(mock_date_converter, "package_name", None) + if not package_name: + raise ValueError("mock_date_converter.package_name must be set") + + xor_templates = getattr(mock_date_converter, "xor_templates", None) + if not xor_templates: + raise ValueError("mock_date_converter.xor_templates must be populated before use") + + required = self._PHONE_REQUIRED_KEYS if device_class == self.DEVICE_PHONE_ANDROID else self._TABLET_REQUIRED_KEYS + missing = required - xor_templates.keys() + if missing: + raise ValueError(f"Missing XOR templates for {device_class}: {sorted(missing)}") + + + def get_basic_token(self, device_id: str) -> str: + token_str = f"{device_id}:{self.device_class}" + return base64.b64encode(token_str.encode(self.ENCODING)).decode(self.ENCODING) + + def fetch_config_webplayer(self) -> dict: + response = requests.get(self.CONFIG_URL, timeout=10).text + match = re.search(self.CONFIG_PATTERN, response, re.DOTALL) + if not match: + raise ValueError("Configuration pattern not found in the response.") + + config_data = json.loads(match.group(1))['config'] + return { + "version": config_data['version'], + "timestamp": config_data['timestamp'], + "ua": config_data['ua'], + "name": config_data['name'], + "language": config_data['language'] + } + + def generate_password(self) -> str: + version = self.config['version'] + timestamp = self.config['timestamp'] + name = self.config['name'] + language = self.config['language'] + password_str = f"{version}{name}{timestamp}{language}" + encoded_password = base64.b64encode(password_str.encode(self.ENCODING)).decode(self.ENCODING) + return encoded_password + + def fetch_config_androidtv(self) -> dict: + response = requests.get(self.CONFIG_URL, timeout=10).text + match = re.search(self.CONFIG_PATTERN, response, re.DOTALL) + if not match: + raise ValueError("Configuration pattern not found in the response.") + + ua_body = match.group(1) + + pairs = re.finditer(r'(\w+)\s*:\s*(?:\'([^\']*)\'|"([^"]*)")', ua_body) + config_data = { + p.group(1): (p.group(2) if p.group(2) is not None else p.group(3)) + for p in pairs + } + + return { + "version": config_data['version'], + "timestamp": config_data['timestamp'], + "ua": { + "query": config_data['query'], + "param": config_data['param'], + "jswbw": config_data['jswbw'] + }, + "name": config_data['name'], + "language": config_data['language'] + } + + def reconstruct_app_consumer_secret(self) -> str: + mock_date_converter = self.mock_date_converter + if(self.device_class == self.DEVICE_PHONE_ANDROID): + hours = mock_date_converter.get_value_phone_hours() + minutes = mock_date_converter.get_value_phone_minutes() + seconds = mock_date_converter.get_value_phone_seconds() + elif(self.device_class == self.DEVICE_TABLET_ANDROID): + hours = mock_date_converter.get_value_tablet_hours() + minutes = mock_date_converter.get_value_tablet_minutes() + seconds = mock_date_converter.get_value_tablet_seconds() + else: + raise ValueError("Incorrect device_class for this function.") + return hours + minutes + seconds + + def reconstruct_app_consumer_key(self) -> str: + mock_date_converter = self.mock_date_converter + if(self.device_class == self.DEVICE_PHONE_ANDROID): + hours = mock_date_converter.get_key_phone_hours() + minutes = mock_date_converter.get_key_phone_minutes() + seconds = mock_date_converter.get_key_phone_seconds() + elif(self.device_class == self.DEVICE_TABLET_ANDROID): + hours = mock_date_converter.get_key_tablet_hours() + minutes = mock_date_converter.get_key_tablet_minutes() + seconds = mock_date_converter.get_key_tablet_seconds() + else: + raise ValueError("Incorrect device_class for this function.") + return hours + minutes + seconds + + def decrypt_ua(self, key: str) -> str: + try: + encrypted_data = json.loads(self.config['ua'][key]) + except KeyError as e: + raise KeyError(f"UA key '{key}' not found in config") from e + return self.decrypt(encrypted_data, self.password) + + @classmethod + def openssl_evp_bytes_to_key(cls, password: str, salt: bytes) -> tuple: + password_bytes = password.encode(cls.ENCODING) + key_iv = b'' + prev = b'' + while len(key_iv) < cls.KEY_LENGTH + cls.IV_LENGTH: + prev = cls.HASH_FUNCTION(prev + password_bytes + salt).digest() + key_iv += prev + key = key_iv[:cls.KEY_LENGTH] + iv = key_iv[cls.KEY_LENGTH:cls.KEY_LENGTH + cls.IV_LENGTH] + return key, iv + + @classmethod + def _decode_segment(cls, segment: list[str]) -> bytes: + return bytes.fromhex(''.join(segment[::-1])) + + @classmethod + def _extract_salt_and_ciphertext(cls, data: bytes) -> tuple[bytes, bytes]: + if data.startswith(cls.SALT_PREFIX): + if len(data) < len(cls.SALT_PREFIX) + cls.SALT_LENGTH: + raise ValueError("Invalid salted ciphertext: truncated salt") + + salt = data[len(cls.SALT_PREFIX):len(cls.SALT_PREFIX) + cls.SALT_LENGTH] + ciphertext = data[len(cls.SALT_PREFIX) + cls.SALT_LENGTH:] + else: + salt = b'' + ciphertext = data + return salt, ciphertext + + @classmethod + def _decrypt_ciphertext(cls, ciphertext: bytes, password: str, salt: bytes) -> bytes: + key, iv = cls.openssl_evp_bytes_to_key(password, salt) + cipher = AES.new(key, cls.CIPHER_MODE, iv) + return unpad(cipher.decrypt(ciphertext), cls.BLOCK_SIZE) + + @classmethod + def decrypt(cls, salt_ciphertext_segments: list[list[str]], password: str) -> str: + decrypted_parts = [] + + for segment in salt_ciphertext_segments: + raw_bytes = cls._decode_segment(segment) + decoded = base64.b64decode(raw_bytes.decode(cls.ENCODING)) + + salt, ciphertext = cls._extract_salt_and_ciphertext(decoded) + plaintext_bytes = cls._decrypt_ciphertext(ciphertext, password, salt) + + decrypted_parts.append(plaintext_bytes.decode(cls.ENCODING)) + + return ''.join(decrypted_parts) + + @staticmethod + def get_nonce_web() -> str: + random_part = ''.join(random.choices('abcdefghijklmnopqrstuvwxyz0123456789', k=11)) + timestamp_part = str(int(time.time() * 1000)) + full_nonce = random_part + timestamp_part + return base64.b64encode(full_nonce.encode('utf-8')).decode('utf-8') + + @staticmethod + def get_nonce_apk() -> str: + random_part = ''.join(random.choices('ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789', k=9)) + return random_part + + @staticmethod + def get_hmac_sha1(key: bytes, message: str) -> str: + signature = hmac.new(key, message.encode(), hashlib.sha1) + return base64.b64encode(signature.digest()).decode() + + def get_signature(self, method: str, url: str, token: str, scheme: str, params: dict | None = None) -> str: + method = method.upper() + allowed_methods = {"GET", "POST", "PUT", "DELETE"} + allowed_schemes = {"Basic", "Bearer"} + + if method not in allowed_methods: + raise ValueError(f"Invalid method: {method}. Allowed methods are: {allowed_methods}") + if scheme not in allowed_schemes: + raise ValueError(f"Invalid scheme: {scheme}. Allowed schemes are: {allowed_schemes}") + + consumer_secret = self.consumer_secret.encode('utf-8') + consumer_key = self.consumer_key + + if self.device_class in [self.DEVICE_WEBPLAYER, self.DEVICE_ANDROID_TV]: + nonce = self.get_nonce_web() + else: + nonce = self.get_nonce_apk() + + signature_method = "HMAC-SHA1" + timestamp = str(int(time.time())) + version = "1.0" + + parsed_url = urllib.parse.urlparse(url) + base_url = parsed_url.netloc + parsed_url.path + + url_params = dict(urllib.parse.parse_qsl(parsed_url.query)) + + merged_params = {} + if params: + merged_params.update(params) + merged_params.update(url_params) + + merged_params[scheme] = token + merged_params['consumer_key'] = consumer_key + merged_params['nonce'] = nonce + merged_params['signature_method'] = signature_method + merged_params['timestamp'] = timestamp + merged_params['version'] = version + + for key in list(merged_params.keys()): + raw_value = str(merged_params[key]) + decoded_value = urllib.parse.unquote(raw_value) + + if '@' in decoded_value: + merged_params[key] = urllib.parse.quote_plus(decoded_value) + else: + merged_params[key] = raw_value + + sorted_encoded_params = [] + for key in sorted(merged_params.keys()): + encoded_key = urllib.parse.quote_plus(key) + encoded_value = urllib.parse.quote_plus(merged_params[key]) + sorted_encoded_params.append(f"{encoded_key}%3D{encoded_value}") + + encoded_sorted_params = "%26".join(sorted_encoded_params) + data_to_sign = f"{method}&{urllib.parse.quote_plus(base_url)}&{encoded_sorted_params}" + + hmac_sha1 = self.get_hmac_sha1(consumer_secret, data_to_sign) + + result_string = ( + f'consumer_key="{consumer_key}",' + f'nonce="{nonce}",' + f'signature_method="{signature_method}",' + f'signature="{hmac_sha1}",' + f'timestamp="{timestamp}",' + f'version="{version}"' + ) + + return base64.b64encode(result_string.encode()).decode() \ No newline at end of file