Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Jobflow engine - Findings and recommendations #5044

Open
Tracked by #5116
mhamra opened this issue Feb 27, 2024 · 3 comments
Open
Tracked by #5116

Jobflow engine - Findings and recommendations #5044

mhamra opened this issue Feb 27, 2024 · 3 comments

Comments

@mhamra
Copy link
Contributor

mhamra commented Feb 27, 2024

Target version Related issue Related PR/dev branch
4.9 #4989 https://github.com/wazuh/wazuh-qa/tree/4996-dtt1-iteration-3-workflow-engine-unit-tests
4.9 #4996 https://github.com/wazuh/wazuh-qa/tree/4989-dtt1-iteration-3-workflow-engine-module-release-workflow-module

Description

While developing the unit tests for DTT1 iteration 3 - Workflow engine, I had the opportunity to review the workflow engine source code thoroughly. I list the findings in this issue for discussion and triage of several requests that the team will assess to improve the code quality, endurance, and maintainability.

Code Style Remarks and Best Practices:

  • I propose to follow the style guides used by the Wazuh Framework team to develop Python code.
  • I would like to discuss whether instance variable and parameter typing will be partially used.

Exception handling Best Practices

  • Use exception chaining raise xxx from e
  • Don't raise the generic Exception class. It is preferred that the original exception be bubbled.
  • Define custom exceptions for Domain issues (for example, when cycles are found in a workflow configuration file).
  • Include the Wazuh copyright headers in all files.
  • We should ideally identify the known exceptions we want to handle. For example, when a DAG is instantiated, a graphlib.is used.TopologicalSorter is instantiated. The add method of this class generates a ValueError exception, and the prepare method raises a CycleError. Those errors should be handled so they can be logged as errors in the workflow log file. The user should receive the original exception (using chaining) and a more informative message whenever possible.

Import statements

  • Change of order of import statements. Import the standard library packages first, then third-party libraries, and finally, local imports.
  • Group and order alphabetically.
  • Remove import statements with unused Classes, functions, etc.

Suggestions and Ideas

  • I recommend using the Click library instead of the standard argparse to handle command line arguments. It is more powerful and more straightforward to use. The parameters are implemented with decorators. I leave the link: https://click.palletsprojects.com/en/8.1.x/

Requested fixes - cosmetics and bugs

  • The SchemaValidator.validateSchema method does not comply with the snake_case standard for function names; it should be validate_schema.
  • Remove pass statements in abstract methods; they are marked by pylint as unnecessary.

Design and Architecture

  • Check the parallel libraries used to execute tasks. Threads are being used in the WorkflowProcessor class. Multithread programming is not 100% "parallel" due to a limitation of Python, the Global Interpreter Lock (GIL). It will always run only one thread at a time, never in true parallel. GIL switches execution to another thread when a thread is blocked waiting for an Input/Output operation. We must consider whether this covers our requirements. If the only thing the workflow will do is to spawn processes like the ProcessTask.execute function (that calls the subprocess.run function to run a script with parameters and catches the std output and stderr), the Threading libraries may be enough.

  • WorkflowProcessor constructor

    def __init__(self, workflow_file: str, dry_run: bool, threads: int, log_level: str = 'INFO', schema_file: Path | str = None):
    """
    Initialize WorkflowProcessor.
    Args:
    workflow_file (str): Path to the workflow file (YAML format).
    dry_run (bool): Display the plan without executing tasks.
    threads (int): Number of threads to use for parallel execution.
    log_level (str): Log level.
    schema_file (Path | str): Path to the schema file (YAML format).
    """
    logger.setLevel(log_level)
    # Initialize the instance variables.
    self.task_collection = WorkflowFile(workflow_file, schema_file).task_collection
    self.dry_run = dry_run
    self.threads = threads

    • I recommend removing the WorkflowProcessor.dry_run instance variable and changing the WorkflowProcessor.run function signature to accept a dry_run optional parameter.
      def run(self) -> None:
      """Main entry point."""
      try:
      if not self.dry_run:

      I recommend changing the name of the WorkflowProcessor.threads instance variable to WorkflowProcessor.max_threads_count to clarify its purpose.
    • I recommend removing the WorkflowProcessor.log_level instance variable and moving the log configuration outside the class. This will reduce the responsibilities of this class. Indeed, if many WorkflowProcessor instances will be created, each new instance will redefine the log level. The log level is global to the program; it is not particular to each instance.
    • The WorkflowProcessor constructor creates a WorkflowFile instance, creating a dependency. Options:
      • Use a constructor pattern. Another class constructs the WorkflowProcessor instances.
      • Change the class constructor. The constructor receives a task_collection.
      • Add a dag instance variable and change the WorkflowProcessor.run function to use this instance variable.
        args = parse_arguments()
        processor = WorkflowProcessor(**dict(InputPayload(**vars(args))))

