From 921cd96d579b584dd8dcf370c02b5c72ddfa922c Mon Sep 17 00:00:00 2001 From: "min.tian" Date: Fri, 24 Jan 2025 11:06:56 +0800 Subject: [PATCH] fix rate_runner memory insufficiency Addressed the memory insufficiency problem when the vectordb can not handle too high insert_rate and vdbbench will hold too many insert tasks. Now, when the vdbbench accumulates a certain number of insert tasks, it will stop fetching data from the dataset until the number of insert tasks falls below 200. It ensures that the vdbbench client does not consume excessive memory. Signed-off-by: min.tian --- vectordb_bench/backend/runner/rate_runner.py | 43 +++++++++++--------- 1 file changed, 23 insertions(+), 20 deletions(-) diff --git a/vectordb_bench/backend/runner/rate_runner.py b/vectordb_bench/backend/runner/rate_runner.py index ac7081c4..47e72069 100644 --- a/vectordb_bench/backend/runner/rate_runner.py +++ b/vectordb_bench/backend/runner/rate_runner.py @@ -72,8 +72,6 @@ def check_and_send_signal(wait_interval: float, finished: bool = False): _ = [fut.result() for fut in done] if len(not_done) > 0: self.executing_futures = list(not_done) - if len(not_done) > 100: - log.warning(f"[{len(not_done)}] tasks are not done, trying to wait in the next round") else: self.executing_futures = [] @@ -94,28 +92,33 @@ def check_and_send_signal(wait_interval: float, finished: bool = False): time_per_batch = config.TIME_PER_BATCH with self.db.init(): start_time = time.perf_counter() - inserted_batch_cnt = 0 + round_idx = 0 while True: - finished, elapsed_time = submit_by_rate() - if finished is True: - log.info(f"End of dataset, left unfinished={len(self.executing_futures)}") - break - if elapsed_time >= 1.5: - log.warning( - f"Submit insert tasks took {elapsed_time}s, expected 1s, " - f"indicating potential resource limitations on the client machine.", - ) - - wait_interval = 0.001 - check_and_send_signal(wait_interval, finished=False) - - dur = time.perf_counter() - start_time - inserted_batch_cnt * time_per_batch + if len(self.executing_futures) > 200: + log.warning("Skip data insertion this round. There are 200+ unfinished insertion tasks.") + else: + finished, elapsed_time = submit_by_rate() + if finished is True: + log.info( + f"End of dataset, left unfinished={len(self.executing_futures)}, num_round={round_idx}" + ) + break + if elapsed_time >= 1.5: + log.warning( + f"Submit insert tasks took {elapsed_time}s, expected 1s, " + f"indicating potential resource limitations on the client machine.", + ) + + check_and_send_signal(wait_interval=0.001, finished=False) + dur = time.perf_counter() - start_time - round_idx * time_per_batch if dur < time_per_batch: time.sleep(time_per_batch - dur) - inserted_batch_cnt += 1 + round_idx += 1 # wait for all tasks in executing_futures to complete - wait_interval = 1 while len(self.executing_futures) > 0: - check_and_send_signal(wait_interval, finished=True) + check_and_send_signal(wait_interval=1, finished=True) + round_idx += 1 + + log.info(f"Finish all streaming insertion, num_round={round_idx}")