diff --git a/scripts/playbooks.py b/scripts/playbooks.py index 6cb8ed76..96465982 100755 --- a/scripts/playbooks.py +++ b/scripts/playbooks.py @@ -53,7 +53,7 @@ def playbooks(services, output_dir, for_collection, collections): for operation in dag.get_all_operations(): dag_services.add_node(operation.name.service) for dependency in operation.depends_on: - dependency_operation = OperationName.from_name(dependency) + dependency_operation = OperationName.from_str(dependency) if dependency_operation.service != operation.name.service: dag_services.add_edge( dependency_operation.service, operation.name.service diff --git a/tdp/cli/commands/ops.py b/tdp/cli/commands/ops.py index 473bd9f2..315576ba 100644 --- a/tdp/cli/commands/ops.py +++ b/tdp/cli/commands/ops.py @@ -11,10 +11,10 @@ from tdp.cli.params import collections_option, hosts_option from tdp.core.dag import Dag +from tdp.core.entities.operation import Operation, PlaybookOperation if TYPE_CHECKING: from tdp.core.collections import Collections - from tdp.core.entities.operation import Operation @click.command() @@ -50,7 +50,11 @@ def ops( operations = [ operation for operation in dag.get_all_operations() - if len(hosts) == 0 or bool(set(operation.host_names) & set(hosts)) + if len(hosts) == 0 + or ( + not isinstance(operation, PlaybookOperation) + or bool(set(operation.playbook.hosts) & set(hosts)) + ) ] if topo_sort: sorted_operations = dag.topological_sort_key( @@ -65,7 +69,11 @@ def ops( operations = [ operation for operation in collections.operations.values() - if len(hosts) == 0 or bool(set(operation.host_names) & set(hosts)) + if len(hosts) == 0 + or ( + not isinstance(operation, PlaybookOperation) + or bool(set(operation.playbook.hosts) & set(hosts)) + ) ] sorted_operations = sorted( operations, key=lambda operation: operation.name.name @@ -78,7 +86,14 @@ def _print_operations(operations: Iterable[Operation], /): click.echo( tabulate( [ - [operation.name.name, operation.host_names or ""] + [ + operation.name.name, + ( + ", ".join(operation.playbook.hosts) + if isinstance(operation, PlaybookOperation) + else "" + ), + ] for operation in operations ], headers=["Operation name", "Hosts"], diff --git a/tdp/core/cluster_status.py b/tdp/core/cluster_status.py index d1f6aec1..82be415a 100644 --- a/tdp/core/cluster_status.py +++ b/tdp/core/cluster_status.py @@ -18,6 +18,7 @@ create_hosted_entity, ) from tdp.core.entities.hosted_entity_status import HostedEntityStatus +from tdp.core.entities.operation import PlaybookOperation from tdp.core.models.sch_status_log_model import ( SCHStatusLogModel, SCHStatusLogSourceEnum, @@ -133,7 +134,11 @@ def generate_stale_sch_logs( continue # Create a log for each host where the entity is deployed - for host in operation.host_names: + for host in ( + operation.playbook.hosts + if isinstance(operation, PlaybookOperation) + else [] + ): log = logs.setdefault( create_hosted_entity( create_entity_name( diff --git a/tdp/core/collections/collection_reader.py b/tdp/core/collections/collection_reader.py index 65a140db..c8ad4e04 100644 --- a/tdp/core/collections/collection_reader.py +++ b/tdp/core/collections/collection_reader.py @@ -19,7 +19,7 @@ SCHEMA_VARS_DIRECTORY_NAME, YML_EXTENSION, ) -from tdp.core.entities.operation import DagOperation, Playbook +from tdp.core.entities.operation import Playbook from tdp.core.inventory_reader import InventoryReader from tdp.core.types import PathLike from tdp.core.variables.schema import ServiceCollectionSchema @@ -51,6 +51,32 @@ class MissingMandatoryDirectoryError(Exception): pass +class TDPLibDagNodeModel(BaseModel): + """Model for a DAG node read from a DAG file. + + Meant to be used in a DagNodeBuilder. + + Args: + name: Name of the operation. + depends_on: List of operations that must be executed before this one. + noop: Whether the operation is a noop. + """ + + model_config = ConfigDict(extra="ignore") + + name: str + depends_on: frozenset[str] = frozenset() + noop: Optional[bool] = False + + +class TDPLibDagModel(BaseModel): + """Model for a TDP DAG defined in a tdp_lib_dag file.""" + + model_config = ConfigDict(extra="ignore") + + operations: list[TDPLibDagNodeModel] + + class CollectionReader: """An enriched version of an Ansible collection. @@ -122,24 +148,9 @@ def schema_directory(self) -> Path: """Path to the variables schema directory.""" return self._path / SCHEMA_VARS_DIRECTORY_NAME - def read_dag_nodes(self) -> Generator[DagOperation, None, None]: + def read_dag_nodes(self) -> Generator[TDPLibDagNodeModel, None, None]: """Read the DAG nodes stored in the dag_directory.""" - class TDPLibDagNodeModel(BaseModel): - """Model for a TDP operation defined in a tdp_lib_dag file.""" - - model_config = ConfigDict(extra="ignore") - - name: str - depends_on: frozenset[str] = frozenset() - - class TDPLibDagModel(BaseModel): - """Model for a TDP DAG defined in a tdp_lib_dag file.""" - - model_config = ConfigDict(extra="ignore") - - operations: list[TDPLibDagNodeModel] - for dag_file in (self.dag_directory).glob("*" + YML_EXTENSION): with dag_file.open("r") as operations_file: file_content = yaml.load(operations_file, Loader=Loader) @@ -147,9 +158,7 @@ class TDPLibDagModel(BaseModel): try: tdp_lib_dag = TDPLibDagModel(operations=file_content) for operation in tdp_lib_dag.operations: - yield DagOperation.from_name( - name=operation.name, depends_on=operation.depends_on - ) + yield operation except ValidationError as e: logger.error(f"Error while parsing tdp_lib_dag file {dag_file}: {e}") raise @@ -201,7 +210,7 @@ def _check_collection_structure(self, path: Path) -> None: def read_hosts_from_playbook( playbook_path: Path, inventory_reader: Optional[InventoryReader] -) -> set[str]: +) -> frozenset[str]: """Read the hosts from a playbook. Args: diff --git a/tdp/core/collections/collections.py b/tdp/core/collections/collections.py index 47ff6ae6..7ab3092e 100644 --- a/tdp/core/collections/collections.py +++ b/tdp/core/collections/collections.py @@ -18,7 +18,14 @@ from typing import TYPE_CHECKING, Optional from tdp.core.entities.entity_name import ServiceComponentName -from tdp.core.entities.operation import Operation, Operations, Playbook +from tdp.core.entities.operation import ( + DagOperationBuilder, + ForgedDagOperation, + OperationName, + Operations, + OtherPlaybookOperation, + Playbook, +) from tdp.core.inventory_reader import InventoryReader from tdp.core.variables.schema.service_schema import ServiceSchema @@ -47,8 +54,8 @@ def __init__(self, collections: Iterable[CollectionReader]): A Collections object.""" self._collection_readers = list(collections) - self._init_playbooks() - self._dag_operations, self._other_operations = self._init_operations() + self._playbooks = self._read_playbooks() + self._operations = self._generate_operations() self._default_var_dirs = self._init_default_vars_dirs() self._schemas = self._init_schemas() self._services_components = self._init_entities() @@ -63,31 +70,16 @@ def from_collection_paths( ] return Collections(collection_readers) - @property - def dag_operations(self) -> Operations: - """Mapping of operation name that are defined in dag files to their Operation instance.""" - return self._dag_operations - - @property - def other_operations(self) -> Operations: - """Mapping of operation name that aren't in dag files to their Operation instance.""" - return self._other_operations - - @property - def operations(self) -> Operations: - """Mapping of all operation name to Operation instance.""" - operations = Operations() - if self._dag_operations: - operations.update(self._dag_operations) - if self._other_operations: - operations.update(self._other_operations) - return operations - @property def playbooks(self) -> dict[str, Playbook]: """Mapping of playbook name to Playbook instance.""" return self._playbooks + @property + def operations(self) -> Operations: + """Mapping of operation name to Operation instance.""" + return self._operations + @property def default_vars_dirs(self) -> dict[str, Path]: """Mapping of collection name to their default vars directory.""" @@ -103,126 +95,65 @@ def entities(self) -> dict[str, set[ServiceComponentName]]: """Mapping of service names to their set of components.""" return self._services_components - def _init_playbooks(self) -> None: - """Initialize the playbooks from the collections. - - If a playbook is defined in multiple collections, the last one will take - precedence over the previous ones. - """ - logger.debug("Initializing playbooks") - self._playbooks: dict[str, Playbook] = {} + def _read_playbooks(self) -> dict[str, Playbook]: + playbooks: dict[str, Playbook] = {} for collection in self._collection_readers: for playbook in collection.read_playbooks(): - if playbook.path.stem in self._playbooks: + if playbook.name in playbooks: logger.debug( f"'{playbook.name}' defined in " - f"'{self._playbooks[playbook.name].collection_name}' " + f"'{playbooks[playbook.name].collection_name}' " f"is overridden by '{collection.name}'" ) else: logger.debug(f"Adding playbook '{playbook.path}'") - self._playbooks[playbook.name] = playbook - logger.debug("Playbooks initialized") - - def _init_operations(self) -> tuple[Operations, Operations]: - dag_operations = Operations() - other_operations = Operations() + playbooks[playbook.name] = playbook + return playbooks + def _generate_operations(self) -> Operations: + # Create DagOperationBuilders to merge dag nodes with the same name + dag_operation_builders: dict[str, DagOperationBuilder] = {} for collection in self._collection_readers: - # Load DAG operations from the dag files for dag_node in collection.read_dag_nodes(): - existing_operation = dag_operations.get(dag_node.name.name) - - # The read_operation is associated with a playbook defined in the - # current collection - if playbook := self.playbooks.get(dag_node.name.name): - # TODO: would be nice to dissociate the Operation class from the playbook and store the playbook in the Operation - dag_operation_to_register = Operation( - name=dag_node.name.name, - collection_name=collection.name, - host_names=playbook.hosts, # TODO: do not store the hosts in the Operation object - depends_on=list(dag_node.depends_on), - ) - # If the operation is already registered, merge its dependencies - if existing_operation: - dag_operation_to_register.depends_on.extend( - dag_operations[dag_node.name.name].depends_on + if dag_node.name in dag_operation_builders: + dag_operation_builders[dag_node.name].extends(dag_node) + else: + dag_operation_builders[dag_node.name] = ( + DagOperationBuilder.from_read_dag_node( + dag_node=dag_node, + playbook=self._playbooks.get(dag_node.name), ) - # Register the operation - dag_operations[dag_node.name.name] = dag_operation_to_register - continue - - # The read_operation is already registered - if existing_operation: - logger.debug( - f"'{dag_node.name}' defined in " - f"'{existing_operation.collection_name}' " - f"is extended by '{collection.name}'" - ) - existing_operation.depends_on.extend(dag_node.depends_on) - continue - - # From this point, the read_operation is a noop as it is not defined - # in the current nor the previous collections - - # Create and register the operation - dag_operations[dag_node.name.name] = Operation( - name=dag_node.name.name, - collection_name=collection.name, - depends_on=list(dag_node.depends_on), - noop=True, - host_names=None, - ) - # 'restart' and 'stop' operations are not defined in the DAG file - # for noop, they need to be generated from the start operations - if dag_node.name.name.endswith("_start"): - logger.debug( - f"'{dag_node.name}' is noop, creating the associated " - "restart and stop operations" - ) - # Create and register the restart operation - restart_operation_name = dag_node.name.name.replace( - "_start", "_restart" - ) - other_operations[restart_operation_name] = Operation( - name=restart_operation_name, - collection_name="replace_restart_noop", - depends_on=list(dag_node.depends_on), - noop=True, - host_names=None, ) - # Create and register the stop operation - stop_operation_name = dag_node.name.name.replace("_start", "_stop") - other_operations[stop_operation_name] = Operation( - name=stop_operation_name, - collection_name="replace_stop_noop", - depends_on=list(dag_node.depends_on), - noop=True, - host_names=None, + # Generate the operations + operations = Operations() + for dag_operation_builder in dag_operation_builders.values(): + # 1. Build the DAG operation from the defined dag nodes + operation = dag_operation_builder.build() + operations.add(operation) + # 2. Forge restart and stop operations from start operations + if operation.name.action == "start": + restart_operation_name = operation.name.clone("restart") + operations.add( + ForgedDagOperation.create( + operation_name=restart_operation_name, + source_operation=operation, + playbook=self._playbooks.get(str(restart_operation_name)), ) - - # We can't merge the two for loops to handle the case where a playbook operation - # is defined in a first collection but not used in the DAG and then used in - # the DAG in a second collection. - for collection in self._collection_readers: - # Load playbook operations to complete the operations list with the - # operations that are not defined in the DAG files - for operation_name, playbook in self.playbooks.items(): - if operation_name in dag_operations: - continue - if operation_name in other_operations: - logger.debug( - f"'{operation_name}' defined in " - f"'{other_operations[operation_name].collection_name}' " - f"is overridden by '{collection.name}'" + ) + stop_operation_name = operation.name.clone("stop") + operations.add( + ForgedDagOperation.create( + operation_name=stop_operation_name, + source_operation=operation, + playbook=self._playbooks.get(str(stop_operation_name)), ) - other_operations[operation_name] = Operation( - name=operation_name, - host_names=playbook.hosts, # TODO: do not store the hosts in the Operation object - collection_name=collection.name, ) - - return dag_operations, other_operations + # 3. Parse the remaining playbooks (that are not part of the DAG) as operations + for playbook in self._playbooks.values(): + operation_name = OperationName.from_str(playbook.name) + if operation_name not in operations: + operations.add(OtherPlaybookOperation(operation_name, playbook)) + return operations def _init_default_vars_dirs(self) -> dict[str, Path]: """Initialize the default vars directories from the collections.""" diff --git a/tdp/core/dag.py b/tdp/core/dag.py index 53d30c40..0d287b78 100644 --- a/tdp/core/dag.py +++ b/tdp/core/dag.py @@ -20,7 +20,14 @@ import networkx as nx from tdp.core.constants import DEFAULT_SERVICE_PRIORITY, SERVICE_PRIORITY -from tdp.core.entities.operation import Operations +from tdp.core.entities.entity_name import ServiceName +from tdp.core.entities.operation import ( + DagOperation, + ForgedDagOperation, + OperationName, + OperationNoop, + PlaybookOperation, +) if TYPE_CHECKING: from tdp.core.collections import Collections @@ -46,12 +53,15 @@ def __init__(self, collections: Collections): collections: Collections instance. """ self._collections = collections - self._operations = self._collections.dag_operations + self._operations = { + operation.name: operation + for operation in collections.operations.get_by_class(DagOperation) + } validate_dag_nodes(self._operations, self._collections) self._graph = self._generate_graph(self.operations) @property - def operations(self) -> Operations: + def operations(self) -> dict[OperationName, DagOperation]: """DAG operations dictionary.""" return self._operations @@ -120,7 +130,7 @@ def topological_sort_key( # Define a priority function for nodes based on service priority. def priority_key(node: str) -> str: - operation = self.operations[node] + operation = self.operations[OperationName.from_str(node)] operation_priority = SERVICE_PRIORITY.get( operation.name.service, DEFAULT_SERVICE_PRIORITY ) @@ -240,16 +250,18 @@ def get_operation_descendants( ) # TODO: can take a list of operations instead of a dict - def _generate_graph(self, nodes: Operations) -> nx.DiGraph: + def _generate_graph(self, nodes: dict[OperationName, DagOperation]) -> nx.DiGraph: DG = nx.DiGraph() for operation_name, operation in nodes.items(): - DG.add_node(operation_name) + if isinstance(operation, ForgedDagOperation): + continue + DG.add_node(str(operation_name)) for dependency in operation.depends_on: if dependency not in nodes: raise ValueError( f'Dependency "{dependency}" does not exist for operation "{operation_name}"' ) - DG.add_edge(dependency, operation_name) + DG.add_edge(str(dependency), str(operation_name)) if nx.is_directed_acyclic_graph(DG): return DG @@ -257,9 +269,10 @@ def _generate_graph(self, nodes: Operations) -> nx.DiGraph: raise ValueError("Not a DAG") -# TODO: call this method inside of Collections._init_operations instead of the Dag constructor # TODO: remove Collections dependency -def validate_dag_nodes(nodes: Operations, collections: Collections) -> None: +def validate_dag_nodes( + nodes: dict[OperationName, DagOperation], collections: Collections +) -> None: r"""Validation rules : - \*_start operations can only be required from within its own service - \*_install operations should only depend on other \*_install operations @@ -269,18 +282,27 @@ def validate_dag_nodes(nodes: Operations, collections: Collections) -> None: """ # key: service_name # value: set of available actions for the service + # e.g. {'HDFS': {'install', 'config', 'init', 'start'}} services_actions = {} - def warning(collection_name: str, message: str) -> None: - logger.warning(message + f", collection: {collection_name}") + def warning(operation: DagOperation, message: str) -> None: + if isinstance(operation, PlaybookOperation): + collection_name = operation.playbook.collection_name + logger.warning(message + f", collection: {collection_name}") + else: + logger.warning(message) for operation_name, operation in nodes.items(): - c_warning = functools.partial(warning, operation.collection_name) + # No test are performed on forged operations + if isinstance(operation, ForgedDagOperation): + continue + + c_warning = functools.partial(warning, operation) for dependency in operation.depends_on: # *_start operations can only be required from within its own service dependency_service = nodes[dependency].name.service if ( - dependency.endswith("_start") + dependency.action == "start" and dependency_service != operation.name.service ): c_warning( @@ -290,8 +312,9 @@ def warning(collection_name: str, message: str) -> None: ) # *_install operations should only depend on other *_install operations - if operation_name.endswith("_install") and not dependency.endswith( - "_install" + if ( + operation.name.action == "install" + and not dependency.action == "install" ): c_warning( f"Operation '{operation_name}' is an install action, depends on '{dependency}' which is " @@ -302,7 +325,7 @@ def warning(collection_name: str, message: str) -> None: # even if they are "empty" (tagged with noop) # Part 1 service_actions = services_actions.setdefault(operation.name.service, set()) - if operation.is_service_operation(): + if isinstance(operation.name.entity, ServiceName): service_actions.add(operation.name.action) # Each service action (config, start, init) except the first (install) must have an explicit @@ -330,13 +353,14 @@ def warning(collection_name: str, message: str) -> None: # Operations tagged with the noop flag should not have a playbook defined in the collection - if operation_name in collections.playbooks: - if operation.noop: + #! This case can't happen because no operation inherits both PlaybookOperation and NoOp + if str(operation_name) in collections.playbooks: + if isinstance(operation, OperationNoop): c_warning( f"Operation '{operation_name}' is noop and the playbook should not exist" ) else: - if not operation.noop: + if not isinstance(operation, OperationNoop): c_warning(f"Operation '{operation_name}' should have a playbook") # Each service (HDFS, HBase, Hive, etc) should have *_install, *_config, *_init and *_start actions diff --git a/tdp/core/dag_dot.py b/tdp/core/dag_dot.py index de79cdcd..56bb5dfa 100644 --- a/tdp/core/dag_dot.py +++ b/tdp/core/dag_dot.py @@ -54,7 +54,7 @@ def to_pydot( for dot_node in dot_nodes: # Dot node name can be quoted, remove it operation_name = dot_node.get_name().strip('"') - operation_name = OperationName.from_name(operation_name) + operation_name = OperationName.from_str(operation_name) subgraphs.setdefault( operation_name.service, pydot.Cluster( diff --git a/tdp/core/deployment/deployment_iterator.py b/tdp/core/deployment/deployment_iterator.py index bae3e17b..3aa2e89d 100644 --- a/tdp/core/deployment/deployment_iterator.py +++ b/tdp/core/deployment/deployment_iterator.py @@ -13,6 +13,7 @@ from tdp.core.constants import OPERATION_SLEEP_NAME from tdp.core.entities.entity_name import create_entity_name from tdp.core.entities.hosted_entity import create_hosted_entity +from tdp.core.entities.operation import OperationNoop, PlaybookOperation from tdp.core.models import ( DeploymentModel, NothingToReconfigureError, @@ -145,7 +146,7 @@ def _process_operation_fn( operation = self._collections.operations[operation_rec.operation] # Run the operation - if operation.noop: + if isinstance(operation, OperationNoop): # A noop operation is always successful operation_rec.state = OperationStateEnum.SUCCESS else: @@ -169,7 +170,12 @@ def _process_operation_fn( operation.name.service, operation.name.component ) - if self._cluster_status.is_sc_stale(entity_name, hosts=operation.host_names): + hosts = ( + operation.playbook.hosts + if isinstance(operation, PlaybookOperation) + else None + ) + if self._cluster_status.is_sc_stale(entity_name, hosts=hosts): # Get the first reconfigure operation if any if self._reconfigure_operations: try: @@ -195,9 +201,9 @@ def _process_operation_fn( # fmt: off hosts = ( - [None] if operation.noop # A noop operation doesn't have any host + [None] if not isinstance(operation, PlaybookOperation) # A noop operation doesn't have any host else [operation_rec.host] if operation_rec.host # Only one operation is launched on a single host - else operation.host_names # Host is not specified, hence the operation is launched on all host + else operation.playbook.hosts # Host is not specified, hence the operation is launched on all host ) # fmt: on diff --git a/tdp/core/deployment/deployment_runner.py b/tdp/core/deployment/deployment_runner.py index 53ed0e10..11a2255f 100644 --- a/tdp/core/deployment/deployment_runner.py +++ b/tdp/core/deployment/deployment_runner.py @@ -8,6 +8,7 @@ from typing import TYPE_CHECKING from tdp.core.deployment.deployment_iterator import DeploymentIterator +from tdp.core.entities.operation import PlaybookOperation from tdp.core.models.enums import DeploymentStateEnum, OperationStateEnum from tdp.core.variables import ClusterVariables @@ -54,7 +55,10 @@ def _run_operation(self, operation_rec: OperationModel) -> None: operation = self._collections.operations[operation_rec.operation] # Check if the operation is available for the given host - if operation_rec.host and operation_rec.host not in operation.host_names: + if operation_rec.host and ( + not isinstance(operation, PlaybookOperation) + or operation_rec.host not in operation.playbook.hosts + ): logs = ( f"Operation '{operation_rec.operation}' not available for host " + f"'{operation_rec.host}'" diff --git a/tdp/core/entities/operation.py b/tdp/core/entities/operation.py index cf7c9be2..8307a160 100644 --- a/tdp/core/entities/operation.py +++ b/tdp/core/entities/operation.py @@ -4,10 +4,10 @@ from __future__ import annotations from abc import ABC -from collections.abc import MutableMapping +from collections.abc import Generator, Iterable, Iterator, MutableMapping from dataclasses import dataclass from pathlib import Path -from typing import Any, Optional, Union +from typing import TYPE_CHECKING, Optional, TypeVar, Union, overload from tdp.core.constants import ( ACTION_NAME_MAX_LENGTH, @@ -20,6 +20,9 @@ parse_entity_name, ) +if TYPE_CHECKING: + from tdp.core.collections.collection_reader import TDPLibDagNodeModel + @dataclass(frozen=True) class OperationName: @@ -53,91 +56,34 @@ def name(self) -> str: return f"{self.entity.name}_{self.action}" @classmethod - def from_name(cls, name: str) -> OperationName: + def from_str(cls, name: str) -> OperationName: entity_name, action_name = name.rsplit("_", 1) entity = parse_entity_name(entity_name) return cls(entity, action_name) - def __repr__(self): - return self.name - - def __str__(self): - return self.name - - -class Operation: - """A task that can be executed by Ansible. - - The name of the operation is composed of the service name, the component name and - the action name (__). The component name is optional. - - Args: - action: Name of the action. - name: Name of the operation. - collection_name: Name of the collection where the operation is defined. - component: Name of the component. - depends_on: List of operations that must be executed before this one. - noop: If True, the operation will not be executed. - service: Name of the service. - host_names: Set of host names where the operation can be launched. - """ - - def __init__( - self, - name: str, - collection_name: Optional[str] = None, - depends_on: Optional[list[str]] = None, - noop: bool = False, - host_names: Optional[set[str]] = None, - ): - """Create a new Operation. + def clone(self, action: str) -> OperationName: + """Clone the operation name with a new action. Args: - name: Name of the operation. - collection_name: Name of the collection where the operation is defined. - depends_on: List of operations that must be executed before this one. - noop: If True, the operation will not be executed. - host_names: Set of host names where the operation can be launched. - """ - self.name = OperationName.from_name(name) - self.collection_name = collection_name - self.depends_on = depends_on or [] - self.noop = noop - self.host_names = host_names or set() - - for host_name in self.host_names: - if len(host_name) > HOST_NAME_MAX_LENGTH: - raise ValueError( - f"host {host_name} is longer than {HOST_NAME_MAX_LENGTH}" - ) + action: The new action name. - def is_service_operation(self) -> bool: - """Return True if the operation is about a service, False otherwise.""" - return isinstance(self.name.entity, ServiceName) + Returns: + A new OperationName object with the same entity and the new action. + """ + return OperationName(entity=self.entity, action=action) def __repr__(self): - return ( - f"Operation(name={self.name}, " - f"collection_name={self.collection_name}, " - f"depends_on={self.depends_on}, " - f"noop={self.noop}, " - f"host_names={self.host_names})" - ) - - def __eq__(self, other: Any) -> bool: - if not isinstance(other, Operation): - return NotImplemented - return repr(self) == repr(other) + return self.name - def __hash__(self) -> int: - return hash(repr(self)) + def __str__(self): + return self.name @dataclass(frozen=True) class Playbook: path: Path collection_name: str - hosts: set[str] # TODO: would be better to use a frozenset + hosts: frozenset[str] def __post_init__(self): for host_name in self.hosts: @@ -153,78 +99,313 @@ def name(self) -> str: @dataclass(frozen=True) -class BaseOperation(ABC): +class Operation(ABC): + """An operation. + + Not meant to be instantiated directly. + + Args: + name: Name of the operation. + """ name: OperationName - @classmethod - def from_name( - cls, - name: str, - ) -> BaseOperation: - return cls(OperationName.from_name(name)) + def __post_init__(self): + if type(self) is Operation: + raise TypeError("Operation class cannot be instantiated directly.") + + +@dataclass(frozen=True) +class PlaybookOperation(Operation): + """An operation that is linked to a playbook. + + Not meant to be instantiated directly. + + Args: + name: Name of the operation. + playbook: The playbook that defines the operation. + """ + + playbook: Playbook + + def __post_init__(self): + if type(self) is PlaybookOperation: + raise TypeError("PlaybookOperation class cannot be instantiated directly.") + + +@dataclass(frozen=True) +class OperationNoop(Operation, ABC): + """An operation that does nothing. + + Args: + name: Name of the operation. + """ + + def __post_init__(self): + if type(self) is OperationNoop: + raise TypeError("OperationNoop class cannot be instantiated directly.") + + def __init_subclass__(cls) -> None: + super().__init_subclass__() + if issubclass(cls, PlaybookOperation): + raise TypeError( + f"{cls.__name__} cannot inherit both OperationNoop and PlaybookOperation." + ) + + +@dataclass(frozen=True) +class DagOperation(Operation, ABC): + """An operation that is part of the DAG. + + Not meant to be instantiated directly. + + Args: + name: Name of the operation. + depends_on: List of operations that must be executed before this one. + """ + + depends_on: frozenset[OperationName] + + def __post_init__(self): + if type(self) is DagOperation: + raise TypeError("DagOperation class cannot be instantiated directly.") + + +@dataclass(frozen=True) +class DagOperationNoop(DagOperation, OperationNoop): + """An operation that is part of the DAG and does nothing. + + Args: + name: Name of the operation. + depends_on: List of operations that must be executed before this one. + """ + + pass @dataclass(frozen=True) -class DagOperation(BaseOperation): - """A DAG node. +class DagOperationWithPlaybook(DagOperation, PlaybookOperation): + """An operation that is part of the DAG associated with a playbook. + + Args: + name: Name of the operation. + depends_on: List of operations that must be executed before this one. + playbook: The playbook that defines the operation. + """ + + pass + + +@dataclass +class DagOperationBuilder: + """A builder for a DAG Operation. + + Meant to be short-lived. Allows to aggregate multiple ReadDagNode. Args: name: Name of the operation. depends_on: List of operations that must be executed before this one. - definitions: Set of paths to the playbooks that define the node. + playbook: The playbook that defines the operation. """ - depends_on: frozenset[str] + name: str + depends_on: set[OperationName] + playbook: Optional[Playbook] = None @classmethod - def from_name( - cls, - name: str, - depends_on: Optional[frozenset[str]] = None, - ) -> DagOperation: + def from_read_dag_node( + cls, dag_node: TDPLibDagNodeModel, playbook: Optional[Playbook] = None + ) -> DagOperationBuilder: return cls( - name=OperationName.from_name(name), - depends_on=depends_on or frozenset(), + name=dag_node.name, + depends_on=set( + OperationName.from_str(dependency) for dependency in dag_node.depends_on + ), + playbook=playbook, + ) + + def extends(self, dag_node: TDPLibDagNodeModel) -> None: + self.depends_on.update( + OperationName.from_str(dependency) for dependency in dag_node.depends_on + ) + + def build(self) -> Union[DagOperationNoop, DagOperationWithPlaybook]: + if self.playbook: + return DagOperationWithPlaybook( + name=OperationName.from_str(self.name), + depends_on=frozenset(self.depends_on), + playbook=self.playbook, + ) + return DagOperationNoop( + name=OperationName.from_str(self.name), + depends_on=frozenset(self.depends_on), ) -class Operations(MutableMapping[str, Operation]): +@dataclass(frozen=True) +class ForgedDagOperation(DagOperation, ABC): + """An operation that is part of the DAG. + + Not meant to be instantiated directly. + + Args: + name: Name of the operation. + depends_on: List of operations that must be executed before this one. + forged_from: The operation that was forged. + """ + + forged_from: DagOperation + + def __post_init__(self): + if type(self) is ForgedDagOperation: + raise TypeError("ForgedDagOperation class cannot be instantiated directly.") + + @staticmethod + def create( + operation_name: OperationName, + source_operation: DagOperation, + playbook: Optional[Playbook] = None, + ) -> Union[ForgedDagOperationNoop, ForgedDagOperationWithPlaybook]: + """Forge a DAG operation.""" + if not isinstance(source_operation, DagOperation): + raise ValueError(f"Operation {source_operation} cannot be forged.") + if playbook: + return ForgedDagOperationWithPlaybook( + name=operation_name, + playbook=playbook, + depends_on=source_operation.depends_on, + forged_from=source_operation, + ) + return ForgedDagOperationNoop( + name=operation_name, + depends_on=source_operation.depends_on, + forged_from=source_operation, + ) + + +@dataclass(frozen=True) +class ForgedDagOperationNoop(DagOperationNoop, ForgedDagOperation): + """An operation that is part of the DAG and does nothing. + + Created from a start operation. + + Args: + name: Name of the operation. + depends_on: List of operations that must be executed before this one. + forged_from: The operation that was forged. + """ + + pass + + +@dataclass(frozen=True) +class ForgedDagOperationWithPlaybook(DagOperationWithPlaybook, ForgedDagOperation): + """An operation that is part of the DAG associated with a playbook. + + Created from a start operation. + + Args: + name: Name of the operation. + playbook: The playbook that defines the operation. + depends_on: List of operations that must be executed before this one. + forged_from: The operation that was forged. + """ + + pass + + +@dataclass(frozen=True) +class OtherPlaybookOperation(PlaybookOperation): + """An operation that is not part of the DAG. + + Args: + name: Name of the operation. + playbook: The playbook that defines the operation. + """ + + pass + - def __init__(self): - self._operations: dict[str, Operation] = {} +T = TypeVar("T", bound=Operation) - def __getitem__(self, key: str): + +class Operations(MutableMapping[Union[OperationName, str], Operation]): + + def __init__(self) -> None: + self._inner = {} + + def __getitem__(self, key: Union[OperationName, str]): + if isinstance(key, str): + key = OperationName.from_str(key) try: - return self._operations[key] + return self._inner[key] except KeyError: raise KeyError(f"Operation '{key}' not found") - def __setitem__(self, key: str, value: Operation): - if key != value.name.name: + def __setitem__(self, key: Union[OperationName, str], value: Operation): + if isinstance(key, str): + key = OperationName.from_str(key) + if key != value.name: raise ValueError( f"Operation name '{value.name}' does not match key '{key}'" ) - self._operations[key] = value + self._inner[key] = value - def __delitem__(self, key): - del self._operations[key] + def __delitem__(self, key: Union[OperationName, str]): + if isinstance(key, str): + key = OperationName.from_str(key) + del self._inner[key] - def __iter__(self): - return iter(self._operations) + def __iter__(self) -> Iterator[OperationName]: + return iter(self._inner) - def __len__(self): - return len(self._operations) + def __len__(self) -> int: + return len(self._inner) - def __repr__(self): - return f"{self.__class__.__name__}({self._operations})" + def add(self, operation: Operation) -> None: + """Add an operation.""" + self[operation.name] = operation - def __str__(self): - return f"{self.__class__.__name__}({self._operations})" - - def get(self, key: str, default=None, *, restart: bool = False, stop: bool = False): - if restart and key.endswith("_start"): - key = key.replace("_start", "_restart") - elif stop and key.endswith("_start"): - key = key.replace("_start", "_stop") - return self._operations.get(key, default) + @overload + def get_by_class( + self, include: Optional[None] = None, exclude: Optional[None] = None + ) -> Generator[Operation, None, None]: ... + + @overload + def get_by_class( + self, + include: Optional[Union[type[T], Iterable[type[T]]]] = None, + exclude: Optional[None] = None, + ) -> Generator[T, None, None]: ... + + @overload + def get_by_class( + self, + include: Optional[None] = None, + exclude: Optional[Union[type[Operation], Iterable[type[Operation]]]] = None, + ) -> Generator[Operation, None, None]: ... + + @overload + def get_by_class( + self, + include: Optional[Union[type[T], Iterable[type[T]]]] = None, + exclude: Optional[Union[type[T], Iterable[type[T]]]] = None, + ) -> Generator[T, None, None]: ... + + def get_by_class( + self, + include: Optional[Union[type[T], Iterable[type[T]]]] = None, + exclude: Optional[Union[type[Operation], Iterable[type[Operation]]]] = None, + ) -> Generator[Operation, None, None]: + # Normalize include and exclude into sets + include_set = {include} if isinstance(include, type) else set(include or []) + exclude_set = {exclude} if isinstance(exclude, type) else set(exclude or []) + + for operation in self.values(): + if include_set and not any( + isinstance(operation, inc) for inc in include_set + ): + continue + if any(isinstance(operation, exc) for exc in exclude_set): + continue + yield operation diff --git a/tdp/core/inventory_reader.py b/tdp/core/inventory_reader.py index 7e58041c..e7ccfcc0 100644 --- a/tdp/core/inventory_reader.py +++ b/tdp/core/inventory_reader.py @@ -56,7 +56,7 @@ def get_hosts(self, *args, **kwargs) -> list[str]: # so we convert them to "str" return [str(host) for host in self.inventory.get_hosts(*args, **kwargs)] - def get_hosts_from_playbook(self, fd: TextIO) -> set[str]: + def get_hosts_from_playbook(self, fd: TextIO) -> frozenset[str]: """Takes a playbook content, read all plays inside and return a set of matching host like "ansible-playbook --list-hosts playbook.yml". @@ -81,4 +81,4 @@ def get_hosts_from_playbook(self, fd: TextIO) -> set[str]: ) hosts.update(self.get_hosts(play["hosts"])) - return hosts + return frozenset(hosts) diff --git a/tdp/core/models/deployment_model.py b/tdp/core/models/deployment_model.py index b037e3dd..2bc61f14 100644 --- a/tdp/core/models/deployment_model.py +++ b/tdp/core/models/deployment_model.py @@ -14,6 +14,7 @@ from tdp.core.constants import OPERATION_SLEEP_NAME, OPERATION_SLEEP_VARIABLE from tdp.core.dag import Dag +from tdp.core.entities.operation import PlaybookOperation from tdp.core.filters import FilterFactory from tdp.core.models.base_model import BaseModel from tdp.core.models.enums import ( @@ -60,9 +61,12 @@ class MissingHostForOperationError(Exception): def __init__(self, operation: Operation, host_name: str): self.operation = operation self.host_name = host_name + available_hosts = [] + if isinstance(operation, PlaybookOperation): + available_hosts = operation.playbook.hosts super().__init__( f"Host {host_name} not found for operation {operation.name}." - f"Available hosts are {operation.host_names}." + f"Available hosts are {available_hosts}." ) @@ -175,8 +179,9 @@ def from_dag( for operation in operations: can_perform_rolling_restart = ( rolling_interval is not None + and isinstance(operation, PlaybookOperation) and operation.name.action == "restart" - and operation.host_names + and len(operation.playbook.hosts) > 0 ) deployment.operations.append( OperationModel( @@ -225,7 +230,9 @@ def from_operations( operations = [collections.operations[o] for o in operation_names] for host in host_names or []: for operation in operations: - if host not in operation.host_names: + if not isinstance(operation, PlaybookOperation) or ( + host not in operation.playbook.hosts + ): raise MissingHostForOperationError(operation, host) deployment = DeploymentModel( deployment_type=DeploymentTypeEnum.OPERATIONS, @@ -245,13 +252,14 @@ def from_operations( for operation in operations: can_perform_rolling_restart = ( rolling_interval is not None + and isinstance(operation, PlaybookOperation) and operation.name.action == "restart" - and operation.host_names + and len(operation.playbook.hosts) > 0 ) for host_name in host_names or ( # if restart operation with rolling and no host is specified, # run on all hosts - operation.host_names + operation.playbook.hosts # type: ignore : operation is a PlaybookOperation if can_perform_rolling_restart else [None] ): @@ -304,13 +312,12 @@ def from_operations_hosts_vars( operation_host_vars_names, start=1 ): operation_name, host_name, var_names = operation_host_vars - if ( - host_name - and host_name not in collections.operations[operation_name].host_names + operation = collections.operations[operation_name] + if host_name and ( + not isinstance(operation, PlaybookOperation) + or host_name not in operation.playbook.hosts ): - raise MissingHostForOperationError( - collections.operations[operation_name], host_name - ) + raise MissingHostForOperationError(operation, host_name) else: if operation_name not in collections.operations: raise MissingOperationError(operation_name) diff --git a/test_dag_order/conftest.py b/test_dag_order/conftest.py index 344bc49e..320f9f6c 100644 --- a/test_dag_order/conftest.py +++ b/test_dag_order/conftest.py @@ -210,5 +210,5 @@ def stale_sc(plan_reconfigure: DeploymentModel) -> set[str]: """Set of stale service_components""" sc: set[str] = set() for operation in plan_reconfigure.operations: - sc.add(OperationName.from_name(operation.operation).entity.name) + sc.add(OperationName.from_str(operation.operation).entity.name) return sc diff --git a/tests/unit/core/test_collection_reader.py b/tests/unit/core/test_collection_reader.py index 4d2c0633..dc5087d0 100644 --- a/tests/unit/core/test_collection_reader.py +++ b/tests/unit/core/test_collection_reader.py @@ -18,7 +18,6 @@ DEFAULT_VARS_DIRECTORY_NAME, PLAYBOOKS_DIRECTORY_NAME, ) -from tdp.core.entities.operation import DagOperation from tests.conftest import generate_collection_at_path from tests.unit.core.models.test_deployment_log import ( MockInventoryReader, @@ -148,14 +147,10 @@ def test_collection_reader_read_dag_nodes(mock_empty_collection_reader: Path): ) dag_nodes = list(collection_reader.read_dag_nodes()) assert len(dag_nodes) == 2 - assert ( - DagOperation.from_name(name="s1_c1_a", depends_on=frozenset({"sx_cx_a"})) - in dag_nodes - ) - assert ( - DagOperation.from_name(name="s2_c2_a", depends_on=frozenset({"s1_c1_a"})) - in dag_nodes - ) + assert dag_nodes[0].name == "s1_c1_a" + assert dag_nodes[0].depends_on == frozenset({"sx_cx_a"}) + assert dag_nodes[1].name == "s2_c2_a" + assert dag_nodes[1].depends_on == frozenset({"s1_c1_a"}) def test_collection_reader_read_dag_nodes_empty_file( diff --git a/tests/unit/core/test_collections.py b/tests/unit/core/test_collections.py index e3234375..08ec1cb0 100644 --- a/tests/unit/core/test_collections.py +++ b/tests/unit/core/test_collections.py @@ -4,6 +4,7 @@ import pytest from tdp.core.collections import Collections +from tdp.core.entities.operation import DagOperation, OperationName from tests.conftest import generate_collection_at_path @@ -48,17 +49,28 @@ def test_collections_from_collection_list(tmp_path_factory: pytest.TempPathFacto [collection_path_1, collection_path_2] ) - assert collections.dag_operations is not None - assert "service1_install" in collections.dag_operations - assert "service1_config" in collections.dag_operations - assert "service2_install" in collections.dag_operations - assert "service2_config" in collections.dag_operations + dag_operations = { + str(op.name): op for op in collections.operations.get_by_class(DagOperation) + } + + assert len(dag_operations) != 0 + assert "service1_install" in dag_operations + assert "service1_config" in dag_operations + assert "service2_install" in dag_operations + assert "service2_config" in dag_operations - assert [] == collections.dag_operations["service1_install"].depends_on - assert sorted(["service1_install", "service2_install"]) == sorted( - collections.dag_operations["service1_config"].depends_on + assert len(dag_operations["service1_install"].depends_on) == 0 + assert ( + frozenset( + [ + OperationName.from_str("service1_install"), + OperationName.from_str("service2_install"), + ] + ) + == dag_operations["service1_config"].depends_on + ) + assert len(dag_operations["service2_install"].depends_on) == 0 + assert ( + frozenset([OperationName.from_str("service2_install")]) + == dag_operations["service2_config"].depends_on ) - assert [] == collections.dag_operations["service2_install"].depends_on - assert ["service2_install"] == collections.dag_operations[ - "service2_config" - ].depends_on diff --git a/tests/unit/core/test_filters.py b/tests/unit/core/test_filters.py index ac74b2e2..004248d2 100644 --- a/tests/unit/core/test_filters.py +++ b/tests/unit/core/test_filters.py @@ -1,20 +1,27 @@ # Copyright 2022 TOSIT.IO # SPDX-License-Identifier: Apache-2.0 +from dataclasses import dataclass + import pytest -from tdp.core.entities.operation import Operation +from tdp.core.entities.operation import Operation, OperationName from tdp.core.filters import FilterFactory, GlobFilterStrategy, RegexFilterStrategy from tdp.core.models.enums import FilterTypeEnum +@dataclass(frozen=True) +class MockOperation(Operation): + pass + + @pytest.fixture def operations(): return [ - Operation("service1_component1_config"), - Operation("service1_component1_start"), - Operation("service1_component2_config"), - Operation("service2_component1_start"), + MockOperation(OperationName.from_str("service1_component1_config")), + MockOperation(OperationName.from_str("service1_component1_start")), + MockOperation(OperationName.from_str("service1_component2_config")), + MockOperation(OperationName.from_str("service2_component1_start")), ]