diff --git a/CHANGELOG.md b/CHANGELOG.md index f6fb896a7..af91f9321 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -36,6 +36,7 @@ #### Experts - `intelmq.bots.experts.sieve.expert`: - For `:contains`, `=~` and `!~`, convert the value to string before matching avoiding an exception. If the value is a dict, convert the value to JSON (PR#2500 by Sebastian Wagner). + - Add support for variables in Sieve scripts (PR#2514 by Mikk Margus Möll, fixes #2486). - `intelmq.bots.experts.filter.expert`: - Treat value `false` for parameter `filter_regex` as false (PR#2499 by Sebastian Wagner). diff --git a/intelmq/bots/experts/sieve/expert.py b/intelmq/bots/experts/sieve/expert.py index 9bd723e7a..2c2ff9f5b 100644 --- a/intelmq/bots/experts/sieve/expert.py +++ b/intelmq/bots/experts/sieve/expert.py @@ -14,9 +14,9 @@ import re import traceback import operator +import json from datetime import datetime, timedelta, timezone -from json import dumps from typing import Callable, Dict, Optional, Union from enum import Enum, auto @@ -25,7 +25,7 @@ from intelmq.lib import utils from intelmq.lib.bot import ExpertBot from intelmq.lib.exceptions import MissingDependencyError -from intelmq.lib.message import Event +from intelmq.lib.message import Message from intelmq.lib.utils import parse_relative from intelmq.lib.harmonization import DateTime @@ -41,219 +41,303 @@ except ImportError: metamodel_from_file = None -CondMap = Dict[str, Callable[['SieveExpertBot', object, Event], bool]] - class Procedure(Enum): CONTINUE = auto() # continue processing subsequent statements (default) - KEEP = auto() # stop processing and keep event - DROP = auto() # stop processing and drop event + KEEP = auto() # stop processing and keep event + DROP = auto() # stop processing and drop event class SieveExpertBot(ExpertBot): """Filter and modify events based on a sieve-based language""" - _message_processed_verb = 'Forwarded' + + _message_processed_verb = "Forwarded" _harmonization = None - file: str = "/opt/intelmq/var/lib/bots/sieve/filter.sieve" # TODO: should be pathlib.Path + file: str = ( + "/opt/intelmq/var/lib/bots/sieve/filter.sieve" # TODO: should be pathlib.Path + ) + + def init(self) -> None: + if parse is None: + raise MissingDependencyError("pendulum") + + if not SieveExpertBot._harmonization: + harmonization_config = utils.load_configuration(HARMONIZATION_CONF_FILE) + SieveExpertBot._harmonization = harmonization_config["event"] + + self.metamodel = self.init_metamodel() + self.model = self.read_sieve_file(self.file, self.metamodel) + self.variables = {} + + @staticmethod + def check(parameters): + try: + harmonization_config = utils.load_configuration(HARMONIZATION_CONF_FILE) + SieveExpertBot._harmonization = harmonization_config["event"] + + grammarfile = os.path.join(os.path.dirname(__file__), "sieve.tx") + if not os.path.exists(grammarfile): + raise FileExistsError(f"Sieve grammar file not found: {grammarfile!r}.") + + metamodel = None + + try: + metamodel = metamodel_from_file(grammarfile) + except TextXError as e: + raise ValueError( + f"Could not process sieve grammar file. Error in ({e.line}, {e.col})." + ) + + if not os.path.exists(parameters["file"]): + raise ValueError(f'File does not exist: {parameters["file"]!r}') + + try: + metamodel.model_from_file(parameters["file"]) + except TextXError as e: + raise ValueError( + f'Could not process sieve file {parameters["file"]!r}. Error in ({e.line}, {e.col}).' + ) + except Exception: + return [ + [ + "error", + f"Validation of Sieve file failed with the following traceback: {traceback.format_exc()!r}", + ] + ] + + def process(self) -> None: + event = self.receive_message() + procedure = self.model_process(event) + + # forwarding decision + if procedure == Procedure.DROP: + self.acknowledge_message() + return + + paths = getattr(event, "path", ("_default",)) + if hasattr(paths, "values"): # PathValueList + paths = tuple(path.value for path in paths.values) + elif hasattr(paths, "value"): # SinglePathValue + paths = (paths.value,) + + for path in paths: + self.send_message(event, path=path) + + self.acknowledge_message() _string_op_map = { - '==': operator.eq, - '!=': operator.ne, - ':contains': lambda lhs, rhs: lhs.find(rhs) >= 0, - '=~': lambda lhs, rhs: re.search(rhs, lhs) is not None, - '!~': lambda lhs, rhs: re.search(rhs, lhs) is None, + "==": operator.eq, + "!=": operator.ne, + ":contains": lambda lhs, rhs: lhs.find(rhs) >= 0, + "=~": lambda lhs, rhs: re.search(rhs, lhs) is not None, + "!~": lambda lhs, rhs: re.search(rhs, lhs) is None, } _string_multi_op_map = { - ':in': lambda lhs, rhs: lhs in rhs, - ':containsany': lambda lhs, rhs: any(lhs.find(s) >= 0 for s in rhs), - ':regexin': lambda lhs, rhs: any(re.search(s, lhs) is not None for s in rhs), + ":in": lambda lhs, rhs: lhs in rhs, + ":containsany": lambda lhs, rhs: any(lhs.find(s) >= 0 for s in rhs), + ":regexin": lambda lhs, rhs: any(re.search(s, lhs) is not None for s in rhs), } _list_op_map = { - ':setequals': operator.eq, - ':overlaps': lambda l, r: not l.isdisjoint(r), - ':subsetof': set.issubset, - ':supersetof': set.issuperset, + ":setequals": operator.eq, + ":overlaps": lambda lhs, rhs: not lhs.isdisjoint(rhs), + ":subsetof": set.issubset, + ":supersetof": set.issuperset, } _numeric_op_map = { - '==': operator.eq, - '!=': operator.ne, - '<=': operator.le, - '>=': operator.ge, - '<': operator.lt, - '>': operator.gt, + "==": operator.eq, + "!=": operator.ne, + "<=": operator.le, + ">=": operator.ge, + "<": operator.lt, + ">": operator.gt, } _numeric_multi_op_map = { - ':in': lambda lhs, rhs: lhs in rhs, + ":in": lambda lhs, rhs: lhs in rhs, } _basic_math_op_map = { - '+=': operator.add, - '-=': operator.sub, + "+=": operator.add, + "-=": operator.sub, } _bool_op_map = { - '==': operator.eq, - '!=': operator.ne, - } - - _date_op_map = { - ':before': operator.lt, - ':after': operator.gt + "==": operator.eq, + "!=": operator.ne, } - _cond_map: CondMap = { - 'ExistMatch': lambda self, match, event: self.process_exist_match(match.key, match.op, event), - 'SingleStringMatch': lambda self, match, event: self.process_single_string_match(match.key, match.op, match.value, event), - 'MultiStringMatch': lambda self, match, event: self.process_multi_string_match(match.key, match.op, match.value, event), - 'SingleNumericMatch': lambda self, match, event: self.process_single_numeric_match(match.key, match.op, match.value, event), - 'MultiNumericMatch': lambda self, match, event: self.process_multi_numeric_match(match.key, match.op, match.value, event), - 'IpRangeMatch': lambda self, match, event: self.process_ip_range_match(match.key, match.range, event), - 'DateMatch': lambda self, match, event: self.process_date_match(match.key, match.op, match.date, event), - 'ListMatch': lambda self, match, event: self.process_list_match(match.key, match.op, match.value, event), - 'BoolMatch': lambda self, match, event: self.process_bool_match(match.key, match.op, match.value, event), - 'Expression': lambda self, match, event: self.match_expression(match, event), + _date_op_map = {":before": operator.lt, ":after": operator.gt} + + _cond_map: Dict[ + str, + Callable[ + [ + "SieveExpertBot", + object, + Message, + ], + bool, + ], + ] = { + "ExistMatch": lambda self, match, event: self.process_exist_match( + self.resolve_value(match.key, str), match.op, event + ), + "SingleStringMatch": lambda self, match, event: self.process_single_string_match( + self.resolve_value(match.key, str), + match.op, + self.resolve_value(match.value.value, str), + event, + ), + "MultiStringMatch": lambda self, match, event: self.process_multi_string_match( + self.resolve_value(match.key, str), + match.op, + [self.resolve_value(value.value, str) for value in match.value.values], + event, + ), + "SingleNumericMatch": lambda self, match, event: self.process_single_numeric_match( + self.resolve_value(match.key, str), + match.op, + self.resolve_value(match.value.value, (int, float)), + event, + ), + "MultiNumericMatch": lambda self, match, event: self.process_multi_numeric_match( + self.resolve_value(match.key, str), + match.op, + [ + self.resolve_value(value.value, (int, float)) + for value in match.value.values + ], + event, + ), + "IpRangeMatch": lambda self, match, event: self.process_ip_range_match( + self.resolve_value(match.key, str), match.range, event + ), + "DateMatch": lambda self, match, event: self.process_date_match( + self.resolve_value(match.key, str), match.op, match.date, event + ), + "ListMatch": lambda self, match, event: self.process_list_match( + self.resolve_value(match.key, str), match.op, match.value, event + ), + "BoolMatch": lambda self, match, event: self.process_bool_match( + self.resolve_value(match.key, str), match.op, match.value, event + ), + "Expression": lambda self, match, event: self.match_expression(match, event), } - def init(self) -> None: - if parse is None: - raise MissingDependencyError("pendulum") - - if not SieveExpertBot._harmonization: - harmonization_config = utils.load_configuration(HARMONIZATION_CONF_FILE) - SieveExpertBot._harmonization = harmonization_config['event'] - - self.metamodel = SieveExpertBot.init_metamodel() - self.sieve = SieveExpertBot.read_sieve_file(self.file, self.metamodel) - @staticmethod def init_metamodel(): if metamodel_from_file is None: raise MissingDependencyError("textx") try: - grammarfile = os.path.join(os.path.dirname(__file__), 'sieve.tx') + grammarfile = os.path.join(os.path.dirname(__file__), "sieve.tx") metamodel = metamodel_from_file(grammarfile) # apply custom validation rules - metamodel.register_obj_processors({ - 'SingleStringMatch': SieveExpertBot.validate_string_match, - 'MultiStringMatch': SieveExpertBot.validate_string_match, - 'SingleNumericMatch': SieveExpertBot.validate_numeric_match, - 'MultiNumericMatch': SieveExpertBot.validate_numeric_match, - 'SingleIpRange': SieveExpertBot.validate_ip_range, - }) + metamodel.register_obj_processors( + { + "SingleStringMatch": SieveExpertBot.validate_string_match, + "MultiStringMatch": SieveExpertBot.validate_string_match, + "SingleNumericMatch": SieveExpertBot.validate_numeric_match, + "MultiNumericMatch": SieveExpertBot.validate_numeric_match, + "SingleIpRange": SieveExpertBot.validate_ip_range, + } + ) return metamodel except TextXError as e: - raise ValueError(f'Could not process sieve grammar file. Error in ({e.line}, {e.col}): {e}') + raise ValueError( + f"Could not process sieve grammar file. Error in ({e.line}, {e.col}): {e}" + ) @staticmethod def read_sieve_file(filename, metamodel): if not os.path.exists(filename): - raise exceptions.InvalidArgument('file', got=filename, expected='existing file') + raise exceptions.InvalidArgument( + "file", got=filename, expected="existing file" + ) try: sieve = metamodel.model_from_file(filename) return sieve except TextXError as e: - raise ValueError(f'Could not parse sieve file {filename!r}, error in ({e.line}, {e.col}): {e}') - - @staticmethod - def check(parameters): - try: - harmonization_config = utils.load_configuration(HARMONIZATION_CONF_FILE) - SieveExpertBot._harmonization = harmonization_config['event'] - - grammarfile = os.path.join(os.path.dirname(__file__), 'sieve.tx') - if not os.path.exists(grammarfile): - raise FileExistsError(f'Sieve grammar file not found: {grammarfile!r}.') - - metamodel = None - - try: - metamodel = metamodel_from_file(grammarfile) - except TextXError as e: - raise ValueError(f'Could not process sieve grammar file. Error in ({e.line}, {e.col}).') - - if not os.path.exists(parameters['file']): - raise ValueError(f'File does not exist: {parameters["file"]!r}') - - try: - metamodel.model_from_file(parameters['file']) - except TextXError as e: - raise ValueError(f'Could not process sieve file {parameters["file"]!r}. Error in ({e.line}, {e.col}).') - except Exception: - return [['error', f'Validation of Sieve file failed with the following traceback: {traceback.format_exc()!r}']] + raise ValueError( + f"Could not parse sieve file {filename!r}, error in ({e.line}, {e.col}): {e}" + ) - def process(self) -> None: - event = self.receive_message() + def model_process(self, event): procedure = Procedure.CONTINUE - if self.sieve: # empty rules file results in empty string - for statement in self.sieve.statements: - procedure = self.process_statement(statement, event) - if procedure == Procedure.KEEP: - self.logger.debug(f'Stop processing based on statement at {self.get_linecol(statement)}: event.') - break - elif procedure == Procedure.DROP: - self.logger.debug(f'Dropped event based on statement at {self.get_linecol(statement)}: {event}.') - break - - # forwarding decision - if procedure != Procedure.DROP: - paths = getattr(event, "path", ("_default", )) - if hasattr(paths, 'values'): # PathValueList - paths = tuple(path.value for path in paths.values) - elif hasattr(paths, 'value'): # SinglePathValue - paths = (paths.value, ) - # else: default value -> pass - for path in paths: - self.send_message(event, path=path) - - self.acknowledge_message() + self.variables.clear() + if not self.model: # empty rules file results in empty string + return procedure + + for statement in self.model.statements: + procedure = self.process_statement(statement, event) + if procedure == Procedure.KEEP: + self.logger.debug( + f"Stop processing based on statement at {self.get_linecol(statement)}: {event}." + ) + break + elif procedure == Procedure.DROP: + self.logger.debug( + f"Dropped event based on statement at {self.get_linecol(statement)}: {event}." + ) + break + + return procedure def process_statement(self, statement, event): name = statement.__class__.__name__ - if name == 'Branching': + if name == "Branching": return self.process_branching(statement, event) - elif name == 'Action': + elif name == "Action": return self.process_action(statement.action, event) + raise TextXSemanticError( + f"unexpected statement class {name} in process_statement." + ) def process_branching(self, rule, event) -> Procedure: - # process optional 'if' clause - if rule.if_: - result = self.process_clause(rule.if_, event) - if isinstance(result, Procedure): - return result + # process 'if' clause + result = self.process_clause(rule.if_, event) + if result: + return result # process optional 'elif' clauses for clause in rule.elif_: result = self.process_clause(clause, event) - if isinstance(result, Procedure): + if result: return result # process optional 'else' clause if rule.else_: result = self.process_clause(rule.else_, event, True) - if isinstance(result, Procedure): + if result: return result return Procedure.CONTINUE def process_clause(self, clause, event, else_clause=False) -> Optional[Procedure]: - if else_clause or self.match_expression(clause.expr, event): - self.logger.debug(f'Matched event based on rule at {self.get_linecol(clause)}: {event}.') + if not (else_clause or self.match_expression(clause.expr, event)): + return None + + self.logger.debug( + f"Matched event based on rule at {self.get_linecol(clause)}: {event}." + ) + + for procedure in ( + self.process_statement(statement, event) for statement in clause.statements + ): + if procedure != Procedure.CONTINUE: + return procedure - for procedure in (self.process_statement(statement, event) for statement in clause.statements): - if procedure != Procedure.CONTINUE: - return procedure - if not else_clause: - return Procedure.CONTINUE - return None + if else_clause: + return None + + return Procedure.CONTINUE def match_expression(self, expr, event) -> bool: return any(self.process_conjunction(conj, event) for conj in expr.conj) @@ -263,48 +347,45 @@ def process_conjunction(self, conj, event) -> bool: def process_condition(self, cond, event) -> bool: name = cond.match.__class__.__name__ - return cond.neg ^ self._cond_map[name](self, cond.match, event) + ret = self._cond_map[name](self, cond.match, event) + return not ret if cond.neg else ret - @staticmethod - def process_exist_match(key, op, event) -> bool: - return (key in event) ^ (op == ':notexists') + def process_exist_match(self, key, op, event) -> bool: + ret = key in event + if op == ":notexists": + ret = not ret + return ret def process_single_string_match(self, key, op, value, event) -> bool: if key not in event: - return op in {'!=', '!~'} + return op in {"!=", "!~"} lhs = event[key] - if not isinstance(lhs, str) and op not in ('==', '!='): + if not isinstance(lhs, str) and op not in ("==", "!="): if isinstance(lhs, dict): - lhs = dumps(lhs) + lhs = json.dumps(lhs) else: lhs = str(lhs) - return self._string_op_map[op](lhs, value.value) + return self._string_op_map[op](lhs, value) - def process_multi_string_match(self, key, op, value, event) -> bool: + def process_multi_string_match(self, key, op, values, event) -> bool: if key not in event: return False - return self._string_multi_op_map[op](event[key], (v.value for v in value.values)) + return self._string_multi_op_map[op](event[key], values) def process_single_numeric_match(self, key, op, value, event) -> bool: if key not in event: return False - return self._numeric_op_map[op](event[key], value.value) + return self._numeric_op_map[op](event[key], value) - def process_multi_numeric_match(self, key, op, value, event) -> bool: + def process_multi_numeric_match(self, key, op, values, event) -> bool: if key not in event: return False - return self._numeric_multi_op_map[op](event[key], (v.value for v in value.values)) - - if name == 'SingleNumericValue': - return self.process_numeric_operator(event[key], op, value.value) - elif name == 'NumericValueList': - return any(self.process_numeric_operator(event[key], op, val.value) for val in value.values) - raise TextXSemanticError(f'Unhandled type: {name}') + return self._numeric_multi_op_map[op](event[key], values) def process_ip_range_match(self, key, ip_range, event) -> bool: if key not in event: @@ -313,19 +394,24 @@ def process_ip_range_match(self, key, ip_range, event) -> bool: try: addr = ipaddress.ip_address(event[key]) except ValueError: - self.logger.warning(f'Could not parse IP address {key}={event[key]} in {event}.') + self.logger.warning( + f"Could not parse IP address {key}={event[key]} in {event}." + ) return False name = ip_range.__class__.__name__ - if name == 'SingleIpRange': + if name == "SingleIpRange": return addr in ipaddress.ip_network(ip_range.value, strict=False) - elif name == 'IpRangeList': - return any(addr in ipaddress.ip_network(val.value, strict=False) for val in ip_range.values) - raise TextXSemanticError(f'Unhandled type: {name}') + elif name == "IpRangeList": + return any( + addr in ipaddress.ip_network(val.value, strict=False) + for val in ip_range.values + ) + raise TextXSemanticError(f"Unhandled type: {name}") def parse_timeattr(self, time_attr) -> Union[datetime, timedelta]: - """ Parses relative or absolute time specification. """ + """Parses relative or absolute time specification.""" try: return parse(time_attr) except ValueError: @@ -354,7 +440,9 @@ def process_list_match(self, key, op, value, event) -> bool: lhs = event[key] rhs = value.values - return lhs == rhs if op == ':equals' else self._list_op_map[op](set(lhs), set(rhs)) + if op == ":equals": + return lhs == rhs + return self._list_op_map[op](set(lhs), set(rhs)) def process_bool_match(self, key, op, value, event): if not (key in event and isinstance(event[key], bool)): @@ -370,47 +458,51 @@ def compute_basic_math(self, action, event) -> str: def process_action(self, action, event) -> Procedure: name = action.__class__.__name__ - if action == 'drop': + if action == "drop": return Procedure.DROP - elif action == 'keep': + elif action == "keep": return Procedure.KEEP - elif name == 'PathAction': + elif name == "PathAction": event.path = action.path - elif name == 'AddAction': + elif name == "AddAction": if action.key not in event: value = action.value - if action.operator != '=': + if action.operator != "=": value = self.compute_basic_math(action, event) event.add(action.key, value) - elif name == 'AddForceAction': - value = action.value - if action.operator != '=': + elif name == "AddForceAction": + value = self.resolve_value(action.value) + if action.operator != "=": value = self.compute_basic_math(action, event) event.add(action.key, value, overwrite=True) - elif name == 'UpdateAction': + elif name == "UpdateAction": if action.key in event: value = action.value - if action.operator != '=': + if action.operator != "=": value = self.compute_basic_math(action, event) event.change(action.key, value) - elif name == 'RemoveAction': + elif name == "RemoveAction": if action.key in event: del event[action.key] - elif name == 'AppendAction': - if action.key in event: - if isinstance(event[action.key], list): # silently ignore existing non-list values - event[action.key].append(action.value) - else: - event[action.key] = [action.value] - elif name == 'AppendForceAction': - if action.key in event: - old_value = event[action.key] - list_ = old_value if isinstance(old_value, list) else [old_value] + elif name == "AppendAction": + if action.key not in event: + event.add(action.key, [action.value]) + # silently ignore existing non-list values + elif isinstance(event[action.key], list): + event[action.key].append(action.value) + elif name == "AppendForceAction": + if action.key not in event: + event.add(action.key, [action.value]) + elif isinstance(event[action.key], list): + event[action.key].append(action.value) else: - list_ = [] - - list_.append(action.value) - event.add(action.key, list_, overwrite=True) + event.add(action.key, [event[action.key], action.value], overwrite=True) + elif name == "VarSetAction": + if action.key not in event: + raise KeyError(f"{action.key} not present in event.") + self.variables[action.var.value] = event[action.key] + else: + raise TextXSemanticError(f"unknown name {name}.") return Procedure.CONTINUE @@ -420,11 +512,11 @@ def validate_ip_range(ip_range) -> None: ipaddress.ip_network(ip_range.value, strict=False) except ValueError: position = SieveExpertBot.get_linecol(ip_range, as_dict=True) - raise TextXSemanticError(f'Invalid ip range: {ip_range.value}.', **position) + raise TextXSemanticError(f"Invalid ip range: {ip_range.value}.", **position) @staticmethod def validate_numeric_match(num_match) -> None: - """ Validates a numeric match expression. + """Validates a numeric match expression. Checks if the event key (given on the left hand side of the expression) is of a valid type for a numeric match, according the the IntelMQ harmonization. @@ -432,20 +524,20 @@ def validate_numeric_match(num_match) -> None: Raises: TextXSemanticError: when the key is of an incompatible type for numeric match expressions. """ - valid_types = {'Integer', 'Float', 'Accuracy', 'ASN'} + valid_types = {"Integer", "Float", "Accuracy", "ASN"} position = SieveExpertBot.get_linecol(num_match.value, as_dict=True) # validate harmonization type (event key) try: - type = SieveExpertBot._harmonization[num_match.key]['type'] + type = SieveExpertBot._harmonization[num_match.key]["type"] if type not in valid_types: - raise TextXSemanticError(f'Incompatible type: {type}.', **position) + raise TextXSemanticError(f"Incompatible type: {type}.", **position) except KeyError: - raise TextXSemanticError(f'Invalid key: {num_match.key}.', **position) + raise TextXSemanticError(f"Invalid key: {num_match.key}.", **position) @staticmethod def validate_string_match(str_match) -> None: - """ Validates a string match expression. + """Validates a string match expression. Checks if the type of the value given on the right hand side of the expression matches the event key in the left hand side, according to the IntelMQ harmonization. @@ -455,12 +547,16 @@ def validate_string_match(str_match) -> None: """ # validate IPAddress - ipaddr_types = (k for k, v in SieveExpertBot._harmonization.items() if v['type'] == 'IPAddress') + ipaddr_types = ( + k + for k, v in SieveExpertBot._harmonization.items() + if v["type"] == "IPAddress" + ) if str_match.key in ipaddr_types: name = str_match.value.__class__.__name__ - if name == 'SingleStringValue': + if name == "SingleStringValue": SieveExpertBot.validate_ip_address(str_match.value) - elif name == 'StringValueList': + elif name == "StringValueList": for val in str_match.value.values: SieveExpertBot.validate_ip_address(val) @@ -470,11 +566,11 @@ def validate_ip_address(ipaddr) -> None: ipaddress.ip_address(ipaddr.value) except ValueError: position = SieveExpertBot.get_linecol(ipaddr, as_dict=True) - raise TextXSemanticError(f'Invalid IP address: {ipaddr.value}.', **position) + raise TextXSemanticError(f"Invalid IP address: {ipaddr.value}.", **position) @staticmethod def get_linecol(model_obj, as_dict=False): - """ Gets the position of a model object in the sieve file. + """Gets the position of a model object in the sieve file. Args: model_obj: the model object @@ -486,14 +582,24 @@ def get_linecol(model_obj, as_dict=False): """ # The __version__ attribute is first available with version 1.7.0 - if hasattr(textx, '__version__'): + if hasattr(textx, "__version__"): parser = textx.model.get_model(model_obj)._tx_parser else: parser = textx.model.metamodel(model_obj).parser tup = parser.pos_to_linecol(model_obj._tx_position) if as_dict: - return dict(zip(['line', 'col'], tup)) + return dict(zip(["line", "col"], tup)) return tup + def resolve_value(self, val, expected_types=None): + if not val.__class__.__name__ == "Variable": + return val + if val.value not in self.variables: + raise NameError(f"Sieve variable {val.value} used before definition.") + ret = self.variables[val.value] + if not (expected_types is None or isinstance(ret, expected_types)): + raise ValueError(f"Expected {expected_types} variable, got {type(ret)}.") + return ret + BOT = SieveExpertBot diff --git a/intelmq/bots/experts/sieve/sieve.tx b/intelmq/bots/experts/sieve/sieve.tx index d9b81f2d6..d4314bd1e 100644 --- a/intelmq/bots/experts/sieve/sieve.tx +++ b/intelmq/bots/experts/sieve/sieve.tx @@ -26,21 +26,22 @@ Condition: ) ; +Variable: value=/\$[a-z0-9_]+/; StringMatch: SingleStringMatch | MultiStringMatch; SingleStringMatch: key=Key op=SingleStringOperator value=SingleStringValue; MultiStringMatch: key=Key op=MultiStringOperator value=StringValueList; -SingleStringOperator: - '==' // compares two whole strings with each other +SingleStringOperator + : '==' // compares two whole strings with each other | '!=' // test for string inequality | ':contains' // sub-string match | '=~' // match strings according to regular expression | '!~' // inverse match with regular expression ; -MultiStringOperator: - ':in' // tests if a string is in a list of strings +MultiStringOperator + : ':in' // tests if a string is in a list of strings | ':containsany' // sub-string match against multiple substrings | ':regexin' // match a string against at least one of a list of regex patterns ; @@ -50,8 +51,8 @@ NumericMatch: SingleNumericMatch | MultiNumericMatch; SingleNumericMatch: key=Key op=SingleNumericOperator value=SingleNumericValue; MultiNumericMatch: key=Key op=MultiNumericOperator value=NumericValueList; -SingleNumericOperator: - '==' // equal +SingleNumericOperator + : '==' // equal | '!=' // not equal | '<=' // less than or equal | '>=' // greater than or equal @@ -59,8 +60,8 @@ SingleNumericOperator: | '>' // greater than ; -MultiNumericOperator: - ':in' // tests if number is in a list of numbers +MultiNumericOperator + : ':in' // tests if number is in a list of numbers ; @@ -76,33 +77,35 @@ ExistMatch: op=ExistOperator key=Key; ExistOperator: ':exists' | ':notexists'; ListMatch: key=Key op=ListOperator value=ListValue; -ListOperator: - ':equals' // lists are equal, including order +ListOperator + : ':equals' // lists are equal, including order | ':setequals' // lists contain the same elements, ignoring order and repeating values | ':overlaps' // lists contain one or more common values | ':subsetof' // key is a proper subset of value | ':supersetof' // key is a proper superset of value ; -BoolMatch: key=Key op=BoolOperator value=BOOL; +Bool: BOOL | Variable; +BoolMatch: key=Key op=BoolOperator value=Bool; BoolOperator: '==' | '!='; Negation: '!'; -TypedValue: STRICTFLOAT | INT | BOOL | STRING; +TypedValue: STRICTFLOAT | INT | BOOL | STRING | Variable; ListValue: '[' values*=TypedValue[','] ']' ; -Key: /[a-z0-9_\.]+/; +KeyLiteral: /[a-z0-9_\.]+/; +Key: KeyLiteral | Variable; -SingleStringValue: value=STRING ; +SingleStringValue: value=STRING | value=Variable ; StringValueList: '[' values+=SingleStringValue[','] ']' ; -SingleNumericValue: value=NUMBER ; +SingleNumericValue: value=NUMBER | value=Variable ; NumericValueList: '[' values+=SingleNumericValue[','] ']' ; -AssignOperator: - '=' +AssignOperator + : '=' | '+=' | '-=' ; @@ -120,7 +123,9 @@ Action: action=DropAction | action=RemoveAction | action=AppendAction | action=AppendForceAction + | action=VarSetAction ); + DropAction: 'drop'; KeepAction: 'keep'; PathAction: 'path' path=PathValue; @@ -131,4 +136,5 @@ RemoveAction: 'remove' key=Key; AppendAction: 'append' key=Key value=TypedValue; // append an element to a list if it exists AppendForceAction: 'append!' key=Key value=TypedValue; // append an element to a list; converts a single item to a list with one element // or creates a new list if the key is not included yet +VarSetAction: var=Variable '=' key=KeyLiteral ; // set variable to value given in key Comment: /(#|\/\/).*$/ ; diff --git a/intelmq/tests/bots/experts/sieve/test_expert.py b/intelmq/tests/bots/experts/sieve/test_expert.py index caf23b03c..20023b413 100644 --- a/intelmq/tests/bots/experts/sieve/test_expert.py +++ b/intelmq/tests/bots/experts/sieve/test_expert.py @@ -11,15 +11,17 @@ import intelmq.lib.test as test from intelmq.bots.experts.sieve.expert import SieveExpertBot -EXAMPLE_INPUT = {"__type": "Event", - "source.ip": "127.0.0.1", - "source.abuse_contact": "abuse@example.com", - "time.observation": "2017-01-01T00:00:00+00:00", - } +EXAMPLE_INPUT = { + "__type": "Event", + "source.ip": "127.0.0.1", + "source.abuse_contact": "abuse@example.com", + "time.observation": "2017-01-01T00:00:00+00:00", +} -EXAMPLE_MD5 = {"__type": "Event", - "malware.hash.md5": "0904631316551", - } +EXAMPLE_MD5 = { + "__type": "Event", + "malware.hash.md5": "0904631316551", +} @test.skip_exotic() @@ -33,34 +35,36 @@ def set_bot(cls): cls.bot_reference = SieveExpertBot def test_if_clause(self): - """ Test processing of subsequent if clauses. """ - self.sysconfig['file'] = os.path.join(os.path.dirname(__file__), 'test_sieve_files/test_if_clause.sieve') + """Test processing of subsequent if clauses.""" + self.sysconfig["file"] = os.path.join( + os.path.dirname(__file__), "test_sieve_files/test_if_clause.sieve" + ) # assert first if clause is matched event1 = EXAMPLE_INPUT.copy() - event1['comment'] = 'changeme' + event1["comment"] = "changeme" expected1 = EXAMPLE_INPUT.copy() - expected1['comment'] = 'changed' + expected1["comment"] = "changed" self.input_message = event1 self.run_bot() self.assertMessageEqual(0, expected1) # assert second if clause is matched event2 = EXAMPLE_INPUT.copy() - event2['source.ip'] = '192.168.0.1' + event2["source.ip"] = "192.168.0.1" expected2 = EXAMPLE_INPUT.copy() - expected2['source.ip'] = '192.168.0.2' + expected2["source.ip"] = "192.168.0.2" self.input_message = event2 self.run_bot() self.assertMessageEqual(0, expected2) # assert both if clauses are matched event3 = EXAMPLE_INPUT.copy() - event3['comment'] = 'changeme' - event3['source.ip'] = '192.168.0.1' + event3["comment"] = "changeme" + event3["source.ip"] = "192.168.0.1" expected3 = EXAMPLE_INPUT.copy() - expected3['comment'] = 'changed' - expected3['source.ip'] = '192.168.0.2' + expected3["comment"] = "changed" + expected3["source.ip"] = "192.168.0.2" self.input_message = event3 self.run_bot() self.assertMessageEqual(0, expected3) @@ -72,145 +76,154 @@ def test_if_clause(self): self.assertMessageEqual(0, event4) def test_if_else_clause(self): - """ Test processing else clause. """ - self.sysconfig['file'] = os.path.join(os.path.dirname(__file__), 'test_sieve_files/test_if_else_clause.sieve') + """Test processing else clause.""" + self.sysconfig["file"] = os.path.join( + os.path.dirname(__file__), "test_sieve_files/test_if_else_clause.sieve" + ) # assert that event matches if clause event1 = EXAMPLE_INPUT.copy() - event1['comment'] = 'match' + event1["comment"] = "match" expected1 = EXAMPLE_INPUT.copy() - expected1['comment'] = 'matched' + expected1["comment"] = "matched" self.input_message = event1 self.run_bot() self.assertMessageEqual(0, expected1) # assert that action in else clause is applied event2 = EXAMPLE_INPUT.copy() - event2['comment'] = 'foobar' + event2["comment"] = "foobar" expected2 = EXAMPLE_INPUT.copy() - expected2['comment'] = 'notmatched' + expected2["comment"] = "notmatched" self.input_message = event2 self.run_bot() self.assertMessageEqual(0, expected2) def test_if_elif_clause(self): - """ Test processing elif clauses. """ - self.sysconfig['file'] = os.path.join(os.path.dirname(__file__), 'test_sieve_files/test_if_elif_clause.sieve') + """Test processing elif clauses.""" + self.sysconfig["file"] = os.path.join( + os.path.dirname(__file__), "test_sieve_files/test_if_elif_clause.sieve" + ) # test match if clause event = EXAMPLE_INPUT.copy() - event['comment'] = 'match1' + event["comment"] = "match1" expected = EXAMPLE_INPUT.copy() - expected['comment'] = 'changed1' + expected["comment"] = "changed1" self.input_message = event self.run_bot() self.assertMessageEqual(0, expected) # test match first elif clause - event['comment'] = 'match2' - expected['comment'] = 'changed2' + event["comment"] = "match2" + expected["comment"] = "changed2" self.input_message = event self.run_bot() self.assertMessageEqual(0, expected) # test match second elif clause - event['comment'] = 'match3' - expected['comment'] = 'changed3' + event["comment"] = "match3" + expected["comment"] = "changed3" self.input_message = event self.run_bot() self.assertMessageEqual(0, expected) # test no match - event['comment'] = 'foobar' + event["comment"] = "foobar" self.input_message = event self.run_bot() self.assertMessageEqual(0, event) def test_if_elif_else_clause(self): - """ Test processing if, elif, and else clause. """ - self.sysconfig['file'] = os.path.join(os.path.dirname(__file__), - 'test_sieve_files/test_if_elif_else_clause.sieve') + """Test processing if, elif, and else clause.""" + self.sysconfig["file"] = os.path.join( + os.path.dirname(__file__), "test_sieve_files/test_if_elif_else_clause.sieve" + ) # test match if clause event = EXAMPLE_INPUT.copy() - event['comment'] = 'match1' + event["comment"] = "match1" expected = EXAMPLE_INPUT.copy() - expected['comment'] = 'changed1' + expected["comment"] = "changed1" self.input_message = event self.run_bot() self.assertMessageEqual(0, expected) # test match elif clause - event['comment'] = 'match2' - expected['comment'] = 'changed2' + event["comment"] = "match2" + expected["comment"] = "changed2" self.input_message = event self.run_bot() self.assertMessageEqual(0, expected) # test match else clause - event['comment'] = 'match3' - expected['comment'] = 'changed3' + event["comment"] = "match3" + expected["comment"] = "changed3" self.input_message = event self.run_bot() self.assertMessageEqual(0, expected) def test_or_match(self): - """ Test Or Operator in match""" - self.sysconfig['file'] = os.path.join(os.path.dirname(__file__), 'test_sieve_files/test_or_match.sieve') + """Test Or Operator in match""" + self.sysconfig["file"] = os.path.join( + os.path.dirname(__file__), "test_sieve_files/test_or_match.sieve" + ) # Expressions: TRUE || TRUE => TRUE truetrue = EXAMPLE_INPUT.copy() - truetrue['comment'] = "I am TRUE in OR clause" + truetrue["comment"] = "I am TRUE in OR clause" truetrue_result = truetrue.copy() - truetrue_result['source.ip'] = "10.9.8.7" + truetrue_result["source.ip"] = "10.9.8.7" self.input_message = truetrue self.run_bot() self.assertMessageEqual(0, truetrue_result) # Expressions: TRUE || FALSE => TRUE truefalse = EXAMPLE_INPUT.copy() - truefalse['comment'] = "I am NOT True in OR clause" + truefalse["comment"] = "I am NOT True in OR clause" truefalse_result = truefalse.copy() - truefalse_result['source.ip'] = "10.9.8.7" + truefalse_result["source.ip"] = "10.9.8.7" self.input_message = truefalse self.run_bot() self.assertMessageEqual(0, truefalse_result) # Expressions: FALSE || TRUE => TRUE falsetrue = EXAMPLE_INPUT.copy() - falsetrue['source.abuse_contact'] = "test@test.eu" - falsetrue['comment'] = "I am TRUE in OR clause" + falsetrue["source.abuse_contact"] = "test@test.eu" + falsetrue["comment"] = "I am TRUE in OR clause" falsetrue_result = falsetrue.copy() - falsetrue_result['source.ip'] = "10.9.8.7" + falsetrue_result["source.ip"] = "10.9.8.7" self.input_message = falsetrue self.run_bot() self.assertMessageEqual(0, falsetrue_result) # Expressions: FALSE || FALSE => FALSE falsefalse = EXAMPLE_INPUT.copy() - falsefalse['source.abuse_contact'] = "test@test.eu" - falsefalse['comment'] = "I am NOT True in OR clause" + falsefalse["source.abuse_contact"] = "test@test.eu" + falsefalse["comment"] = "I am NOT True in OR clause" falsefalse_result = falsefalse.copy() self.input_message = falsefalse self.run_bot() self.assertMessageEqual(0, falsefalse_result) def test_and_match(self): - """ Test And Operator in match""" - self.sysconfig['file'] = os.path.join(os.path.dirname(__file__), 'test_sieve_files/test_and_match.sieve') + """Test And Operator in match""" + self.sysconfig["file"] = os.path.join( + os.path.dirname(__file__), "test_sieve_files/test_and_match.sieve" + ) # Expressions: TRUE && TRUE => TRUE truetrue = EXAMPLE_INPUT.copy() - truetrue['comment'] = "I am TRUE in AND clause" + truetrue["comment"] = "I am TRUE in AND clause" truetrue_result = truetrue.copy() - truetrue_result['source.ip'] = "10.9.8.7" + truetrue_result["source.ip"] = "10.9.8.7" self.input_message = truetrue self.run_bot() self.assertMessageEqual(0, truetrue_result) # Expressions: TRUE && FALSE => FALSE truefalse = EXAMPLE_INPUT.copy() - truefalse['comment'] = "I am NOT True in AND clause" + truefalse["comment"] = "I am NOT True in AND clause" truefalse_result = truefalse.copy() self.input_message = truefalse self.run_bot() @@ -218,8 +231,8 @@ def test_and_match(self): # Expressions: FALSE && TRUE => FALSE falsetrue = EXAMPLE_INPUT.copy() - falsetrue['source.abuse_contact'] = "test@test.eu" - falsetrue['comment'] = "I am TRUE in AND clause" + falsetrue["source.abuse_contact"] = "test@test.eu" + falsetrue["comment"] = "I am TRUE in AND clause" falsetrue_result = falsetrue.copy() self.input_message = falsetrue self.run_bot() @@ -227,53 +240,56 @@ def test_and_match(self): # Expressions: FALSE && FALSE => FALSE falsefalse = EXAMPLE_INPUT.copy() - falsefalse['source.abuse_contact'] = "test@test.eu" - falsefalse['comment'] = "I am NOT True in AND clause" + falsefalse["source.abuse_contact"] = "test@test.eu" + falsefalse["comment"] = "I am NOT True in AND clause" falsefalse_result = falsefalse.copy() self.input_message = falsefalse self.run_bot() self.assertMessageEqual(0, falsefalse_result) def test_precedence(self): - """ Test precedence of operators """ - self.sysconfig['file'] = os.path.join(os.path.dirname(__file__), 'test_sieve_files/test_precedence.sieve') + """Test precedence of operators""" + self.sysconfig["file"] = os.path.join( + os.path.dirname(__file__), "test_sieve_files/test_precedence.sieve" + ) # test && has higher precedence than || event = EXAMPLE_INPUT.copy() - event['feed.provider'] = 'acme' + event["feed.provider"] = "acme" expected = event.copy() - expected['comment'] = 'match1' + expected["comment"] = "match1" self.input_message = event self.run_bot() self.assertMessageEqual(0, expected) # test round braces to change precedence event = EXAMPLE_INPUT.copy() - event['source.abuse_contact'] = 'abuse@example.com' - event['source.ip'] = '5.6.7.8' + event["source.abuse_contact"] = "abuse@example.com" + event["source.ip"] = "5.6.7.8" expected = event.copy() - expected['comment'] = 'match2' + expected["comment"] = "match2" self.input_message = event self.run_bot() self.assertMessageEqual(0, expected) def test_string_equal_match(self): - """ Test == string match """ - self.sysconfig['file'] = os.path.join(os.path.dirname(__file__), - 'test_sieve_files/test_string_equal_match.sieve') + """Test == string match""" + self.sysconfig["file"] = os.path.join( + os.path.dirname(__file__), "test_sieve_files/test_string_equal_match.sieve" + ) # positive test event = EXAMPLE_INPUT.copy() - event['source.fqdn'] = 'www.example.com' + event["source.fqdn"] = "www.example.com" expected = event.copy() - expected['comment'] = 'match' + expected["comment"] = "match" self.input_message = event self.run_bot() self.assertMessageEqual(0, expected) # negative test (key doesn't match) event = EXAMPLE_INPUT.copy() - event['source.fqdn'] = 'www.hotmail.com' + event["source.fqdn"] = "www.hotmail.com" self.input_message = event self.run_bot() self.assertMessageEqual(0, event) @@ -285,15 +301,17 @@ def test_string_equal_match(self): self.assertMessageEqual(0, event) def test_string_not_equal_match(self): - """ Test != string match """ - self.sysconfig['file'] = os.path.join(os.path.dirname(__file__), - 'test_sieve_files/test_string_not_equal_match.sieve') + """Test != string match""" + self.sysconfig["file"] = os.path.join( + os.path.dirname(__file__), + "test_sieve_files/test_string_not_equal_match.sieve", + ) # positive test (key mismatch) event = EXAMPLE_INPUT.copy() - event['source.fqdn'] = 'mail.ru' + event["source.fqdn"] = "mail.ru" expected = event.copy() - expected['comment'] = 'match' + expected["comment"] = "match" self.input_message = event self.run_bot() self.assertMessageEqual(0, expected) @@ -301,35 +319,37 @@ def test_string_not_equal_match(self): # positive test (key undefined) event = EXAMPLE_INPUT.copy() expected = event.copy() - expected['comment'] = 'match' + expected["comment"] = "match" self.input_message = event self.run_bot() self.assertMessageEqual(0, expected) # negative test event = EXAMPLE_INPUT.copy() - event['source.fqdn'] = 'www.example.com' + event["source.fqdn"] = "www.example.com" self.input_message = event self.run_bot() self.assertMessageEqual(0, event) def test_string_contains_match(self): - """ Test :contains string match """ - self.sysconfig['file'] = os.path.join(os.path.dirname(__file__), - 'test_sieve_files/test_string_contains_match.sieve') + """Test :contains string match""" + self.sysconfig["file"] = os.path.join( + os.path.dirname(__file__), + "test_sieve_files/test_string_contains_match.sieve", + ) # positive test event = EXAMPLE_INPUT.copy() - event['source.url'] = 'https://www.switch.ch/security/' + event["source.url"] = "https://www.switch.ch/security/" expected = event.copy() - expected['comment'] = 'match' + expected["comment"] = "match" self.input_message = event self.run_bot() self.assertMessageEqual(0, expected) # negative test (key mismatch) event = EXAMPLE_INPUT.copy() - event['source.url'] = 'https://www.ripe.net/' + event["source.url"] = "https://www.ripe.net/" self.input_message = event self.run_bot() self.assertMessageEqual(0, event) @@ -341,22 +361,23 @@ def test_string_contains_match(self): self.assertMessageEqual(0, event) def test_string_regex_match(self): - """ Test =~ string match """ - self.sysconfig['file'] = os.path.join(os.path.dirname(__file__), - 'test_sieve_files/test_string_regex_match.sieve') + """Test =~ string match""" + self.sysconfig["file"] = os.path.join( + os.path.dirname(__file__), "test_sieve_files/test_string_regex_match.sieve" + ) # positive test event = EXAMPLE_INPUT.copy() - event['source.url'] = 'https://www.switch.ch/security' + event["source.url"] = "https://www.switch.ch/security" expected = event.copy() - expected['comment'] = 'match' + expected["comment"] = "match" self.input_message = event self.run_bot() self.assertMessageEqual(0, expected) # negative test (key mismatch) event = EXAMPLE_INPUT.copy() - event['source.url'] = 'http://www.example.com' + event["source.url"] = "http://www.example.com" self.input_message = event self.run_bot() self.assertMessageEqual(0, event) @@ -368,15 +389,17 @@ def test_string_regex_match(self): self.assertMessageEqual(0, event) def test_string_inverse_regex_match(self): - """ Test !~ string match """ - self.sysconfig['file'] = os.path.join(os.path.dirname(__file__), - 'test_sieve_files/test_string_inverse_regex_match.sieve') + """Test !~ string match""" + self.sysconfig["file"] = os.path.join( + os.path.dirname(__file__), + "test_sieve_files/test_string_inverse_regex_match.sieve", + ) # positive test (key mismatch) event = EXAMPLE_INPUT.copy() - event['source.url'] = 'http://www.example.com' + event["source.url"] = "http://www.example.com" expected = event.copy() - expected['comment'] = 'match' + expected["comment"] = "match" self.input_message = event self.run_bot() self.assertMessageEqual(0, expected) @@ -384,179 +407,195 @@ def test_string_inverse_regex_match(self): # positive test (key undefined) event = EXAMPLE_INPUT.copy() expected = event.copy() - expected['comment'] = 'match' + expected["comment"] = "match" self.input_message = event self.run_bot() self.assertMessageEqual(0, expected) # negative test (key match) event = EXAMPLE_INPUT.copy() - event['source.url'] = 'https://www.switch.ch/security' + event["source.url"] = "https://www.switch.ch/security" self.input_message = event self.run_bot() self.assertMessageEqual(0, event) def test_string_invalid_ipaddr(self): - """ Tests validation of harmonization for IP addresses. """ - self.sysconfig['file'] = os.path.join(os.path.dirname(__file__), - 'test_sieve_files/test_string_invalid_ipaddr.sieve') + """Tests validation of harmonization for IP addresses.""" + self.sysconfig["file"] = os.path.join( + os.path.dirname(__file__), + "test_sieve_files/test_string_invalid_ipaddr.sieve", + ) event = EXAMPLE_INPUT.copy() self.input_message = event with self.assertRaises(ValueError) as context: self.run_bot() exception = context.exception - self.assertRegex(str(exception), 'Invalid IP address:') + self.assertRegex(str(exception), "Invalid IP address:") def test_numeric_equal_match(self): - """ Test == numeric match """ - self.sysconfig['file'] = os.path.join(os.path.dirname(__file__), - 'test_sieve_files/test_numeric_equal_match.sieve') + """Test == numeric match""" + self.sysconfig["file"] = os.path.join( + os.path.dirname(__file__), "test_sieve_files/test_numeric_equal_match.sieve" + ) # if match drop numeric_match_true = EXAMPLE_INPUT.copy() - numeric_match_true['feed.accuracy'] = 100.0 + numeric_match_true["feed.accuracy"] = 100.0 self.input_message = numeric_match_true self.run_bot() self.assertOutputQueueLen(0) # if doesn't match keep numeric_match_false = EXAMPLE_INPUT.copy() - numeric_match_false['feed.accuracy'] = 50.0 + numeric_match_false["feed.accuracy"] = 50.0 self.input_message = numeric_match_false self.run_bot() self.assertMessageEqual(0, numeric_match_false) def test_numeric_not_equal_match(self): - """ Test != numeric match """ - self.sysconfig['file'] = os.path.join(os.path.dirname(__file__), - 'test_sieve_files/test_numeric_not_equal_match.sieve') + """Test != numeric match""" + self.sysconfig["file"] = os.path.join( + os.path.dirname(__file__), + "test_sieve_files/test_numeric_not_equal_match.sieve", + ) # if not equal drop numeric_match_false = EXAMPLE_INPUT.copy() - numeric_match_false['feed.accuracy'] = 50.0 + numeric_match_false["feed.accuracy"] = 50.0 self.input_message = numeric_match_false self.run_bot() self.assertOutputQueueLen(0) # if equal keep numeric_match_true = EXAMPLE_INPUT.copy() - numeric_match_true['feed.accuracy'] = 100 + numeric_match_true["feed.accuracy"] = 100 self.input_message = numeric_match_true self.run_bot() self.assertMessageEqual(0, numeric_match_true) def test_numeric_less_than_match(self): - """ Test < numeric match """ - self.sysconfig['file'] = os.path.join(os.path.dirname(__file__), - 'test_sieve_files/test_numeric_less_than_match.sieve') + """Test < numeric match""" + self.sysconfig["file"] = os.path.join( + os.path.dirname(__file__), + "test_sieve_files/test_numeric_less_than_match.sieve", + ) # if less than drop numeric_match_true = EXAMPLE_INPUT.copy() - numeric_match_true['feed.accuracy'] = 50.0 + numeric_match_true["feed.accuracy"] = 50.0 self.input_message = numeric_match_true self.run_bot() self.assertOutputQueueLen(0) # if greater than keep numeric_match_false = EXAMPLE_INPUT.copy() - numeric_match_false['feed.accuracy'] = 99.5 + numeric_match_false["feed.accuracy"] = 99.5 self.input_message = numeric_match_false self.run_bot() self.assertMessageEqual(0, numeric_match_false) def test_numeric_less_than_or_equal_match(self): - """ Test <= numeric match """ - self.sysconfig['file'] = os.path.join(os.path.dirname(__file__), - 'test_sieve_files/test_numeric_less_than_or_equal_match.sieve') + """Test <= numeric match""" + self.sysconfig["file"] = os.path.join( + os.path.dirname(__file__), + "test_sieve_files/test_numeric_less_than_or_equal_match.sieve", + ) # if less than drop numeric_match_false = EXAMPLE_INPUT.copy() - numeric_match_false['feed.accuracy'] = 40.0 + numeric_match_false["feed.accuracy"] = 40.0 self.input_message = numeric_match_false self.run_bot() self.assertOutputQueueLen(0) # if equal drop numeric_match_false = EXAMPLE_INPUT.copy() - numeric_match_false['feed.accuracy'] = 90 + numeric_match_false["feed.accuracy"] = 90 self.input_message = numeric_match_false self.run_bot() self.assertOutputQueueLen(0) # if greater than keep numeric_match_true = EXAMPLE_INPUT.copy() - numeric_match_true['feed.accuracy'] = 95.0 + numeric_match_true["feed.accuracy"] = 95.0 self.input_message = numeric_match_true self.run_bot() self.assertMessageEqual(0, numeric_match_true) def test_numeric_greater_than_match(self): - """ Test > numeric match """ - self.sysconfig['file'] = os.path.join(os.path.dirname(__file__), - 'test_sieve_files/test_numeric_greater_than_match.sieve') + """Test > numeric match""" + self.sysconfig["file"] = os.path.join( + os.path.dirname(__file__), + "test_sieve_files/test_numeric_greater_than_match.sieve", + ) # if greater than drop numeric_match_true = EXAMPLE_INPUT.copy() - numeric_match_true['feed.accuracy'] = 50.0 + numeric_match_true["feed.accuracy"] = 50.0 self.input_message = numeric_match_true self.run_bot() self.assertOutputQueueLen(0) # if less than keep numeric_match_false = EXAMPLE_INPUT.copy() - numeric_match_false['feed.accuracy'] = 35.5 + numeric_match_false["feed.accuracy"] = 35.5 self.input_message = numeric_match_false self.run_bot() self.assertMessageEqual(0, numeric_match_false) def test_numeric_greater_than_or_equal_match(self): - """ Test >= numeric match """ - self.sysconfig['file'] = os.path.join(os.path.dirname(__file__), - 'test_sieve_files/test_numeric_greater_than_or_equal_match.sieve') + """Test >= numeric match""" + self.sysconfig["file"] = os.path.join( + os.path.dirname(__file__), + "test_sieve_files/test_numeric_greater_than_or_equal_match.sieve", + ) # if less than keep numeric_match_false = EXAMPLE_INPUT.copy() - numeric_match_false['feed.accuracy'] = 40.0 + numeric_match_false["feed.accuracy"] = 40.0 self.input_message = numeric_match_false self.run_bot() self.assertMessageEqual(0, numeric_match_false) # if equal drop numeric_match_false = EXAMPLE_INPUT.copy() - numeric_match_false['feed.accuracy'] = 90 + numeric_match_false["feed.accuracy"] = 90 self.input_message = numeric_match_false self.run_bot() self.assertOutputQueueLen(0) # if greater than drop numeric_match_true = EXAMPLE_INPUT.copy() - numeric_match_true['feed.accuracy'] = 95.0 + numeric_match_true["feed.accuracy"] = 95.0 self.input_message = numeric_match_true self.run_bot() self.assertOutputQueueLen(0) def test_numeric_invalid_key(self): - """ Tests validation of harmonization for numeric types. """ - self.sysconfig['file'] = os.path.join(os.path.dirname(__file__), - 'test_sieve_files/test_numeric_invalid_key.sieve') + """Tests validation of harmonization for numeric types.""" + self.sysconfig["file"] = os.path.join( + os.path.dirname(__file__), "test_sieve_files/test_numeric_invalid_key.sieve" + ) event = EXAMPLE_INPUT.copy() self.input_message = event with self.assertRaises(ValueError) as context: self.run_bot() exception = context.exception - self.assertRegex(str(exception), r'.*Incompatible type: FQDN\.$') + self.assertRegex(str(exception), r".*Incompatible type: FQDN\.$") def test_exists_match(self): - """ Test :exists match """ - self.sysconfig['file'] = os.path.join(os.path.dirname(__file__), 'test_sieve_files/test_exists_match.sieve') + """Test :exists match""" + self.sysconfig["file"] = os.path.join( + os.path.dirname(__file__), "test_sieve_files/test_exists_match.sieve" + ) # positive test event = EXAMPLE_INPUT.copy() - event['source.fqdn'] = 'www.example.com' + event["source.fqdn"] = "www.example.com" expected = event.copy() - expected['comment'] = 'I think therefore I am.' + expected["comment"] = "I think therefore I am." self.input_message = event self.run_bot() self.assertMessageEqual(0, expected) @@ -568,59 +607,63 @@ def test_exists_match(self): self.assertMessageEqual(0, event) def test_not_exists_match(self): - """ Test :notexists match """ - self.sysconfig['file'] = os.path.join(os.path.dirname(__file__), 'test_sieve_files/test_notexists_match.sieve') + """Test :notexists match""" + self.sysconfig["file"] = os.path.join( + os.path.dirname(__file__), "test_sieve_files/test_notexists_match.sieve" + ) # positive test event = EXAMPLE_INPUT.copy() expected = event.copy() - expected['comment'] = 'I think therefore I am.' + expected["comment"] = "I think therefore I am." self.input_message = event self.run_bot() self.assertMessageEqual(0, expected) # negative test event = EXAMPLE_INPUT.copy() - event['source.fqdn'] = 'www.example.com' + event["source.fqdn"] = "www.example.com" self.input_message = event self.run_bot() self.assertMessageEqual(0, event) def test_string_match_value_list(self): - """ Test string match with StringValueList """ - self.sysconfig['file'] = os.path.join(os.path.dirname(__file__), - 'test_sieve_files/test_string_match_value_list.sieve') + """Test string match with StringValueList""" + self.sysconfig["file"] = os.path.join( + os.path.dirname(__file__), + "test_sieve_files/test_string_match_value_list.sieve", + ) # Match the first rule string_value_list_match_1 = EXAMPLE_INPUT.copy() - string_value_list_match_1['classification.type'] = 'infected-system' + string_value_list_match_1["classification.type"] = "infected-system" string_value_list_expected_result_1 = string_value_list_match_1.copy() - string_value_list_expected_result_1['comment'] = 'infected hosts' + string_value_list_expected_result_1["comment"] = "infected hosts" self.input_message = string_value_list_match_1 self.run_bot() self.assertMessageEqual(0, string_value_list_expected_result_1) # Match the second rule string_value_list_match_2 = EXAMPLE_INPUT.copy() - string_value_list_match_2['classification.type'] = 'c2-server' + string_value_list_match_2["classification.type"] = "c2-server" string_value_list_expected_result_2 = string_value_list_match_2.copy() - string_value_list_expected_result_2['comment'] = 'malicious server / service' + string_value_list_expected_result_2["comment"] = "malicious server / service" self.input_message = string_value_list_match_2 self.run_bot() self.assertMessageEqual(0, string_value_list_expected_result_2) # don't match any rule string_value_list_match_3 = EXAMPLE_INPUT.copy() - string_value_list_match_3['classification.type'] = 'blacklist' + string_value_list_match_3["classification.type"] = "blacklist" self.input_message = string_value_list_match_3 self.run_bot() self.assertMessageEqual(0, string_value_list_match_3) # match containsany, first match event = EXAMPLE_INPUT.copy() - event['source.fqdn'] = 'matched.mx' + event["source.fqdn"] = "matched.mx" expected = event.copy() - expected['comment'] = 'containsany match' + expected["comment"] = "containsany match" self.input_message = event self.run_bot() @@ -628,9 +671,9 @@ def test_string_match_value_list(self): # match containsany, first match event = EXAMPLE_INPUT.copy() - event['source.fqdn'] = 'matched.zz' + event["source.fqdn"] = "matched.zz" expected = event.copy() - expected['comment'] = 'containsany match' + expected["comment"] = "containsany match" self.input_message = event self.run_bot() @@ -638,7 +681,7 @@ def test_string_match_value_list(self): # do not match containsany event = EXAMPLE_INPUT.copy() - event['source.fqdn'] = 'matched.yy' + event["source.fqdn"] = "matched.yy" expected = event.copy() self.input_message = event @@ -647,9 +690,9 @@ def test_string_match_value_list(self): # match regexin, first match event = EXAMPLE_INPUT.copy() - event['extra.tag'] = 'xxee' + event["extra.tag"] = "xxee" expected = event.copy() - expected['comment'] = 'regexin match' + expected["comment"] = "regexin match" self.input_message = event self.run_bot() @@ -657,9 +700,9 @@ def test_string_match_value_list(self): # match regexin, second match event = EXAMPLE_INPUT.copy() - event['extra.tag'] = 'abcd' + event["extra.tag"] = "abcd" expected = event.copy() - expected['comment'] = 'regexin match' + expected["comment"] = "regexin match" self.input_message = event self.run_bot() @@ -667,7 +710,7 @@ def test_string_match_value_list(self): # do not match regexin event = EXAMPLE_INPUT.copy() - event['extra.tag'] = 'eeabcc' + event["extra.tag"] = "eeabcc" expected = event.copy() self.input_message = event @@ -675,38 +718,42 @@ def test_string_match_value_list(self): self.assertMessageEqual(0, expected) def test_numeric_match_value_list(self): - """ Test numeric match with NumericValueList """ - self.sysconfig['file'] = os.path.join(os.path.dirname(__file__), - 'test_sieve_files/test_numeric_match_value_list.sieve') + """Test numeric match with NumericValueList""" + self.sysconfig["file"] = os.path.join( + os.path.dirname(__file__), + "test_sieve_files/test_numeric_match_value_list.sieve", + ) # Match the first rule numeric_value_list_match_1 = EXAMPLE_INPUT.copy() - numeric_value_list_match_1['destination.asn'] = 6939 + numeric_value_list_match_1["destination.asn"] = 6939 numeric_value_list_expected_result_1 = numeric_value_list_match_1.copy() - numeric_value_list_expected_result_1['comment'] = 'Belongs to peering group' + numeric_value_list_expected_result_1["comment"] = "Belongs to peering group" self.input_message = numeric_value_list_match_1 self.run_bot() self.assertMessageEqual(0, numeric_value_list_expected_result_1) # Match the second rule numeric_value_list_match_2 = EXAMPLE_INPUT.copy() - numeric_value_list_match_2['destination.asn'] = 1930 + numeric_value_list_match_2["destination.asn"] = 1930 numeric_value_list_expected_result_2 = numeric_value_list_match_2.copy() - numeric_value_list_expected_result_2['comment'] = 'Belongs constituency group' + numeric_value_list_expected_result_2["comment"] = "Belongs constituency group" self.input_message = numeric_value_list_match_2 self.run_bot() self.assertMessageEqual(0, numeric_value_list_expected_result_2) # don't Match any rule numeric_value_list_match_3 = EXAMPLE_INPUT.copy() - numeric_value_list_match_3['destination.asn'] = 3356 + numeric_value_list_match_3["destination.asn"] = 3356 self.input_message = numeric_value_list_match_3 self.run_bot() self.assertMessageEqual(0, numeric_value_list_match_3) def test_drop_event(self): - """ Test if matched event is dropped and processing is stopped. """ - self.sysconfig['file'] = os.path.join(os.path.dirname(__file__), 'test_sieve_files/test_drop_event.sieve') + """Test if matched event is dropped and processing is stopped.""" + self.sysconfig["file"] = os.path.join( + os.path.dirname(__file__), "test_sieve_files/test_drop_event.sieve" + ) event1 = EXAMPLE_INPUT.copy() self.input_message = event1 @@ -714,34 +761,38 @@ def test_drop_event(self): self.assertMessageEqual(0, event1) event2 = EXAMPLE_INPUT.copy() - event2['comment'] = 'drop' + event2["comment"] = "drop" self.input_message = event2 self.run_bot() self.assertOutputQueueLen(0) def test_keep_event(self): - """ Test if matched event is kept and processing is stopped. """ - self.sysconfig['file'] = os.path.join(os.path.dirname(__file__), 'test_sieve_files/test_keep_event.sieve') + """Test if matched event is kept and processing is stopped.""" + self.sysconfig["file"] = os.path.join( + os.path.dirname(__file__), "test_sieve_files/test_keep_event.sieve" + ) event1 = EXAMPLE_INPUT.copy() - event1['comment'] = 'continue' + event1["comment"] = "continue" self.input_message = event1 self.run_bot() expected1 = EXAMPLE_INPUT.copy() - expected1['comment'] = 'changed' + expected1["comment"] = "changed" self.assertMessageEqual(0, expected1) event2 = EXAMPLE_INPUT.copy() - event2['comment'] = 'keep' + event2["comment"] = "keep" self.input_message = event2 self.run_bot() expected2 = EXAMPLE_INPUT.copy() - expected2['comment'] = 'keep' + expected2["comment"] = "keep" self.assertMessageEqual(0, expected2) def test_add(self): - """ Test adding key/value pairs """ - self.sysconfig['file'] = os.path.join(os.path.dirname(__file__), 'test_sieve_files/test_add.sieve') + """Test adding key/value pairs""" + self.sysconfig["file"] = os.path.join( + os.path.dirname(__file__), "test_sieve_files/test_add.sieve" + ) # If doesn't match, nothing should have changed event1 = EXAMPLE_INPUT.copy() @@ -750,16 +801,18 @@ def test_add(self): self.assertMessageEqual(0, event1) # If expression matches, destination.ip field is added - event1['comment'] = 'add field' + event1["comment"] = "add field" result = event1.copy() - result['destination.ip'] = '150.50.50.10' + result["destination.ip"] = "150.50.50.10" self.input_message = event1 self.run_bot() self.assertMessageEqual(0, result) def test_add_force(self): - """ Test adding key/value pairs, overwriting existing key """ - self.sysconfig['file'] = os.path.join(os.path.dirname(__file__), 'test_sieve_files/test_add_force.sieve') + """Test adding key/value pairs, overwriting existing key""" + self.sysconfig["file"] = os.path.join( + os.path.dirname(__file__), "test_sieve_files/test_add_force.sieve" + ) # If doesn't match, nothing should have changed event1 = EXAMPLE_INPUT.copy() @@ -768,26 +821,28 @@ def test_add_force(self): self.assertMessageEqual(0, event1) # If expression matches, destination.ip field is added as new field - event1['comment'] = 'add force new field' + event1["comment"] = "add force new field" result = event1.copy() - result['destination.ip'] = '150.50.50.10' + result["destination.ip"] = "150.50.50.10" self.input_message = event1 self.run_bot() self.assertMessageEqual(0, result) # If expression matches, destination.ip field is added as new field event2 = EXAMPLE_INPUT.copy() - event2['comment'] = 'add force existing fields' + event2["comment"] = "add force existing fields" result2 = event2.copy() - result2['destination.ip'] = '200.10.9.7' - result2['source.ip'] = "10.9.8.7" + result2["destination.ip"] = "200.10.9.7" + result2["source.ip"] = "10.9.8.7" self.input_message = event2 self.run_bot() self.assertMessageEqual(0, result2) def test_update(self): - """ Test updating key/value pairs """ - self.sysconfig['file'] = os.path.join(os.path.dirname(__file__), 'test_sieve_files/test_update.sieve') + """Test updating key/value pairs""" + self.sysconfig["file"] = os.path.join( + os.path.dirname(__file__), "test_sieve_files/test_update.sieve" + ) # If doesn't match, nothing should have changed event1 = EXAMPLE_INPUT.copy() @@ -796,7 +851,7 @@ def test_update(self): self.assertMessageEqual(0, event1) # If expression matches && parameter doesn't exists, nothing changes - event1['comment'] = 'update new parameter' + event1["comment"] = "update new parameter" result = event1.copy() self.input_message = event1 self.run_bot() @@ -804,16 +859,18 @@ def test_update(self): # If expression matches && parameter exists, source.ip changed event2 = EXAMPLE_INPUT.copy() - event2['comment'] = 'update existing parameter' + event2["comment"] = "update existing parameter" result2 = event2.copy() - result2['source.ip'] = '10.9.8.7' + result2["source.ip"] = "10.9.8.7" self.input_message = event2 self.run_bot() self.assertMessageEqual(0, result2) def test_remove(self): - """ Test removing keys """ - self.sysconfig['file'] = os.path.join(os.path.dirname(__file__), 'test_sieve_files/test_remove.sieve') + """Test removing keys""" + self.sysconfig["file"] = os.path.join( + os.path.dirname(__file__), "test_sieve_files/test_remove.sieve" + ) # If doesn't match, nothing should have changed event1 = EXAMPLE_INPUT.copy() @@ -822,138 +879,148 @@ def test_remove(self): self.assertMessageEqual(0, event1) # If expression matches && parameter exists, parameter is removed - event1['comment'] = 'remove parameter' + event1["comment"] = "remove parameter" result = event1.copy() - event1['destination.ip'] = '192.168.10.1' + event1["destination.ip"] = "192.168.10.1" self.input_message = event1 self.run_bot() self.assertMessageEqual(0, result) # If expression matches && parameter doesn't exist, nothing happens event2 = EXAMPLE_INPUT.copy() - event2['comment'] = 'remove parameter' + event2["comment"] = "remove parameter" result2 = event2.copy() self.input_message = event2 self.run_bot() self.assertMessageEqual(0, result2) def test_multiple_actions(self): - """ Test applying multiple actions in one rule """ - self.sysconfig['file'] = os.path.join(os.path.dirname(__file__), 'test_sieve_files/test_multiple_actions.sieve') + """Test applying multiple actions in one rule""" + self.sysconfig["file"] = os.path.join( + os.path.dirname(__file__), "test_sieve_files/test_multiple_actions.sieve" + ) event = EXAMPLE_INPUT.copy() - event['classification.type'] = 'undetermined' + event["classification.type"] = "undetermined" self.input_message = event self.run_bot() expected_result = event.copy() - expected_result['source.ip'] = '127.0.0.2' - expected_result['comment'] = 'added' - del expected_result['classification.type'] + expected_result["source.ip"] = "127.0.0.2" + expected_result["comment"] = "added" + del expected_result["classification.type"] self.assertMessageEqual(0, expected_result) def test_ip_range_match(self): - """ Test IP range match operator. """ - self.sysconfig['file'] = os.path.join(os.path.dirname(__file__), 'test_sieve_files/test_ip_range_match.sieve') + """Test IP range match operator.""" + self.sysconfig["file"] = os.path.join( + os.path.dirname(__file__), "test_sieve_files/test_ip_range_match.sieve" + ) # match /24 network event = EXAMPLE_INPUT.copy() - event['source.ip'] = '192.0.0.1' + event["source.ip"] = "192.0.0.1" expected = event.copy() - expected['comment'] = 'bogon1' + expected["comment"] = "bogon1" self.input_message = event self.run_bot() self.assertMessageEqual(0, expected) # match /16 network event = EXAMPLE_INPUT.copy() - event['source.ip'] = '192.0.200.1' + event["source.ip"] = "192.0.200.1" expected = event.copy() - expected['comment'] = 'bogon2' + expected["comment"] = "bogon2" self.input_message = event self.run_bot() self.assertMessageEqual(0, expected) # no match event = EXAMPLE_INPUT.copy() - event['source.ip'] = '192.168.0.1' + event["source.ip"] = "192.168.0.1" self.input_message = event self.run_bot() self.assertMessageEqual(0, event) # IPv6 address event = EXAMPLE_INPUT.copy() - event['source.ip'] = '2001:620:0:ff::56' + event["source.ip"] = "2001:620:0:ff::56" expected = event.copy() - expected['comment'] = 'SWITCH' + expected["comment"] = "SWITCH" self.input_message = event self.run_bot() self.assertMessageEqual(0, expected) # invalid address in event should not match event = EXAMPLE_INPUT.copy() - event['comment'] = '300.300.300.300' + event["comment"] = "300.300.300.300" self.input_message = event self.allowed_warning_count = 1 self.run_bot() - self.assertLogMatches(pattern='^Could not parse IP address', levelname='WARNING') + self.assertLogMatches( + pattern="^Could not parse IP address", levelname="WARNING" + ) self.assertMessageEqual(0, event) def test_ip_range_list_match(self): - """ Test IP range list match operator. """ - self.sysconfig['file'] = os.path.join(os.path.dirname(__file__), - 'test_sieve_files/test_ip_range_list_match.sieve') + """Test IP range list match operator.""" + self.sysconfig["file"] = os.path.join( + os.path.dirname(__file__), "test_sieve_files/test_ip_range_list_match.sieve" + ) # positive test event = EXAMPLE_INPUT.copy() - event['source.ip'] = '192.0.0.1' + event["source.ip"] = "192.0.0.1" expected = event.copy() - expected['comment'] = 'bogon' + expected["comment"] = "bogon" self.input_message = event self.run_bot() self.assertMessageEqual(0, expected) # negative test event = EXAMPLE_INPUT.copy() - event['source.ip'] = '8.8.8.8' + event["source.ip"] = "8.8.8.8" self.input_message = event self.run_bot() self.assertMessageEqual(0, event) def test_network_host_bits_list_match(self): - """ Test if range list of networks with host bits set match operator. """ - self.sysconfig['file'] = os.path.join(os.path.dirname(__file__), - 'test_sieve_files/test_ip_range_list_match.sieve') + """Test if range list of networks with host bits set match operator.""" + self.sysconfig["file"] = os.path.join( + os.path.dirname(__file__), "test_sieve_files/test_ip_range_list_match.sieve" + ) # positive test event = EXAMPLE_INPUT.copy() - event['source.ip'] = '169.254.2.1' + event["source.ip"] = "169.254.2.1" expected = event.copy() - expected['comment'] = 'bogon' + expected["comment"] = "bogon" self.input_message = event self.run_bot() self.assertMessageEqual(0, expected) # positive test event = EXAMPLE_INPUT.copy() - event['source.ip'] = '169.254.3.1' + event["source.ip"] = "169.254.3.1" expected = event.copy() - expected['comment'] = 'bogon' + expected["comment"] = "bogon" self.input_message = event self.run_bot() self.assertMessageEqual(0, expected) # negative test event = EXAMPLE_INPUT.copy() - event['source.ip'] = '169.255.2.1' + event["source.ip"] = "169.255.2.1" self.input_message = event self.run_bot() self.assertMessageEqual(0, event) def test_date_match(self): - """ Test comparing absolute and relative to now dates. """ - self.sysconfig['file'] = Path(__file__).parent / 'test_sieve_files/test_date_match.sieve' + """Test comparing absolute and relative to now dates.""" + self.sysconfig["file"] = ( + Path(__file__).parent / "test_sieve_files/test_date_match.sieve" + ) def check(event, expected): self.input_message = event @@ -964,101 +1031,133 @@ def check(event, expected): expected = event.copy() event["time.observation"] = "2017-01-01T00:00:00+00:00" # past event with tz - expected['extra.list'] = ['before 1 week', 'before 2023-06-01', 'before 2023-06-01 15:00'] + expected["extra.list"] = [ + "before 1 week", + "before 2023-06-01", + "before 2023-06-01 15:00", + ] check(event, expected) event["time.observation"] = "2017-01-01T00:00:00" # past event without tz check(event, expected) - event["time.observation"] = "2023-06-01" # just date, neither before nor after the date's midnight - expected['extra.list'] = ['before 1 week', 'before 2023-06-01 15:00'] + event["time.observation"] = ( + "2023-06-01" # just date, neither before nor after the date's midnight + ) + expected["extra.list"] = ["before 1 week", "before 2023-06-01 15:00"] check(event, expected) event["time.observation"] = "2023-06-01 10:00" # time given - expected['extra.list'] = ['before 1 week', 'after 2023-06-01', 'before 2023-06-01 15:00'] + expected["extra.list"] = [ + "before 1 week", + "after 2023-06-01", + "before 2023-06-01 15:00", + ] check(event, expected) event["time.observation"] = "2023-06-01T10:00+00:00" # time including tz check(event, expected) event["time.observation"] = "2023-06-01T10:00-06:00" # tz changes - expected['extra.list'] = ['before 1 week', 'after 2023-06-01', 'after 2023-06-01 15:00'] + expected["extra.list"] = [ + "before 1 week", + "after 2023-06-01", + "after 2023-06-01 15:00", + ] check(event, expected) event["time.observation"] = str(datetime.now()) - expected['extra.list'] = ['after 1 week', 'after 2023-06-01', 'after 2023-06-01 15:00'] + expected["extra.list"] = [ + "after 1 week", + "after 2023-06-01", + "after 2023-06-01 15:00", + ] check(event, expected) event["time.observation"] = str(datetime.now() - timedelta(days=3)) check(event, expected) event["time.observation"] = str(datetime.now() - timedelta(days=8)) - expected['extra.list'] = ['before 1 week', 'after 2023-06-01', 'after 2023-06-01 15:00'] + expected["extra.list"] = [ + "before 1 week", + "after 2023-06-01", + "after 2023-06-01 15:00", + ] check(event, expected) event["time.observation"] = str(datetime.now() - timedelta(days=8)) - expected['extra.list'] = ['before 1 week', 'after 2023-06-01', 'after 2023-06-01 15:00'] + expected["extra.list"] = [ + "before 1 week", + "after 2023-06-01", + "after 2023-06-01 15:00", + ] check(event, expected) def test_comments(self): - """ Test comments in sieve file.""" - self.sysconfig['file'] = os.path.join(os.path.dirname(__file__), 'test_sieve_files/test_comments.sieve') + """Test comments in sieve file.""" + self.sysconfig["file"] = os.path.join( + os.path.dirname(__file__), "test_sieve_files/test_comments.sieve" + ) event = EXAMPLE_INPUT.copy() expected = event.copy() - expected['comment'] = 'hello' + expected["comment"] = "hello" self.input_message = event self.run_bot() self.assertMessageEqual(0, expected) def test_named_queues(self): - """ Test named queues """ - self.sysconfig['file'] = os.path.join(os.path.dirname(__file__), - 'test_sieve_files/test_named_queues.sieve') + """Test named queues""" + self.sysconfig["file"] = os.path.join( + os.path.dirname(__file__), "test_sieve_files/test_named_queues.sieve" + ) # if match drop numeric_match_true = EXAMPLE_INPUT.copy() - numeric_match_true['comment'] = "drop" + numeric_match_true["comment"] = "drop" self.input_message = numeric_match_true self.run_bot() self.assertOutputQueueLen(0) # if doesn't match keep numeric_match_false = EXAMPLE_INPUT.copy() - numeric_match_false['comment'] = "keep without path" + numeric_match_false["comment"] = "keep without path" self.input_message = numeric_match_false self.run_bot() self.assertMessageEqual(0, numeric_match_false) # if doesn't match keep numeric_match_false = EXAMPLE_INPUT.copy() - numeric_match_false['comment'] = "keep with path" + numeric_match_false["comment"] = "keep with path" self.input_message = numeric_match_false self.prepare_bot(destination_queues={"_default", "other-way"}) self.run_bot(prepare=False) # if doesn't match keep numeric_match_false = EXAMPLE_INPUT.copy() - numeric_match_false['comment'] = "default path" + numeric_match_false["comment"] = "default path" self.input_message = numeric_match_false self.run_bot() self.assertMessageEqual(0, numeric_match_false, path="_default") def test_numeric_key(self): - """ Test == numeric match """ - self.sysconfig['file'] = os.path.join(os.path.dirname(__file__), - 'test_sieve_files/test_numeric_key.sieve') + """Test == numeric match""" + self.sysconfig["file"] = os.path.join( + os.path.dirname(__file__), "test_sieve_files/test_numeric_key.sieve" + ) # if match drop numeric_match_true = EXAMPLE_INPUT.copy() - numeric_match_true['comment'] = "drop" + numeric_match_true["comment"] = "drop" self.input_message = numeric_match_true self.run_bot() self.assertMessageEqual(0, numeric_match_true) def test_parentheses(self): - """ Test if parenthesis work""" - self.sysconfig['file'] = os.path.join(os.path.dirname(__file__), 'test_sieve_files/test_parentheses.sieve') + """Test if parenthesis work""" + self.sysconfig["file"] = os.path.join( + os.path.dirname(__file__), "test_sieve_files/test_parentheses.sieve" + ) # If doesn't match, nothing should have changed event1 = EXAMPLE_INPUT.copy() @@ -1067,125 +1166,137 @@ def test_parentheses(self): self.assertMessageEqual(0, event1) # If expression matches, destination.ip field is added - event1['comment'] = 'add field' + event1["comment"] = "add field" result = event1.copy() - result['destination.ip'] = '150.50.50.10' + result["destination.ip"] = "150.50.50.10" self.input_message = event1 self.run_bot() self.assertMessageEqual(0, result) # If expression matches, destination.ip field is added event2 = EXAMPLE_INPUT.copy() - event2['classification.taxonomy'] = 'vulnerable' + event2["classification.taxonomy"] = "vulnerable" result = event2.copy() - result['destination.ip'] = '150.50.50.10' + result["destination.ip"] = "150.50.50.10" self.input_message = event2 self.run_bot() self.assertMessageEqual(0, result) def test_basic_math(self): - """ Test basic math operations""" - self.sysconfig['file'] = os.path.join(os.path.dirname(__file__), - 'test_sieve_files/test_basic_math.sieve') + """Test basic math operations""" + self.sysconfig["file"] = os.path.join( + os.path.dirname(__file__), "test_sieve_files/test_basic_math.sieve" + ) event = EXAMPLE_INPUT.copy() - event['comment'] = "add_force" + event["comment"] = "add_force" test_add_force = event.copy() - test_add_force['comment'] = "add_force" - test_add_force['time.observation'] = '2017-01-01T01:00:00+00:00' + test_add_force["comment"] = "add_force" + test_add_force["time.observation"] = "2017-01-01T01:00:00+00:00" self.input_message = event self.run_bot() self.assertMessageEqual(0, test_add_force) test_minus_force = event.copy() - event['comment'] = "minus_force" - test_minus_force['comment'] = 'minus_force' - test_minus_force['time.observation'] = '2016-12-31T23:00:00+00:00' + event["comment"] = "minus_force" + test_minus_force["comment"] = "minus_force" + test_minus_force["time.observation"] = "2016-12-31T23:00:00+00:00" self.input_message = event self.run_bot() self.assertMessageEqual(0, test_minus_force) test_minus_normal = event.copy() - event['comment'] = "minus_normal" - test_minus_normal['comment'] = 'minus_normal' - test_minus_normal['time.observation'] = '2016-12-31T23:00:00+00:00' + event["comment"] = "minus_normal" + test_minus_normal["comment"] = "minus_normal" + test_minus_normal["time.observation"] = "2016-12-31T23:00:00+00:00" self.input_message = event self.allowed_error_count = 1 self.run_bot() self.assertMessageEqual(0, test_minus_normal) test_add_normal = event.copy() - event['comment'] = "add_normal" - test_add_normal['comment'] = 'add_normal' - test_add_normal['time.observation'] = '2017-01-01T01:00:00+00:00' + event["comment"] = "add_normal" + test_add_normal["comment"] = "add_normal" + test_add_normal["time.observation"] = "2017-01-01T01:00:00+00:00" self.input_message = event self.allowed_error_count = 1 self.run_bot() self.assertMessageEqual(0, test_add_normal) test_add_update = event.copy() - event['comment'] = "add_update" - test_add_update['comment'] = 'add_update' - test_add_update['time.observation'] = '2017-01-01T01:00:00+00:00' + event["comment"] = "add_update" + test_add_update["comment"] = "add_update" + test_add_update["time.observation"] = "2017-01-01T01:00:00+00:00" self.input_message = event self.run_bot() self.assertMessageEqual(0, test_add_update) test_minus_update = event.copy() - event['comment'] = "minus_update" - test_minus_update['comment'] = 'minus_update' - test_minus_update['time.observation'] = '2016-12-31T23:00:00+00:00' + event["comment"] = "minus_update" + test_minus_update["comment"] = "minus_update" + test_minus_update["time.observation"] = "2016-12-31T23:00:00+00:00" self.input_message = event self.run_bot() self.assertMessageEqual(0, test_minus_update) def test_multiple_paths(self): - """ Test path = ['one', 'two'] """ + """Test path = ['one', 'two']""" self.input_message = EXAMPLE_INPUT - self.prepare_bot(destination_queues={"_default", "one", "two"}, - parameters={'file': os.path.join(os.path.dirname(__file__), - 'test_sieve_files/test_named_queues_multi.sieve')}) + self.prepare_bot( + destination_queues={"_default", "one", "two"}, + parameters={ + "file": os.path.join( + os.path.dirname(__file__), + "test_sieve_files/test_named_queues_multi.sieve", + ) + }, + ) self.run_bot(prepare=False) - self.assertMessageEqual(0, EXAMPLE_INPUT, path='one') - self.assertMessageEqual(0, EXAMPLE_INPUT, path='two') + self.assertMessageEqual(0, EXAMPLE_INPUT, path="one") + self.assertMessageEqual(0, EXAMPLE_INPUT, path="two") def test_only_one_action(self): - """ Test only one action """ - self.sysconfig['file'] = os.path.join(os.path.dirname(__file__), - 'test_sieve_files/test_only_one_action.sieve') + """Test only one action""" + self.sysconfig["file"] = os.path.join( + os.path.dirname(__file__), "test_sieve_files/test_only_one_action.sieve" + ) event = EXAMPLE_INPUT.copy() - event['comment'] = 'Test action only' + event["comment"] = "Test action only" self.input_message = EXAMPLE_INPUT self.run_bot() self.assertMessageEqual(0, event) def test_only_multiple_actions(self): - """ Test only multiple action """ - self.sysconfig['file'] = os.path.join(os.path.dirname(__file__), - 'test_sieve_files/test_only_multiple_actions.sieve') + """Test only multiple action""" + self.sysconfig["file"] = os.path.join( + os.path.dirname(__file__), + "test_sieve_files/test_only_multiple_actions.sieve", + ) event = EXAMPLE_INPUT.copy() - event['comment'] = 'Test action only' - event['source.ip'] = '1.3.3.7' + event["comment"] = "Test action only" + event["source.ip"] = "1.3.3.7" self.input_message = EXAMPLE_INPUT self.run_bot() self.assertMessageEqual(0, event) def test_list_equals_match(self): - """ Test list-based :equals match """ - self.sysconfig['file'] = os.path.join(os.path.dirname(__file__), 'test_sieve_files/test_list_equals_match.sieve') + """Test list-based :equals match""" + self.sysconfig["file"] = os.path.join( + os.path.dirname(__file__), "test_sieve_files/test_list_equals_match.sieve" + ) base = EXAMPLE_INPUT.copy() - base['extra.list'] = ['a', 'b', 'c'] + base["extra.list"] = ["a", "b", "c"] # positive test event = base.copy() - event['comment'] = 'match1' + event["comment"] = "match1" expected = base.copy() - expected['comment'] = 'changed1' + expected["comment"] = "changed1" self.input_message = event self.run_bot() @@ -1193,7 +1304,7 @@ def test_list_equals_match(self): # negative test event = base.copy() - event['comment'] = 'match2' + event["comment"] = "match2" expected = event.copy() self.input_message = event @@ -1201,17 +1312,20 @@ def test_list_equals_match(self): self.assertMessageEqual(0, expected) def test_list_setequals_match(self): - """ Test list/set-based :setequals match """ - self.sysconfig['file'] = os.path.join(os.path.dirname(__file__), 'test_sieve_files/test_list_setequals_match.sieve') + """Test list/set-based :setequals match""" + self.sysconfig["file"] = os.path.join( + os.path.dirname(__file__), + "test_sieve_files/test_list_setequals_match.sieve", + ) base = EXAMPLE_INPUT.copy() - base['extra.list'] = ['a', 'b', 'c'] + base["extra.list"] = ["a", "b", "c"] # positive test event = base.copy() - event['comment'] = 'match1' + event["comment"] = "match1" expected = base.copy() - expected['comment'] = 'changed1' + expected["comment"] = "changed1" self.input_message = event self.run_bot() @@ -1219,7 +1333,7 @@ def test_list_setequals_match(self): # negative test event = base.copy() - event['comment'] = 'match2' + event["comment"] = "match2" expected = event.copy() self.input_message = event @@ -1227,17 +1341,19 @@ def test_list_setequals_match(self): self.assertMessageEqual(0, expected) def test_list_overlaps_match(self): - """ Test list/set-based :overlaps match """ - self.sysconfig['file'] = os.path.join(os.path.dirname(__file__), 'test_sieve_files/test_list_overlaps_match.sieve') + """Test list/set-based :overlaps match""" + self.sysconfig["file"] = os.path.join( + os.path.dirname(__file__), "test_sieve_files/test_list_overlaps_match.sieve" + ) base = EXAMPLE_INPUT.copy() - base['extra.list'] = ['a', 'b', 'c'] + base["extra.list"] = ["a", "b", "c"] # positive test event = base.copy() - event['comment'] = 'match1' + event["comment"] = "match1" expected = base.copy() - expected['comment'] = 'changed1' + expected["comment"] = "changed1" self.input_message = event self.run_bot() @@ -1245,7 +1361,7 @@ def test_list_overlaps_match(self): # negative test event = base.copy() - event['comment'] = 'match2' + event["comment"] = "match2" expected = event.copy() self.input_message = event @@ -1253,17 +1369,19 @@ def test_list_overlaps_match(self): self.assertMessageEqual(0, expected) def test_list_subsetof_match(self): - """ Test list/set-based :subsetof match """ - self.sysconfig['file'] = os.path.join(os.path.dirname(__file__), 'test_sieve_files/test_list_subsetof_match.sieve') + """Test list/set-based :subsetof match""" + self.sysconfig["file"] = os.path.join( + os.path.dirname(__file__), "test_sieve_files/test_list_subsetof_match.sieve" + ) base = EXAMPLE_INPUT.copy() - base['extra.list'] = ['a', 'b', 'c'] + base["extra.list"] = ["a", "b", "c"] # positive test event = base.copy() - event['comment'] = 'match1' + event["comment"] = "match1" expected = base.copy() - expected['comment'] = 'changed1' + expected["comment"] = "changed1" self.input_message = event self.run_bot() @@ -1271,7 +1389,7 @@ def test_list_subsetof_match(self): # negative test event = base.copy() - event['comment'] = 'match2' + event["comment"] = "match2" expected = event.copy() self.input_message = event @@ -1279,17 +1397,20 @@ def test_list_subsetof_match(self): self.assertMessageEqual(0, expected) def test_list_supersetof_match(self): - """ Test list/set-based :supersetof match """ - self.sysconfig['file'] = os.path.join(os.path.dirname(__file__), 'test_sieve_files/test_list_supersetof_match.sieve') + """Test list/set-based :supersetof match""" + self.sysconfig["file"] = os.path.join( + os.path.dirname(__file__), + "test_sieve_files/test_list_supersetof_match.sieve", + ) base = EXAMPLE_INPUT.copy() - base['extra.list'] = ['a', 'b', 'c'] + base["extra.list"] = ["a", "b", "c"] # positive test event = base.copy() - event['comment'] = 'match1' + event["comment"] = "match1" expected = base.copy() - expected['comment'] = 'changed1' + expected["comment"] = "changed1" self.input_message = event self.run_bot() @@ -1297,7 +1418,7 @@ def test_list_supersetof_match(self): # negative test event = base.copy() - event['comment'] = 'match2' + event["comment"] = "match2" expected = event.copy() self.input_message = event @@ -1305,17 +1426,19 @@ def test_list_supersetof_match(self): self.assertMessageEqual(0, expected) def test_bool_match(self): - ''' Test bool match ''' - self.sysconfig['file'] = os.path.join(os.path.dirname(__file__), 'test_sieve_files/test_bool_match.sieve') + """Test bool match""" + self.sysconfig["file"] = os.path.join( + os.path.dirname(__file__), "test_sieve_files/test_bool_match.sieve" + ) base = EXAMPLE_INPUT.copy() - base['extra.truthy'] = True - base['extra.falsy'] = False + base["extra.truthy"] = True + base["extra.falsy"] = False # positive test with true == true event = base.copy() - event['comment'] = 'match1' + event["comment"] = "match1" expected = base.copy() - expected['comment'] = 'changed1' + expected["comment"] = "changed1" self.input_message = event self.run_bot() @@ -1323,9 +1446,9 @@ def test_bool_match(self): # positive test with false == false event = base.copy() - event['comment'] = 'match2' + event["comment"] = "match2" expected = base.copy() - expected['comment'] = 'changed2' + expected["comment"] = "changed2" self.input_message = event self.run_bot() @@ -1333,9 +1456,9 @@ def test_bool_match(self): # positive test with true != false event = base.copy() - event['comment'] = 'match3' + event["comment"] = "match3" expected = base.copy() - expected['comment'] = 'changed3' + expected["comment"] = "changed3" self.input_message = event self.run_bot() @@ -1343,9 +1466,9 @@ def test_bool_match(self): # positive test with false != true event = base.copy() - event['comment'] = 'match4' + event["comment"] = "match4" expected = base.copy() - expected['comment'] = 'changed4' + expected["comment"] = "changed4" self.input_message = event self.run_bot() @@ -1353,7 +1476,7 @@ def test_bool_match(self): # negative test with true == false event = base.copy() - event['comment'] = 'match5' + event["comment"] = "match5" expected = event.copy() self.input_message = event @@ -1362,7 +1485,7 @@ def test_bool_match(self): # negative test with false == true event = base.copy() - event['comment'] = 'match6' + event["comment"] = "match6" expected = event.copy() self.input_message = event @@ -1371,7 +1494,7 @@ def test_bool_match(self): # negative test with true != true event = base.copy() - event['comment'] = 'match7' + event["comment"] = "match7" expected = event.copy() self.input_message = event @@ -1380,7 +1503,7 @@ def test_bool_match(self): # negative test with false != false event = base.copy() - event['comment'] = 'match8' + event["comment"] = "match8" expected = event.copy() self.input_message = event @@ -1388,16 +1511,18 @@ def test_bool_match(self): self.assertMessageEqual(0, expected) def test_typed_values(self): - ''' Test typed values ''' - self.sysconfig['file'] = os.path.join(os.path.dirname(__file__), 'test_sieve_files/test_typed_values.sieve') + """Test typed values""" + self.sysconfig["file"] = os.path.join( + os.path.dirname(__file__), "test_sieve_files/test_typed_values.sieve" + ) # test with list of values of mixed types event = EXAMPLE_INPUT.copy() - event['extra.list'] = [True, 2.1, 'three', 4] - event['comment'] = 'foo' + event["extra.list"] = [True, 2.1, "three", 4] + event["comment"] = "foo" expected = event.copy() - expected['comment'] = 'changed' + expected["comment"] = "changed" self.input_message = event self.run_bot() @@ -1405,9 +1530,9 @@ def test_typed_values(self): # test assigning a string event = EXAMPLE_INPUT.copy() - event['comment'] = 'match1' + event["comment"] = "match1" expected = event.copy() - expected['extra.value'] = 'string' + expected["extra.value"] = "string" self.input_message = event self.run_bot() @@ -1415,9 +1540,9 @@ def test_typed_values(self): # test force-adding an int event = EXAMPLE_INPUT.copy() - event['comment'] = 'match2' + event["comment"] = "match2" expected = event.copy() - expected['extra.value'] = 100 + expected["extra.value"] = 100 self.input_message = event self.run_bot() @@ -1425,9 +1550,9 @@ def test_typed_values(self): # test updating to a string event = EXAMPLE_INPUT.copy() - event['comment'] = 'match3' + event["comment"] = "match3" expected = event.copy() - expected['extra.value'] = 1.5 + expected["extra.value"] = 1.5 self.input_message = event self.run_bot() @@ -1435,24 +1560,26 @@ def test_typed_values(self): # test assigning a bool event = EXAMPLE_INPUT.copy() - event['comment'] = 'match4' + event["comment"] = "match4" expected = event.copy() - expected['extra.value'] = True + expected["extra.value"] = True self.input_message = event self.run_bot() self.assertMessageEqual(0, expected) def test_append(self): - ''' Test append action ''' - self.sysconfig['file'] = os.path.join(os.path.dirname(__file__), 'test_sieve_files/test_append.sieve') + """Test append action""" + self.sysconfig["file"] = os.path.join( + os.path.dirname(__file__), "test_sieve_files/test_append.sieve" + ) # positive test event = EXAMPLE_INPUT.copy() - event['comment'] = 'match1' + event["comment"] = "match1" expected = event.copy() - event['extra.list'] = ['a', 'b'] - expected['extra.list'] = ['a', 'b', 'c'] + event["extra.list"] = ["a", "b"] + expected["extra.list"] = ["a", "b", "c"] self.input_message = event self.run_bot() @@ -1460,8 +1587,8 @@ def test_append(self): # negative test - non-list value event = EXAMPLE_INPUT.copy() - event['comment'] = 'match2' - event['extra.single'] = 'something' + event["comment"] = "match2" + event["extra.single"] = "something" expected = event.copy() self.input_message = event @@ -1470,24 +1597,26 @@ def test_append(self): # negative test - nonexistent value event = EXAMPLE_INPUT.copy() - event['comment'] = 'match3' + event["comment"] = "match3" expected = event.copy() - expected['extra.nonexistent'] = ['new'] + expected["extra.nonexistent"] = ["new"] self.input_message = event self.run_bot() self.assertMessageEqual(0, expected) def test_force_append(self): - ''' Test force append action ''' - self.sysconfig['file'] = os.path.join(os.path.dirname(__file__), 'test_sieve_files/test_force_append.sieve') + """Test force append action""" + self.sysconfig["file"] = os.path.join( + os.path.dirname(__file__), "test_sieve_files/test_force_append.sieve" + ) # positive test with list event = EXAMPLE_INPUT.copy() - event['comment'] = 'match1' + event["comment"] = "match1" expected = event.copy() - event['extra.list'] = ['a', 'b'] - expected['extra.list'] = ['a', 'b', 'c'] + event["extra.list"] = ["a", "b"] + expected["extra.list"] = ["a", "b", "c"] self.input_message = event self.run_bot() @@ -1495,10 +1624,10 @@ def test_force_append(self): # positive test - with non-list value event = EXAMPLE_INPUT.copy() - event['comment'] = 'match2' + event["comment"] = "match2" expected = event.copy() - event['extra.single'] = 'something' - expected['extra.single'] = ['something', 'more'] + event["extra.single"] = "something" + expected["extra.single"] = ["something", "more"] self.input_message = event self.run_bot() @@ -1506,23 +1635,25 @@ def test_force_append(self): # positive test with nonexistent value event = EXAMPLE_INPUT.copy() - event['comment'] = 'match3' + event["comment"] = "match3" expected = event.copy() - expected['extra.nonexistent'] = ['something'] + expected["extra.nonexistent"] = ["something"] self.input_message = event self.run_bot() self.assertMessageEqual(0, expected) def test_negation(self): - ''' Test expression negation ''' - self.sysconfig['file'] = os.path.join(os.path.dirname(__file__), 'test_sieve_files/test_negation.sieve') + """Test expression negation""" + self.sysconfig["file"] = os.path.join( + os.path.dirname(__file__), "test_sieve_files/test_negation.sieve" + ) # positive test with single expression event = EXAMPLE_INPUT.copy() - event['comment'] = 'match1' + event["comment"] = "match1" expected = event.copy() - expected['comment'] = 'changed1' + expected["comment"] = "changed1" self.input_message = event self.run_bot() @@ -1530,9 +1661,9 @@ def test_negation(self): # positive test with single expression in braces event = EXAMPLE_INPUT.copy() - event['comment'] = 'match2' + event["comment"] = "match2" expected = event.copy() - expected['comment'] = 'changed2' + expected["comment"] = "changed2" self.input_message = event self.run_bot() @@ -1540,10 +1671,10 @@ def test_negation(self): # positive test with OR'ing of two negated expressions, first matches event = EXAMPLE_INPUT.copy() - event['comment'] = 'match3' - event['extra.text'] = 'test1' + event["comment"] = "match3" + event["extra.text"] = "test1" expected = event.copy() - expected['comment'] = 'changed3' + expected["comment"] = "changed3" self.input_message = event self.run_bot() @@ -1551,10 +1682,10 @@ def test_negation(self): # positive test with OR'ing of two negated expressions, second matches event = EXAMPLE_INPUT.copy() - event['comment'] = 'match3' - event['extra.text'] = 'test2' + event["comment"] = "match3" + event["extra.text"] = "test2" expected = event.copy() - expected['comment'] = 'changed3' + expected["comment"] = "changed3" self.input_message = event self.run_bot() @@ -1562,8 +1693,8 @@ def test_negation(self): # positive test with OR'ing of two negated expressions, neither match event = EXAMPLE_INPUT.copy() - event['comment'] = 'match3' - event['extra.text'] = 'test3' + event["comment"] = "match3" + event["extra.text"] = "test3" expected = event.copy() self.input_message = event @@ -1572,8 +1703,8 @@ def test_negation(self): # negative test with AND'ing of two negated expressions, first does not match event = EXAMPLE_INPUT.copy() - event['comment'] = 'match4' - event['extra.text'] = 'test1' + event["comment"] = "match4" + event["extra.text"] = "test1" expected = event.copy() self.input_message = event @@ -1582,8 +1713,8 @@ def test_negation(self): # negative test with AND'ing of two negated expressions, second does not match event = EXAMPLE_INPUT.copy() - event['comment'] = 'match4' - event['extra.text'] = 'test2' + event["comment"] = "match4" + event["extra.text"] = "test2" expected = event.copy() self.input_message = event @@ -1592,25 +1723,27 @@ def test_negation(self): # positive test with AND'ing of two negated expressions event = EXAMPLE_INPUT.copy() - event['comment'] = 'match4' - event['extra.text'] = 'test3' + event["comment"] = "match4" + event["extra.text"] = "test3" expected = event.copy() - expected['comment'] = 'changed4' + expected["comment"] = "changed4" self.input_message = event self.run_bot() self.assertMessageEqual(0, expected) def test_nested_if(self): - ''' Test nested if statements ''' - self.sysconfig['file'] = os.path.join(os.path.dirname(__file__), 'test_sieve_files/test_nested_if.sieve') + """Test nested if statements""" + self.sysconfig["file"] = os.path.join( + os.path.dirname(__file__), "test_sieve_files/test_nested_if.sieve" + ) # match outer if and inner if event = EXAMPLE_INPUT.copy() - event['comment'] = 'match1' - event['extra.text'] = 'test1' + event["comment"] = "match1" + event["extra.text"] = "test1" expected = event.copy() - expected['comment'] = 'changed1' + expected["comment"] = "changed1" self.input_message = event self.run_bot() @@ -1618,10 +1751,10 @@ def test_nested_if(self): # match outer if and inner elif event = EXAMPLE_INPUT.copy() - event['comment'] = 'match1' - event['extra.text'] = 'test2' + event["comment"] = "match1" + event["extra.text"] = "test2" expected = event.copy() - expected['comment'] = 'changed2' + expected["comment"] = "changed2" self.input_message = event self.run_bot() @@ -1629,9 +1762,9 @@ def test_nested_if(self): # match outer if and inner else event = EXAMPLE_INPUT.copy() - event['comment'] = 'match1' + event["comment"] = "match1" expected = event.copy() - expected['comment'] = 'changed3' + expected["comment"] = "changed3" self.input_message = event self.run_bot() @@ -1639,10 +1772,10 @@ def test_nested_if(self): # match outer elif and inner if event = EXAMPLE_INPUT.copy() - event['comment'] = 'match2' - event['extra.text'] = 'test4' + event["comment"] = "match2" + event["extra.text"] = "test4" expected = event.copy() - expected['comment'] = 'changed4' + expected["comment"] = "changed4" self.input_message = event self.run_bot() @@ -1650,10 +1783,10 @@ def test_nested_if(self): # match outer elif and inner elif event = EXAMPLE_INPUT.copy() - event['comment'] = 'match2' - event['extra.text'] = 'test5' + event["comment"] = "match2" + event["extra.text"] = "test5" expected = event.copy() - expected['comment'] = 'changed5' + expected["comment"] = "changed5" self.input_message = event self.run_bot() @@ -1661,9 +1794,9 @@ def test_nested_if(self): # match outer elif and inner else event = EXAMPLE_INPUT.copy() - event['comment'] = 'match2' + event["comment"] = "match2" expected = event.copy() - expected['comment'] = 'changed6' + expected["comment"] = "changed6" self.input_message = event self.run_bot() @@ -1671,10 +1804,10 @@ def test_nested_if(self): # match outer else and inner if event = EXAMPLE_INPUT.copy() - event['comment'] = 'match3' - event['extra.text'] = 'test7' + event["comment"] = "match3" + event["extra.text"] = "test7" expected = event.copy() - expected['comment'] = 'changed7' + expected["comment"] = "changed7" self.input_message = event self.run_bot() @@ -1682,10 +1815,10 @@ def test_nested_if(self): # match outer else and inner elif event = EXAMPLE_INPUT.copy() - event['comment'] = 'match3' - event['extra.text'] = 'test8' + event["comment"] = "match3" + event["extra.text"] = "test8" expected = event.copy() - expected['comment'] = 'changed8' + expected["comment"] = "changed8" self.input_message = event self.run_bot() @@ -1693,24 +1826,26 @@ def test_nested_if(self): # match outer else and inner else event = EXAMPLE_INPUT.copy() - event['comment'] = 'match3' + event["comment"] = "match3" expected = event.copy() - expected['comment'] = 'changed9' + expected["comment"] = "changed9" self.input_message = event self.run_bot() self.assertMessageEqual(0, expected) def test_mixed_if_and_actions(self): - ''' Test mixed if statements and actions ''' - self.sysconfig['file'] = os.path.join(os.path.dirname(__file__), 'test_sieve_files/test_mixed_if.sieve') + """Test mixed if statements and actions""" + self.sysconfig["file"] = os.path.join( + os.path.dirname(__file__), "test_sieve_files/test_mixed_if.sieve" + ) # pass unconditional and conditional statement event = EXAMPLE_INPUT.copy() - event['comment'] = 'match' + event["comment"] = "match" expected = event.copy() - expected['comment'] = 'changed' - expected['extra.tag'] = 'matched' + expected["comment"] = "changed" + expected["extra.tag"] = "matched" self.input_message = event self.run_bot() @@ -1719,33 +1854,51 @@ def test_mixed_if_and_actions(self): # pass unconditional, but not conditional statement event = EXAMPLE_INPUT.copy() expected = event.copy() - expected['extra.tag'] = 'matched' + expected["extra.tag"] = "matched" self.input_message = event self.run_bot() self.assertMessageEqual(0, expected) def test_empty_list(self): - ''' Test whether empty lists function ''' - self.sysconfig['file'] = os.path.join(os.path.dirname(__file__), 'test_sieve_files/test_empty_list.sieve') + """Test whether empty lists function""" + self.sysconfig["file"] = os.path.join( + os.path.dirname(__file__), "test_sieve_files/test_empty_list.sieve" + ) event = EXAMPLE_INPUT.copy() - event['extra.list'] = [] + event["extra.list"] = [] expected = event.copy() - expected['extra.list_empty'] = True + expected["extra.list_empty"] = True self.input_message = event self.run_bot() self.assertMessageEqual(0, expected) def test_extra_dict(self): - self.sysconfig['file'] = os.path.join(os.path.dirname(__file__), 'test_sieve_files/test_extra_dict.sieve') + self.sysconfig["file"] = os.path.join( + os.path.dirname(__file__), "test_sieve_files/test_extra_dict.sieve" + ) event = EXAMPLE_INPUT.copy() - event['extra.some_dict'] = {'key': []} + event["extra.some_dict"] = {"key": []} self.input_message = event self.run_bot() self.assertOutputQueueLen(0) + def test_variables(self): + """Test operations with variables""" + self.sysconfig["file"] = os.path.join( + os.path.dirname(__file__), "test_sieve_files/test_basic_variables.sieve" + ) -if __name__ == '__main__': # pragma: no cover + event = EXAMPLE_INPUT.copy() + expected = event.copy() + expected["extra.email"] = event["source.abuse_contact"] + + self.input_message = event + self.run_bot() + self.assertMessageEqual(0, expected) + + +if __name__ == "__main__": # pragma: no cover unittest.main() diff --git a/intelmq/tests/bots/experts/sieve/test_sieve_files/test_basic_variables.sieve b/intelmq/tests/bots/experts/sieve/test_sieve_files/test_basic_variables.sieve new file mode 100644 index 000000000..11dab9417 --- /dev/null +++ b/intelmq/tests/bots/experts/sieve/test_sieve_files/test_basic_variables.sieve @@ -0,0 +1,2 @@ +$foo = source.abuse_contact +add! extra.email = $foo diff --git a/intelmq/tests/bots/experts/sieve/test_sieve_files/test_basic_variables.sieve.license b/intelmq/tests/bots/experts/sieve/test_sieve_files/test_basic_variables.sieve.license new file mode 100644 index 000000000..4a7fe1a6a --- /dev/null +++ b/intelmq/tests/bots/experts/sieve/test_sieve_files/test_basic_variables.sieve.license @@ -0,0 +1,2 @@ +SPDX-FileCopyrightText: 2024 Mikk Margus Möll +SPDX-License-Identifier: AGPL-3.0-or-later