Skip to content

Commit

Permalink
Code review: 319020043: Fixes for tests on AppVeyor log2timeline#1108
Browse files Browse the repository at this point in the history
  • Loading branch information
joachimmetz committed Jan 3, 2017
1 parent 2d08359 commit 6784927
Show file tree
Hide file tree
Showing 7 changed files with 134 additions and 104 deletions.
2 changes: 1 addition & 1 deletion config/dpkg/changelog
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@ plaso (1.5.2-1) unstable; urgency=low

* Auto-generated

-- Log2Timeline <[email protected]> Tue, 20 Dec 2016 14:30:59 +0100
-- Log2Timeline <[email protected]> Tue, 03 Jan 2017 08:12:28 +0100
2 changes: 1 addition & 1 deletion plaso/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
__version__ = '1.5.2'

VERSION_DEV = True
VERSION_DATE = '20161220'
VERSION_DATE = '20170103'


def GetVersion():
Expand Down
72 changes: 40 additions & 32 deletions tests/cli/status_view_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
"""Tests for the StatusView tool object."""

import unittest
import sys

import plaso

from plaso.cli.status_view_tool import StatusViewTool
from plaso.cli import tools
from plaso.engine import processing_status
Expand Down Expand Up @@ -49,20 +51,26 @@ def testPrintStatusUpdate(self):

string = output_writer.ReadOutput()

expected_string = (
b'Source path\t: /test/source/path\n'
b'Source type\t: TESTSOURCE\n'
b'\n'
b'plaso - version {0:s}\n'
b'\n'
b'Source path\t: /test/source/path\n'
b'Source type\t: TESTSOURCE\n'
b'\n'
b'\x1b[1mIdentifier\tPID\tStatus\t\tSources\t\tEvents\t\tFile\x1b[0m\n'
b'f_identifier\t123\tf_status\t29 (29)\t\t456 (456)\tf_test_file\n'
b'\n'
).format(plaso.GetVersion())
self.assertEqual(string.split(b'\n'), expected_string.split(b'\n'))
plaso_version = plaso.GetVersion()

table_header = b'Identifier\tPID\tStatus\t\tSources\t\tEvents\t\tFile'
if not sys.platform.startswith(u'win'):
table_header = b'\x1b[1m{0:s}\x1b[0m'.format(table_header)

expected_lines = [
b'Source path\t: /test/source/path',
b'Source type\t: TESTSOURCE',
b'',
b'plaso - version {0:s}'.format(plaso_version),
b'',
b'Source path\t: /test/source/path',
b'Source type\t: TESTSOURCE',
b'',
table_header,
b'f_identifier\t123\tf_status\t29 (29)\t\t456 (456)\tf_test_file',
b'',
b'']
self.assertEqual(string.split(b'\n'), expected_lines)

