Skip to content

Commit

Permalink
Attempt to load previous snapshot VMDK when missing snapshot disks (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
Schamper authored Feb 5, 2025
1 parent 027629f commit ffd18db
Show file tree
Hide file tree
Showing 4 changed files with 199 additions and 18 deletions.
2 changes: 1 addition & 1 deletion dissect/target/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ class Loader:
def __init__(self, path: Path, **kwargs):
self.path = path

def __repr__(self):
def __repr__(self) -> str:
return f"{self.__class__.__name__}('{self.path}')"

@staticmethod
Expand Down
60 changes: 51 additions & 9 deletions dissect/target/loaders/vmx.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,17 @@
from __future__ import annotations

from typing import TYPE_CHECKING

from dissect.hypervisor import vmx

from dissect.target.containers.vmdk import VmdkContainer
from dissect.target.loader import Loader

if TYPE_CHECKING:
from pathlib import Path

from dissect.target.target import Target


class VmxLoader(Loader):
"""Load VMware virtual machine configuration (VMX) files.
Expand All @@ -11,20 +20,53 @@ class VmxLoader(Loader):
- https://docs.vmware.com/en/VMware-Workstation-Pro/17/com.vmware.ws.using.doc/GUID-A968EF50-BA25-450A-9D1F-F8A9DEE640E7.html # noqa
"""

def __init__(self, path, **kwargs):
path = path.resolve()

super().__init__(path)
self.vmx = vmx.VMX.parse(path.read_text())
self.base_dir = path.parent
def __init__(self, path: Path, **kwargs):
super().__init__(path.resolve())
self.vmx = vmx.VMX.parse(self.path.read_text())

@staticmethod
def detect(path):
def detect(path: Path) -> bool:
return path.suffix.lower() in (".vmx", ".vmtx")

def map(self, target):
def map(self, target: Target) -> None:
for disk in self.vmx.disks():
path = self.base_dir.joinpath(disk)
path = self.path.parent.joinpath(disk)

if not path.is_file():
base, sep, snapshot_id = path.stem.rpartition("-")
if sep and len(snapshot_id) == 6 and snapshot_id.isdigit():
# Probably a snapshot, try to load the parent disk
target.log.info(
"Disk not found but seems to be a snapshot, trying previous snapshots: %s", path.name
)

snapshot_num = int(snapshot_id)
missing = [path.name]
for i in range(snapshot_num - 1, -1, -1):
snapshot_disk = (
path.with_name(f"{base}{path.suffix}")
if i == 0
else path.with_name(f"{base}-{i:06d}{path.suffix}")
)
target.log.debug("Trying to load snapshot: %s", snapshot_disk.name)

if snapshot_disk.is_file():
target.log.warning(
"Missing disk(s) but continuing with found snapshot: %s (missing %s)",
snapshot_disk.name,
", ".join(missing),
)
path = snapshot_disk
break

missing.append(snapshot_disk.name)
else:
target.log.error("Failed to find previous snapshot for disk: %s", path.name)
continue
else:
target.log.error("Disk not found: %s", path.name)
continue

try:
target.disks.add(VmdkContainer(path))
except Exception:
Expand Down
17 changes: 9 additions & 8 deletions tests/loaders/test_vmwarevm.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,23 @@
from dissect.target.containers.vmdk import VmdkContainer
from dissect.target.loaders.vmwarevm import VmwarevmLoader
from dissect.target.target import Target
from tests._utils import mkdirs


@patch("dissect.target.loaders.vmx.VmdkContainer")
@patch("dissect.target.loaders.vmx.vmx.VMX")
def test_vmwarevm_loader(VMX: VMX, VmdkContainer: VmdkContainer, target_bare: Target, tmp_path: Path):
root = tmp_path
mkdirs(root, ["Test.vmwarevm"])
(root / "Test.vmwarevm" / "Test.vmx").touch()
def test_vmwarevm_loader(VMX: VMX, VmdkContainer: VmdkContainer, target_bare: Target, tmp_path: Path) -> None:
root = tmp_path.resolve()
vm_path = root / "Test.vmwarevm"
vm_path.mkdir()
(vm_path / "Test.vmx").touch()
(vm_path / "mock.vmdk").touch()

VMX.parse.return_value = VMX
VMX.disks.return_value = ["mock.vmdk"]
VmdkContainer.return_value = VmdkContainer

vmwarevm_loader = VmwarevmLoader(root / "Test.vmwarevm")
vmwarevm_loader.map(target_bare)
assert VmwarevmLoader.detect(vm_path)

VmwarevmLoader(vm_path).map(target_bare)
assert len(target_bare.disks) == 1
assert VmdkContainer.mock_calls == [call(root.resolve() / "Test.vmwarevm" / "mock.vmdk")]
assert VmdkContainer.mock_calls == [call(vm_path / "mock.vmdk")]
138 changes: 138 additions & 0 deletions tests/loaders/test_vmx.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
import logging
from pathlib import Path
from unittest.mock import call, patch

import pytest
from dissect.hypervisor.descriptor.vmx import VMX

from dissect.target.containers.vmdk import VmdkContainer
from dissect.target.loaders.vmx import VmxLoader
from dissect.target.target import Target


@patch("dissect.target.loaders.vmx.VmdkContainer")
@patch("dissect.target.loaders.vmx.vmx.VMX")
def test_vmx_loader(VMX: VMX, VmdkContainer: VmdkContainer, target_bare: Target, tmp_path: Path) -> None:
root = tmp_path.resolve()
vm_path = root / "Test.vmwarevm"
vm_path.mkdir()
vmx_path = vm_path / "Test.vmx"
vmx_path.touch()
(vm_path / "mock.vmdk").touch()

VMX.parse.return_value = VMX
VMX.disks.return_value = ["mock.vmdk"]
VmdkContainer.return_value = VmdkContainer

assert VmxLoader.detect(vmx_path)

VmxLoader(vmx_path).map(target_bare)
assert len(target_bare.disks) == 1
assert VmdkContainer.mock_calls == [call(vm_path / "mock.vmdk")]


@patch("dissect.target.loaders.vmx.VmdkContainer")
@patch("dissect.target.loaders.vmx.vmx.VMX")
def test_vmx_loader_missing_disk(
VMX: VMX, VmdkContainer: VmdkContainer, target_bare: Target, tmp_path: Path, caplog: pytest.LogCaptureFixture
) -> None:
root = tmp_path.resolve()
vm_path = root / "Test.vmwarevm"
vm_path.mkdir()
vmx_path = vm_path / "Test.vmx"
vmx_path.touch()

VMX.parse.return_value = VMX
VMX.disks.return_value = ["mock.vmdk"]
VmdkContainer.return_value = VmdkContainer

assert VmxLoader.detect(vmx_path)

VmxLoader(vmx_path).map(target_bare)
assert len(target_bare.disks) == 0

assert "Disk not found: mock.vmdk" in caplog.text


@patch("dissect.target.loaders.vmx.VmdkContainer")
@patch("dissect.target.loaders.vmx.vmx.VMX")
def test_vmx_loader_missing_snapshots(
VMX: VMX, VmdkContainer: VmdkContainer, target_bare: Target, tmp_path: Path, caplog: pytest.LogCaptureFixture
) -> None:
root = tmp_path
vm_path = root / "Test.vmwarevm"
vm_path.mkdir()
vmx_path = vm_path / "Test.vmx"
vmx_path.touch()
(vm_path / "mock.vmdk").touch()
(vm_path / "mock-000001.vmdk").touch()

VMX.parse.return_value = VMX
VMX.disks.return_value = ["mock-000002.vmdk"]
VmdkContainer.return_value = VmdkContainer

caplog.set_level(logging.DEBUG)

VmxLoader(vmx_path).map(target_bare)
assert len(target_bare.disks) == 1
assert VmdkContainer.mock_calls == [call(vm_path / "mock-000001.vmdk")]

assert "Disk not found but seems to be a snapshot, trying previous snapshots: mock-000002.vmdk" in caplog.text
assert "Trying to load snapshot: mock-000001.vmdk" in caplog.text
assert (
"Missing disk(s) but continuing with found snapshot: mock-000001.vmdk (missing mock-000002.vmdk)" in caplog.text
)


@patch("dissect.target.loaders.vmx.VmdkContainer")
@patch("dissect.target.loaders.vmx.vmx.VMX")
def test_vmx_loader_missing_snapshots_base(
VMX: VMX, VmdkContainer: VmdkContainer, target_bare: Target, tmp_path: Path, caplog: pytest.LogCaptureFixture
) -> None:
root = tmp_path
vm_path = root / "Test.vmwarevm"
vm_path.mkdir()
vmx_path = vm_path / "Test.vmx"
vmx_path.touch()
(vm_path / "mock.vmdk").touch()

VMX.parse.return_value = VMX
VMX.disks.return_value = ["mock-000002.vmdk"]
VmdkContainer.return_value = VmdkContainer

caplog.set_level(logging.DEBUG)

VmxLoader(vmx_path).map(target_bare)
assert len(target_bare.disks) == 1
assert VmdkContainer.mock_calls == [call(vm_path / "mock.vmdk")]

assert "Disk not found but seems to be a snapshot, trying previous snapshots: mock-000002.vmdk" in caplog.text
assert "Trying to load snapshot: mock-000001.vmdk" in caplog.text
assert "Trying to load snapshot: mock.vmdk" in caplog.text
assert (
"Missing disk(s) but continuing with found snapshot: mock.vmdk (missing mock-000002.vmdk, mock-000001.vmdk)"
in caplog.text
)


@patch("dissect.target.loaders.vmx.VmdkContainer")
@patch("dissect.target.loaders.vmx.vmx.VMX")
def test_vmx_loader_missing_all_snapshots(
VMX: VMX, VmdkContainer: VmdkContainer, target_bare: Target, tmp_path: Path, caplog: pytest.LogCaptureFixture
) -> None:
root = tmp_path
vm_path = root / "Test.vmwarevm"
vm_path.mkdir()
vmx_path = vm_path / "Test.vmx"
vmx_path.touch()

VMX.parse.return_value = VMX
VMX.disks.return_value = ["mock-000001.vmdk"]
VmdkContainer.return_value = VmdkContainer

caplog.set_level(logging.DEBUG)

VmxLoader(vmx_path).map(target_bare)
assert len(target_bare.disks) == 0

assert "Failed to find previous snapshot for disk: mock-000001.vmdk" in caplog.text

0 comments on commit ffd18db

Please sign in to comment.