Skip to content
This repository has been archived by the owner on Dec 25, 2022. It is now read-only.

Recursive hashing script

Joachim Metz edited this page Feb 17, 2019 · 5 revisions

Recursive file hashing

In this example we create a recursive file hashing script that can be pointed at a directory or an image file.

Initial boiler plate

First let's start with the initial boiler plate:

  • the shebang
  • the encoding of the source file
  • a description of what the script is supposed to do
  • the main function skeleton
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Script to recursively calculate a message digest hash for every file."""

from __future__ import print_function
from __future__ import unicode_literals

# TODO: add imports here.
import sys


# TODO: add classes here.


def Main():
  """The main program function.

  Returns:
    bool: True if successful or False if not.
  """
  # TODO: add main program logic here.

  return True


if __name__ == '__main__':
  if not Main():
    sys.exit(1)
  else:
    sys.exit(0)

Argument handling

For the program argument handling we'll use argparse.

Add the necessary import:

import argparse

And to the Main function add code that sets:

  • the program description
  • a program argument named source
  argument_parser = argparse.ArgumentParser(description=(
      'Calculates a message digest hash for every file in a directory or '
      'storage media image.'))

  argument_parser.add_argument(
      'source', nargs='?', action='store', metavar='image.raw',
      default=None, help=(
          'path of the directory or filename of a storage media image '
          'containing the file.'))

  options = argument_parser.parse_args()

  if not options.source:
    print('Source value is missing.')
    print('')
    argument_parser.print_help()
    print('')
    return False

Logging

For notifying errors, warnings or debug information we'll use logging.

Add the necessary import:

import logging

And to the Main function add code that sets:

  • the logging configuration
  logging.basicConfig(
      level=logging.INFO, format='[%(levelname)s] %(message)s')

The recursive hasher class

The Recursive Hasher is the main class that will contain the recursive digest hash calculation code.

Add the necessary import:

from dfvfs.helpers import volume_scanner

And the recursive hasher class:

class RecursiveHasher(volume_scanner.VolumeScanner):
  """Recursively calculates message digest hashes of data streams."""

The VolumeScanner helper class provides common function to scan for various volume formats in storage media images and devices. A VolumeScannerMediator can be used to obtain additional information necessary for the volume detection, e.g. prompting the user for an encryption passphrase.

The method to retrieve the base path specification

dfVFS uses paths specification and not regular operating system-based paths. dfVFS comes with a path specification factory class that hides most of the dfVFS internals necessary to create path specifications.

To create the path specification you will need to:

  • tell the factory which type of path specification you want to create by passing a type indicator;
  • pass the required Path specification addressing attributes, e.g. for path specification type OS (operating system) this is a location addressing attribute.

For the recursive hasher we need to determine the base path specification from the source that was passed as an argument to the script, for this we use the GetBasePathSpec method provided by VolumeScanner.

The volume scanner, which is uses the source scanner, will determine:

  • if the source is a device, directory or file;
  • when the source is a file if it contains a supported store media image type;
  • if the store media image contains a volume system, e.g. a MBR- or GPT-based partition table;
  • the location of volumes within the store media image;
  • if the volumes contain a supported file system.

The method to recursively calculate message digest hashes

To recursively calculate message digest hashes we add several class methods to the RecursiveHasher class, namely:

  • CalculateHashes; which will take the (base) path specifications and recursively calculate message digest hashes;
  • _CalculateHashesFileEntry; which will take a dfVFS file entry object and recursively calculate message digest hashes;
  • _CalculateHashDataStream; which will take a dfVFS file entry object and calculates a SHA256 (message digest) hash of the corresponding data stream.

In the CalculateHashes method the dfVFS resolver is used to resolve a path specification into a file system and file entry object.

  • The file system object is used in _CalculateHashesFileEntry method to join path segments to create a represent-able version of the operating system path or path within the image.
  • The file entry object is used to recurs through the file entry hierarchy.

Add the necessary import:

from dfvfs.resolver import resolver

Add a class method to recursively calculate the message digest hashes given (base) path specifications.

  def CalculateHashes(self, base_path_specs, output_writer):
    """Recursive calculates hashes starting with the base path specification.

    Args:
      base_path_specs (list[dfvfs.PathSpec]): source path specification.
      output_writer (StdoutWriter): output writer.
    """
    for base_path_spec in base_path_specs:
      file_system = resolver.Resolver.OpenFileSystem(base_path_spec)
      file_entry = resolver.Resolver.OpenFileEntry(base_path_spec)
      if file_entry is None:
        logging.warning('Unable to open base path specification:\n{0:s}'.format(
            base_path_spec.comparable))
        continue

      self._CalculateHashesFileEntry(file_system, file_entry, '', output_writer)

Add a class method to recursively calculate the message digest hashes given a file entry object.

  def _CalculateHashesFileEntry(
      self, file_system, file_entry, parent_full_path, output_writer):
    """Recursive calculates hashes starting with the file entry.

    Args:
      file_system (dfvfs.FileSystem): file system.
      file_entry (dfvfs.FileEntry): file entry.
      parent_full_path (str): full path of the parent file entry.
      output_writer (StdoutWriter): output writer.
    """
    # Since every file system implementation can have their own path
    # segment separator we are using JoinPath to be platform and file system
    # type independent.
    full_path = file_system.JoinPath([parent_full_path, file_entry.name])
    for data_stream in file_entry.data_streams:
      hash_value = self._CalculateHashDataStream(file_entry, data_stream.name)
      display_path = self._GetDisplayPath(
          file_entry.path_spec, full_path, data_stream.name)
      output_writer.WriteFileHash(display_path, hash_value or 'N/A')

    for sub_file_entry in file_entry.sub_file_entries:
      self._CalculateHashesFileEntry(
          file_system, sub_file_entry, full_path, output_writer)

Add the necessary import:

import hashlib

Add a class constant to define the read buffer size:

  # Class constant that defines the default read buffer size.
  _READ_BUFFER_SIZE = 32768

In the _CalculateHashFileEntry method the file-like object is obtained from the file entry object by calling it's GetFileObject method. Note that the file-like is opened on return and that an explicit close of the file-like object removes it from the resolver cache (if used).

Add a class method to calculate the SHA256 message digest hash given a file entry object and data stream name.

  def _CalculateHashDataStream(self, file_entry, data_stream_name):
    """Calculates a message digest hash of the data of the file entry.

    Args:
      file_entry (dfvfs.FileEntry): file entry.
      data_stream_name (str): name of the data stream.

    Returns:
      bytes: digest hash or None.
    """
    hash_context = hashlib.sha256()

    try:
      file_object = file_entry.GetFileObject(data_stream_name=data_stream_name)
    except IOError as exception:
      logging.warning((
          'Unable to open path specification:\n{0:s}'
          'with error: {1!s}').format(
              file_entry.path_spec.comparable, exception))
      return None

    if not file_object:
      return None

    try:
      data = file_object.read(self._READ_BUFFER_SIZE)
      while data:
        hash_context.update(data)
        data = file_object.read(self._READ_BUFFER_SIZE)
    except IOError as exception:
      logging.warning((
          'Unable to read from path specification:\n{0:s}'
          'with error: {1!s}').format(
              file_entry.path_spec.comparable, exception))
      return None

    finally:
      file_object.close()

    return hash_context.hexdigest()

Utility function to format the file paths more human readable:

  def _GetDisplayPath(self, path_spec, full_path, data_stream_name):
    """Retrieves a path to display.

    Args:
      path_spec (dfvfs.PathSpec): path specification of the file entry.
      full_path (str): full path of the file entry.
      data_stream_name (str): name of the data stream.

    Returns:
      str: path to display.
    """
    display_path = ''

    if path_spec.HasParent():
      parent_path_spec = path_spec.parent
      if parent_path_spec and parent_path_spec.type_indicator == (
          dfvfs_definitions.TYPE_INDICATOR_TSK_PARTITION):
        display_path = ''.join([display_path, parent_path_spec.location])

    display_path = ''.join([display_path, full_path])
    if data_stream_name:
      display_path = ':'.join([display_path, data_stream_name])

    return display_path

The stdout output writer class

To abstract the output writing we define an output writer class. In this example we only show the stdout output writer but the abstraction allows to easily implement support writing to e.g. a SQLite database.

class StdoutWriter(OutputWriter):
  """Output writer that writes to stdout."""

  def Close(self):
    """Closes the output writer object."""
    pass

  def Open(self):
    """Opens the output writer object."""
    pass

  def WriteFileHash(self, path, hash_value):
    """Writes the file path and hash to stdout.

    Args:
      path (str): path of the file.
      hash_value (str): message digest hash calculated over the file data.
    """
    string = '{0:s}\t{1:s}'.format(hash_value, path)

    encoded_string = self._EncodeString(string)
    print(encoded_string)

Connecting the dots

What remains now is to connect the dots and add the program logic to the Main function.

  output_writer = StdoutWriter()

  if not output_writer.Open():
    print('Unable to open output writer.')
    print('')
    return False

  return_value = True
  mediator = command_line.CLIVolumeScannerMediator()
  recursive_hasher = RecursiveHasher(mediator=mediator)

  try:
    base_path_specs = recursive_hasher.GetBasePathSpecs(options.source)
    if not base_path_specs:
      print('No supported file system found in source.')
      print('')
      return False

    recursive_hasher.CalculateHashes(base_path_specs, output_writer)

    print('')
    print('Completed.')

  except errors.ScannerError as exception:
    return_value = False

    print('')
    print('[ERROR] {0!s}'.format(exception))

  except errors.UserAbort as exception:
    return_value = False

    print('')
    print('Aborted.')

  output_writer.Close()

  return return_value