Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor stale operations #630

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
4 changes: 1 addition & 3 deletions tdp/cli/commands/plan/reconfigure.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,7 @@ def reconfigure(
with Dao(db_engine, commit_on_exit=True) as dao:
deployment = DeploymentModel.from_stale_hosted_entities(
collections=collections,
stale_hosted_entity_statuses=dao.get_hosted_entity_statuses(
filter_stale=True
),
hosted_entity_statuses=dao.get_hosted_entity_statuses(filter_stale=True),
rolling_interval=rolling_interval,
)
if preview:
Expand Down
6 changes: 3 additions & 3 deletions tdp/core/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def graph(self) -> nx.DiGraph:
"""DAG graph."""
return self._graph

def node_to_operation(
def _node_to_operation(
self, node: str, restart: bool = False, stop: bool = False
) -> Operation:
# ? Restart operations are now stored in collections.operations they can be
Expand Down Expand Up @@ -152,7 +152,7 @@ def topological_sort(
"""
return list(
map(
lambda node: self.node_to_operation(node, restart=restart, stop=stop),
lambda node: self._node_to_operation(node, restart=restart, stop=stop),
self.topological_sort_key(nodes),
)
)
Expand Down Expand Up @@ -234,7 +234,7 @@ def get_operation_descendants(
nodes_filtered = filter(lambda node: node not in nodes, nodes_set)
return list(
map(
lambda node: self.node_to_operation(node, restart=restart, stop=stop),
lambda node: self._node_to_operation(node, restart=restart, stop=stop),
nodes_filtered,
)
)
Expand Down
4 changes: 1 addition & 3 deletions tdp/core/deployment/deployment_iterator.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,7 @@ def __init__(
self._reconfigure_operations = _group_hosts_by_operation(
DeploymentModel.from_stale_hosted_entities(
collections=self._collections,
stale_hosted_entity_statuses=[
status for status in cluster_status.values() if status.is_stale
],
hosted_entity_statuses=list(cluster_status.values()),
)
)
except NothingToReconfigureError:
Expand Down
88 changes: 36 additions & 52 deletions tdp/core/models/deployment_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -329,31 +329,46 @@ def from_operations_hosts_vars(
@staticmethod
def from_stale_hosted_entities(
collections: Collections,
stale_hosted_entity_statuses: list[HostedEntityStatus],
hosted_entity_statuses: list[HostedEntityStatus],
rolling_interval: Optional[int] = None,
) -> DeploymentModel:
"""Generate a deployment plan for stale components.

Args:
collections: Collections to retrieve the operations from.
stale_hosted_entity_statuses: List of stale hosted entity statuses.
hosted_entity_statuses: List of hosted entity statuses.
rolling_interval: Number of seconds to wait between component restart.

Raises:
NothingToReconfigureError: If no component needs to be reconfigured.
"""
operation_hosts = _get_reconfigure_operation_hosts(stale_hosted_entity_statuses)

# Sort operations using DAG topological sort. Convert operation name to
# Operation instance and replace "start" action by "restart".
dag = Dag(collections)
reconfigure_operations_sorted = list(
map(
lambda x: (dag.node_to_operation(x[0], restart=True), x[1]),
dag.topological_sort_key(operation_hosts, key=lambda x: x[0]),
)
operation_hosts: set[OperationHostTuple] = set()
for status in hosted_entity_statuses:
if status.to_config:
operation_hosts.add(
OperationHostTuple(
f"{status.entity.name}_config",
status.entity.host,
)
)
if status.to_restart:
operation_hosts.add(
OperationHostTuple(
f"{status.entity.name}_restart",
status.entity.host,
)
)
if len(operation_hosts) == 0:
raise NothingToReconfigureError("No component needs to be reconfigured.")
# Sort by hosts to improve readability
operation_hosts_sorted = sorted(
operation_hosts, key=lambda x: f"{x.operation_name}_{x.host_name}"
)
# Sort operations using DAG topological sort.
operation_hosts_sorted = Dag(collections).topological_sort_key(
operation_hosts_sorted,
key=lambda x: x.operation_name.replace("_restart", "_start"),
)

# Generate deployment
deployment = DeploymentModel(
deployment_type=DeploymentTypeEnum.RECONFIGURE,
Expand All @@ -367,18 +382,21 @@ def from_stale_hosted_entities(
state=DeploymentStateEnum.PLANNED,
)
operation_order = 1
for operation, host in reconfigure_operations_sorted:
for operation, host in operation_hosts_sorted:
deployment.operations.append(
OperationModel(
operation=operation.name,
operation=operation,
operation_order=operation_order,
host=host,
extra_vars=None,
state=OperationStateEnum.PLANNED,
)
)
# Add sleep operation after each "restart"
if rolling_interval is not None and operation.action_name == "restart":
if (
rolling_interval is not None
and Operation(operation).action_name == "restart"
):
operation_order += 1
deployment.operations.append(
OperationModel(
Expand All @@ -389,7 +407,6 @@ def from_stale_hosted_entities(
state=OperationStateEnum.PLANNED,
)
)

operation_order += 1
return deployment

Expand Down Expand Up @@ -471,40 +488,7 @@ def _filter_falsy_options(options: dict) -> dict:


class OperationHostTuple(NamedTuple):
"""Association of an operation string and its optional host."""

operation_name: str
host_name: Optional[str]


def _get_reconfigure_operation_hosts(
stale_hosted_entity_statuses: list[HostedEntityStatus],
) -> list[OperationHostTuple]:
"""Get the list of reconfigure operations from a list of hosted entities statuses.

Args:
stale_hosted_entity_statuses: List of stale hosted entities statuses.

Returns: List of tuple (operation, host) ordered <operation-name>_<host>.
"""
operation_hosts: set[OperationHostTuple] = set()
for status in stale_hosted_entity_statuses:
if status.to_config:
operation_hosts.add(
OperationHostTuple(
f"{status.entity.name}_config",
status.entity.host,
)
)
if status.to_restart:
operation_hosts.add(
OperationHostTuple(
f"{status.entity.name}_start",
status.entity.host,
)
)
if len(operation_hosts) == 0:
raise NothingToReconfigureError("No component needs to be reconfigured.")
# Sort by hosts to improve readability
return sorted(
operation_hosts,
key=lambda x: f"{x[0]}_{x[1]}", # order by <operation-name>_<host-name>
)
2 changes: 1 addition & 1 deletion test_dag_order/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ def plan_reconfigure(
# return the deployment plan (it is neither persisted in the database nor executed)
return DeploymentModel.from_stale_hosted_entities(
collections=collections,
stale_hosted_entity_statuses=dao.get_hosted_entity_statuses(filter_stale=True),
hosted_entity_statuses=dao.get_hosted_entity_statuses(filter_stale=True),
)


Expand Down
Loading