Skip to content

Commit

Permalink
fix rate_runner memory insufficiency
Browse files Browse the repository at this point in the history
Addressed the memory insufficiency problem when the vectordb can not handle too high insert_rate and vdbbench will hold too many insert tasks.

Now, when the vdbbench accumulates a certain number of insert tasks, it will stop fetching data from the dataset until the number of insert tasks falls below 200. It ensures that the vdbbench client does not consume excessive memory.

Signed-off-by: min.tian <[email protected]>
  • Loading branch information
alwayslove2013 authored and XuanYang-cn committed Jan 24, 2025
1 parent 78d6906 commit 0192c1a
Showing 1 changed file with 23 additions and 20 deletions.
43 changes: 23 additions & 20 deletions vectordb_bench/backend/runner/rate_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,6 @@ def check_and_send_signal(wait_interval: float, finished: bool = False):
_ = [fut.result() for fut in done]
if len(not_done) > 0:
self.executing_futures = list(not_done)
if len(not_done) > 100:
log.warning(f"[{len(not_done)}] tasks are not done, trying to wait in the next round")
else:
self.executing_futures = []

Expand All @@ -94,28 +92,33 @@ def check_and_send_signal(wait_interval: float, finished: bool = False):
time_per_batch = config.TIME_PER_BATCH
with self.db.init():
start_time = time.perf_counter()
inserted_batch_cnt = 0
round_idx = 0

while True:
finished, elapsed_time = submit_by_rate()
if finished is True:
log.info(f"End of dataset, left unfinished={len(self.executing_futures)}")
break
if elapsed_time >= 1.5:
log.warning(
f"Submit insert tasks took {elapsed_time}s, expected 1s, "
f"indicating potential resource limitations on the client machine.",
)

wait_interval = 0.001
check_and_send_signal(wait_interval, finished=False)

dur = time.perf_counter() - start_time - inserted_batch_cnt * time_per_batch
if len(self.executing_futures) > 200:
log.warning("Skip data insertion this round. There are 200+ unfinished insertion tasks.")
else:
finished, elapsed_time = submit_by_rate()
if finished is True:
log.info(
f"End of dataset, left unfinished={len(self.executing_futures)}, num_round={round_idx}"
)
break
if elapsed_time >= 1.5:
log.warning(
f"Submit insert tasks took {elapsed_time}s, expected 1s, "
f"indicating potential resource limitations on the client machine.",
)

check_and_send_signal(wait_interval=0.001, finished=False)
dur = time.perf_counter() - start_time - round_idx * time_per_batch
if dur < time_per_batch:
time.sleep(time_per_batch - dur)
inserted_batch_cnt += 1
round_idx += 1

# wait for all tasks in executing_futures to complete
wait_interval = 1
while len(self.executing_futures) > 0:
check_and_send_signal(wait_interval, finished=True)
check_and_send_signal(wait_interval=1, finished=True)
round_idx += 1

log.info(f"Finish all streaming insertion, num_round={round_idx}")

0 comments on commit 0192c1a

Please sign in to comment.