process_status.UpdateWorkerStatus(
u'w_identifier', u'w_status', 123,
Expand All @@ -71,18 +79,18 @@ def testPrintStatusUpdate(self):
status_view_tool._PrintStatusUpdate(process_status)
string = output_writer.ReadOutput()

expected_string = (
b'plaso - version {0:s}\n'
b'\n'
b'Source path\t: /test/source/path\n'
b'Source type\t: TESTSOURCE\n'
b'\n'
b'\x1b[1mIdentifier\tPID\tStatus\t\tSources\t\tEvents\t\tFile\x1b[0m\n'
b'f_identifier\t123\tf_status\t29 (29)\t\t456 (456)\tf_test_file\n'
b'w_identifier\t123\tw_status\t2 (2)\t\t4 (4)\t\tw_test_file\n'
b'\n'
).format(plaso.GetVersion())
self.assertEqual(string.split(b'\n'), expected_string.split(b'\n'))
expected_lines = [
b'plaso - version {0:s}'.format(plaso_version),
b'',
b'Source path\t: /test/source/path',
b'Source type\t: TESTSOURCE',
b'',
table_header,
b'f_identifier\t123\tf_status\t29 (29)\t\t456 (456)\tf_test_file',
b'w_identifier\t123\tw_status\t2 (2)\t\t4 (4)\t\tw_test_file',
b'',
b'']
self.assertEqual(string.split(b'\n'), expected_lines)

def testPrintStatusUpdateStream(self):
"""Tests the PrintStatusUpdateStream function."""
Expand All @@ -103,12 +111,12 @@ def testPrintStatusUpdateStream(self):

string = output_writer.ReadOutput()

expected_string = (
b'Source path\t: /test/source/path\n'
b'Source type\t: TESTSOURCE\n'
b'\n'
)
self.assertEqual(string.split(b'\n'), expected_string.split(b'\n'))
expected_lines = [
b'Source path\t: /test/source/path',
b'Source type\t: TESTSOURCE',
b'',
b'']
self.assertEqual(string.split(b'\n'), expected_lines)

process_status.UpdateWorkerStatus(
u'w_identifier', u'w_status', 123,
Expand Down
59 changes: 46 additions & 13 deletions tests/engine/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,23 @@ class EventExtractionWorkerTest(shared_test_lib.BaseTestCase):

def _TestProcessPathSpec(
self, storage_writer, path_spec, extraction_worker=None,
process_archives=False):
knowledge_base_values=None, process_archives=False):
"""Tests processing a path specification.
Args:
storage_writer (StorageWriter): storage writer.
path_spec (dfvfs.PathSpec): path specification.
extraction_worker (Optional[EventExtractorWorker]): worker to process the
pathspec. If None, a new worker will be created.
knowledge_base_values (Optional[dict]): knowledge base values.
process_archives (Optional[bool]): whether archive files should be
processed.
"""
knowledge_base_object = knowledge_base.KnowledgeBase()
if knowledge_base_values:
for identifier, value in iter(knowledge_base_values.items()):
knowledge_base_object.SetValue(identifier, value)

mediator = parsers_mediator.ParserMediator(
storage_writer, knowledge_base_object)

Expand All @@ -61,11 +66,18 @@ def _TestProcessPathSpec(
@shared_test_lib.skipUnlessHasTestFile([u'ímynd.dd'])
def testAnalyzeFileObject(self):
"""Tests the _AnalyzeFileObject function."""
knowledge_base_values = {u'year': 2016}
session = sessions.Session()

storage_writer = fake_storage.FakeStorageWriter(session)

knowledge_base_object = knowledge_base.KnowledgeBase()
if knowledge_base_values:
for identifier, value in iter(knowledge_base_values.items()):
knowledge_base_object.SetValue(identifier, value)

mediator = parsers_mediator.ParserMediator(
storage_writer, knowledge_base_object)
storage_writer, knowledge_base_object, preferred_year=2016)

resolver_context = context.Context()
extraction_worker = worker.EventExtractionWorker(resolver_context)
Expand Down Expand Up @@ -93,39 +105,46 @@ def testAnalyzeFileObject(self):
@shared_test_lib.skipUnlessHasTestFile([u'syslog'])
def testProcessPathSpecFile(self):
"""Tests the ProcessPathSpec function on a file."""
knowledge_base_values = {u'year': 2016}
session = sessions.Session()

path_spec = self._GetTestFilePathSpec([u'syslog'])
storage_writer = fake_storage.FakeStorageWriter(session)
self._TestProcessPathSpec(storage_writer, path_spec)
self._TestProcessPathSpec(
storage_writer, path_spec, knowledge_base_values=knowledge_base_values)

self.assertEqual(storage_writer.number_of_events, 19)

@shared_test_lib.skipUnlessHasTestFile([u'syslog.gz'])
def testProcessPathSpecCompressedFileGZIP(self):
"""Tests the ProcessPathSpec function on a gzip compressed file."""
knowledge_base_values = {u'year': 2016}
session = sessions.Session()

path_spec = self._GetTestFilePathSpec([u'syslog.gz'])
storage_writer = fake_storage.FakeStorageWriter(session)
self._TestProcessPathSpec(storage_writer, path_spec)
self._TestProcessPathSpec(
storage_writer, path_spec, knowledge_base_values=knowledge_base_values)

self.assertEqual(storage_writer.number_of_events, 16)

@shared_test_lib.skipUnlessHasTestFile([u'syslog.bz2'])
def testProcessPathSpecCompressedFileBZIP2(self):
"""Tests the ProcessPathSpec function on a bzip2 compressed file."""
knowledge_base_values = {u'year': 2016}
session = sessions.Session()

path_spec = self._GetTestFilePathSpec([u'syslog.bz2'])
storage_writer = fake_storage.FakeStorageWriter(session)
self._TestProcessPathSpec(storage_writer, path_spec)
self._TestProcessPathSpec(
storage_writer, path_spec, knowledge_base_values=knowledge_base_values)

self.assertEqual(storage_writer.number_of_events, 15)

@shared_test_lib.skipUnlessHasTestFile([u'syslog.tar'])
def testProcessPathSpec(self):
"""Tests the ProcessPathSpec function on an archive file."""
knowledge_base_values = {u'year': 2016}
session = sessions.Session()

source_path = self._GetTestFilePath([u'syslog.tar'])
Expand All @@ -136,28 +155,32 @@ def testProcessPathSpec(self):
parent=path_spec)

