diff --git a/tdp/cli/commands/plan/reconfigure.py b/tdp/cli/commands/plan/reconfigure.py index e1378eb4..1605e6ed 100644 --- a/tdp/cli/commands/plan/reconfigure.py +++ b/tdp/cli/commands/plan/reconfigure.py @@ -38,9 +38,7 @@ def reconfigure( with Dao(db_engine, commit_on_exit=True) as dao: deployment = DeploymentModel.from_stale_hosted_entities( collections=collections, - stale_hosted_entity_statuses=dao.get_hosted_entity_statuses( - filter_stale=True - ), + hosted_entity_statuses=dao.get_hosted_entity_statuses(filter_stale=True), rolling_interval=rolling_interval, ) if preview: diff --git a/tdp/core/dag.py b/tdp/core/dag.py index 84584a00..2398cee6 100644 --- a/tdp/core/dag.py +++ b/tdp/core/dag.py @@ -60,7 +60,7 @@ def graph(self) -> nx.DiGraph: """DAG graph.""" return self._graph - def node_to_operation( + def _node_to_operation( self, node: str, restart: bool = False, stop: bool = False ) -> Operation: # ? Restart operations are now stored in collections.operations they can be @@ -152,7 +152,7 @@ def topological_sort( """ return list( map( - lambda node: self.node_to_operation(node, restart=restart, stop=stop), + lambda node: self._node_to_operation(node, restart=restart, stop=stop), self.topological_sort_key(nodes), ) ) @@ -234,7 +234,7 @@ def get_operation_descendants( nodes_filtered = filter(lambda node: node not in nodes, nodes_set) return list( map( - lambda node: self.node_to_operation(node, restart=restart, stop=stop), + lambda node: self._node_to_operation(node, restart=restart, stop=stop), nodes_filtered, ) ) diff --git a/tdp/core/deployment/deployment_iterator.py b/tdp/core/deployment/deployment_iterator.py index 241bd018..6d415296 100644 --- a/tdp/core/deployment/deployment_iterator.py +++ b/tdp/core/deployment/deployment_iterator.py @@ -103,9 +103,7 @@ def __init__( self._reconfigure_operations = _group_hosts_by_operation( DeploymentModel.from_stale_hosted_entities( collections=self._collections, - stale_hosted_entity_statuses=[ - status for status in cluster_status.values() if status.is_stale - ], + hosted_entity_statuses=list(cluster_status.values()), ) ) except NothingToReconfigureError: diff --git a/tdp/core/models/deployment_model.py b/tdp/core/models/deployment_model.py index c64fbb1f..af752675 100644 --- a/tdp/core/models/deployment_model.py +++ b/tdp/core/models/deployment_model.py @@ -329,31 +329,46 @@ def from_operations_hosts_vars( @staticmethod def from_stale_hosted_entities( collections: Collections, - stale_hosted_entity_statuses: list[HostedEntityStatus], + hosted_entity_statuses: list[HostedEntityStatus], rolling_interval: Optional[int] = None, ) -> DeploymentModel: """Generate a deployment plan for stale components. Args: collections: Collections to retrieve the operations from. - stale_hosted_entity_statuses: List of stale hosted entity statuses. + hosted_entity_statuses: List of hosted entity statuses. rolling_interval: Number of seconds to wait between component restart. Raises: NothingToReconfigureError: If no component needs to be reconfigured. """ - operation_hosts = _get_reconfigure_operation_hosts(stale_hosted_entity_statuses) - - # Sort operations using DAG topological sort. Convert operation name to - # Operation instance and replace "start" action by "restart". - dag = Dag(collections) - reconfigure_operations_sorted = list( - map( - lambda x: (dag.node_to_operation(x[0], restart=True), x[1]), - dag.topological_sort_key(operation_hosts, key=lambda x: x[0]), - ) + operation_hosts: set[OperationHostTuple] = set() + for status in hosted_entity_statuses: + if status.to_config: + operation_hosts.add( + OperationHostTuple( + f"{status.entity.name}_config", + status.entity.host, + ) + ) + if status.to_restart: + operation_hosts.add( + OperationHostTuple( + f"{status.entity.name}_restart", + status.entity.host, + ) + ) + if len(operation_hosts) == 0: + raise NothingToReconfigureError("No component needs to be reconfigured.") + # Sort by hosts to improve readability + operation_hosts_sorted = sorted( + operation_hosts, key=lambda x: f"{x.operation_name}_{x.host_name}" + ) + # Sort operations using DAG topological sort. + operation_hosts_sorted = Dag(collections).topological_sort_key( + operation_hosts_sorted, + key=lambda x: x.operation_name.replace("_restart", "_start"), ) - # Generate deployment deployment = DeploymentModel( deployment_type=DeploymentTypeEnum.RECONFIGURE, @@ -367,10 +382,10 @@ def from_stale_hosted_entities( state=DeploymentStateEnum.PLANNED, ) operation_order = 1 - for operation, host in reconfigure_operations_sorted: + for operation, host in operation_hosts_sorted: deployment.operations.append( OperationModel( - operation=operation.name, + operation=operation, operation_order=operation_order, host=host, extra_vars=None, @@ -378,7 +393,10 @@ def from_stale_hosted_entities( ) ) # Add sleep operation after each "restart" - if rolling_interval is not None and operation.action_name == "restart": + if ( + rolling_interval is not None + and Operation(operation).action_name == "restart" + ): operation_order += 1 deployment.operations.append( OperationModel( @@ -389,7 +407,6 @@ def from_stale_hosted_entities( state=OperationStateEnum.PLANNED, ) ) - operation_order += 1 return deployment @@ -471,40 +488,7 @@ def _filter_falsy_options(options: dict) -> dict: class OperationHostTuple(NamedTuple): + """Association of an operation string and its optional host.""" + operation_name: str host_name: Optional[str] - - -def _get_reconfigure_operation_hosts( - stale_hosted_entity_statuses: list[HostedEntityStatus], -) -> list[OperationHostTuple]: - """Get the list of reconfigure operations from a list of hosted entities statuses. - - Args: - stale_hosted_entity_statuses: List of stale hosted entities statuses. - - Returns: List of tuple (operation, host) ordered _. - """ - operation_hosts: set[OperationHostTuple] = set() - for status in stale_hosted_entity_statuses: - if status.to_config: - operation_hosts.add( - OperationHostTuple( - f"{status.entity.name}_config", - status.entity.host, - ) - ) - if status.to_restart: - operation_hosts.add( - OperationHostTuple( - f"{status.entity.name}_start", - status.entity.host, - ) - ) - if len(operation_hosts) == 0: - raise NothingToReconfigureError("No component needs to be reconfigured.") - # Sort by hosts to improve readability - return sorted( - operation_hosts, - key=lambda x: f"{x[0]}_{x[1]}", # order by _ - ) diff --git a/test_dag_order/conftest.py b/test_dag_order/conftest.py index 5bcb9313..3163e3e5 100644 --- a/test_dag_order/conftest.py +++ b/test_dag_order/conftest.py @@ -200,7 +200,7 @@ def plan_reconfigure( # return the deployment plan (it is neither persisted in the database nor executed) return DeploymentModel.from_stale_hosted_entities( collections=collections, - stale_hosted_entity_statuses=dao.get_hosted_entity_statuses(filter_stale=True), + hosted_entity_statuses=dao.get_hosted_entity_statuses(filter_stale=True), )