From 79a20647f3448d4aaab042be16d91f2b6f729cc5 Mon Sep 17 00:00:00 2001 From: MSAdministrator Date: Fri, 26 Aug 2022 15:47:16 -0500 Subject: [PATCH] dev: Updating tests for processor class --- .flake8 | 2 +- src/atomic_operator_runner/processor.py | 4 +- tests/test_processor.py | 90 +++++++++++++++++++++++++ 3 files changed, 93 insertions(+), 3 deletions(-) diff --git a/.flake8 b/.flake8 index 73b36c8..0b70986 100644 --- a/.flake8 +++ b/.flake8 @@ -4,6 +4,6 @@ ignore = E203,E501,RST201,RST203,RST301,W503,S601,S404 max-line-length = 120 max-complexity = 10 docstring-convention = google -per-file-ignores = tests/*:S101 +per-file-ignores = tests/*:S101,B950 rst-roles = class,const,func,meth,mod,ref rst-directives = deprecated diff --git a/src/atomic_operator_runner/processor.py b/src/atomic_operator_runner/processor.py index 4f45859..eb3270c 100644 --- a/src/atomic_operator_runner/processor.py +++ b/src/atomic_operator_runner/processor.py @@ -65,7 +65,7 @@ def _capture_base_records(self, data: Any) -> None: else: self.response.records.extend(record) - def _parse_data_record(self, data: Any, type: str) -> BaseRecord: + def _parse_data_record(self, data: Any, record_type: str) -> BaseRecord: """Parses the InformationRecord data out of the response stream.""" extra_dict = {} for i in dir(data): @@ -76,7 +76,7 @@ def _parse_data_record(self, data: Any, type: str) -> BaseRecord: else: message = data.message data_dict = { - "type": type, + "type": record_type, "message-data": message, "source": data.source if hasattr(data, "source") else None, "time_generated": data.time_generated if hasattr(data, "time_generated") else None, diff --git a/tests/test_processor.py b/tests/test_processor.py index 1b6589e..7743ee5 100644 --- a/tests/test_processor.py +++ b/tests/test_processor.py @@ -21,6 +21,74 @@ } +class SamplePSDataStreams: + """Sample PSDataStreams class.""" + + def __init__(self) -> None: + """Example.""" + type_property = random.choice(["error", "debug", "information", "verbose", "warning"]) + + setattr(self, type_property, [SampleErrorRecordMessage()]) + + +class SampleErrorRecordMessage: + """A sample ErrorRecordMessage class.""" + + extra = { + "MESSAGE_TYPE": "266245", + "action": "None", + "activity": "Invoke-Expression", + "category": "17", + "command_definition": "None", + "command_name": "None", + "command_type": "None", + "command_visibility": "None", + "details_message": "None", + "exception": "System.Management.Automation.ParseException: At line:1 char:12\r\n+ Get-Service'\r\n+ ~\nThe string is missing the terminator: '.\r\n at System.Management.Automation.ScriptBlock.Create(Parser parser, String fileName, String fileContents)\r\n at System.Management.Automation.ScriptBlock.Create(ExecutionContext context, String script)\r\n at Microsoft.PowerShell.Commands.InvokeExpressionCommand.ProcessRecord()\r\n at System.Management.Automation.CommandProcessor.ProcessRecord()", + "extended_info_present": "False", + "fq_error": "TerminatorExpectedAtEndOfString,Microsoft.PowerShell.Commands.InvokeExpressionCommand", + "invocation": "False", + "invocation_bound_parameters": "None", + "invocation_command_origin": "None", + "invocation_expecting_input": "None", + "invocation_history_id": "None", + "invocation_info": "System.Management.Automation.InvocationInfo", + "invocation_line": "None", + "invocation_name": "None", + "invocation_offset_in_line": "None", + "invocation_pipeline_iteration_info": "None", + "invocation_pipeline_length": "None", + "invocation_pipeline_position": "None", + "invocation_position_message": "None", + "invocation_script_line_number": "None", + "invocation_script_name": "None", + "invocation_unbound_arguments": "None", + "message": "ParserError: (:) [Invoke-Expression], ParseException", + "pipeline_iteration_info": "None", + "reason": "ParseException", + "script_stacktrace": "None", + "target_info": "None", + "target_name": "", + "target_object": "None", + "target_type": "", + } + + def __init__(self, type_property: str = "error") -> None: + """Example.""" + self.type = type_property + self.message_data = "ParserError: (:) [Invoke-Expression], ParseException" + self.source = None + self.time_generated = None + self.user = None + self.computer = None + self.pid = None + self.native_thread_id = None + self.managed_thread_id = None + + for key, val in self.extra.items(): + setattr(self, key, val) + + def test_processor(): """Tests Processor class.""" from atomic_operator_runner.models import RunnerResponse @@ -58,3 +126,25 @@ def test_capture_base_records(): processor._capture_base_records(data=SAMPLE_BASE_RECORD) for key, _val in SAMPLE_BASE_RECORD.items(): assert hasattr(processor.response.records[0], key) + + +def test_parse_data_record(): + """Testing parse_data_record method.""" + from atomic_operator_runner.models import BaseRecord + from atomic_operator_runner.processor import Processor + + processor = Processor(**SAMPLE_DATA) + + response = processor._parse_data_record(data=SampleErrorRecordMessage(), record_type="error") + assert isinstance(response, BaseRecord) + + +def test_handle_windows_streams(): + """Testing handle_windows_streams method.""" + from atomic_operator_runner.processor import Processor + + processor = Processor(**SAMPLE_DATA) + + response = processor._handle_windows_streams(stream=SamplePSDataStreams()) + assert isinstance(response, list) + assert len(response) == 1