From fcea7895e926cc24bacf7895ffc66032cda0755a Mon Sep 17 00:00:00 2001 From: Ori Hoch Date: Sun, 28 Apr 2024 14:48:07 +0300 Subject: [PATCH] add graceful termination --skip-tests --- cwm_worker_operator/daemon.py | 14 +++++++++++++- cwm_worker_operator/kafka_streamer.py | 4 ++-- 2 files changed, 15 insertions(+), 3 deletions(-) diff --git a/cwm_worker_operator/daemon.py b/cwm_worker_operator/daemon.py index 8138c04..8950de4 100644 --- a/cwm_worker_operator/daemon.py +++ b/cwm_worker_operator/daemon.py @@ -1,4 +1,5 @@ import time +import signal import prometheus_client @@ -22,6 +23,7 @@ def __init__(self, name, sleep_time_between_iterations_seconds, self.sleep_time_between_iterations_seconds = sleep_time_between_iterations_seconds self.run_single_iteration_extra_kwargs = {} if run_single_iteration_extra_kwargs is None else run_single_iteration_extra_kwargs self.deployments_manager = deployments_manager if deployments_manager else DeploymentsManager() + self.terminate_requested = False def start(self, once=False, with_prometheus=None): if with_prometheus is None: @@ -41,15 +43,25 @@ def start(self, once=False, with_prometheus=None): else: self.start_main_loop() + def on_sigterm(self, *args, **kwargs): + logs.debug_info(f"SIGTERM received for daemon {self.name}") + self.terminate_requested = True + def start_main_loop(self): - while True: + signal.signal(signal.SIGTERM, self.on_sigterm) + while not self.terminate_requested: self.run_single_iteration(**self.run_single_iteration_extra_kwargs) + if self.terminate_requested: + break time.sleep(self.sleep_time_between_iterations_seconds) + assert self.terminate_requested, f'Unexpected termination of daemon {self.name}' + logs.debug_info(f"Graceful termination of daemon {self.name}") def run_single_iteration(self, **kwargs): self.run_single_iteration_callback( domains_config=self.domains_config, metrics=self.metrics, deployments_manager=self.deployments_manager, + daemon=self, **kwargs ) diff --git a/cwm_worker_operator/kafka_streamer.py b/cwm_worker_operator/kafka_streamer.py index febf014..9b9bde8 100644 --- a/cwm_worker_operator/kafka_streamer.py +++ b/cwm_worker_operator/kafka_streamer.py @@ -92,7 +92,7 @@ def delete_records(topic, latest_partition_offset): ], env={**os.environ, 'DEBUG': ''}) -def run_single_iteration(domains_config: DomainsConfig, topic=None, no_kafka_commit=False, no_kafka_delete=False, **_): +def run_single_iteration(domains_config: DomainsConfig, topic, daemon, no_kafka_commit=False, no_kafka_delete=False, **_): start_time = common.now() assert topic, "topic is required" logs.debug(f"running iteration for topic: {topic}", 8) @@ -105,7 +105,7 @@ def run_single_iteration(domains_config: DomainsConfig, topic=None, no_kafka_com latest_partition_offset = {} try: agg_data = {} - while (common.now() - start_time).total_seconds() < config.KAFKA_STREAMER_POLL_TIME_SECONDS: + while (common.now() - start_time).total_seconds() < config.KAFKA_STREAMER_POLL_TIME_SECONDS and not daemon.terminate_requested: msg = consumer.poll(timeout=config.KAFKA_STREAMER_CONSUMER_POLL_TIMEOUT_SECONDS) if msg is None: # logs.debug("Waiting for messages...", 10)