Skip to content

Commit

Permalink
Sync all process before timing start
Browse files Browse the repository at this point in the history
Signed-off-by: yangxuan <[email protected]>
  • Loading branch information
XuanYang-cn authored and alwayslove2013 committed Jul 4, 2023
1 parent 9353f96 commit b060486
Showing 1 changed file with 27 additions and 11 deletions.
38 changes: 27 additions & 11 deletions vectordb_bench/backend/runner/mp_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,12 @@ def __init__(
self.test_data = utils.SharedNumpyArray(test_data)
log.debug(f"test dataset columns: {len(test_data)}")

def search(self, test_np: utils.SharedNumpyArray) -> tuple[int, float]:
def search(self, test_np: utils.SharedNumpyArray, q: mp.Queue, cond: mp.Condition) -> tuple[int, float]:
# sync all process
q.put(1)
with cond:
cond.wait()

with self.db.init():
test_data = test_np.read().tolist()
num, idx = len(test_data), 0
Expand Down Expand Up @@ -77,23 +82,34 @@ def search(self, test_np: utils.SharedNumpyArray) -> tuple[int, float]:

@staticmethod
def get_mp_context():
mp_start_method = "forkserver" if "forkserver" in mp.get_all_start_methods() else "spawn"
mp_start_method = "spawn"
log.debug(f"MultiProcessingSearchRunner get multiprocessing start method: {mp_start_method}")
return mp.get_context(mp_start_method)

def _run_all_concurrencies_mem_efficient(self) -> float:
max_qps = 0
try:
for conc in self.concurrencies:
with concurrent.futures.ProcessPoolExecutor(mp_context=self.get_mp_context(), max_workers=conc) as executor:
start = time.perf_counter()
log.info(f"start search {self.duration}s in concurrency {conc}, filters: {self.filters}")
future_iter = executor.map(self.search, [self.test_data for i in range(conc)])
all_count = sum([r[0] for r in future_iter])

cost = time.perf_counter() - start
qps = round(all_count / cost, 4)
log.info(f"end search in concurrency {conc}: dur={cost}s, total_count={all_count}, qps={qps}")
with mp.Manager() as m:
q, cond = m.Queue(), m.Condition()
with concurrent.futures.ProcessPoolExecutor(mp_context=self.get_mp_context(), max_workers=conc) as executor:
log.info(f"Start search {self.duration}s in concurrency {conc}, filters: {self.filters}")
future_iter = [executor.submit(self.search, self.test_data, q, cond) for i in range(conc)]
# Sync all processes
while q.qsize() < conc:
sleep_t = conc if conc < 10 else 10
time.sleep(sleep_t)

with cond:
cond.notify_all()
log.info(f"Syncing all process and start concurency search, concurency={conc}")

start = time.perf_counter()
all_count = sum([r.result()[0] for r in future_iter])
cost = time.perf_counter() - start

qps = round(all_count / cost, 4)
log.info(f"End search in concurrency {conc}: dur={cost}s, total_count={all_count}, qps={qps}")

if qps > max_qps:
max_qps = qps
Expand Down

0 comments on commit b060486

Please sign in to comment.