diff --git a/README.md b/README.md index 18aeb02..eb3c66f 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,3 @@ # PlayReady -PlayReady stuffs \ No newline at end of file +PlayReady stuffs, forks and collections from PlayReady Discord server diff --git a/decbin-pr.zip b/decbin-pr.zip new file mode 100644 index 0000000..1c1f289 Binary files /dev/null and b/decbin-pr.zip differ diff --git a/decrypt_bgc_zgp_pak.py b/decrypt_bgc_zgp_pak.py new file mode 100644 index 0000000..3007105 --- /dev/null +++ b/decrypt_bgc_zgp_pak.py @@ -0,0 +1,464 @@ +import sys +import subprocess + +from typing import Union, Optional +from pathlib import Path +from zlib import crc32 + +from Crypto.Cipher import AES +from Crypto.Hash import CMAC + +from cryptography.hazmat.primitives.keywrap import aes_key_unwrap + + +class Pak: + def __init__(self, bgroupcert_sizes: dict, zgpriv_sizes: dict): + self.mstar_key_bank_magic_id = b"Mstar.Key.Bank" + self.secure_store_file_magic_id = b"MSTAR_SECURE_STORE_FILE_MAGIC_ID" + self.bgroupcert_sizes = bgroupcert_sizes + self.zgpriv_sizes = zgpriv_sizes + + def process_files(self, boot_file: Path, sedata_file: Path, playready_keys: dict) -> None: + self.validate_files(boot_file, sedata_file) + boot_key = self.get_boot_key(input_file=boot_file).hex() + + boot_keys = { + "brand_unknown": { + "MODEL_UNKNOWN": boot_key + } + } + + decryption_bgc_zgp = DecryptionBgcZgp( + boot_keys=boot_keys, + playready_keys=playready_keys + ) + + chunks = self.get_chunks_from_sedata(input_file=sedata_file) + + for bgp_security_level, bgp_size in self.bgroupcert_sizes.items(): + print(f"\nTrying get BGCert - Security Level: {bgp_security_level}") + for chunk in chunks: + data = self.prepare_data(chunk) + + bgroupcert_data = self.decrypt_bgroupcert(decryption_bgc_zgp, data, bgp_size) + if not bgroupcert_data: + continue + + print("Firmware:") + print(f" - Key : {decryption_bgc_zgp.boot_key.hex()}") + print(f" - Brand: {decryption_bgc_zgp.brand}") + + for chunk in chunks: + data = self.prepare_data(chunk) + + zgpriv_data = self.decrypt_zgpriv(decryption_bgc_zgp, data, self.zgpriv_sizes[bgp_security_level]) + if not zgpriv_data: + continue + + code = self.process_decrypted_data(decryption_bgc_zgp, bgroupcert_data, zgpriv_data) + if code == 0: + break + + def get_boot_key(self, input_file: Path, offset: int = 16) -> bytes: + data = DecryptionBgcZgp.get_file_data(input_file) + + index = data.find(self.mstar_key_bank_magic_id) + if index == -1 or index < offset: + raise ValueError("Error: Boot key not found or invalid position") + + return data[index - offset:index] + + def get_chunks_from_sedata(self, input_file: Path, min_size: int = 0, max_size: int = 2000) -> list[dict]: + data = DecryptionBgcZgp.get_file_data(input_file) + + chunks = [] + start_pos = 0 + + while True: + pos = data.find(self.secure_store_file_magic_id, start_pos) + if pos == -1: + break + + header_start = max(0, pos - 32) + + header = data[header_start:pos] + + data_start = pos + len(self.secure_store_file_magic_id) + + next_pos = data.find(self.secure_store_file_magic_id, data_start) + if next_pos == -1: + data_end = len(data) + else: + data_end = next_pos + + extracted_data = data[data_start:data_end] + total_size = len(header) + len(self.secure_store_file_magic_id) + len(extracted_data) + + if min_size <= total_size <= max_size: + chunks.append({ + "position": header_start if len(header) == 32 else pos, + "header": header if len(header) == 32 else b"", + "has_full_header": len(header) == 32, + "data": extracted_data, + "total_size": total_size + }) + + start_pos = data_end + + return chunks + + def prepare_data(self, chunk: dict) -> bytes: + magic_id = self.secure_store_file_magic_id + header = chunk["header"] if chunk["has_full_header"] else b"" + return header + magic_id + chunk["data"] + + def process_decrypted_data( + self, decryption_bgc_zgp: "DecryptionBgcZgp", bgroupcert_data: bytes, zgpriv_data: bytes + ) -> int: + bgroupcert_tmp_path = Path("bgroupcert.decrypt_bgc_zgp_pak.tmp") + zgpriv_tmp_path = Path("zgpriv_temp.decrypt_bgc_zgp_pak.tmp") + + code = 1 + + try: + bgroupcert_tmp_path.write_bytes(bgroupcert_data) + zgpriv_tmp_path.write_bytes(zgpriv_data) + + command = [ + "pyplayready", + "create-device", + "-c", str(bgroupcert_tmp_path), + "-k", str(zgpriv_tmp_path) + ] + + result = subprocess.run( + command, + capture_output=True, + text=True + ) + + if result.returncode == 0: + prd_path = Path(result.stderr.split("Saved to: ")[-1].strip()) + folder_path = Path("playready_decryption_bgc_zgp_pak") / prd_path.stem + + if folder_path.exists() and any(item.is_file() for item in folder_path.iterdir()): + raise ValueError(f"Error: This certificate already exists in {folder_path}") + + folder_path.mkdir(parents=True, exist_ok=True) + + bgroupcert_out_path = folder_path / "bgroupcert.dat" + bgroupcert_out_path.write_bytes(bgroupcert_data) + + zgpriv_out_path = folder_path / "zgpriv.dat" + zgpriv_out_path.write_bytes(zgpriv_data) + + prd_path.rename(folder_path / prd_path.name) + + security_level = decryption_bgc_zgp.get_security_level(bgroupcert_data) + + print("Decryption:") + print(f" - BGroupCert Security level: {security_level}") + print(f" - BGroupCert Path : {bgroupcert_out_path}") + print(f" - ZGPriv Path : {zgpriv_out_path}") + print(f" - PRD Path : {folder_path / prd_path.name}") + print(f" - Files exported to : {folder_path.parent}") + code = result.returncode + finally: + bgroupcert_tmp_path.unlink(missing_ok=True) + zgpriv_tmp_path.unlink(missing_ok=True) + return code + + @staticmethod + def validate_files(boot_file: Path, sedata_file: Path) -> None: + if not boot_file.exists(): + raise ValueError(f"Error: Boot File not found -> {boot_file}") + if not sedata_file.exists(): + raise ValueError(f"Error: SeData File not found -> {sedata_file}") + + @staticmethod + def decrypt_bgroupcert(decryption_bgc_zgp: "DecryptionBgcZgp", data: bytes, size: int): + try: + decrypted_bgroupcert_data = decryption_bgc_zgp.decrypt_aes_ecb( + inp=data, + size=size, + is_bgroupcert=True + ) + return decryption_bgc_zgp.process_bgroupcert(data=decrypted_bgroupcert_data) + except ValueError: + return None + + @staticmethod + def decrypt_zgpriv(decryption_bgc_zgp: "DecryptionBgcZgp", data: bytes, size: int): + try: + decrypted_zgpriv_data = decryption_bgc_zgp.decrypt_aes_ecb( + inp=data, + size=size + ) + return decryption_bgc_zgp.process_zgpriv(data=decrypted_zgpriv_data, is_sl3000=False) + except ValueError: + return None + + +class DecryptionBgcZgp: + def __init__(self, boot_keys: dict, playready_keys: dict): + self.boot_keys = boot_keys + self.playready_keys = playready_keys + self.boot_key: Optional[bytes] = None + self.brand: Optional[str] = None + self.bgroupcert_start_magic = b"CHAI" + self.bgroupcert_end_magic = b"\x93\xfa\xc5\xab" + + def decrypt_aes_ecb(self,inp: Union[Path, bytes],size: Optional[int] = None, is_bgroupcert: bool = False) -> bytes: + if isinstance(inp, bytes): + encrypted_data = inp + else: + encrypted_data = self.get_file_data(inp) + + for brand, keys in self.boot_keys.items(): + for model, boot_key in keys.items(): + if self.boot_key: + key = self.boot_key + else: + key = self.hex_to_bytes(boot_key) + + cipher = AES.new(key, AES.MODE_ECB) + + if size: + data = cipher.decrypt(encrypted_data[:size]) + else: + data = cipher.decrypt(encrypted_data) + + if is_bgroupcert and not self.bgroupcert_start_magic in data and not self.bgroupcert_end_magic in data: + continue + + if is_bgroupcert: + self.boot_key = key + self.brand = brand + + return data + + raise ValueError( + "Error: Failed to decrypt the bgroupcert. No valid AES key matched or the decrypted data is invalid." + ) + + def decrypt_zgpriv(self, encrypted_data: bytes) -> bytes: + transient_key = self.hex_to_bytes(self.playready_keys["porting_kit"]["transient"]) + cmac = CMAC.new(transient_key, ciphermod=AES) + + intermediate_key = self.hex_to_bytes(self.playready_keys["porting_kit"]["intermediate"]) + intermediate_data = ( + b"\x01" + + intermediate_key + # Intermediate key + b"\x00" + + b"\x00" * 16 + # Context + b"\x00\x80" + ) + cmac.update(intermediate_data) + + wrapping_key = cmac.digest() + + wrapped_key = encrypted_data[:48] + unwrapped_key = aes_key_unwrap(wrapping_key, wrapped_key) + + return unwrapped_key + + def process_zgpriv(self, data: bytes, is_sl3000: bool) -> bytes: + content = self.remove_header(data) + + if is_sl3000: + content = self.decrypt_zgpriv(encrypted_data=content) + + if self.brand == "lg" and len(data) == 128: + zgpriv_data = content[64:-32] + else: + zgpriv_data = content[:32] + + return zgpriv_data + + def process_bgroupcert(self, data: bytes) -> bytes: + content = self.remove_header(data) + + start_index = content.find(self.bgroupcert_start_magic) + end_index = content.find(self.bgroupcert_end_magic) + + if start_index == -1: + raise ValueError("Error: BGCert start magic sequence not found.") + + if end_index == -1: + raise ValueError("Error: BGCert end magic sequence not found.") + + if end_index < start_index: + raise ValueError("Error: BGCert end magic sequence found before the start magic sequence.") + + bgroupcert_data = content[start_index:end_index + len(self.bgroupcert_end_magic)] + + return bgroupcert_data + + @staticmethod + def remove_header(data: bytes, header: Union[str, bytes] = "INNER_MSTAR_FILE") -> bytes: + if isinstance(header, str): + header = header.encode() + + header_index = data.find(header) + + if header_index == -1: + return data + + content = data[header_index + len(header):] + + return content + + @staticmethod + def hex_to_bytes(data: Union[str, bytes]) -> bytes: + if isinstance(data, str): + data = bytes.fromhex(data) + return data + + @staticmethod + def get_file_data(input_file: Path) -> bytes: + with open(input_file, "rb") as f: + data = f.read() + return data + + @staticmethod + def get_security_level(data: bytes) -> str: + if b"SL3000" in data: + security_level = "SL3000" + elif b"SL2000" in data: + security_level = "SL2000" + elif b"SL150" in data: + security_level = "SL150" + else: + security_level = "UNKNOWN" + return security_level + + def process_files(self, bgroupcert_file: Path, zgpriv_file: Path): + if not bgroupcert_file.exists(): + raise ValueError(f"Error: BGCert File not found -> {bgroupcert_file}") + + if not zgpriv_file.exists(): + raise ValueError(f"Error: ZGPriv File not found -> {zgpriv_file}") + + decrypted_bgroupcert_data = self.decrypt_aes_ecb( + inp=bgroupcert_file, + is_bgroupcert=True + ) + bgroupcert_data = self.process_bgroupcert( + data=decrypted_bgroupcert_data + ) + + print("Firmware:") + print(f" - Key : {decryption_bgc_zgp.boot_key.hex()}") + print(f" - Brand: {decryption_bgc_zgp.brand}") + + security_level = self.get_security_level(bgroupcert_data) + + decrypted_zgpriv_data = self.decrypt_aes_ecb( + inp=zgpriv_file + ) + zgpriv_data = self.process_zgpriv( + data=decrypted_zgpriv_data, + is_sl3000=security_level == "SL3000" + ) + + bgc_zgp_id = hex(crc32(bgroupcert_data + zgpriv_data))[2:] + folder_name = f"playready_decryption_bgc_zgp_pak/{self.brand}_{security_level}_{bgc_zgp_id}" + folder_path = Path(folder_name) + + if folder_path.exists(): + raise ValueError(f"Error: This certificate already exists in {folder_path}") + + folder_path.mkdir(parents=True, exist_ok=True) + + bgroupcert_out_path = Path(folder_path / "bgroupcert.dat") + bgroupcert_out_path.write_bytes(bgroupcert_data) + + zgpriv_out_path = Path(folder_path / "zgpriv.dat") + zgpriv_out_path.write_bytes(zgpriv_data) + + command = [ + "pyplayready", "create-device", + "-c", str(bgroupcert_out_path), + "-k", str(zgpriv_out_path), + "-o", folder_path + ] + + result = subprocess.run( + command, + capture_output=True, + text=True + ) + + prd_path = None + if result.returncode == 0: + prd_path = folder_path / Path(result.stderr.split("Saved to: ")[-1].strip()).name + + print("Decryption:") + print(f" - BGroupCert Security level: {security_level}") + print(f" - BGroupCert Path : {bgroupcert_out_path}") + print(f" - ZGPriv Path : {zgpriv_out_path}") + if prd_path: + print(f" - PRD Path : {prd_path}") + print(f" - Files exported to : {folder_path}") + + +if __name__ == "__main__": + if len(sys.argv) < 3: + print("Usage:") + print(" BGroupCert and ZGPriv") + print(f" decrypt_bgc_zgp_pak.py ") + print(" Pak") + print(f" decrypt_bgc_zgp_pak.py ") + sys.exit(1) + + boot_keys = { + "vestel": { + "MB130": "8981D083B3D53B3DF1AC529A70F244C0", + "MB_VARIANT_1": "24490B4CC95F739CE34138478E47139E" + }, + "lg": { + "HE_LCD_NC5U_AAADAIAA": "E33AB4C45C2570B8AD15A921F752DEB6", + "HE_DTV_W21A_AFADATAA": "0007FF4154534D92FC55AA0FFF0110E0" + } + } + + playready_keys = { + "porting_kit": { + "transient": "8B222FFD1E76195659CF2703898C427F", + "intermediate": "9CE93432C7D74016BA684763F801E136", + } + } + + # Pak / SeData + bgroupcert_sizes = { + "SL2000": 1760, + "SL3000": 1440 + } + zgpriv_sizes = { + "SL2000": 176, + "SL3000": 176 + } + + print("Process started...") + + if "boot" in sys.argv[1] and "sedata" in sys.argv[2]: + pak = Pak(bgroupcert_sizes, zgpriv_sizes) + + boot_file = Path(sys.argv[1]) + sedata_file = Path(sys.argv[2]) + + pak.process_files(boot_file, sedata_file, playready_keys) + else: + decryption_bgc_zgp = DecryptionBgcZgp( + boot_keys=boot_keys, + playready_keys=playready_keys + ) + + bgroupcert_file = Path(sys.argv[1]) + zgpriv_file = Path(sys.argv[2]) + + decryption_bgc_zgp.process_files(bgroupcert_file, zgpriv_file) + + print("\nProcess completed successfully.") + + sys.exit(0) \ No newline at end of file diff --git a/extract_emmc_bin_0.2.zip b/extract_emmc_bin_0.2.zip new file mode 100644 index 0000000..3023d7e Binary files /dev/null and b/extract_emmc_bin_0.2.zip differ diff --git a/ubidump/LICENSE b/ubidump/LICENSE new file mode 100644 index 0000000..c866ff1 --- /dev/null +++ b/ubidump/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2017 Willem Hengeveld + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/ubidump/README.md b/ubidump/README.md new file mode 100644 index 0000000..817d549 --- /dev/null +++ b/ubidump/README.md @@ -0,0 +1,129 @@ +UBIFS Dumper +============ + +This tool can be used to view or extract the contents of UBIFS images. + +About UBIFS +=========== + +UBIFS is a filesystem specifically designed for used on NAND flash chips. +NAND flash is organized in _eraseblocks_. _Eraseblocks_ can be erased, +appended to, and read. Erasing is a relatively expensive operation, and can +be done only a limited number of times. + +An UBIFS image contains four abstraction layers: + * eraseblocks + * volumes + * b-tree nodes + * inodes + +Each eraseblock contains info on how often it has been erased, and which volume it belongs to. +A volume contains a b-tree database with keys for: + * inodes, indexed by inode number + * direntries, indexed by inode number + name hash + * datablocks, indexed by inode number + block number + +The inodes are basically a standard unix filesystem, with direntries, regular files, symlinks, devices, etc. + +mounting images on linux +------------------------ + + modprobe nandsim first_id_byte=0x2c second_id_byte=0xac third_id_byte=0x90 fourth_id_byte=0x26 + nandwrite /dev/mtd0 firmware-image.ubi + modprobe ubi mtd=/dev/mtd0,4096 + mount -t ubifs -o ro /dev/ubi0_0 mnt + +This will mount a ubi image for a device with eraseblock size 0x40000. +If your image has a blocksize of 0x20000, use `fourth_id_byte=0x15`, and specify a pagesize of `2048` +with the second modprobe line. + +Usage +===== + +View the contents of the `/etc/passwd` file in the filesystem image `image.ubi`: + + python ubidump.py -c /etc/passwd image.ubi + +List the files in all the volumes in `image.ubi`: + + python ubidump.py -l image.ubi + +View the contents of b-tree database from the volumes in `image.ubi`: + + python ubidump.py -d image.ubi + +Extract an unsupported volume type, so you can analyze it with other tools: + + python ubidump.py -v 0 --saveraw unknownvol.bin image.ubi + +Note that often ubi images contain squashfs volumes, which can be extracted using tools like +[unsquashfs](https://github.com/plougher/squashfs-tools) or [rdsquashfs](https://github.com/AgentD/squashfs-tools-ng) + +Install +======= + +Install the required python modules using: + + pip install -r requirements.txt + +or as a pip package: + + pip install ubidump + +You may need to manually install your operarating system libraries for lzo first: + +on linux: + + apt install liblzo2-dev + +on MacOS: + + brew install lzo + +maybe you need to build the python library like this: + + LDFLAGS=-L/usr/local/lib CFLAGS=-I/usr/local/include/lzo pip3 install python-lzo + + +When you need zstd compression, you will need to install the `zstandard` module. + + +Dependencies +============ + + * python2 or python3 + * python-lzo ( >= 1.09, which introduces the 'header=False' argument ) + * crcmod + * optional: zstandard + +TODO +==== + + * add option to select a volume + * add option to select a older `master` node + * parse the journal + * analyze b-tree structure for unused nodes + * analyze fs structure for unused inodes, dirents + * verify that data block size equals the size mentioned in the inode. + * add support for ubifs ( without the ubi layer ) + * add option to extract a raw volume. + +References +========== + + * the ubifs/mtd tools http://linux-mtd.infradead.org/ + * git repos can be found [here](http://git.infradead.org/) + +Similar tools +============= + + * another python tool [on github](https://github.com/jrspruitt/ubi_reader/) + * does not support listing files. + * a closed source windows tool [here](http://ubidump.oozoon.de/) + * ubi-utils/ubidump.c [on the mtd mailinglist](http://lists.infradead.org/pipermail/linux-mtd/2014-July/054547.html) + +Author +====== + +Willem Hengeveld + diff --git a/ubidump/requirements.txt b/ubidump/requirements.txt new file mode 100644 index 0000000..09335b5 --- /dev/null +++ b/ubidump/requirements.txt @@ -0,0 +1,2 @@ +python-lzo>=1.11 +crcmod>=1.7 diff --git a/ubidump/setup.py b/ubidump/setup.py new file mode 100644 index 0000000..fd09ddf --- /dev/null +++ b/ubidump/setup.py @@ -0,0 +1,48 @@ +from setuptools import setup +setup( + name = "ubidump", + version = "1.0.0", + entry_points = { + 'console_scripts': ['ubidump=ubidump:main'], + }, + install_requires=[ + "python-lzo>=1.11", + "crcmod>=1.7", + ], + py_modules=['ubidump'], + author = "Willem Hengeveld", + author_email = "itsme@xs4all.nl", + description = "Commandline tool for viewing or extracting UBIFS images.", + long_description=""" +This tool can be used to view or extract the contents of UBIFS images. + +View the contents of the `/etc/passwd` file in the filesystem image `image.ubi`: + + ubidump -c /etc/passwd image.ubi + +List the files in all the volumes in `image.ubi`: + + ubidump -l image.ubi + +View the contents of b-tree database from the volumes in `image.ubi`: + + ubidump -d image.ubi +""", + + license = "MIT", + keywords = "ubifs commandline", + url = "https://github.com/nlitsme/ubidump/", + classifiers = [ + 'Environment :: Console', + 'Intended Audience :: End Users/Desktop', + 'Intended Audience :: Developers', + 'License :: OSI Approved :: MIT License', + 'Operating System :: OS Independent', + 'Programming Language :: Python :: 2', + 'Programming Language :: Python :: 3', + 'Topic :: Utilities', + 'Topic :: Software Development :: Version Control :: Git', + 'Topic :: System :: Filesystems', + ], +) + diff --git a/ubidump/ubidump.py b/ubidump/ubidump.py new file mode 100755 index 0000000..dc91ab0 --- /dev/null +++ b/ubidump/ubidump.py @@ -0,0 +1,1812 @@ +#!/usr/bin/python3 +""" +Tool for listing and extracting data from an UBI (Unsorted Block Image) image. + +(C) 2017 by Willem Hengeveld +""" +from __future__ import division, print_function +import crcmod.predefined +import argparse +import struct +from binascii import b2a_hex +import lzo +import zlib +import os +import errno +import datetime +import sys +from collections import defaultdict + + + +try: + import zstandard as zstd +except ImportError: + zstd = None + +if sys.version_info[0] == 2: + stdin = sys.stdin + stdout = sys.stdout +else: + stdin = sys.stdin.buffer + stdout = sys.stdout.buffer + +if sys.version_info[0] == 2: + reload(sys) + sys.setdefaultencoding('utf-8') + +import importlib.metadata + +def check_dependencies(dependencies): + for dep in dependencies: + package, version_spec = dep.split('>=') # Only handle simple '>=X.Y' dependencies + installed_version = importlib.metadata.version(package) + if installed_version < version_spec: + raise ModuleNotFoundError() + + +# Note that ubidump depends on the 'header=False' argument to compress/decompress, +# introduced in python-lzo 1.09 + +dependencies = [ + 'python-lzo>=1.11', + 'crcmod>=1.7' +] + +check_dependencies(dependencies) + + +if sys.version_info[0] == 3: + def cmp(a,b): + return (a>b) - (a self.maxofs: + return self.fh.seek(self.maxofs) + return self.fh.seek(dist+self.baseofs)-self.baseofs + case 1: + if abspos+dist > self.maxofs: + return self.fh.seek(self.maxofs) + return self.fh.seek(dist, 1)-self.baseofs + case 2: + return self.fh.seek(self.maxofs+dist, 2)-self.baseofs + + + +class SeekableStdout: + """ + Wrapper for stdout, which allows forward seeking. + """ + def __init__(self): + self.pos = 0 + + def seek(self, newpos, whence=os.SEEK_SET): + if whence==os.SEEK_SET: + if newpos < self.pos: + print("WARNING: can't seek stdout backwards") + return -1 + if newpos > self.pos: + self.seekforward(newpos - self.pos) + self.pos = newpos + elif whence==os.SEEK_CUR: + if newpos < 0: + print("WARNING: can't seek stdout backwards") + return -1 + if newpos > 0: + self.seekforward(newpos) + self.pos += newpos + else: + print("WARNING: can't seek stdout from EOF") + return -1 + + def seekforward(self, size): + """ + Seek forward by writing NUL bytes. + """ + sys.stdout.flush() + chunk = b"\x00" * 0x10000 + while size > 0: + if len(chunk) > size: + chunk = chunk[:size] + stdout.write(chunk) + size -= len(chunk) + + def write(self, data): + sys.stdout.flush() + stdout.write(data) + self.pos += len(data) + + def truncate(self, size): + """ + Ignore this. + """ + pass + + +########### block level objects ############ + +class UbiEcHeader: + """ + The Erase count header + """ + hdrsize = 16*4 + def __init__(self): + self.magic = b'UBI#' + def parse(self, data): + ( + self.magic, # 4s + self.version, # B + # 3x + self.erasecount, # Q + self.vid_hdr_ofs, # L + self.data_ofs, # L + self.image_seq, # L + # 32x + hdr_crc, # L + ) = struct.unpack(">4sB3xQLLL32xL", data) + if self.magic != b'UBI#': + raise Exception("UBI# ec hdr magic num mismatch") + if hdr_crc != crc32(data[:-4]): + raise Exception("crc mismatch") + def encode(self): + data = struct.pack(">4sB3xQLLL32x", self.magic, self.version, self.erasecount, self.vid_hdr_ofs, self.data_ofs, \ + self.image_seq) + return data + struct.pack(">L", crc32(data)) + def __repr__(self): + return "EC: magic=%s, v%d, ec=%d, vidhdr=%x, data=%x, imgseq=%x" % ( + self.magic, self.version, self.erasecount, self.vid_hdr_ofs, + self.data_ofs, self.image_seq) + + +VTBL_VOLID=0x7fffefff +class UbiVidHead: + """ + The volume id header + """ + hdrsize = 16*4 + def __init__(self): + self.vol_id = VTBL_VOLID + self.magic = b'UBI!' + + def parse(self, data): + ( + self.magic, # 4s + self.version, # B + self.vol_type, # B + self.copy_flag, # B + self.compat, # B + self.vol_id, # L + self.lnum, # L + # 4x + self.data_size, # L + self.used_ebs, # L + self.data_pad, # L + self.data_crc, # L + # 4x + self.sqnum, # Q + # 12x + hdr_crc, # L + )= struct.unpack(">4s4BLL4x4L4xQ12xL", data) + if self.magic != b'UBI!': + raise Exception("UBI! volid magic num mismatch") + if hdr_crc != crc32(data[:-4]): + raise Exception("crc mismatch") + + def encode(self): + data = struct.pack(">4s4BLL4x4L4xQ12x", self.magic, self.version, self.vol_type, self.copy_flag, self.compat, self.vol_id, \ + self.lnum, self.data_size, self.used_ebs, self.data_pad, self.data_crc, \ + self.sqnum) + return data + struct.pack(">L", crc32(data)) + + def __repr__(self): + if hasattr(self, 'magic'): + return "VID: magic=%s, v%d, vt=%d, cp=%d, compat=%d, volid=%x, lnum=[%d], " \ + "dsize=%d, usedebs=%d, datapad=%d, datacrc=%x, sqnum=%d" % ( + self.magic, self.version, self.vol_type, self.copy_flag, self.compat, + self.vol_id, self.lnum, self.data_size, self.used_ebs, self.data_pad, + self.data_crc, self.sqnum) + else: + return "VID" + + +class UbiVtblRecord: + """ + A volume table record. + """ + hdrsize = 4*4+128+24+4 + def __init__(self): + self.reserved_pebs = 0 + def parse(self, data): + ( + self.reserved_pebs, # L + self.alignment, # L + self.data_pad, # L + self.vol_type, # B + self.upd_marker, # B + name_len, # H + self.name, # 128s + self.flags, # B + # 23x + crc, # L + ) = struct.unpack(">3LBBH128sB23xL", data) + if crc != crc32(data[:-4]): + raise Exception("crc mismatch") + self.name = self.name[:name_len] + def encode(self): + data = struct.pack(">3LBBH128sB23x", self.reserved_pebs, self.alignment, self.data_pad, self.vol_type, self.upd_marker, \ + name_len, self.name, self.flags) + return data + struct.pack(">L", crc32(data)) + def empty(self): + if hasattr(self, 'name'): + return self.reserved_pebs==0 and self.alignment==0 and self.data_pad==0 \ + and self.vol_type==0 and self.upd_marker==0 and self.name==b'' and self.flags==0 + else: + return True + + def __repr__(self): + return "VREC: rsvpebs=%d, align=%d, datapad=%d, voltype=%d, updmark=%d, flags=%x, name=%s" % ( + self.reserved_pebs, self.alignment, self.data_pad, self.vol_type, + self.upd_marker, self.flags, self.name) + + +class UbiVolume: + """ + provides read access to a specific volume in an UBI image. + """ + def __init__(self, blks, volid, dataofs): + """ + takes an UbiBlocks object, a volumeid, and a baseoffset. + """ + self.blks = blks + self.volid = volid + self.dataofs = dataofs + + def read(self, lnum, offs, size): + return self.blks.readvolume(self.volid, lnum, self.dataofs+offs, size) + + def write(self, lnum, offs, data): + return self.blks.writevolume(self.volid, lnum, self.dataofs+offs, data) + + def hexdump(self, lnum, offs, size): + print("[%03d:0x%05x] %s" % (lnum, offs, b2a_hex(self.read(lnum, offs, size)))) + + def saveraw(self, filename): + with open(filename, "wb") as fh: + for lnum in range(self.blks.maxlebs): + data = self.read(lnum, 0, self.blks.leb_size-self.dataofs) + fh.write(data) + +class RawVolume: + """ + provides read access to a raw data volume + """ + def __init__(self, fh): + self.fh = fh + self.leb_size = self.find_block_size() + + def read(self, lnum, offs, size): + self.fh.seek(lnum*self.leb_size+offs) + return self.fh.read(size) + + def write(self, lnum, offs, data): + self.fh.seek(lnum*self.leb_size+offs) + return self.fh.write(data) + + def find_block_size(self): + self.fh.seek(0) + data = self.fh.read(0x200) + values = struct.unpack("<12L", data[:4*12]) + if values[0] == 0x06101831 and values[5] == 6: + # node magic, and nodetype == 6:superblock + return values[9] # sb.leb_size + + def hexdump(self, lnum, offs, size): + print("R:[%03d:0x%05x] %s" % (lnum, offs, b2a_hex(self.read(lnum, offs, size)))) + + def saveraw(self, filename): + print("TODO") + + + +class UbiBlocks: + """ + Block level access to an UBI image. + """ + def __init__(self, fh): + self.fh = fh + self.leb_size = self.find_blocksize() + + fh.seek(0, os.SEEK_END) + self.filesize = fh.tell() + self.maxlebs = self.filesize // self.leb_size + + self.scanblocks() + + if not VTBL_VOLID in self.vmap: + print("no volume directory, %d physical volumes" % len(self.vmap)) + return + self.scanvtbls(self.vmap[VTBL_VOLID][0]) + + print("%d named volumes found, %d physical volumes, blocksize=0x%x" % (self.nr_named, len(self.vmap), self.leb_size)) + + def find_blocksize(self): + self.fh.seek(0) + magic = self.fh.read(4) + if magic != b'UBI#': + raise Exception("not an UBI image") + for log_blocksize in range(10,20): + self.fh.seek(1< physical lnum + """ + self.vmap = defaultdict(lambda : defaultdict(int)) + for lnum in range(self.maxlebs): + + try: + ec = UbiEcHeader() + hdr = self.readblock(lnum, 0, ec.hdrsize) + ec.parse(hdr) + + vid = UbiVidHead() + viddata = self.readblock(lnum, ec.vid_hdr_ofs, vid.hdrsize) + vid.parse(viddata) + + self.vmap[vid.vol_id][vid.lnum] = lnum + except: + pass + + def readblock(self, lnum, offs, size): + self.fh.seek(lnum * self.leb_size + offs) + return self.fh.read(size) + + def writeblock(self, lnum, offs, data): + self.fh.seek(lnum * self.leb_size + offs) + return self.fh.write(data) + + def hexdump(self, lnum, offs, size): + print("[%03d:0x%05x] %s" % (lnum, offs, b2a_hex(self.readblock(lnum, offs, size)))) + + def scanvtbls(self, lnum): + """ + reads the volume table + """ + ec = UbiEcHeader() + hdr = self.readblock(lnum, 0, ec.hdrsize) + ec.parse(hdr) + + self.ec = ec + + try: + vid = UbiVidHead() + viddata = self.readblock(lnum, ec.vid_hdr_ofs, vid.hdrsize) + vid.parse(viddata) + + self.vid = vid + + self.vtbl = [] + self.nr_named = 0 + + if vid.vol_id == VTBL_VOLID: + for i in range(128): + vrec = UbiVtblRecord() + vrecdata = self.readblock(lnum, self.ec.data_ofs + i * vrec.hdrsize, vrec.hdrsize) + vrec.parse(vrecdata) + + self.vtbl.append(vrec) + + if not vrec.empty(): + self.nr_named += 1 + except: + print(ec) + print("viddata:%s" % b2a_hex(viddata)) + import traceback + traceback.print_exc() + + self.vid = UbiVidHead() + self.vtbl = [ UbiVtblRecord() ] + + def dumpvtbl(self): + print("%s %s" % (self.ec, self.vid)) + for v in self.vtbl: + if not v.empty(): + print(" %s" % v) + + for volid, lmap in self.vmap.items(): + print("volume %x : %d lebs" % (volid, len(lmap))) + + def nr_named(self): + return self.nr_named + + def getvrec(self, volid): + return self.vtbl[volid] + + def getvolume(self, volid): + return UbiVolume(self, volid, self.ec.data_ofs) + + def readvolume(self, volid, lnum, offs, size): + physlnum = self.vmap[volid].get(lnum, None) + if physlnum is None: + raise Exception("volume does not contain lnum") + return self.readblock(physlnum, offs, size) + + def writevolume(self, volid, lnum, offs, data): + physlnum = self.vmap[volid].get(lnum, None) + if physlnum is None: + raise Exception("volume does not contain lnum") + return self.writeblock(physlnum, offs, data) + + + +################ filesytem level objects ################## + +UBIFS_INO_KEY = 0 +UBIFS_DATA_KEY = 1 +UBIFS_DENT_KEY = 2 +UBIFS_XENT_KEY = 3 + +""" +key format: (inum, (type<<29) | value) + +key types: UBIFS_*_KEY: INO, DATA, DENT, XENT + +inode: + 0 +dirent: + hash +xent: + hash +data: + 0 + +""" +def unpackkey(key): + if len(key)==16 and key[8:]!=b'\x00'*8: + print("key has more than 8 bytes: %s" % b2a_hex(key)) + inum, value = struct.unpack(">29, value&0x1FFFFFFF) + + +def packkey(key): + inum, ityp, value = key + return struct.pack(">4 + a &= 0xFFFFFFFF + a *= 11 + a &= 0xFFFFFFFF + a &= 0x1FFFFFFF + if a <= 2: a += 3 + return a + + +COMPR_NONE = 0 +COMPR_LZO = 1 +COMPR_ZLIB = 2 +COMPR_ZSTD = 3 +def decompress(data, buflen, compr_type): + if compr_type==COMPR_NONE: + return data + elif compr_type==COMPR_LZO: + return lzo.decompress(data, False, buflen) + elif compr_type==COMPR_ZLIB: + return zlib.decompress(data, -zlib.MAX_WBITS) + elif compr_type==COMPR_ZSTD and zstd: + return zstd.decompress(data) + else: + raise Exception("unknown compression type") + + +def compress(data, compr_type): + if compr_type==COMPR_NONE: + return data + elif compr_type==COMPR_LZO: + return lzo.compress(data, False) + elif compr_type==COMPR_ZLIB: + return zlib.compress(data, -zlib.MAX_WBITS) + elif compr_type==COMPR_ZSTD and zstd: + return zstd.compress(data) + else: + raise Exception("unknown compression type") + + +# the blocksize is a fixed value, independent of the underlying device. +UBIFS_BLOCKSIZE = 4096 + +########### objects for the various node types ########### +class UbiFsInode: + """ + Leafnode in the B-tree, contains information for a specific file or directory. + + It's b-tree key is formatted like this: + * 32 bit inode number + * the 3 bit node type: 0 for inode + * a 29 bit zero value. + """ + nodetype = 0 + hdrsize = 16 + 5*8 + 11*4 + 2*4 + 28 + + # note: these values are like the posix stat values, + # the UbiFsDirEntry uses a different set of values for the same types. + ITYPE_FIFO = 1 # S_IFIFO + ITYPE_CHARDEV = 2 # S_IFCHR + ITYPE_DIRECTORY = 4 # S_IFDIR + ITYPE_BLOCKDEV = 6 # S_IFBLK + ITYPE_REGULAR = 8 # S_IFREG + ITYPE_SYMLINK = 10 # S_IFLNK + ITYPE_SOCKET = 12 # S_IFSOCK + + def __init__(self): + pass + def parse(self, data): + ( + self.key, # 16s + self.creat_sqnum, # Q + self.size, # Q + self.atime_sec, # Q + self.ctime_sec, # Q + self.mtime_sec, # Q + self.atime_nsec, # L + self.ctime_nsec, # L + self.mtime_nsec, # L + self.nlink, # L + self.uid, # L + self.gid, # L + self.mode, # L + self.flags, # L + self.data_len, # L + self.xattr_cnt, # L + self.xattr_size, # L + # 4x + self.xattr_names, # L + self.compr_type # H + # 26x + ) = struct.unpack("<16s5Q11L4xLH26x", data[:self.hdrsize]) + + # data contains the symlink string for symbolic links + self.data = data[self.hdrsize:] + if len(self.data) != self.data_len: + raise Exception("inode data size mismatch") + + def encode(self): + return struct.pack("<16s5Q11L4xLH26x", \ + self.key, self.creat_sqnum, self.size, self.atime_sec, self.ctime_sec, self.mtime_sec, \ + self.atime_nsec, self.ctime_nsec, self.mtime_nsec, self.nlink, self.uid, self.gid, \ + self.mode, self.flags, self.data_len, self.xattr_cnt, self.xattr_size, \ + self.xattr_names, self.compr_type) + + def inodedata_repr(self): + types = ["0", "FIFO", "CHAR", "3", "DIRENT", "5", "BLOCK", "7", "FILE", "9", "LINK", "11", "SOCK", "13", "14", "15"] + typ = self.nodetype() + if typ in (self.ITYPE_CHARDEV, self.ITYPE_BLOCKDEV): # CHAR or BLOCK + return types[typ] + ":" + b2a_hex(self.data).decode('ascii') + return types[typ] + ":%s" % self.data + + def __repr__(self): + return "INODE: key=%s, sq=%04x, size=%5d, n=%3d, uid:gid=%d:%d, mode=%06o, fl=%x, dl=%3d, " \ + "xattr=%d:%d, xanames=%d, comp=%d -- %s" % (formatkey(self.key), self.creat_sqnum, + self.size, self.nlink, self.uid, self.gid, self.mode, self.flags, self.data_len, + self.xattr_cnt, self.xattr_size, self.xattr_names, self.compr_type, self.inodedata_repr()) + # todo: self.atime_sec, self.ctime_sec, self.mtime_sec, self.atime_nsec, self.ctime_nsec, self.mtime_nsec, + def atime(self): + return self.atime_sec + self.atime_nsec / 1000000000.0 + def mtime(self): + return self.mtime_sec + self.mtime_nsec / 1000000000.0 + def ctime(self): + return self.ctime_sec + self.ctime_nsec / 1000000000.0 + def devnum(self): + ma, mi = struct.unpack("BB", self.data[:2]) + return (ma, mi) + def nodetype(self): + return (self.mode >> 12) & 0xF + + +class UbiFsData: + """ + Leafnode in the B-tree, contains a datablock + + It's b-tree key is formatted like this: + * 32 bit inode number + * the 3 bit node type: 1 for data + * a 29 bit file blocknumber + """ + nodetype = 1 + hdrsize = 16 + 4 + 4 + def __init__(self): + pass + def parse(self, data): + ( + self.key, # 16s + self.size, # L + self.compr_type, # H + # 2x + )= struct.unpack("<16sLH2x", data[:self.hdrsize]) + self.data = decompress(data[self.hdrsize:], self.size, self.compr_type) + if len(self.data) != self.size: + raise Exception("data size mismatch") + + def encode(self): + return struct.pack("<16sLH2x", self.key, len(self.data), self.compr_type) + compress(self.data, self.compr_type) + + def __repr__(self): + return "DATA: key=%s, size=%d, comp=%d" % (formatkey(self.key), self.size, self.compr_type) + + +class UbiFsDirEntry: + """ + Leafnode in the B-tree, contains a directory entry. + + Properties: + * key + * inum + * type + * name + + It's b-tree key is formatted like this: + * 32 bit inode number ( of the directory containing this dirent ) + * the 3 bit node type: 2 for dirent + * a 29 bit name hash + """ + TYPE_REGULAR = 0 + TYPE_DIRECTORY = 1 + TYPE_SYMLINK = 2 + TYPE_BLOCKDEV = 3 + TYPE_CHARDEV = 4 + TYPE_FIFO = 5 + TYPE_SOCKET = 6 + + ALL_TYPES = 127 + + nodetype = 2 + hdrsize = 16 + 8+4+4 + + def __init__(self): + pass + def parse(self, data): + ( + self.key, # 16s + self.inum, # Q + # x + self.type, # B + nlen, # H + # 4x + ) = struct.unpack("<16sQxBH4x", data[:self.hdrsize]) + self.name = data[self.hdrsize:-1] + if len(self.name) != nlen: + raise Exception("name length mismatch") + def encode(self): + return struct.pack("<16sQxBH4x", self.key, self.inum, self.type, nlen) + def __repr__(self): + typenames = [ 'reg', 'dir', 'lnk', 'blk', 'chr', 'fifo', 'sock' ] + # type: UBIFS_ITYPE_REG, UBIFS_ITYPE_DIR, etc + return "DIRENT: key=%s, inum=%05d, type=%d:%s -- %s" % (formatkey(self.key), self.inum, self.type, typenames[self.type], self.name) + + +class UbiFsExtendedAttribute: + """ + Leafnode in the B-tree, contains extended attributes. + + It's b-tree key is formatted like this: + * 32 bit inode number ( of the directory containing this dirent ) + * the 3 bit node type: 3 for xent + * a 29 bit hash of the attribute name. + """ + nodetype = 3 + hdrsize = 0 + def __init__(self): + pass + def parse(self, data): + # TODO + pass + def __repr__(self): + return "EA" + + +class UbiFsTruncation: + """ + Used only in the journal + """ + nodetype = 4 + hdrsize = 4+12+2*8 + def __init__(self): + pass + def parse(self, data): + ( + self.inum, # L + # 12x + self.old_size, # Q + self.new_size, # Q + ) = struct.unpack("%d" % (self.inum, self.old_size, self.new_size) + + +class UbiFsPadding: + """ + """ + nodetype = 5 + hdrsize = 4 + def __init__(self): + pass + def parse(self, data): + self.pad_len, = struct.unpack_from("= len(data): + raise Exception("parse error") + branch = self.Branch() + branch.parse(data[o:o+branch.hdrsize]) ; o += branch.hdrsize + branch.key = data[o:o+8] ; o += 8 + self.branches.append(branch) + def encode(self): + data = struct.pack("= key, returns relation to the key + + these are all possibilities with 1 branches + + key < b0 -> 'lt', 0 + key == b0 -> 'eq', 0 + b0 < key -> 'gt', 0 + + these are all possibilities with 2 branches + key < b0 < b1 -> 'lt', 0 + key == b0 < b1 -> 'eq', 0 + b0 < key < b1 -> 'gt', 0 + b0 < key == b1 -> 'eq', 1 + b0 < b1 < key -> 'gt', 1 + + add two more options for every next branch. + + """ + for i, b in enumerate(self.branches): + c = comparekeys(key, b.key) + if c<0: + if i==0: + # before first item + return ('lt', i) + else: + # between prev and this item + return ('gt', i-1) + elif c==0: + # found item + return ('eq', i) + # else c>0 -> continue searching + + # after last item + return ('gt', i) + + + +class UbiFsCommitStart: + nodetype = 10 + hdrsize = 8 + def __init__(self): + pass + def parse(self, data): + self.cmt_no, = struct.unpack(" want = %08x" % ( b2a_hex(hdrdata), b2a_hex(nodedata), crc32(hdrdata[8:] + nodedata), ch.crc)) + raise Exception("invalid node crc") + node.parse(nodedata) + + return node + + def writenode(self, node): + """ + Write a node from a lnum + offset. + + TODO + """ + + nodedata = node.encode() + + node.hdr.len = len(nodedata) + node.hdr.hdrsize + hdrdata = node.hdr.encode() + + node.hdr.crc = crc32(hdrdata[8:] + nodedata) + hdrdata = node.hdr.encode() + + self.vol.write(node.hdr.lnum, node.hdr.offs, hdrdata+nodedata) + + def dumpnode(self, lnum, offs): + node = self.readnode(lnum, offs) + print("[%03d:0x%05x-0x%05x] %s" % (lnum, offs, offs+node.hdr.len, node)) + + def printrecursive(self, idx): + """ + Recursively dump all b-tree nodes. + """ + print("[%03d:0x%05x-0x%05x] %s" % (idx.hdr.lnum, idx.hdr.offs, idx.hdr.offs+idx.hdr.len, idx)) + if not hasattr(idx, 'branches'): + #print(idx) + return + for i, b in enumerate(idx.branches): + print("%s %d %s -> " % (" " * (6-idx.level), i, b), end=" ") + try: + n = self.readnode(b.lnum, b.offs) + self.printrecursive(n) + except Exception as e: + print("ERROR %s" % e) + + def printmbitems(self): + print("--log [%03d] .. [%03d]" % (self.mst.log_lnum, self.mst.log_lnum+self.sb.log_lebs-1)) + try: + self.dumpnode(self.mst.log_lnum, 0) + self.vol.hexdump(self.mst.log_lnum, 0, 0x100) + except Exception as e: + print(e) + print("--root") + try: + self.dumpnode(self.mst.root_lnum, self.mst.root_offs) + self.vol.hexdump(self.mst.root_lnum, self.mst.root_offs, self.mst.root_len) + except Exception as e: + print(e) + print("--gc [%03d]" % (self.mst.gc_lnum)) + try: + self.vol.hexdump(self.mst.gc_lnum, 0, 0x100) + except Exception as e: + print(e) + print("--ihead") + try: + self.vol.hexdump(self.mst.ihead_lnum, self.mst.ihead_offs, self.mst.index_size) + except Exception as e: + print(e) + print("--lpt [%03d] .. [%03d]" % (self.mst.lpt_lnum, self.mst.lpt_lnum+self.sb.lpt_lebs-1)) + try: + self.vol.hexdump(self.mst.lpt_lnum, self.mst.lpt_offs, 0x100) + except Exception as e: + print(e) + print("--nhead") + try: + self.vol.hexdump(self.mst.nhead_lnum, self.mst.nhead_offs, 0x100) + except Exception as e: + print(e) + print("--ltab") + try: + self.vol.hexdump(self.mst.ltab_lnum, self.mst.ltab_offs, 0x100) + except Exception as e: + print(e) + print("--lsave") + try: + self.vol.hexdump(self.mst.lsave_lnum, self.mst.lsave_offs, 0x100) + self.dumpnode(self.mst.lsave_lnum, self.mst.lsave_offs) + except Exception as e: + print(e) + print("--lscan") + try: + self.vol.hexdump(self.mst.lscan_lnum, 0, 0x100) + self.dumpnode(self.mst.lscan_lnum, 0) + except Exception as e: + print(e) + + class Cursor: + """ + The Cursor represents a position in the b-tree. + """ + def __init__(self, fs, stack): + self.fs = fs + self.stack = stack + + def next(self): + """ move cursor to next entry """ + if not self.stack: + # starting at 'eof' + page = self.fs.root + ix = 0 + else: + page, ix = self.stack.pop() + while self.stack and ix==len(page.branches)-1: + page, ix = self.stack.pop() + if ix==len(page.branches)-1: + return + ix += 1 + self.stack.append( (page, ix) ) + while page.level: + page = self.fs.readnode(page.branches[ix].lnum, page.branches[ix].offs) + ix = 0 + self.stack.append( (page, ix) ) + + def prev(self): + """ move cursor to next entry """ + if not self.stack: + # starting at 'eof' + page = self.fs.root + ix = len(page.branches)-1 + else: + page, ix = self.stack.pop() + while self.stack and ix==0: + page, ix = self.stack.pop() + if ix==0: + return + ix -= 1 + self.stack.append( (page, ix) ) + while page.level: + page = self.fs.readnode(page.branches[ix].lnum, page.branches[ix].offs) + ix = len(page.branches)-1 + self.stack.append( (page, ix) ) + def eof(self): + return len(self.stack)==0 + def __repr__(self): + return "[%s]" % (",".join(str(_[1]) for _ in self.stack)) + + def getkey(self): + """ + Returns the key tuple for the current item + """ + if self.stack: + page, ix = self.stack[-1] + return unpackkey(page.branches[ix].key) + + def getnode(self): + """ + Returns the node object for the current item + """ + if self.stack: + page, ix = self.stack[-1] + return self.fs.readnode(page.branches[ix].lnum, page.branches[ix].offs) + + + def find(self, rel, key, root=None): + """ + returns a cursor for the relation + key. + + ('lt', searchkey) searches for the highest ordered node with a key less than `searchkey` + ('ge', searchkey) searches for the lowest ordered node with a key greater or equal to `searchkey` + etc... + + """ + stack = [] + page = self.root if root is None else root + + while len(stack)<32: + act, ix = page.find(packkey(key)) + stack.append( (page, ix) ) + if page.level==0: + break + page = self.readnode(page.branches[ix].lnum, page.branches[ix].offs) + + if len(stack)==32: + raise Exception("tree too deep") + + cursor = self.Cursor(self, stack) + + """ + act rel: | lt le eq ge gt + (lt, 0) key < 0 | None None None pass pass + (eq, ix) key == ix | -- pass pass pass ++ + (gt, ix) ix < key < ix+1 | pass pass None ++ ++ + """ + + if (act+rel) in ('gtlt', 'gtle', 'eqle', 'eqeq', 'eqge', 'ltge', 'ltgt'): + return cursor + if (act+rel) in ('ltlt', 'ltle', 'lteq', 'gteq'): + return None + if (act+rel) == 'eqlt': + cursor.prev() + return cursor + if (act+rel) in ('eqgt', 'gtge', 'gtgt'): + cursor.next() + return cursor + + raise Exception("unexpected case") + + def setkey(self, key, node): + pass + #todo - adding a + + + def recursefiles(self, inum, path, filter = 1< inode.size: + print("WARNING: found more (%d bytes) for inode %05d, than specified in the inode(%d bytes) -- %s" % (savedlen, inum, inode.size, ubiname)) + elif savedlen < inode.size: + # padding file with zeros + fh.seek(inode.size) + fh.truncate(inode.size) + + def findfile(self, path, inum = 1): + """ + find the inode of the given `path`, starting in the directory specified by `inum` + + `path` must be a list of path elements. ( so not a '/' separated path string ) + """ + itype = UbiFsDirEntry.TYPE_DIRECTORY + for part in path: + if itype!=UbiFsDirEntry.TYPE_DIRECTORY: + # not a directory + return None + c = self.find('eq', (inum, UBIFS_DENT_KEY, namehash(part))) + if not c or c.eof(): + # not found + return None + dirent = c.getnode() + inum, itype = dirent.inum, dirent.type + return inum + + +def modestring(mode): + """ + return a "-rw-r--r--" style mode string + """ + # 4 bits type + # 3 bits suid/sgid/sticky + # 3 bits owner perm + # 3 bits group perm + # 3 bits other perm + typechar = "?pc?d?b?-?l?s???" + + def rwx(bits, extra, xchar): + rflag = "-r"[(bits>>2)&1] + wflag = "-w"[(bits>>1)&1] + xflag = ("-x" + xchar.upper() + xchar.lower())[(bits&1)+2*extra] + + return rflag + wflag + xflag + + return typechar[(mode>>12)&15] + rwx((mode>>6)&7, (mode>>11)&1, 's') + rwx((mode>>3)&7, (mode>>10)&1, 's') + rwx(mode&7, (mode>>9)&1, 't') + + +def timestring(t): + return datetime.datetime.fromtimestamp(t, datetime.timezone.utc).strftime("%Y-%m-%d %H:%M:%S") + + +def processvolume(vol, volumename, args): + """ + Perform actions specified by `args` on `vol`. + + `vol` can be either a RawVolume ( an image file containing only the filesystem, + no flash block management layer. + + Or a UbiVolume, with the block management layer. + """ + nr_symlink_warnings = 0 + + fs = UbiFs(vol, args.masteroffset) + if args.verbose: + fs.dumpfs() + + root = fs.root + if args.root: + lnum, offset = args.root.split(':', 1) + lnum = int(lnum, 16) + offset = int(offset, 16) + root = fs.readnode(lnum, offset) + + if args.hexdump and isinstance(vol, RawVolume): + vol.hexdump(*args.hexdump) + if args.saveraw and isinstance(vol, RawVolume): + vol.saveraw(args.saveraw) + if args.nodedump: + fs.dumpnode(*args.nodedump) + + if args.dumptree: + fs.printrecursive(root) + if args.verbose: + fs.printmbitems() + if args.savedir: + savedir = args.savedir.encode(args.encoding) + + os.makedirs(savedir.decode(args.encoding) + '/' + volumename.decode(args.encoding), exist_ok=True) + count = 0 + for inum, path in fs.recursefiles(1, [], UbiFsDirEntry.ALL_TYPES, root=root): + c = fs.find('eq', (inum, UBIFS_INO_KEY, 0)) + inode = c.getnode() + typ = inode.nodetype() + + fullpath = os.path.join(*[savedir, volumename] + path) + try: + if typ == inode.ITYPE_FIFO: + os.mkfifo(fullpath) + elif typ == inode.ITYPE_SOCKET: + import socket as s + sock = s.socket(s.AF_UNIX) + sock.bind(fullpath) + elif typ == inode.ITYPE_SYMLINK: + try: + os.symlink(inode.data, fullpath) + except (AttributeError, OSError): + # python2 on windows does not support 'symlink', and with python3 + # you still need special permissions to create a symlink. So often + # on windows os.symlink will fail. + nr_symlink_warnings += 1 + elif typ == inode.ITYPE_DIRECTORY: + os.makedirs(fullpath) + elif typ == inode.ITYPE_REGULAR: + with open(fullpath, "wb") as fh: + fs.exportfile(inum, fh, os.path.join(*path)) + elif typ in (inode.ITYPE_BLOCKDEV, inode.ITYPE_CHARDEV): + try: + devnum = os.makedev(*inode.devnum()) + if devnum < 0: + devnum += 0x100000000 + os.mknod(fullpath, inode.mode, devnum) + except PermissionError as e: + # silently ignoring permission error + pass + else: + if args.verbose: + print("UNKNOWN inode type: %d" % typ) + continue + except OSError as e: + if e.errno != errno.EEXIST: + print(f"ERROR writing {fullpath}, {e}") + except Exception as e: + print(f"ERROR writing {fullpath}, {e}") + + if args.preserve and typ != inode.ITYPE_SYMLINK and os.path.exists(fullpath): + # note: we have to do this after closing the file, since the close after exportfile + # will update the last-modified time. + # the check for existence is because earlier mknod may fail when not root. + os.utime(fullpath, (inode.atime(), inode.mtime())) + os.chmod(fullpath, inode.mode) + try: + os.chown(fullpath, inode.uid, inode.gid) + except PermissionError as e: + # silently ignoring permission error + pass + + count += 1 + print("saved %d files" % count) + if nr_symlink_warnings: + print("Failed to create %d symlinks." % nr_symlink_warnings) + + if args.listfiles: + for inum, path in fs.recursefiles(1, [], UbiFsDirEntry.ALL_TYPES, root=root): + c = fs.find('eq', (inum, UBIFS_INO_KEY, 0)) + inode = c.getnode() + + if inode.nodetype() in (inode.ITYPE_CHARDEV, inode.ITYPE_BLOCKDEV): # char or block dev. + sizestr = "%d,%4d" % inode.devnum() + else: + sizestr = str(inode.size) + + if inode.nodetype() == inode.ITYPE_SYMLINK: + linkdata = inode.data + if args.encoding: + linkdata = linkdata.decode(args.encoding, 'ignore') + linkstr = " -> %s" % linkdata + else: + linkstr = "" + + filename = b"/".join(path) + if args.encoding: + filename = filename.decode(args.encoding, 'ignore') + print("%s %2d %-5d %-5d %10s %s %s%s" % (modestring(inode.mode), inode.nlink, inode.uid, inode.gid, sizestr, timestring(inode.mtime_sec), filename, linkstr)) + + for srcfile in args.cat: + if len(args.cat)>1: + print("==>", srcfile, "<==") + inum = fs.findfile(srcfile.lstrip('/').split('/')) + if inum: + fs.exportfile(inum, SeekableStdout(), srcfile) + if len(args.cat)>1: + print() + else: + print("Not found") + + +def processblocks(fh, args): + """ + Perform operations on a UbiBlocks type image: starting with bytes 'UBI#' + """ + blks = UbiBlocks(fh) + if args.verbose: + print("===== block =====") + blks.dumpvtbl() + if args.hexdump: + if args.volume is None: + blks.hexdump(*args.hexdump) + else: + vol = blks.getvolume(args.volume) + vol.hexdump(*args.hexdump) + + if args.saveraw: + if args.volume is None: + blks.saveraw(args.saveraw) + else: + vol = blks.getvolume(args.volume) + vol.saveraw(args.saveraw) + + for volid in range(128): + vrec = blks.getvrec(volid) + if vrec.empty(): + continue + vol = blks.getvolume(volid) + + try: + print("== volume %s ==" % vrec.name) + + processvolume(vol, vrec.name, args) + except Exception as e: + print("E: %s" % e) + if args.debug: + raise + +################################################## +# raw hexdumper +def findpattern(data, pattn, blocksize): + o = 0 + while o4sLQLLL32sL", data) + print("%08x: %s %08x %010x %08x %08x %08x %s %08x" % (o, m, v, ec, vidofs, datofs, iseq, zero, crc)) + + +def raw_vid_dump(o, data): + i = 0 + while i < len(data): + print("%08x: %s" % (o+i, data[i:i+0xAC])) + i += 0xAC + + +def raw_vhdr_dump(o, data): + # UBI! blocks + data = data.rstrip(b'\xff') + data2 = b'' + o2 = 0 + if len(data)!=64: + data2 = data[64:].lstrip(b'\xff') + o2 = o + data.find(data2) + data = data[:64] + + if len(data) != 64: + print("short vhdr: %s" % data.hex()) + return + + ( + m, # 4s + v, # B + vt, # B + cf, # B + compat, # B + volid, # L + lnum, # L + zero1, # 4s + dsize, # L + usedebs, # L + pad, # L + dcrc, # L + zero2, # 4s + sqnum, # Q + zero3, # 12s + hcrc, # L + ) = struct.unpack(">4s4BLL4s4L4sQ12sL", data) + + print("%08x: %s %d %d %d %d %08x %08x %s %08x %08x %08x %08x %s %010x %s %08x" % (o, m, v, vt, cf, compat, volid, lnum, zero1, dsize, usedebs, pad, dcrc, zero2, sqnum, zero3, hcrc)) + if len(data2)==0xAC*0x80: + raw_vid_dump(o2, data2) + + +def raw_node_dump(o, data): + ch = UbiFsCommonHeader() + ch.parse(data[:24]) + + node = ch.getnode() + try: + node.parse(data[24:]) + except Exception as e: + pass + + try: + print("%08x: %s - %s" % (o, repr(ch), repr(node))) + if isinstance(node, UbiFsData) and node.data: + print(" -> ", b2a_hex(node.data)) + except Exception as e: + print("%08x: %s" % (o, b2a_hex(data))) + + +def rawhexdump(fh, args): + data = fh.read() + + ofs = [] + for pattn, bs in ((b'UBI#', 64), (b'UBI!', 64), (b'\x31\x18\x10\x06', 8)): + for o in findpattern(data, pattn, bs): + ofs.append( (o, pattn) ) + + ofs = sorted(ofs, key=lambda o: o[0]) + print("found %d magic numbers" % len(ofs)) + + ofs.append( (len(data), None) ) + + for (o0, p), (o1, _) in zip(ofs, ofs[1:]): + if p==b'UBI#': + raw_ec_dump(o0, data[o0:o1]) + elif p==b'UBI!': + raw_vhdr_dump(o0, data[o0:o1]) + elif p==b'\x31\x18\x10\x06': + raw_node_dump(o0, data[o0:o1]) + else: + print("%08x: %s" % (o0, b2a_hex(data[o0:o1]))) + + +################################################## +def processfile(fn, args): + filesize = os.path.getsize(fn) + with open(fn, "rb") as fh: + if args.offset: + fh = OffsetReader(fh, args.offset, args.length or filesize) + if args.rawdump: + rawhexdump(fh, args) + else: + magic = fh.read(4) + if magic == b'UBI#': + processblocks(fh, args) + elif magic == b'\x31\x18\x10\x06': + processvolume(RawVolume(fh), b"raw", args) + else: + print("Unknown file type") + + +def main(): + parser = argparse.ArgumentParser(description='UBIFS dumper.') + parser.add_argument('--savedir', '-s', type=str, help="save files in all volumes to the specified directory", metavar='DIRECTORY') + parser.add_argument('--preserve', '-p', action='store_true', help="preserve permissions and timestamps") + parser.add_argument('--cat', '-c', type=str, action="append", help="extract a single file to stdout", metavar='FILE', default=[]) + parser.add_argument('--listfiles', '-l', action='store_true', help="list directory contents") + parser.add_argument('--dumptree', '-d', action='store_true', help="dump the filesystem b-tree contents") + parser.add_argument('--verbose', '-v', action='count', help="print extra info, like volume map") + parser.add_argument('--debug', action='store_true', help="abort on exceptions") + parser.add_argument('--encoding', '-e', type=str, help="filename encoding, default=utf-8", default='utf-8') + parser.add_argument('--masteroffset', '-m', type=str, help="Which master node to use.") + parser.add_argument('--root', '-R', type=str, help="Which Root node to use (hexlnum:hexoffset).") + parser.add_argument('--rawdump', action='store_true', help="Raw hexdump of entire volume, finds all nodes.") + parser.add_argument('--volume', type=str, help="which volume to hexdump", metavar="VOLNR") + parser.add_argument('--hexdump', type=str, help="hexdump part of a volume/leb[/ofs[/size]]", metavar="LEB:OFF:N") + parser.add_argument('--saveraw', type=str, help="save the entire volume to the specified file", metavar="FILENAME") + parser.add_argument('--nodedump', type=str, help="dump specific node at volume/leb[/ofs]", metavar="LEB:OFF") + parser.add_argument('--offset', type=str, help="decode ubi image at the specifie offset") + parser.add_argument('--length', type=str, help="size of ubi image to decode") + parser.add_argument('FILES', type=str, nargs='+', help="list of ubi images to use") + args = parser.parse_args() + + if args.masteroffset: + args.masteroffset = [int(_,0) for _ in args.masteroffset.split(':')] + if args.length: + args.length = int(args.length, 0) + if args.offset: + args.offset = int(args.offset, 0) + if args.volume: + args.volume = int(args.volume, 0) + if args.hexdump: + args.hexdump = [int(_, 0) for _ in args.hexdump.split(":")] + if len(args.hexdump) == 1: + args.hexdump.append(0) + if len(args.hexdump) == 2: + args.hexdump.append(0x100) + + if args.nodedump: + args.nodedump = [int(_, 0) for _ in args.nodedump.split(":")] + if len(args.nodedump) == 1: + args.nodedump.append(0) + + for fn in args.FILES: + print("==>", fn, "<==") + try: + processfile(fn, args) + except Exception as e: + print("ERROR", e) + if args.debug: + raise + + +if __name__ == '__main__': + main()