diff --git a/vectordb_bench/backend/runner/rate_runner.py b/vectordb_bench/backend/runner/rate_runner.py index ac7081c4..47e72069 100644 --- a/vectordb_bench/backend/runner/rate_runner.py +++ b/vectordb_bench/backend/runner/rate_runner.py @@ -72,8 +72,6 @@ def check_and_send_signal(wait_interval: float, finished: bool = False): _ = [fut.result() for fut in done] if len(not_done) > 0: self.executing_futures = list(not_done) - if len(not_done) > 100: - log.warning(f"[{len(not_done)}] tasks are not done, trying to wait in the next round") else: self.executing_futures = [] @@ -94,28 +92,33 @@ def check_and_send_signal(wait_interval: float, finished: bool = False): time_per_batch = config.TIME_PER_BATCH with self.db.init(): start_time = time.perf_counter() - inserted_batch_cnt = 0 + round_idx = 0 while True: - finished, elapsed_time = submit_by_rate() - if finished is True: - log.info(f"End of dataset, left unfinished={len(self.executing_futures)}") - break - if elapsed_time >= 1.5: - log.warning( - f"Submit insert tasks took {elapsed_time}s, expected 1s, " - f"indicating potential resource limitations on the client machine.", - ) - - wait_interval = 0.001 - check_and_send_signal(wait_interval, finished=False) - - dur = time.perf_counter() - start_time - inserted_batch_cnt * time_per_batch + if len(self.executing_futures) > 200: + log.warning("Skip data insertion this round. There are 200+ unfinished insertion tasks.") + else: + finished, elapsed_time = submit_by_rate() + if finished is True: + log.info( + f"End of dataset, left unfinished={len(self.executing_futures)}, num_round={round_idx}" + ) + break + if elapsed_time >= 1.5: + log.warning( + f"Submit insert tasks took {elapsed_time}s, expected 1s, " + f"indicating potential resource limitations on the client machine.", + ) + + check_and_send_signal(wait_interval=0.001, finished=False) + dur = time.perf_counter() - start_time - round_idx * time_per_batch if dur < time_per_batch: time.sleep(time_per_batch - dur) - inserted_batch_cnt += 1 + round_idx += 1 # wait for all tasks in executing_futures to complete - wait_interval = 1 while len(self.executing_futures) > 0: - check_and_send_signal(wait_interval, finished=True) + check_and_send_signal(wait_interval=1, finished=True) + round_idx += 1 + + log.info(f"Finish all streaming insertion, num_round={round_idx}")