From 9fe1122ec80ffc7b501ef094c3dc393a827c4de1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arturo=20Filast=C3=B2?= Date: Sat, 25 Jan 2025 16:50:52 +0100 Subject: [PATCH] Add max_worker option --- oonipipeline/src/oonipipeline/settings.py | 2 ++ oonipipeline/src/oonipipeline/tasks/observations.py | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/oonipipeline/src/oonipipeline/settings.py b/oonipipeline/src/oonipipeline/settings.py index 82fa6ee0..72d294d1 100644 --- a/oonipipeline/src/oonipipeline/settings.py +++ b/oonipipeline/src/oonipipeline/settings.py @@ -18,6 +18,8 @@ class Settings(BaseSettings): clickhouse_url: str = "clickhouse://localhost" clickhouse_write_batch_size: int = 200_000 + max_workers = int(os.cpu_count() * 0.7) + telemetry_endpoint: Optional[str] = None prometheus_bind_address: Optional[str] = None diff --git a/oonipipeline/src/oonipipeline/tasks/observations.py b/oonipipeline/src/oonipipeline/tasks/observations.py index 2e1a4a0e..27fa87ab 100644 --- a/oonipipeline/src/oonipipeline/tasks/observations.py +++ b/oonipipeline/src/oonipipeline/tasks/observations.py @@ -224,7 +224,7 @@ def make_observations(params: MakeObservationsParams) -> MakeObservationsResult: bucket_date=params.bucket_date, ) - with concurrent.futures.ProcessPoolExecutor() as executor: + with concurrent.futures.ProcessPoolExecutor(max_workers=config.max_workers) as executor: futures = [ executor.submit( make_observations_for_file_entry_batch,