Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create OperationName entity #632

Merged
merged 11 commits into from
Nov 22, 2024
Merged
12 changes: 6 additions & 6 deletions scripts/playbooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from tdp.cli.params import collections_option
from tdp.core.constants import DEFAULT_SERVICE_PRIORITY, SERVICE_PRIORITY
from tdp.core.dag import Dag
from tdp.core.operation import Operation
from tdp.core.entities.operation import OperationName


@click.command()
Expand Down Expand Up @@ -51,14 +51,14 @@ def playbooks(services, output_dir, for_collection, collections):
# For each service, get all operations with DAG topological_sort order
dag_service_operations = {}
for operation in dag.get_all_operations():
dag_services.add_node(operation.service_name)
dag_services.add_node(operation.name.service)
for dependency in operation.depends_on:
dependency_operation = Operation(dependency)
if dependency_operation.service_name != operation.service_name:
dependency_operation = OperationName.from_name(dependency)
if dependency_operation.service != operation.name.service:
dag_services.add_edge(
dependency_operation.service_name, operation.service_name
dependency_operation.service, operation.name.service
)
dag_service_operations.setdefault(operation.service_name, []).append(operation)
dag_service_operations.setdefault(operation.name.service, []).append(operation)

if not nx.is_directed_acyclic_graph(dag_services):
raise RuntimeError("dag_services is not a DAG.")
Expand Down
10 changes: 8 additions & 2 deletions tdp/cli/commands/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,18 @@ def dag(
if color_to:
nodes_to_color.update(
list(
map(lambda o: o.name, dag.get_operations_to_nodes(color_to.split(",")))
map(
lambda o: o.name.name,
dag.get_operations_to_nodes(color_to.split(",")),
)
)
)
if color_from:
nodes_from = list(
map(lambda o: o.name, dag.get_operations_from_nodes(color_from.split(",")))
map(
lambda o: o.name.name,
dag.get_operations_from_nodes(color_from.split(",")),
)
)
if nodes_to_color:
nodes_to_color = nodes_to_color.intersection(nodes_from)
Expand Down
24 changes: 18 additions & 6 deletions tdp/cli/commands/ops.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
# Copyright 2022 TOSIT.IO
# SPDX-License-Identifier: Apache-2.0

from __future__ import annotations

from collections.abc import Iterable
from typing import TYPE_CHECKING

import click
from tabulate import tabulate

from tdp.cli.params import collections_option, hosts_option
from tdp.core.collections import Collections
from tdp.core.dag import Dag
from tdp.core.operation import Operation

if TYPE_CHECKING:
from tdp.core.collections import Collections
from tdp.core.entities.operation import Operation


@click.command()
Expand Down Expand Up @@ -49,26 +54,33 @@ def ops(
]
if topo_sort:
sorted_operations = dag.topological_sort_key(
operations, key=lambda operation: operation.name
operations, key=lambda operation: operation.name.name
)
else:
sorted_operations = sorted(operations, key=lambda operation: operation.name)
sorted_operations = sorted(
operations, key=lambda operation: operation.name.name
)
_print_operations(sorted_operations)
else:
operations = [
operation
for operation in collections.operations.values()
if len(hosts) == 0 or bool(set(operation.host_names) & set(hosts))
]
sorted_operations = sorted(operations, key=lambda operation: operation.name)
sorted_operations = sorted(
operations, key=lambda operation: operation.name.name
)
_print_operations(sorted_operations)


def _print_operations(operations: Iterable[Operation], /):
"""Prints a list of operations."""
click.echo(
tabulate(
[[operation.name, operation.host_names or ""] for operation in operations],
[
[operation.name.name, operation.host_names or ""]
for operation in operations
],
headers=["Operation name", "Hosts"],
)
)
2 changes: 1 addition & 1 deletion tdp/cli/commands/vars/edit.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ def edit(
# Check if component exists
entity_name = parse_entity_name(variables_file.stem)
if isinstance(entity_name, ServiceComponentName):
if entity_name not in collections.hostable_entities[service_name]:
if entity_name not in collections.entities[service_name]:
raise click.ClickException(
f"Error unknown component '{entity_name.component}' for service '{entity_name.service}'"
)
Expand Down
2 changes: 1 addition & 1 deletion tdp/cli/params/status/component_argument.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def _check_component(
collections: Collections = ctx.params["collections"]
service: str = ctx.params["service"]
if value and value not in [
sc_name.component for sc_name in collections.hostable_entities[service]
sc_name.component for sc_name in collections.entities[service]
]:
raise click.UsageError(
f"Component '{value}' does not exists in service '{service}'."
Expand Down
16 changes: 8 additions & 8 deletions tdp/core/cluster_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,9 @@ def generate_stale_sch_logs(

# Add the config and start operations to the set to get their descendants
if config_operation:
source_reconfigure_operations.add(config_operation.name)
source_reconfigure_operations.add(config_operation.name.name)
if start_operation:
source_reconfigure_operations.add(start_operation.name)
source_reconfigure_operations.add(start_operation.name.name)

# Create a log to update the stale status of the entity if a config and/or
# restart operations are available
Expand Down Expand Up @@ -129,28 +129,28 @@ def generate_stale_sch_logs(
nodes=list(source_reconfigure_operations), restart=True
):
# Only create a log when config or restart operation is available
if operation.action_name not in ["config", "restart"]:
if operation.name.action not in ["config", "restart"]:
continue

# Create a log for each host where the entity is deployed
for host in operation.host_names:
log = logs.setdefault(
create_hosted_entity(
create_entity_name(
operation.service_name, operation.component_name
operation.name.service, operation.name.component
),
host,
),
SCHStatusLogModel(
service=operation.service_name,
component=operation.component_name,
service=operation.name.service,
component=operation.name.component,
host=host,
source=SCHStatusLogSourceEnum.STALE,
),
)
if operation.action_name == "config":
if operation.name.action == "config":
log.to_config = True
elif operation.action_name == "restart":
elif operation.name.action == "restart":
log.to_restart = True

return set(logs.values())
Expand Down
18 changes: 7 additions & 11 deletions tdp/core/collections/collections.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@
from typing import TYPE_CHECKING, Optional

from tdp.core.entities.entity_name import ServiceComponentName
from tdp.core.entities.operation import Operations, Playbook
from tdp.core.entities.operation import Operation, Operations, Playbook
from tdp.core.inventory_reader import InventoryReader
from tdp.core.operation import Operation
from tdp.core.variables.schema.service_schema import ServiceSchema

from .collection_reader import CollectionReader
Expand Down Expand Up @@ -52,7 +51,7 @@ def __init__(self, collections: Iterable[CollectionReader]):
self._dag_operations, self._other_operations = self._init_operations()
self._default_var_directories = self._init_default_vars_dirs()
self._schemas = self._init_schemas()
self._services_components = self._init_hostable_entities()
self._services_components = self._init_entities()

@staticmethod
def from_collection_paths(
Expand Down Expand Up @@ -102,7 +101,7 @@ def schemas(self) -> dict[str, ServiceSchema]:
# ? The mapping is using service name as a string for convenience. Should we keep
# ? this or change it to ServiceName?
@property
def hostable_entities(self) -> dict[str, set[ServiceComponentName]]:
def entities(self) -> dict[str, set[ServiceComponentName]]:
"""Mapping of services to their set of components."""
return self._services_components

Expand Down Expand Up @@ -232,13 +231,10 @@ def _init_schemas(self) -> dict[str, ServiceSchema]:
schemas.setdefault(schema.service, ServiceSchema()).add_schema(schema)
return schemas

def _init_hostable_entities(self) -> dict[str, set[ServiceComponentName]]:
def _init_entities(self) -> dict[str, set[ServiceComponentName]]:
services_components: dict[str, set[ServiceComponentName]] = {}
for operation in self.operations.values():
service = services_components.setdefault(operation.service_name, set())
if not operation.component_name:
continue
service.add(
ServiceComponentName(operation.service_name, operation.component_name)
)
service = services_components.setdefault(operation.name.service, set())
if isinstance(operation.name, ServiceComponentName):
service.add(operation.name)
return services_components
24 changes: 12 additions & 12 deletions tdp/core/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@

from tdp.core.constants import DEFAULT_SERVICE_PRIORITY, SERVICE_PRIORITY
from tdp.core.entities.operation import Operations
from tdp.core.operation import Operation

if TYPE_CHECKING:
from tdp.core.collections import Collections
from tdp.core.entities.operation import Operation

T = TypeVar("T")

Expand Down Expand Up @@ -122,7 +122,7 @@ def topological_sort_key(
def priority_key(node: str) -> str:
operation = self.operations[node]
operation_priority = SERVICE_PRIORITY.get(
operation.service_name, DEFAULT_SERVICE_PRIORITY
operation.name.service, DEFAULT_SERVICE_PRIORITY
)
return f"{operation_priority:02d}_{node}"

Expand Down Expand Up @@ -278,13 +278,13 @@ def warning(collection_name: str, message: str) -> None:
c_warning = functools.partial(warning, operation.collection_name)
for dependency in operation.depends_on:
# *_start operations can only be required from within its own service
dependency_service = nodes[dependency].service_name
dependency_service = nodes[dependency].name.service
if (
dependency.endswith("_start")
and dependency_service != operation.service_name
and dependency_service != operation.name.service
):
c_warning(
f"Operation '{operation_name}' is in service '{operation.service_name}', depends on "
f"Operation '{operation_name}' is in service '{operation.name.service}', depends on "
f"'{dependency}' which is a start action in service '{dependency_service}' and should "
f"only depends on start action within its own service"
)
Expand All @@ -301,22 +301,22 @@ def warning(collection_name: str, message: str) -> None:
# Each service (HDFS, HBase, Hive, etc) should have *_install, *_config, *_init and *_start actions
# even if they are "empty" (tagged with noop)
# Part 1
service_actions = services_actions.setdefault(operation.service_name, set())
service_actions = services_actions.setdefault(operation.name.service, set())
if operation.is_service_operation():
service_actions.add(operation.action_name)
service_actions.add(operation.name.action)

# Each service action (config, start, init) except the first (install) must have an explicit
# dependency with the previous service action within the same service
actions_order = ["install", "config", "start", "init"]
# Check only if the action is in actions_order and is not the first
if (
operation.action_name in actions_order
and operation.action_name != actions_order[0]
operation.name.action in actions_order
and operation.name.action != actions_order[0]
):
previous_action = actions_order[
actions_order.index(operation.action_name) - 1
actions_order.index(operation.name.action) - 1
]
previous_service_action = f"{operation.service_name}_{previous_action}"
previous_service_action = f"{operation.name.service}_{previous_action}"
previous_service_action_found = False
# Loop over dependency and check if the service previous action is found
for dependency in operation.depends_on:
Expand All @@ -325,7 +325,7 @@ def warning(collection_name: str, message: str) -> None:
if not previous_service_action_found:
c_warning(
f"Operation '{operation_name}' is a service action and has to depend on "
f"'{operation.service_name}_{previous_action}'"
f"'{operation.name.service}_{previous_action}'"
)

# Operations tagged with the noop flag should not have a playbook defined in the collection
Expand Down
12 changes: 6 additions & 6 deletions tdp/core/dag_dot.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

import networkx as nx

from tdp.core.operation import Operation
from tdp.core.entities.operation import OperationName


# Needed :
Expand Down Expand Up @@ -54,15 +54,15 @@ def to_pydot(
for dot_node in dot_nodes:
# Dot node name can be quoted, remove it
operation_name = dot_node.get_name().strip('"')
operation = Operation(operation_name)
operation_name = OperationName.from_name(operation_name)
subgraphs.setdefault(
operation.service_name,
operation_name.service,
pydot.Cluster(
operation.service_name,
label=operation.service_name,
operation_name.service,
label=operation_name.service,
fontname="Roboto",
),
).add_node(pydot.Node(operation_name))
).add_node(pydot.Node(operation_name.name))

for service_name, subgraph in sorted(subgraphs.items()):
pydot_graph.add_subgraph(subgraph)
Expand Down
8 changes: 4 additions & 4 deletions tdp/core/deployment/deployment_iterator.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,12 +161,12 @@ def _process_operation_fn(
# ===== Update the cluster status if success =====

# Skip sleep operation
if operation.name == OPERATION_SLEEP_NAME:
if operation.name.name == OPERATION_SLEEP_NAME:
return

sch_status_logs: list[SCHStatusLogModel] = []
entity_name = create_entity_name(
operation.service_name, operation.component_name
operation.name.service, operation.name.component
)

if self._cluster_status.is_sc_stale(entity_name, hosts=operation.host_names):
Expand Down Expand Up @@ -205,8 +205,8 @@ def _process_operation_fn(
for host in hosts:
sch_status_log = self._cluster_status.update_hosted_entity(
create_hosted_entity(entity_name, host),
action_name=operation.action_name,
version=self._cluster_variables[operation.service_name].version,
action_name=operation.name.action,
version=self._cluster_variables[operation.name.service].version,
can_update_stale=can_update_stale,
)
if sch_status_log:
Expand Down
2 changes: 1 addition & 1 deletion tdp/core/deployment/deployment_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def _run_operation(self, operation_rec: OperationModel) -> None:
return

# Execute the operation
playbook_file = self._collections.playbooks[operation.name].path
playbook_file = self._collections.playbooks[operation.name.name].path
state, logs = self._executor.execute(
playbook=playbook_file,
host=operation_rec.host,
Expand Down
Loading
Loading