Skip to content

Commit

Permalink
Improve TargetPath compatibility
Browse files Browse the repository at this point in the history
  • Loading branch information
Schamper committed Dec 4, 2024
1 parent 01373f4 commit 3b20450
Show file tree
Hide file tree
Showing 11 changed files with 279 additions and 670 deletions.
11 changes: 11 additions & 0 deletions dissect/target/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import errno
import os
import sys
import traceback
Expand Down Expand Up @@ -76,22 +77,32 @@ class PluginNotFoundError(PluginError):
class FileNotFoundError(FilesystemError, FileNotFoundError):
"""The requested path could not be found."""

errno = errno.ENOENT


class IsADirectoryError(FilesystemError, IsADirectoryError):
"""The entry is a directory."""

errno = errno.EISDIR


class NotADirectoryError(FilesystemError, NotADirectoryError):
"""The entry is not a directory."""

errno = errno.ENOTDIR


class NotASymlinkError(FilesystemError):
"""The entry is not a symlink."""

errno = errno.EINVAL


class SymlinkRecursionError(FilesystemError):
"""A symlink loop is detected for the entry."""

errno = errno.ELOOP


class RegistryError(Error):
"""A registry error occurred."""
Expand Down
119 changes: 18 additions & 101 deletions dissect/target/helpers/compat/path_310.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,15 @@
import fnmatch
import re
from pathlib import Path, PurePath, _Accessor, _PosixFlavour
from stat import S_ISBLK, S_ISCHR, S_ISFIFO, S_ISSOCK
from typing import IO, TYPE_CHECKING, Any, Callable, Iterator, Optional
from typing import IO, TYPE_CHECKING, Any, Callable, Iterator

from dissect.target import filesystem
from dissect.target.exceptions import FilesystemError, SymlinkRecursionError
from dissect.target.helpers.compat.path_common import (
_DissectPathParents,
io_open,
isjunction,
realpath,
scandir,
)
from dissect.target.helpers.polypath import normalize
from dissect.target.helpers import polypath
from dissect.target.helpers.compat import path_common

if TYPE_CHECKING:
from dissect.target.filesystem import Filesystem, FilesystemEntry
from dissect.target.helpers.compat.path_common import _DissectScandirIterator
from dissect.target.helpers.fsutil import stat_result


Expand Down Expand Up @@ -82,9 +74,9 @@ def open(
path: TargetPath,
mode: str = "rb",
buffering: int = 0,
encoding: Optional[str] = None,
errors: Optional[str] = None,
newline: Optional[str] = None,
encoding: str | None = None,
errors: str | None = None,
newline: str | None = None,
) -> IO:
"""Open file and return a stream.
Expand All @@ -93,15 +85,15 @@ def open(
Note: in contrast to regular Python, the mode is binary by default. Text mode
has to be explicitly specified. Buffering is also disabled by default.
"""
return io_open(path, mode, buffering, encoding, errors, newline)
return path_common.io_open(path, mode, buffering, encoding, errors, newline)

@staticmethod
def listdir(path: TargetPath) -> Iterator[str]:
return path.get().listdir()

@staticmethod
def scandir(path: TargetPath) -> _DissectScandirIterator:
return scandir(path)
def scandir(path: TargetPath) -> path_common._DissectScandirIterator:
return path_common.scandir(path)

@staticmethod
def chmod(path: TargetPath, mode: int, *, follow_symlinks: bool = True) -> None:
Expand Down Expand Up @@ -163,10 +155,10 @@ def getcwd() -> str:
def expanduser(path: str) -> str:
raise NotImplementedError("TargetPath.expanduser() is unsupported")

realpath = staticmethod(realpath)
realpath = staticmethod(path_common.realpath)

# NOTE: Forward compatibility with CPython >= 3.12
isjunction = staticmethod(isjunction)
isjunction = staticmethod(path_common.isjunction)


_dissect_accessor = _DissectAccessor()
Expand All @@ -193,7 +185,7 @@ def _from_parts(cls, args: list) -> TargetPath:
path_args = []
for arg in args[1:]:
if isinstance(arg, str):
arg = normalize(arg, alt_separator=alt_separator)
arg = polypath.normalize(arg, alt_separator=alt_separator)
path_args.append(arg)

self = super()._from_parts(path_args)
Expand Down Expand Up @@ -247,8 +239,8 @@ def parent(self) -> TargetPath:
return result

@property
def parents(self) -> _DissectPathParents:
return _DissectPathParents(self)
def parents(self) -> path_common._DissectPathParents:
return path_common._DissectPathParents(self)


class TargetPath(Path, PureDissectPath):
Expand Down Expand Up @@ -382,9 +374,9 @@ def open(
self,
mode: str = "rb",
buffering: int = 0,
encoding: Optional[str] = None,
errors: Optional[str] = None,
newline: Optional[str] = None,
encoding: str | None = None,
errors: str | None = None,
newline: str | None = None,
) -> IO:
"""Open file and return a stream.
Expand All @@ -402,7 +394,7 @@ def write_bytes(self, data: bytes) -> int:
raise NotImplementedError("TargetPath.write_bytes() is unsupported")

def write_text(
self, data: str, encoding: Optional[str] = None, errors: Optional[str] = None, newline: Optional[str] = None
self, data: str, encoding: str | None = None, errors: str | None = None, newline: str | None = None
) -> int:
"""
Open the file in text mode, write to it, and close the file.
Expand All @@ -417,88 +409,13 @@ def readlink(self) -> TargetPath:
obj = self._from_parts((self._fs, path))
return obj

def exists(self) -> bool:
"""
Whether this path exists.
"""
try:
# .exists() must resolve possible symlinks
self.get().stat()
return True
except (FilesystemError, ValueError):
return False

def is_dir(self) -> bool:
"""
Whether this path is a directory.
"""
try:
return self.get().is_dir()
except (FilesystemError, ValueError):
return False

def is_file(self) -> bool:
"""
Whether this path is a regular file (also True for symlinks pointing
to regular files).
"""
try:
return self.get().is_file()
except (FilesystemError, ValueError):
return False

def is_symlink(self) -> bool:
"""
Whether this path is a symbolic link.
"""
try:
return self.get().is_symlink()
except (FilesystemError, ValueError):
return False

# NOTE: Forward compatibility with CPython >= 3.12
def is_junction(self) -> bool:
"""
Whether this path is a junction.
"""
return self._accessor.isjunction(self)

def is_block_device(self) -> bool:
"""
Whether this path is a block device.
"""
try:
return S_ISBLK(self.stat().st_mode)
except (FilesystemError, ValueError):
return False

def is_char_device(self) -> bool:
"""
Whether this path is a character device.
"""
try:
return S_ISCHR(self.stat().st_mode)
except (FilesystemError, ValueError):
return False

def is_fifo(self) -> bool:
"""
Whether this path is a FIFO.
"""
try:
return S_ISFIFO(self.stat().st_mode)
except (FilesystemError, ValueError):
return False

def is_socket(self) -> bool:
"""
Whether this path is a socket.
"""
try:
return S_ISSOCK(self.stat().st_mode)
except (FilesystemError, ValueError):
return False

def expanduser(self) -> TargetPath:
"""Return a new path with expanded ~ and ~user constructs
(as returned by os.path.expanduser)
Expand Down
Loading

0 comments on commit 3b20450

Please sign in to comment.