From 0c654ec70c7bdb66d1816e7876d3ab3fcfe0c760 Mon Sep 17 00:00:00 2001 From: Rudolf Kolbe Date: Wed, 27 Nov 2024 14:50:40 +0100 Subject: [PATCH] discard ImportHelper --- UnityPy/environment.py | 84 +++++++++++++---- UnityPy/helpers/ImportHelper.py | 161 -------------------------------- 2 files changed, 66 insertions(+), 179 deletions(-) delete mode 100644 UnityPy/helpers/ImportHelper.py diff --git a/UnityPy/environment.py b/UnityPy/environment.py index 6b210aca..34135f0f 100644 --- a/UnityPy/environment.py +++ b/UnityPy/environment.py @@ -1,17 +1,14 @@ import io -import os import ntpath +import os import re -from typing import List, Callable, Dict, Union +from typing import Callable, Dict, List, Union from zipfile import ZipFile from fsspec import AbstractFileSystem from fsspec.implementations.local import LocalFileSystem - -from .files import File, ObjectReader, SerializedFile -from .enums import FileType -from .helpers import ImportHelper +from .files import File, ObjectReader, SerializedFile, parse_file from .streams import EndianBinaryReader reSplit = re.compile(r"(.*?([^\/\\]+?))\.split\d+") @@ -119,15 +116,13 @@ def load_file( file = self._load_split_file(file) # Unity paths are case insensitive, so we need to find "Resources/Foo.asset" when the record says "resources/foo.asset" elif not os.path.exists(file): - file = ImportHelper.find_sensitive_path(self.path, file) + file = find_sensitive_path(self.path, file) # nonexistent files might be packaging errors or references to Unity's global Library/ if file is None: return - if type(file) == str: + if isinstance(file, str): file = self.fs.open(file, "rb") - typ, reader = ImportHelper.check_file_type(file) - stream_name = ( name if name @@ -138,19 +133,18 @@ def load_file( ) ) - if typ == FileType.ZIP: - f = self.load_zip_file(file) - else: - f = ImportHelper.parse_file( - reader, self, name=stream_name, typ=typ, is_dependency=is_dependency - ) - + reader = EndianBinaryReader(file) + path = stream_name if not parent else f"{parent.path}/{stream_name}" + f = parse_file(reader, stream_name, path, parent=self) + + if f is None: + f = reader + if isinstance(f, (SerializedFile, EndianBinaryReader)): self.register_cab(stream_name, f) self.files[stream_name] = f - def load_zip_file(self, value): buffer = None if isinstance(value, str) and self.fs.exists(value): @@ -330,3 +324,57 @@ def simplify_name(name: str) -> str: - converting to lowercase """ return ntpath.basename(name).lower() + + +def file_name_without_extension(file_name: str) -> str: + return os.path.join( + os.path.dirname(file_name), os.path.splitext(os.path.basename(file_name))[0] + ) + + +def list_all_files(directory: str) -> List[str]: + return [ + val + for sublist in [ + [os.path.join(dir_path, filename) for filename in filenames] + for (dir_path, dirn_ames, filenames) in os.walk(directory) + if ".git" not in dir_path + ] + for val in sublist + ] + + +def find_all_files(directory: str, search_str: str) -> List[str]: + return [ + val + for sublist in [ + [ + os.path.join(dir_path, filename) + for filename in filenames + if search_str in filename + ] + for (dir_path, dirn_ames, filenames) in os.walk(directory) + if ".git" not in dir_path + ] + for val in sublist + ] + + +def find_sensitive_path(dir: str, insensitive_path: str) -> Union[str, None]: + parts = os.path.split(insensitive_path.strip(os.path.sep)) + + sensitive_path = dir + for part in parts: + part_lower = part.lower() + part = next( + (name for name in os.listdir(sensitive_path) if name.lower() == part_lower), + None, + ) + if part is None: + return None + sensitive_path = os.path.join(sensitive_path, part) + + return sensitive_path + + +__all__ = ["Environment"] diff --git a/UnityPy/helpers/ImportHelper.py b/UnityPy/helpers/ImportHelper.py deleted file mode 100644 index 7e7e2a4b..00000000 --- a/UnityPy/helpers/ImportHelper.py +++ /dev/null @@ -1,161 +0,0 @@ -from __future__ import annotations -import os -from typing import Union, List -from .CompressionHelper import BROTLI_MAGIC, GZIP_MAGIC -from ..enums import FileType -from ..streams import EndianBinaryReader -from .. import files - - -def file_name_without_extension(file_name: str) -> str: - return os.path.join( - os.path.dirname(file_name), os.path.splitext(os.path.basename(file_name))[0] - ) - - -def list_all_files(directory: str) -> List[str]: - return [ - val - for sublist in [ - [os.path.join(dir_path, filename) for filename in filenames] - for (dir_path, dirn_ames, filenames) in os.walk(directory) - if ".git" not in dir_path - ] - for val in sublist - ] - - -def find_all_files(directory: str, search_str: str) -> List[str]: - return [ - val - for sublist in [ - [ - os.path.join(dir_path, filename) - for filename in filenames - if search_str in filename - ] - for (dir_path, dirn_ames, filenames) in os.walk(directory) - if ".git" not in dir_path - ] - for val in sublist - ] - - -def check_file_type(input_) -> Union[FileType, EndianBinaryReader]: - if isinstance(input_, str) and os.path.isfile(input_): - reader = EndianBinaryReader(open(input_, "rb")) - elif isinstance(input_, EndianBinaryReader): - reader = input_ - else: - try: - reader = EndianBinaryReader(input_) - except: - return None, None - - if reader.Length < 20: - return FileType.ResourceFile, reader - - signature = reader.read_string_to_null(20) - - reader.Position = 0 - if signature in [ - "UnityWeb", - "UnityRaw", - "\xFA\xFA\xFA\xFA\xFA\xFA\xFA\xFA", - "UnityFS", - ]: - return FileType.BundleFile, reader - elif signature == "UnityWebData1.0": - return FileType.WebFile, reader - elif signature == "PK\x03\x04": - return FileType.ZIP, reader - else: - if reader.Length < 128: - return FileType.ResourceFile, reader - - magic = bytes(reader.read_bytes(2)) - reader.Position = 0 - if GZIP_MAGIC == magic: - return FileType.WebFile, reader - reader.Position = 0x20 - magic = bytes(reader.read_bytes(6)) - reader.Position = 0 - if BROTLI_MAGIC == magic: - return FileType.WebFile, reader - - # check if AssetsFile - old_endian = reader.endian - # read as if assetsfile and check version - # ReadHeader - reader.Position = 0 - metadata_size = reader.read_u_int() - file_size = reader.read_u_int() - version = reader.read_u_int() - data_offset = reader.read_u_int() - - if version >= 22: - endian = ">" if reader.read_boolean() else "<" - reserved = reader.read_bytes(3) - metadata_size = reader.read_u_int() - file_size = reader.read_long() - data_offset = reader.read_long() - unknown = reader.read_long() # unknown - - # reset - reader.endian = old_endian - reader.Position = 0 - # check info - if any( - ( - version < 0, - version > 100, - *[ - x < 0 or x > reader.Length - for x in [file_size, metadata_size, version, data_offset] - ], - file_size < metadata_size, - file_size < data_offset, - ) - ): - return FileType.ResourceFile, reader - else: - return FileType.AssetsFile, reader - - -def parse_file( - reader: EndianBinaryReader, - parent, - name: str, - typ: FileType = None, - is_dependency=False, -) -> Union[files.File, EndianBinaryReader]: - if typ is None: - typ, _ = check_file_type(reader) - if typ == FileType.AssetsFile and not name.endswith( - (".resS", ".resource", ".config", ".xml", ".dat") - ): - f = files.SerializedFile(reader, parent, name=name, is_dependency=is_dependency) - elif typ == FileType.BundleFile: - f = files.BundleFile(reader, parent, name=name, is_dependency=is_dependency) - elif typ == FileType.WebFile: - f = files.WebFile(reader, parent, name=name, is_dependency=is_dependency) - else: - f = reader - return f - - -def find_sensitive_path(dir: str, insensitive_path: str) -> Union[str, None]: - parts = os.path.split(insensitive_path.strip(os.path.sep)) - - sensitive_path = dir - for part in parts: - part_lower = part.lower() - part = next( - (name for name in os.listdir(sensitive_path) if name.lower() == part_lower), - None, - ) - if part is None: - return None - sensitive_path = os.path.join(sensitive_path, part) - - return sensitive_path