diff --git a/changedetectionio/flask_app.py b/changedetectionio/flask_app.py index 84e804f7042..f6d3bc59b5a 100644 --- a/changedetectionio/flask_app.py +++ b/changedetectionio/flask_app.py @@ -1547,6 +1547,7 @@ def highlight_submit_ignore_url(): # @todo handle ctrl break ticker_thread = threading.Thread(target=ticker_thread_check_time_launch_checks).start() threading.Thread(target=notification_runner).start() + threading.Thread(target=thread_maintain_worker_thread_pool).start() # Check for new release version, but not when running in test/build or pytest if not os.getenv("GITHUB_REF", False) and not config.get('disable_checkver') == True: @@ -1629,23 +1630,73 @@ def notification_runner(): # Trim the log length notification_debug_log = notification_debug_log[-100:] + +def thread_maintain_worker_thread_pool(): + from changedetectionio import update_worker + + n_workers = int(os.getenv("FETCH_WORKERS", datastore.data['settings']['requests']['workers'])) + + while not app.config.exit.is_set(): + needed_threads = n_workers if not running_update_threads else 0 + how_many_running_now = 0 + dead_threads = [] + + for i, t in enumerate(running_update_threads): + if t.is_alive(): + how_many_running_now += 1 + else: + dead_threads.append(i) + + for i in dead_threads: + del running_update_threads[i] + + for _ in range(needed_threads - how_many_running_now): + logger.info("Adding new worker thread") + new_worker = update_worker.update_worker(update_q, notification_q, app, datastore) + running_update_threads.append(new_worker) + new_worker.start() + + app.config.exit.wait(2) + + + +def thread_maintain_worker_thread_pool(): + from changedetectionio import update_worker + + logger.info("Starting thread pool worker maintainer thread") + n_workers = int(os.getenv("FETCH_WORKERS", datastore.data['settings']['requests']['workers'])) + + while not app.config.exit.is_set(): + needed_threads = n_workers if not running_update_threads else 0 + how_many_running_now = 0 + dead_threads = [] + + for i, t in enumerate(running_update_threads): + if t.is_alive(): + how_many_running_now += 1 + else: + dead_threads.append(i) + + for i in dead_threads: + del running_update_threads[i] + + for _ in range(needed_threads - how_many_running_now): + logger.info("Adding new worker thread") + new_worker = update_worker.update_worker(update_q, notification_q, app, datastore) + running_update_threads.append(new_worker) + new_worker.start() + + app.config.exit.wait(2) + # Thread runner to check every minute, look for new watches to feed into the Queue. def ticker_thread_check_time_launch_checks(): import random - from changedetectionio import update_worker proxy_last_called_time = {} recheck_time_minimum_seconds = int(os.getenv('MINIMUM_SECONDS_RECHECK_TIME', 20)) logger.debug(f"System env MINIMUM_SECONDS_RECHECK_TIME {recheck_time_minimum_seconds}") - # Spin up Workers that do the fetching - # Can be overriden by ENV or use the default settings - n_workers = int(os.getenv("FETCH_WORKERS", datastore.data['settings']['requests']['workers'])) - for _ in range(n_workers): - new_worker = update_worker.update_worker(update_q, notification_q, app, datastore) - running_update_threads.append(new_worker) - new_worker.start() while not app.config.exit.is_set(): @@ -1728,7 +1779,7 @@ def ticker_thread_check_time_launch_checks(): priority = int(time.time()) logger.debug( f"> Queued watch UUID {uuid} " - f"last checked at {watch['last_checked']} " + f"last checked at {watch['last_checked']} ({seconds_since_last_recheck} seconds ago!) recheck min was :{recheck_time_minimum_seconds} " f"queued at {now:0.2f} priority {priority} " f"jitter {watch.jitter_seconds:0.2f}s, " f"{now - watch['last_checked']:0.2f}s since last checked")