I propose creating a global configuration object from a yaml config file. In the current design, the worflow_engine.main.py entry point parses the command arguments and converts them to a dictionary passed directly to the WorkflowProcessor constructor. If a new parameter is defined in the future, the class constructor must be changed. The global configuration can be accessed for any class that should be globally parameterized in the future.

  • I recommend changing the name of the logger.py file to logging.py. The import from workflow_engine.logger.logger import logger is lengthy and repetitive. The logger can also be imported in the subpackage __nit__.py file. This way, the import is reduced to from workflow_engine.logging import logger.
  • I recommend moving the validation of the ProcessTask.task_parameters from ProcessTask.execute to the ProcessTask constructor.
    def execute(self) -> None:
    """Execute the process task."""
    task_args = []
    if self.task_parameters.get('args') is None:
    raise ValueError(f'Not argument found in {self.task_name}')
  • The ProcessTask.execute has a bug that needs to be fixed. The statement if "KeyboardInterrupt" in error_msg: generates an exception because the error_msg variable is None. The stderr parameter must be defined when the CalledProcessError is raised.
    if result.returncode != 0:
    raise subprocess.CalledProcessError(returncode=result.returncode, cmd=result.args, output=result.stdout)
    except subprocess.CalledProcessError as e:
    error_msg = e.stderr
    if "KeyboardInterrupt" in error_msg:
    raise KeyboardInterrupt(f"Error executing process task with keyboard interrupt.")
    raise Exception(f"Error executing process task {e.stderr}")
  • I recommend to validate the parameters of the DAG.set_status function. The task_name must be in the task_collection, the status must be one of these values: failed, canceled, or successful.
    def set_status(self, task_name: str, status: str):
    """Set the status of a task."""
    self.finished_tasks_status[status].add(task_name)
    self.dag.done(task_name)
  • I recommend creating Enumerations for task status, cancel_policies, etc. One example is in this constructor:
    class DAG():
    """Class for creating a dependency graph."""
    def __init__(self, task_collection: list, reverse: bool = False):
    self.task_collection = task_collection
    self.reverse = reverse
    self.dag, self.dependency_tree = self.__build_dag()
    self.to_be_canceled = set()
    self.finished_tasks_status = {
    'failed': set(),
    'canceled': set(),
    'successful': set(),
    }
    self.execution_plan = self.__create_execution_plan(self.dependency_tree)
    self.dag.prepare()
  • I recommend moving the validation of the cancel_policy parameter of the DAG.cancel_dependant_taskcancel from the for loop to the beginning of the function.
    def cancel_dependant_tasks(self, task_name, cancel_policy) -> None:
    """Cancel all tasks that depend on a failed task."""
    def get_all_task_set(tasks):
    task_set = set()
    for task, sub_tasks in tasks.items():
    task_set.add(task)
    task_set.update(get_all_task_set(sub_tasks))
    return task_set
    if cancel_policy == 'continue':
    return
    not_cancelled_tasks = self.finished_tasks_status['failed'].union(self.finished_tasks_status['successful'])
    for root_task, sub_tasks in self.execution_plan.items():
    task_set = get_all_task_set({root_task: sub_tasks})
    if cancel_policy == 'abort-all':
    self.to_be_canceled.update(task_set)
    elif cancel_policy == 'abort-related-flows':
    if task_name in task_set:
    self.to_be_canceled.update(task_set - not_cancelled_tasks)
    else:
    raise ValueError(f"Unknown cancel policy '{cancel_policy}'.")
  • The WorflowProcessor.execute_task function logs the elapsed time only for the normal flow. I recommend logging the elapsed time on exceptions, too.
    logger.info("[%s] Starting task.", task_name)
    start_time = time.time()
    task_object.execute()
    logger.info("[%s] Finished task in %.2f seconds.", task_name, time.time() - start_time)
    dag.set_status(task_name, 'successful')
    except KeyboardInterrupt as e:
    logger.error("[%s] Task failed with error: %s.", task_name, e)
    dag.set_status(task_name, 'failed')
    dag.cancel_dependant_tasks(task_name, task.get('on-error', 'abort-related-flows'))
    raise KeyboardInterrupt
    except Exception as e:
    logger.error("[%s] Task failed with error: %s.", task_name, e)
    dag.set_status(task_name, 'failed')
  • I suggest reviewing the WorflowProcessor.execute_tasks_parallel exception handler. The exception handler calls the same function recursively with the parameter reverse=True. The call could lead to an infinite loop if the KeyboardInterrupt is raised again. I consider that this function should be called only if the reverse parameter is False
    def execute_tasks_parallel(self, dag: DAG, reverse: bool = False) -> None:
    """Execute tasks in parallel."""
    logger.info("Executing tasks in parallel.")
    try:
    with concurrent.futures.ThreadPoolExecutor(max_workers=self.threads) as executor:
    futures = self.generate_futures(dag, executor, reverse)
    concurrent.futures.wait(futures.values())
    except KeyboardInterrupt:
    logger.error("User interrupt detected. Aborting execution...")
    self.execute_tasks_parallel(dag, reverse=True)
  • I suggest removing the WorkflowProcessor.abort_execution method because it is not referred to by any other file in the worflow_engine module.
  • I suggest fixing the element parameter type of the function WorfkowFile.__replace_placeholders. It is defined as str, but it should be of type Any.
    def __replace_placeholders(self, element: str, values: dict, parent_key: str = None):
    """
    Recursively replace placeholders in a dictionary or list.
    Args:
    element (Any): The element to process.
    values (dict): The values to replace placeholders.
    parent_key (str): The parent key for nested replacements.
    Returns:
    Any: The processed element.

    I suggest reviewing the usage of the parent_key parameter of the function WorfkowFile.__replace_placeholders. The parameter doesn't influence the function's output value. If it is maintained, I suggest defining it as parent_key: Optional[String] = None.
  • I suggest improving the function WorkflowFile.__static_workflow_validation.check_not_existing_tasks. The function raises the ValueError when a task is not found in a task's depends-on key in the task_collection. It would be better to raise the ValueError after checking all the tasks.
    def check_not_existing_tasks(self):
    """Validate task existance."""
    task_names = {task['task'] for task in self.task_collection}
    for dependencies in [task.get('depends-on', []) for task in self.task_collection]:
    non_existing_dependencies = [dependency for dependency in dependencies if dependency not in task_names]
    if non_existing_dependencies:
    raise ValueError(f"Tasks do not exist: {', '.join(non_existing_dependencies)}")
  • The SchemaValidator.preprocess_data function validates that the 'path' entry exists for the process tasks types but doesn't validate if the path entry is an empty string. I suggest adding this validation.
    def preprocess_data(self) -> None:
    """
    Preprocess the YAML data to be validated.
    Raises:
    ValidationError: If the YAML data is not valid.
    """
    for task in self.yaml_data.get('tasks', []):
    do_with = task.get('do', {}).get('with', {})
    this_value = task.get('do', {}).get('this', '')
    if this_value == 'process':
    if 'path' not in do_with or 'args' not in do_with:
    raise ValidationError(f"Missing required properties in 'with' for task: {task}")
    do_with = task.get('cleanup', {}).get('with', {})
    this_value = task.get('cleanup', {}).get('this', '')
    if this_value == 'process':
    if 'path' not in do_with or 'args' not in do_with:
    raise ValidationError(f"Missing required properties in 'with' for task: {task}")
  • The SchemaValidator.validaSchema function logs the ValidationError and Exception exceptions to the log but does not re-raise the exception to the caller. I suggest re-raising the exception.
    try:
    jsonschema.validate(self.yaml_data, self.schema_data)
    except ValidationError as e:
    self.logger.error(f"Schema validation error: {e}")
    except Exception as e:
    self.logger.error(f"Unexpected error at schema validation: {e}")
@rauldpm
Copy link
Member

rauldpm commented Mar 8, 2024

We will discuss this in the daily meeting of 2024/03/11 or 2024/03/13 -> Blocked internal

@rauldpm rauldpm changed the title DTT1 iteration 3 - Workflow engine. Braimstorming, review and Call to Action. DTT1 - Iteration 3 - Workflow engine. Braimstorming, review and Call to Action. Mar 18, 2024
@rauldpm
Copy link
Member

rauldpm commented Mar 18, 2024

Move this issue to #4495 as epic

@fcaffieri fcaffieri added level/task Task issue and removed level/subtask Subtask issue labels Mar 18, 2024
@fcaffieri fcaffieri added level/epic and removed level/task Task issue labels Mar 18, 2024
@fcaffieri fcaffieri changed the title DTT1 - Iteration 3 - Workflow engine. Braimstorming, review and Call to Action. DTT2 - Iteration 1 - Workflow engine. Braimstorming, review and Call to Action. Mar 18, 2024
@fcaffieri
Copy link
Member

This issue will be include in DTT Tier 2

@fcaffieri fcaffieri changed the title DTT2 - Iteration 1 - Workflow engine. Braimstorming, review and Call to Action. DTT2 - Iteration 1 - Workflow engine best practice Mar 18, 2024
@fcaffieri fcaffieri added level/task Task issue and removed level/epic labels Mar 18, 2024
@mhamra mhamra changed the title DTT2 - Iteration 1 - Workflow engine best practice DTT2 - Iteration 1 - Workflow engine - Findings and recommendations Mar 19, 2024
@wazuhci wazuhci moved this to Blocked in Release 4.9.0 Mar 20, 2024
@rauldpm rauldpm changed the title DTT2 - Iteration 1 - Workflow engine - Findings and recommendations DTT2 - Workflow engine - Findings and recommendations Mar 21, 2024
@wazuhci wazuhci removed this from Release 4.9.0 Apr 8, 2024
@wazuhci wazuhci moved this to Blocked in Release 4.9.0 Apr 23, 2024
@wazuhci wazuhci removed this from Release 4.9.0 Apr 25, 2024
@juliamagan juliamagan changed the title DTT2 - Workflow engine - Findings and recommendations Jobflow engine - Findings and recommendations Aug 30, 2024
@wazuhci wazuhci moved this to Backlog in XDR+SIEM/Release 5.0.0 Aug 30, 2024
@wazuhci wazuhci moved this from Backlog to Blocked in XDR+SIEM/Release 5.0.0 Aug 30, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
Status: Blocked
Development

No branches or pull requests

3 participants