Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve support for safe symlink extraction #763

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 61 additions & 56 deletions unblob/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,68 +47,73 @@ def is_recursive_link(path: Path) -> bool:
return False


def fix_symlink(path: Path, outdir: Path, task_result: TaskResult) -> Path:
"""Rewrites absolute symlinks to point within the extraction directory (outdir).

If it's not a relative symlink it is either removed it it attempts
to traverse outside of the extraction directory or rewritten to be
fully portable (no mention of the extraction directory in the link
value).
"""
if is_recursive_link(path):
logger.error("Symlink loop identified, removing", path=path)
error_report = MaliciousSymlinkRemoved(
link=path.as_posix(), target=os.readlink(path)
)
task_result.add_report(error_report)
path.unlink()
return path

raw_target = os.readlink(path)
if not raw_target:
logger.error("Symlink with empty target, removing.")
path.unlink()
return path

target = Path(raw_target)

if target.is_absolute():
target = Path(target.as_posix().lstrip("/"))
def sanitize_symlink_target(base_dir, current_dir, target):
# Normalize all paths to their absolute forms
base_dir_abs = os.path.abspath(base_dir)
current_dir_abs = os.path.abspath(current_dir)
target_abs = os.path.abspath(os.path.join(current_dir, target)) \
if not os.path.isabs(target) else os.path.abspath(target)

# Check if the target is absolute and within the base_dir
if os.path.isabs(target):
if target_abs.startswith(base_dir_abs):
return os.path.relpath(target_abs, current_dir_abs)
else:
# Target is absolute but outside base_dir - we'll pretend base_dir is our root
# and adjust the target to be within base_dir
abs = base_dir + "/" + os.path.relpath(target_abs, os.path.commonpath([target_abs, base_dir_abs]))
# We want to return the relative path from current_dir to the adjusted target
return os.path.relpath(abs, current_dir_abs)
else:
target = path.resolve()

safe = is_safe_path(outdir, target)

if not safe:
logger.error("Path traversal attempt through symlink, removing", target=target)
error_report = MaliciousSymlinkRemoved(
link=path.as_posix(), target=target.as_posix()
)
task_result.add_report(error_report)
path.unlink()
else:
relative_target = os.path.relpath(outdir.joinpath(target), start=path.parent)
path.unlink()
path.symlink_to(relative_target)
return path

# Target is relative
if target_abs.startswith(base_dir_abs):
# Target is relative and does not escape base_dir
return os.path.relpath(target_abs, current_dir_abs)
else:
# Target is relative and escapes base_dir
# Say we have foo/passwd -> ../../../etc/passwd with root at /host/test_archive
# from /host/test_archive/foo/passwd, we want to return ../etc/passwd which is the
# relative path from /host/test_archive/foo to /host/test_archive/etc/passwd
# without escaping /host/test_archive

for drop_count in range(0, len(target.split('/'))):
# We drop '..'s from the target by prepending placeholder directories until we get something valid
abs = current_dir + "/" + "/".join(["foo"] * drop_count) + target
resolved = os.path.abspath(abs)
if resolved.startswith(base_dir_abs):
break
else:
raise ValueError(f"Could not resolve symlink target {target} within base_dir {base_dir}")

# We need to add the /placeholder to the relative path because we need
# to act like a file within base_dir is our root (as opposed to base_dir itself)
return os.path.relpath(resolved, base_dir_abs + '/placeholder')

def fix_extracted_directory(outdir: Path, task_result: TaskResult):
def _fix_extracted_directory(directory: Path):
if not directory.exists():
return
for path in (directory / p for p in os.listdir(directory)):
try:
fix_permission(path)
if path.is_symlink():
fix_symlink(path, outdir, task_result)
continue
if path.is_dir():
_fix_extracted_directory(path)
except OSError as e:
if e.errno == errno.ENAMETOOLONG:
continue
raise e from None

base_dir = os.path.abspath(outdir)
for root, dirs, files in os.walk(base_dir, topdown=True):
fix_permission(Path(root))
for name in dirs+files:
try:
full_path = os.path.join(root, name)
if os.path.islink(full_path):
# Make symlinks relative and constrain them to the base_dir
target = os.readlink(full_path)
new_target = sanitize_symlink_target(base_dir, root, target)
if new_target != target:
os.remove(full_path)
os.symlink(new_target, full_path)
logger.info("Updated symlink", path=full_path, target=new_target)
else:
logger.debug("Symlink is already sanitized", path=full_path, target=new_target)
except OSError as e:
if e.errno == errno.ENAMETOOLONG:
continue
raise e from None

fix_permission(outdir)
_fix_extracted_directory(outdir)
Expand Down
38 changes: 28 additions & 10 deletions unblob/file_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ def __init__(self, *, root: Path, path: Path) -> None:

class _FSLink:
def __init__(self, *, root: Path, src: Path, dst: Path) -> None:
self.dst = _FSPath(root=root, path=dst)
self.dst = _FSPath(root=root, path=root / src.parent / dst)
self.src = _FSPath(root=root, path=src)
self.is_safe = self.dst.is_safe and self.src.is_safe