storage_writer = fake_storage.FakeStorageWriter(session)
self._TestProcessPathSpec(storage_writer, path_spec)
self._TestProcessPathSpec(
storage_writer, path_spec, knowledge_base_values=knowledge_base_values)

self.assertEqual(storage_writer.number_of_events, 13)

# Process an archive file without "process archive files" mode.
path_spec = self._GetTestFilePathSpec([u'syslog.tar'])
storage_writer = fake_storage.FakeStorageWriter(session)
self._TestProcessPathSpec(storage_writer, path_spec)
self._TestProcessPathSpec(
storage_writer, path_spec, knowledge_base_values=knowledge_base_values)

self.assertEqual(storage_writer.number_of_events, 3)

# Process an archive file with "process archive files" mode.
path_spec = self._GetTestFilePathSpec([u'syslog.tar'])
storage_writer = fake_storage.FakeStorageWriter(session)
self._TestProcessPathSpec(
storage_writer, path_spec, process_archives=True)
storage_writer, path_spec, knowledge_base_values=knowledge_base_values,
process_archives=True)

self.assertEqual(storage_writer.number_of_events, 16)

@shared_test_lib.skipUnlessHasTestFile([u'syslog.tgz'])
def testProcessPathSpecCompressedArchive(self):
"""Tests the ProcessPathSpec function on a compressed archive file."""
knowledge_base_values = {u'year': 2016}
session = sessions.Session()

source_path = self._GetTestFilePath([u'syslog.tgz'])
Expand All @@ -170,21 +193,24 @@ def testProcessPathSpecCompressedArchive(self):
parent=path_spec)

storage_writer = fake_storage.FakeStorageWriter(session)
self._TestProcessPathSpec(storage_writer, path_spec)
self._TestProcessPathSpec(
storage_writer, path_spec, knowledge_base_values=knowledge_base_values)

self.assertEqual(storage_writer.number_of_events, 13)

# Process an archive file with "process archive files" mode.
path_spec = self._GetTestFilePathSpec([u'syslog.tgz'])
storage_writer = fake_storage.FakeStorageWriter(session)
self._TestProcessPathSpec(
storage_writer, path_spec, process_archives=True)
storage_writer, path_spec, knowledge_base_values=knowledge_base_values,
process_archives=True)

self.assertEqual(storage_writer.number_of_events, 17)

@shared_test_lib.skipUnlessHasTestFile([u'image.vmdk'])
def testProcessPathSpecVMDK(self):
"""Tests the ProcessPathSpec function on a VMDK with symbolic links."""
knowledge_base_values = {u'year': 2016}
session = sessions.Session()

