Skip to content

Commit

Permalink
add graceful termination --skip-tests
Browse files Browse the repository at this point in the history
  • Loading branch information
OriHoch committed Apr 28, 2024
1 parent 3ab17d7 commit fcea789
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 3 deletions.
14 changes: 13 additions & 1 deletion cwm_worker_operator/daemon.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import time
import signal

import prometheus_client

Expand All @@ -22,6 +23,7 @@ def __init__(self, name, sleep_time_between_iterations_seconds,
self.sleep_time_between_iterations_seconds = sleep_time_between_iterations_seconds
self.run_single_iteration_extra_kwargs = {} if run_single_iteration_extra_kwargs is None else run_single_iteration_extra_kwargs
self.deployments_manager = deployments_manager if deployments_manager else DeploymentsManager()
self.terminate_requested = False

def start(self, once=False, with_prometheus=None):
if with_prometheus is None:
Expand All @@ -41,15 +43,25 @@ def start(self, once=False, with_prometheus=None):
else:
self.start_main_loop()

def on_sigterm(self, *args, **kwargs):
logs.debug_info(f"SIGTERM received for daemon {self.name}")
self.terminate_requested = True

def start_main_loop(self):
while True:
signal.signal(signal.SIGTERM, self.on_sigterm)
while not self.terminate_requested:
self.run_single_iteration(**self.run_single_iteration_extra_kwargs)
if self.terminate_requested:
break
time.sleep(self.sleep_time_between_iterations_seconds)
assert self.terminate_requested, f'Unexpected termination of daemon {self.name}'
logs.debug_info(f"Graceful termination of daemon {self.name}")

def run_single_iteration(self, **kwargs):
self.run_single_iteration_callback(
domains_config=self.domains_config,
metrics=self.metrics,
deployments_manager=self.deployments_manager,
daemon=self,
**kwargs
)
4 changes: 2 additions & 2 deletions cwm_worker_operator/kafka_streamer.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ def delete_records(topic, latest_partition_offset):
], env={**os.environ, 'DEBUG': ''})


def run_single_iteration(domains_config: DomainsConfig, topic=None, no_kafka_commit=False, no_kafka_delete=False, **_):
def run_single_iteration(domains_config: DomainsConfig, topic, daemon, no_kafka_commit=False, no_kafka_delete=False, **_):
start_time = common.now()
assert topic, "topic is required"
logs.debug(f"running iteration for topic: {topic}", 8)
Expand All @@ -105,7 +105,7 @@ def run_single_iteration(domains_config: DomainsConfig, topic=None, no_kafka_com
latest_partition_offset = {}
try:
agg_data = {}
while (common.now() - start_time).total_seconds() < config.KAFKA_STREAMER_POLL_TIME_SECONDS:
while (common.now() - start_time).total_seconds() < config.KAFKA_STREAMER_POLL_TIME_SECONDS and not daemon.terminate_requested:
msg = consumer.poll(timeout=config.KAFKA_STREAMER_CONSUMER_POLL_TIMEOUT_SECONDS)
if msg is None:
# logs.debug("Waiting for messages...", 10)
Expand Down

0 comments on commit fcea789

Please sign in to comment.