Expand Down Expand Up @@ -568,18 +568,36 @@ def create_symlink(self, src: Path, dst: Path):
"""Create a symlink dst with the link/content/target src."""
logger.debug("creating symlink", file_path=dst, link_target=src, _verbosity=3)

if src.is_absolute():
# convert absolute paths to dst relative paths
# these would point to the same path if self.root would be the real root "/"
# but they are relocatable
src = self._path_to_root(dst.parent) / chop_root(src)
if dst.is_absolute():
# If the symlink destination is absolute, we need to make it relative to the root
# so it can be safely created in the extraction directory.
# If the resulting path points to outside of the extraction directory, we skip it.
dst = self.root / chop_root(dst)
if not is_safe_path(self.root, dst):
self.record_problem(
LinkExtractionProblem(
problem="Potential path traversal through symlink",
resolution="Skipped.",
path=str(dst),
link_path=str(src),
)
)
return

safe_link = self._get_checked_link(src=dst.parent / src, dst=dst)
safe_link = self._get_checked_link(src=src, dst=dst)

if safe_link:
dst = safe_link.dst.absolute_path
self._ensure_parent_dir(dst)
dst.symlink_to(src)
src = safe_link.src.absolute_path
self._ensure_parent_dir(src)
logger.debug(f"Creating symlink {src} -> {dst}")

# Create symlink at src pointing to dst
src.symlink_to(dst)

if not src.is_symlink():
self.record_problem(
safe_link.format_report("Symlink creation failed.")
)

def create_hardlink(self, src: Path, dst: Path):
"""Create a new hardlink dst to the existing file src."""
Expand Down
74 changes: 57 additions & 17 deletions unblob/handlers/archive/_safe_tarfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,23 +76,63 @@ def extract(self, tarinfo: tarfile.TarInfo, extract_root: Path): # noqa: C901

# prevent traversal attempts through links
if tarinfo.islnk() or tarinfo.issym():
if Path(tarinfo.linkname).is_absolute():
self.record_problem(
tarinfo,
"Absolute path as link target.",
"Converted to extraction relative path.",
)
tarinfo.linkname = f"./{tarinfo.linkname}"
if not is_safe_path(
basedir=extract_root,
path=extract_root / tarinfo.linkname,
):
self.record_problem(
tarinfo,
"Traversal attempt through link path.",
"Skipped.",
)
return
link_target = Path(tarinfo.linkname)

# Check if the link is absolute and make it relative to extract_root
if link_target.is_absolute():
# Strip leading '/' to make the path relative
rel_target = link_target.relative_to('/')

if Path(tarinfo.linkname).is_absolute():
self.record_problem(
tarinfo,
"Absolute path as link target.",
"Converted to extraction relative path.",
)
else:
# Directly use the relative link target. If it points to an unsafe path, we'll
# check and fix below
rel_target = link_target

# The symlink will point to our relative target (may be updated below if unsafe)
tarinfo.linkname = rel_target

# Resolve the link target to an absolute path
resolved_path = (extract_root / tarinfo.name).parent / rel_target

# If the resolved path points outside of extract_root, we need to fix it!
if not is_safe_path(extract_root, resolved_path):
logger.warning("Traversal attempt through link path.", src=tarinfo.name, dest=tarinfo.linkname, basedir=extract_root, resovled_path=resolved_path)

for drop_count in range(0, len(str(rel_target).split('/'))):
new_path = (extract_root / tarinfo.name).parent / Path("/".join(["placeholder"] * drop_count)) / rel_target
resolved_path = os.path.abspath(new_path)
if str(resolved_path).startswith(str(extract_root)):
break
else:
# We didn't hit the break, we couldn't resolve the path safely
self.record_problem(
tarinfo,
"Traversal attempt through link path.",
"Skipped.",
)
return

# Double check that it's safe now
if not is_safe_path(extract_root, resolved_path):
self.record_problem(
tarinfo,
"Traversal attempt through link path.",
"Skipped.",
)
return

# Prepend placeholder directories before rel_target to get a valid path
# within extract_root. This is the relative version of resolved_path.
rel_target = Path("/".join(["placeholder"] * drop_count)) / rel_target
tarinfo.linkname = rel_target

logger.debug("Creating symlink", points_to=resolved_path, name=tarinfo.name)

target_path = extract_root / tarinfo.name
# directories are special: we can not set their metadata now + they might also be already existing
Expand Down
2 changes: 1 addition & 1 deletion unblob/handlers/archive/cpio.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ def dump_entries(self, fs: FileSystem):
self.file[entry.start_offset : entry.start_offset + entry.size]
).decode("utf-8")
)
fs.create_symlink(src=link_path, dst=entry.path)
fs.create_symlink(src=entry.path, dst=link_path)
elif (
stat.S_ISCHR(entry.mode)
or stat.S_ISBLK(entry.mode)
Expand Down
Loading