diff --git a/tdp/cli/commands/status/edit.py b/tdp/cli/commands/status/edit.py index 03397693..d72f9311 100644 --- a/tdp/cli/commands/status/edit.py +++ b/tdp/cli/commands/status/edit.py @@ -16,7 +16,10 @@ from tdp.cli.session import get_session from tdp.cli.utils import check_services_cleanliness from tdp.core.cluster_status import ClusterStatus -from tdp.core.models.sch_status_log import SCHStatusLog, SCHStatusLogSourceEnum +from tdp.core.models.sch_status_log_model import ( + SCHStatusLogModel, + SCHStatusLogSourceEnum, +) from tdp.core.variables import ClusterVariables if TYPE_CHECKING: @@ -81,7 +84,7 @@ def edit( # Create a new SCHStatusLog for each host for host in hosts: session.add( - SCHStatusLog( + SCHStatusLogModel( service=service, component=component, host=host, diff --git a/tdp/cli/queries.py b/tdp/cli/queries.py index f36962b5..9d5db2f9 100644 --- a/tdp/cli/queries.py +++ b/tdp/cli/queries.py @@ -11,7 +11,7 @@ from tdp.core.models import ( DeploymentModel, OperationLog, - SCHStatusLog, + SCHStatusLogModel, ) if TYPE_CHECKING: @@ -37,78 +37,82 @@ def get_sch_status( # combination. latest_configured_version_timestamp_subquery = ( session.query( - SCHStatusLog.service, - SCHStatusLog.component, - SCHStatusLog.host, + SCHStatusLogModel.service, + SCHStatusLogModel.component, + SCHStatusLogModel.host, func.max( case( ( - SCHStatusLog.configured_version != None, - SCHStatusLog.event_time, + SCHStatusLogModel.configured_version != None, + SCHStatusLogModel.event_time, ) ) ).label("latest_configured_version_timestamp"), ) .group_by( - SCHStatusLog.service, - SCHStatusLog.component, - SCHStatusLog.host, + SCHStatusLogModel.service, + SCHStatusLogModel.component, + SCHStatusLogModel.host, ) .subquery() ) latest_running_version_timestamp_subquery = ( session.query( - SCHStatusLog.service, - SCHStatusLog.component, - SCHStatusLog.host, + SCHStatusLogModel.service, + SCHStatusLogModel.component, + SCHStatusLogModel.host, func.max( case( ( - SCHStatusLog.running_version != None, - SCHStatusLog.event_time, + SCHStatusLogModel.running_version != None, + SCHStatusLogModel.event_time, ) ) ).label("latest_running_version_timestamp"), ) .group_by( - SCHStatusLog.service, - SCHStatusLog.component, - SCHStatusLog.host, + SCHStatusLogModel.service, + SCHStatusLogModel.component, + SCHStatusLogModel.host, ) .subquery() ) latest_to_config_timestamp_subquery = ( session.query( - SCHStatusLog.service, - SCHStatusLog.component, - SCHStatusLog.host, + SCHStatusLogModel.service, + SCHStatusLogModel.component, + SCHStatusLogModel.host, func.max( - case((SCHStatusLog.to_config != None, SCHStatusLog.event_time)) + case( + (SCHStatusLogModel.to_config != None, SCHStatusLogModel.event_time) + ) ).label("latest_to_config_timestamp"), ) .group_by( - SCHStatusLog.service, - SCHStatusLog.component, - SCHStatusLog.host, + SCHStatusLogModel.service, + SCHStatusLogModel.component, + SCHStatusLogModel.host, ) .subquery() ) latest_to_restart_timestamp_subquery = ( session.query( - SCHStatusLog.service, - SCHStatusLog.component, - SCHStatusLog.host, + SCHStatusLogModel.service, + SCHStatusLogModel.component, + SCHStatusLogModel.host, func.max( - case((SCHStatusLog.to_restart != None, SCHStatusLog.event_time)) + case( + (SCHStatusLogModel.to_restart != None, SCHStatusLogModel.event_time) + ) ).label("latest_to_restart_timestamp"), ) .group_by( - SCHStatusLog.service, - SCHStatusLog.component, - SCHStatusLog.host, + SCHStatusLogModel.service, + SCHStatusLogModel.component, + SCHStatusLogModel.host, ) .subquery() ) @@ -116,31 +120,31 @@ def get_sch_status( # Get the latest values for each (service, component, host) combination. latest_configured_version_value_subquery = ( session.query( - SCHStatusLog.service, - SCHStatusLog.component, - SCHStatusLog.host, - SCHStatusLog.configured_version, - SCHStatusLog.event_time, + SCHStatusLogModel.service, + SCHStatusLogModel.component, + SCHStatusLogModel.host, + SCHStatusLogModel.configured_version, + SCHStatusLogModel.event_time, ) .join( latest_configured_version_timestamp_subquery, and_( - SCHStatusLog.service + SCHStatusLogModel.service == latest_configured_version_timestamp_subquery.c.service, # Check for null component or if they are equal or_( - SCHStatusLog.component == None, - SCHStatusLog.component + SCHStatusLogModel.component == None, + SCHStatusLogModel.component == latest_configured_version_timestamp_subquery.c.component, ), # Check for null host or if they are equal or_( - SCHStatusLog.host == None, - SCHStatusLog.host + SCHStatusLogModel.host == None, + SCHStatusLogModel.host == latest_configured_version_timestamp_subquery.c.host, ), # Join based on matching timestamps for all the columns in the subquery. - SCHStatusLog.event_time + SCHStatusLogModel.event_time == latest_configured_version_timestamp_subquery.c.latest_configured_version_timestamp, ), ) @@ -149,31 +153,31 @@ def get_sch_status( latest_running_version_value_subquery = ( session.query( - SCHStatusLog.service, - SCHStatusLog.component, - SCHStatusLog.host, - SCHStatusLog.running_version, - SCHStatusLog.event_time, + SCHStatusLogModel.service, + SCHStatusLogModel.component, + SCHStatusLogModel.host, + SCHStatusLogModel.running_version, + SCHStatusLogModel.event_time, ) .join( latest_running_version_timestamp_subquery, and_( - SCHStatusLog.service + SCHStatusLogModel.service == latest_running_version_timestamp_subquery.c.service, # Check for null component or if they are equal or_( - SCHStatusLog.component == None, - SCHStatusLog.component + SCHStatusLogModel.component == None, + SCHStatusLogModel.component == latest_running_version_timestamp_subquery.c.component, ), # Check for null host or if they are equal or_( - SCHStatusLog.host == None, - SCHStatusLog.host + SCHStatusLogModel.host == None, + SCHStatusLogModel.host == latest_running_version_timestamp_subquery.c.host, ), # Join based on matching timestamps for all the columns in the subquery. - SCHStatusLog.event_time + SCHStatusLogModel.event_time == latest_running_version_timestamp_subquery.c.latest_running_version_timestamp, ), ) @@ -182,29 +186,31 @@ def get_sch_status( latest_to_config_value_subquery = ( session.query( - SCHStatusLog.service, - SCHStatusLog.component, - SCHStatusLog.host, - SCHStatusLog.to_config, - SCHStatusLog.event_time, + SCHStatusLogModel.service, + SCHStatusLogModel.component, + SCHStatusLogModel.host, + SCHStatusLogModel.to_config, + SCHStatusLogModel.event_time, ) .join( latest_to_config_timestamp_subquery, and_( - SCHStatusLog.service == latest_to_config_timestamp_subquery.c.service, + SCHStatusLogModel.service + == latest_to_config_timestamp_subquery.c.service, # Check for null component or if they are equal or_( - SCHStatusLog.component == None, - SCHStatusLog.component + SCHStatusLogModel.component == None, + SCHStatusLogModel.component == latest_to_config_timestamp_subquery.c.component, ), # Check for null host or if they are equal or_( - SCHStatusLog.host == None, - SCHStatusLog.host == latest_to_config_timestamp_subquery.c.host, + SCHStatusLogModel.host == None, + SCHStatusLogModel.host + == latest_to_config_timestamp_subquery.c.host, ), # Join based on matching timestamps for all the columns in the subquery. - SCHStatusLog.event_time + SCHStatusLogModel.event_time == latest_to_config_timestamp_subquery.c.latest_to_config_timestamp, ), ) @@ -213,29 +219,31 @@ def get_sch_status( latest_to_restart_value_subquery = ( session.query( - SCHStatusLog.service, - SCHStatusLog.component, - SCHStatusLog.host, - SCHStatusLog.to_restart, - SCHStatusLog.event_time, + SCHStatusLogModel.service, + SCHStatusLogModel.component, + SCHStatusLogModel.host, + SCHStatusLogModel.to_restart, + SCHStatusLogModel.event_time, ) .join( latest_to_restart_timestamp_subquery, and_( - SCHStatusLog.service == latest_to_restart_timestamp_subquery.c.service, + SCHStatusLogModel.service + == latest_to_restart_timestamp_subquery.c.service, # Check for null component or if they are equal or_( - SCHStatusLog.component == None, - SCHStatusLog.component + SCHStatusLogModel.component == None, + SCHStatusLogModel.component == latest_to_restart_timestamp_subquery.c.component, ), # Check for null host or if they are equal or_( - SCHStatusLog.host == None, - SCHStatusLog.host == latest_to_restart_timestamp_subquery.c.host, + SCHStatusLogModel.host == None, + SCHStatusLogModel.host + == latest_to_restart_timestamp_subquery.c.host, ), # Join based on matching timestamps for all the columns in the subquery. - SCHStatusLog.event_time + SCHStatusLogModel.event_time == latest_to_restart_timestamp_subquery.c.latest_to_restart_timestamp, ), ) @@ -264,9 +272,9 @@ def get_sch_status( return ( session.query( - SCHStatusLog.service, - SCHStatusLog.component, - SCHStatusLog.host, + SCHStatusLogModel.service, + SCHStatusLogModel.component, + SCHStatusLogModel.host, max_running_version, max_configured_version, max_to_config, @@ -275,89 +283,92 @@ def get_sch_status( .outerjoin( latest_running_version_value_subquery, and_( - SCHStatusLog.service == latest_running_version_value_subquery.c.service, + SCHStatusLogModel.service + == latest_running_version_value_subquery.c.service, # Check for null component or if they are equal or_( - SCHStatusLog.component == None, - SCHStatusLog.component + SCHStatusLogModel.component == None, + SCHStatusLogModel.component == latest_running_version_value_subquery.c.component, ), # Check for null host or if they are equal or_( - SCHStatusLog.host == None, - SCHStatusLog.host == latest_running_version_value_subquery.c.host, + SCHStatusLogModel.host == None, + SCHStatusLogModel.host + == latest_running_version_value_subquery.c.host, ), - SCHStatusLog.event_time + SCHStatusLogModel.event_time == latest_running_version_value_subquery.c.event_time, ), ) .outerjoin( latest_configured_version_value_subquery, and_( - SCHStatusLog.service + SCHStatusLogModel.service == latest_configured_version_value_subquery.c.service, # Check for null component or if they are equal or_( - SCHStatusLog.component == None, - SCHStatusLog.component + SCHStatusLogModel.component == None, + SCHStatusLogModel.component == latest_configured_version_value_subquery.c.component, ), # Check for null host or if they are equal or_( - SCHStatusLog.host == None, - SCHStatusLog.host + SCHStatusLogModel.host == None, + SCHStatusLogModel.host == latest_configured_version_value_subquery.c.host, ), - SCHStatusLog.event_time + SCHStatusLogModel.event_time == latest_configured_version_value_subquery.c.event_time, ), ) .outerjoin( latest_to_config_value_subquery, and_( - SCHStatusLog.service == latest_to_config_value_subquery.c.service, + SCHStatusLogModel.service == latest_to_config_value_subquery.c.service, # Check for null component or if they are equal or_( - SCHStatusLog.component == None, - SCHStatusLog.component + SCHStatusLogModel.component == None, + SCHStatusLogModel.component == latest_to_config_value_subquery.c.component, ), # Check for null host or if they are equal or_( - SCHStatusLog.host == None, - SCHStatusLog.host == latest_to_config_value_subquery.c.host, + SCHStatusLogModel.host == None, + SCHStatusLogModel.host == latest_to_config_value_subquery.c.host, ), - SCHStatusLog.event_time == latest_to_config_value_subquery.c.event_time, + SCHStatusLogModel.event_time + == latest_to_config_value_subquery.c.event_time, ), ) .outerjoin( latest_to_restart_value_subquery, and_( - SCHStatusLog.service == latest_to_restart_value_subquery.c.service, + SCHStatusLogModel.service == latest_to_restart_value_subquery.c.service, # Check for null component or if they are equal or_( - SCHStatusLog.component == None, - SCHStatusLog.component + SCHStatusLogModel.component == None, + SCHStatusLogModel.component == latest_to_restart_value_subquery.c.component, ), # Check for null host or if they are equal or_( - SCHStatusLog.host == None, - SCHStatusLog.host == latest_to_restart_value_subquery.c.host, + SCHStatusLogModel.host == None, + SCHStatusLogModel.host == latest_to_restart_value_subquery.c.host, ), - SCHStatusLog.event_time + SCHStatusLogModel.event_time == latest_to_restart_value_subquery.c.event_time, ), ) .group_by( - SCHStatusLog.service, - SCHStatusLog.component, - SCHStatusLog.host, + SCHStatusLogModel.service, + SCHStatusLogModel.component, + SCHStatusLogModel.host, ) .order_by( - SCHStatusLog.service, - SCHStatusLog.component, - SCHStatusLog.host, + SCHStatusLogModel.service, + SCHStatusLogModel.component, + SCHStatusLogModel.host, ) .all() ) diff --git a/tdp/cli/test_get_sch_status.py b/tdp/cli/test_get_sch_status.py index b1ca1771..a2782fe0 100644 --- a/tdp/cli/test_get_sch_status.py +++ b/tdp/cli/test_get_sch_status.py @@ -10,7 +10,7 @@ from tdp.cli.queries import get_sch_status from tdp.core.models import ( - SCHStatusLog, + SCHStatusLogModel, SCHStatusLogSourceEnum, ServiceComponentHostStatus, ) @@ -38,13 +38,13 @@ def _mock_sch_status_log( host: Optional[str], n: int = 50, seed: Optional[str] = None, -) -> List["SCHStatusLog"]: +) -> List["SCHStatusLogModel"]: """Generate n mock SCHStatusLog entries.""" _set_seed(seed) logs = [] for _ in range(n): logs.append( - SCHStatusLog( + SCHStatusLogModel( service=service, component=component, host=host, @@ -63,7 +63,7 @@ def _mock_sch_status_log( def _last_values( - logs: List["SCHStatusLog"], + logs: List["SCHStatusLogModel"], ) -> ServiceComponentHostStatus: """Return an SCHStatusLog holding the last non None value for each column from a list of logs.""" return ( diff --git a/tdp/core/cluster_status.py b/tdp/core/cluster_status.py index 796a1c87..ca195516 100644 --- a/tdp/core/cluster_status.py +++ b/tdp/core/cluster_status.py @@ -8,7 +8,10 @@ from typing import TYPE_CHECKING, Any, Optional from tdp.core.dag import Dag -from tdp.core.models.sch_status_log import SCHStatusLog, SCHStatusLogSourceEnum +from tdp.core.models.sch_status_log_model import ( + SCHStatusLogModel, + SCHStatusLogSourceEnum, +) from tdp.core.service_component_host_name import ServiceComponentHostName from tdp.core.service_component_name import ServiceComponentName @@ -106,7 +109,7 @@ def update( configured_version: Optional[str] = None, to_config: Optional[bool] = None, to_restart: Optional[bool] = None, - ) -> Optional[SCHStatusLog]: + ) -> Optional[SCHStatusLogModel]: """Update the status of a service component host, returns a SCHStatusLog if the status was updated. Args: @@ -128,7 +131,7 @@ def update( return # Base log - log = SCHStatusLog( + log = SCHStatusLogModel( service=self.service, component=self.component, host=self.host, @@ -259,7 +262,7 @@ def generate_stale_sch_logs( *, cluster_variables: ClusterVariables, collections: Collections, - ) -> set[SCHStatusLog]: + ) -> set[SCHStatusLogModel]: """Generate SCHStatusLog(s) for components that need to be configured or restarted. This method identifies components that have undergone changes in their @@ -275,7 +278,7 @@ def generate_stale_sch_logs( Returns: Set of SCHStatusLog. """ - stale_sch_logs_dict: dict[ServiceComponentHostName, SCHStatusLog] = {} + stale_sch_logs_dict: dict[ServiceComponentHostName, SCHStatusLogModel] = {} source_reconfigure_operations: set[str] = set() modified_sch = cluster_variables.get_modified_sch(self.values()) @@ -300,7 +303,7 @@ def generate_stale_sch_logs( # Create SCHStatusLog for modified sch if config_operation or restart_operation: - stale_sch_log = stale_sch_logs_dict[sch] = SCHStatusLog( + stale_sch_log = stale_sch_logs_dict[sch] = SCHStatusLogModel( service=sc.service_name, component=sc.component_name, host=sch.host_name, @@ -332,7 +335,7 @@ def generate_stale_sch_logs( ), host_name=host, ), - SCHStatusLog( + SCHStatusLogModel( service=operation.service_name, component=operation.component_name, host=host, @@ -390,7 +393,7 @@ def update_sch( action_name: str, version: Optional[str] = None, can_update_stale: bool = False, - ) -> Optional[SCHStatusLog]: + ) -> Optional[SCHStatusLogModel]: """Update the status of a sch, returns a log if the status was updated. Args: diff --git a/tdp/core/deployment/deployment_iterator.py b/tdp/core/deployment/deployment_iterator.py index 936e4b8c..fa0eb8e2 100644 --- a/tdp/core/deployment/deployment_iterator.py +++ b/tdp/core/deployment/deployment_iterator.py @@ -17,7 +17,7 @@ NothingToReconfigureError, OperationLog, OperationStateEnum, - SCHStatusLog, + SCHStatusLogModel, SCHStatusLogSourceEnum, ) from tdp.core.service_component_host_name import ServiceComponentHostName @@ -56,7 +56,7 @@ def _group_hosts_by_operation( return operation_to_hosts_set -class DeploymentIterator(Iterator[Optional[list[SCHStatusLog]]]): +class DeploymentIterator(Iterator[Optional[list[SCHStatusLogModel]]]): """Iterator that runs an operation at each iteration. Attributes: @@ -99,7 +99,7 @@ def __init__( except NothingToReconfigureError: self._reconfigure_operations = None - def __next__(self) -> Optional[list[SCHStatusLog]]: + def __next__(self) -> Optional[list[SCHStatusLogModel]]: try: while True: operation_log: OperationLog = next(self._iter) @@ -131,7 +131,7 @@ def __next__(self) -> Optional[list[SCHStatusLog]]: if operation.name == OPERATION_SLEEP_NAME: return - sch_status_logs: list[SCHStatusLog] = [] + sch_status_logs: list[SCHStatusLogModel] = [] sc_name = ServiceComponentName( service_name=operation.service_name, component_name=operation.component_name, diff --git a/tdp/core/models/__init__.py b/tdp/core/models/__init__.py index c10c525c..141e66b6 100644 --- a/tdp/core/models/__init__.py +++ b/tdp/core/models/__init__.py @@ -15,7 +15,10 @@ NothingToResumeError, ) from tdp.core.models.operation_log import OperationLog -from tdp.core.models.sch_status_log import SCHStatusLog, SCHStatusLogSourceEnum +from tdp.core.models.sch_status_log_model import ( + SCHStatusLogModel, + SCHStatusLogSourceEnum, +) from tdp.core.models.state_enum import DeploymentStateEnum, OperationStateEnum ServiceComponentHostStatus = tuple[ diff --git a/tdp/core/models/sch_status_log.py b/tdp/core/models/sch_status_log_model.py similarity index 98% rename from tdp/core/models/sch_status_log.py rename to tdp/core/models/sch_status_log_model.py index c223c35b..963ab93f 100644 --- a/tdp/core/models/sch_status_log.py +++ b/tdp/core/models/sch_status_log_model.py @@ -26,7 +26,7 @@ class SCHStatusLogSourceEnum(BaseEnum): MANUAL = "Manual" -class SCHStatusLog(BaseModel): +class SCHStatusLogModel(BaseModel): """Hold what component version are deployed.""" __tablename__ = "sch_status_log" diff --git a/tdp/core/models/test_models.py b/tdp/core/models/test_models.py index c5f772d3..a8bf550a 100644 --- a/tdp/core/models/test_models.py +++ b/tdp/core/models/test_models.py @@ -6,7 +6,7 @@ from sqlalchemy.orm import Session -from tdp.core.models import DeploymentModel, OperationLog, SCHStatusLog +from tdp.core.models import DeploymentModel, OperationLog, SCHStatusLogModel logger = logging.getLogger(__name__) @@ -27,7 +27,7 @@ def test_create_deployment(db_session: Session): status="SUCCESS", deployment_type="Dag", ) - component_version_log = SCHStatusLog( + component_version_log = SCHStatusLogModel( deployment_id=deployment.id, service="service1", component="component1",