source_path = self._GetTestFilePath([u'image.vmdk'])
Expand All @@ -196,7 +222,8 @@ def testProcessPathSpecVMDK(self):
dfvfs_definitions.TYPE_INDICATOR_TSK, location=u'/',
parent=path_spec)
storage_writer = fake_storage.FakeStorageWriter(session)
self._TestProcessPathSpec(storage_writer, path_spec)
self._TestProcessPathSpec(
storage_writer, path_spec, knowledge_base_values=knowledge_base_values)

self.assertEqual(storage_writer.number_of_events, 18)

Expand All @@ -209,11 +236,14 @@ def testExtractionWorkerHashing(self):
extraction_worker.SetHashers(u'md5')
self.assertIn(u'hashing', extraction_worker.GetAnalyzerNames())

knowledge_base_values = {u'year': 2016}
session = sessions.Session()

path_spec = self._GetTestFilePathSpec([u'empty_file'])
storage_writer = fake_storage.FakeStorageWriter(session)
self._TestProcessPathSpec(
storage_writer, path_spec, extraction_worker=extraction_worker)
storage_writer, path_spec, extraction_worker=extraction_worker,
knowledge_base_values=knowledge_base_values)

empty_file_md5 = u'd41d8cd98f00b204e9800998ecf8427e'
for event in storage_writer.events:
Expand All @@ -234,11 +264,14 @@ def testExtractionWorkerYara(self):
extraction_worker.SetYaraRules(rule_string)
self.assertIn(u'yara', extraction_worker.GetAnalyzerNames())

knowledge_base_values = {u'year': 2016}
session = sessions.Session()

path_spec = self._GetTestFilePathSpec([u'test_pe.exe'])
storage_writer = fake_storage.FakeStorageWriter(session)
self._TestProcessPathSpec(
storage_writer, path_spec, extraction_worker=extraction_worker)
storage_writer, path_spec, extraction_worker=extraction_worker,
knowledge_base_values=knowledge_base_values)

expected_yara_match = u'PEfileBasic,PEfile'
for event in storage_writer.events:
Expand Down
34 changes: 20 additions & 14 deletions tests/parsers/syslog_plugins/cron.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,32 @@ class SyslogCronPluginTest(test_lib.SyslogPluginTestCase):
@shared_test_lib.skipUnlessHasTestFile([u'syslog_cron.log'])
def testParse(self):
"""Tests the parsing functionality on a sample file."""
knowledge_base_values = {u'year': 2015}

storage_writer = self._ParseFileWithPlugin(
[u'syslog_cron.log'], u'cron')
[u'syslog_cron.log'], u'cron',
knowledge_base_values=knowledge_base_values)

self.assertEqual(len(storage_writer.events), 9)

event = storage_writer.events[1]
self.assertEqual(cron.CronTaskRunEvent.DATA_TYPE, event.DATA_TYPE)
events = self._GetSortedEvents(storage_writer.events)

event = events[1]

self.assertEqual(event.data_type, cron.CronTaskRunEvent.DATA_TYPE)

expected_timestamp = timelib.Timestamp.CopyFromString(
u'2016-03-11 19:26:39')
self.assertEqual(expected_timestamp, event.timestamp)
u'2015-03-11 19:26:39')
self.assertEqual(event.timestamp, expected_timestamp)

expected_command = u'sleep $(( 1 * 60 )); touch /tmp/afile.txt'
self.assertEqual(expected_command, event.command)
expected_username = u'root'
self.assertEqual(expected_username, event.username)

event = storage_writer.events[8]
expected_command = u'/sbin/status.mycheck'
self.assertEqual(expected_command, event.command)
expected_pid = 31067
self.assertEqual(expected_pid, event.pid)
self.assertEqual(event.command, expected_command)

self.assertEqual(event.username, u'root')

event = events[7]
self.assertEqual(event.command, u'/sbin/status.mycheck')
self.assertEqual(event.pid, 31067)


if __name__ == '__main__':
Expand Down
Loading

0 comments on commit 6784927

Please sign in to comment.